edb_info_stat.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package services
  2. import (
  3. "eta/eta_index_lib/models"
  4. "eta/eta_index_lib/models/data_stat"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "time"
  9. )
  10. // AddEdbInfoUpdateLog 添加指标编辑/刷新日志
  11. func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int) (err error) {
  12. var edbInfo *models.EdbInfo
  13. if edbInfoId > 0 {
  14. // 获取指标详情
  15. edbInfo, err = models.GetEdbInfoById(edbInfoId)
  16. if err != nil {
  17. err = fmt.Errorf("指标不存在")
  18. return
  19. }
  20. log := new(data_stat.EdbInfoUpdateLog)
  21. log.EdbInfoId = edbInfo.EdbInfoId
  22. log.SourceName = edbInfo.SourceName
  23. log.Source = edbInfo.Source
  24. log.EdbCode = edbInfo.EdbCode
  25. log.EdbName = edbInfo.EdbName
  26. log.EdbNameSource = edbInfo.EdbNameSource
  27. log.Frequency = edbInfo.Frequency
  28. log.Unit = edbInfo.Unit
  29. log.StartDate = edbInfo.StartDate
  30. log.EndDate = edbInfo.EndDate
  31. log.SysUserId = edbInfo.SysUserId
  32. log.SysUserRealName = edbInfo.SysUserRealName
  33. log.UniqueCode = edbInfo.UniqueCode
  34. log.EdbCreateTime = edbInfo.CreateTime
  35. log.EdbModifyTime = edbInfo.ModifyTime
  36. log.CreateTime = time.Now()
  37. log.LatestDate = edbInfo.LatestDate
  38. log.LatestValue = edbInfo.LatestValue
  39. log.TerminalCode = edbInfo.TerminalCode
  40. log.UpdateResult = updateResult
  41. log.UpdateFailedReason = updateFailedReason
  42. log.DataUpdateTime = edbInfo.DataUpdateTime
  43. log.ErDataUpdateDate = edbInfo.ErDataUpdateDate
  44. log.DataUpdateResult = dataUpdateResult
  45. log.DataUpdateFailedReason = dataUpdateFailedReason
  46. log.IsSourceRefresh = isSourceRefresh
  47. _, err = data_stat.AddEdbUpdateLog(log)
  48. if err != nil {
  49. err = fmt.Errorf("新增指标更新日志失败,Err: %s", err)
  50. return
  51. }
  52. }
  53. return
  54. }
  55. // SetMysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表
  56. func SetMysteelChemicalEdbInfoUpdateStat(needStat bool) (err error) {
  57. defer func() {
  58. if err != nil {
  59. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
  60. alarm_msg.SendAlarmMsg(tips, 3)
  61. }
  62. }()
  63. //查询钢联的所有指标信息
  64. condition := " and source = ? "
  65. var pars []interface{}
  66. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL)
  67. edbList, err := models.GetEdbInfoByCondition(condition, pars, 0)
  68. if err != nil {
  69. err = fmt.Errorf("查询钢联化工指标信息出错,err: %s", err)
  70. return
  71. }
  72. nowTime := time.Now()
  73. today := time.Now().Format(utils.FormatDate)
  74. todayT, _ := time.ParseInLocation(utils.FormatDate, today, time.Local)
  75. nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
  76. //查询当日所有钢联指标的终端更新记录
  77. updateLogList, err := data_stat.GetEdbUpdateSourceLogByCreateDate(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
  78. if err != nil {
  79. err = fmt.Errorf("查询钢联化工指标终端更新日志报错,err: %s", err)
  80. return
  81. }
  82. if !needStat && len(updateLogList) == 0 {
  83. return
  84. }
  85. updateLogMap := make(map[int]*data_stat.EdbInfoUpdateLog)
  86. if len(updateLogList) > 0 {
  87. for _, v := range updateLogList {
  88. if _, ok := updateLogMap[v.EdbInfoId]; !ok {
  89. updateLogMap[v.EdbInfoId] = v
  90. }
  91. }
  92. }
  93. statCond := " and source = ? and create_time >= ? and create_time < ?"
  94. var statPars []interface{}
  95. statPars = append(statPars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
  96. //查询当日钢联所有的刷新记录
  97. updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
  98. if err != nil {
  99. err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
  100. return
  101. }
  102. updateStatMap := make(map[int]*data_stat.EdbInfoUpdateStat)
  103. if len(updateStatList) > 0 {
  104. for _, v := range updateStatList {
  105. updateStatMap[v.EdbInfoId] = v
  106. }
  107. }
  108. logStat := new(data_stat.EdbInfoUpdateStat)
  109. //组装新增数据
  110. addList := make([]*data_stat.EdbInfoUpdateStat, 0)
  111. modifyList := make([]*data_stat.EdbInfoUpdateStat, 0)
  112. if len(edbList) > 0 {
  113. for _, v := range edbList {
  114. tmp := &data_stat.EdbInfoUpdateStat{
  115. EdbInfoId: v.EdbInfoId,
  116. SourceName: v.SourceName,
  117. Source: v.Source,
  118. EdbCode: v.EdbCode,
  119. EdbName: v.EdbName,
  120. EdbNameSource: v.EdbNameSource,
  121. Frequency: v.Frequency,
  122. Unit: v.Unit,
  123. StartDate: v.StartDate,
  124. EndDate: v.EndDate,
  125. SysUserId: v.SysUserId,
  126. SysUserRealName: v.SysUserRealName,
  127. UniqueCode: v.UniqueCode,
  128. EdbCreateTime: v.CreateTime,
  129. EdbModifyTime: v.ModifyTime,
  130. LatestDate: v.LatestDate,
  131. LatestValue: v.LatestValue,
  132. TerminalCode: v.TerminalCode,
  133. DataUpdateTime: v.DataUpdateTime,
  134. ErDataUpdateDate: v.ErDataUpdateDate,
  135. ModifyTime: nowTime,
  136. }
  137. needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.Frequency)
  138. tmp.NeedRefresh = needRefresh
  139. // 判断是否当日新增
  140. if v.CreateTime.After(todayT) || v.CreateTime == todayT {
  141. tmp.IsAdd = 1
  142. } else {
  143. tmp.IsAdd = 2
  144. }
  145. if up, ok := updateLogMap[v.EdbInfoId]; ok {
  146. tmp.DataUpdateTime = up.DataUpdateTime
  147. tmp.ErDataUpdateDate = up.ErDataUpdateDate
  148. tmp.DataUpdateResult = up.DataUpdateResult
  149. tmp.DataUpdateFailedReason = up.DataUpdateFailedReason
  150. tmp.HasRefresh = 1
  151. tmp.UpdateResult = up.UpdateResult
  152. tmp.UpdateFailedReason = up.UpdateFailedReason
  153. tmp.UpdateTime = up.CreateTime.Format(utils.FormatDateTime)
  154. } else if needRefresh == 1 {
  155. tmp.HasRefresh = 0
  156. tmp.DataUpdateResult = 2
  157. tmp.DataUpdateFailedReason = "服务异常"
  158. }
  159. // 判断是否需要新增还是更新
  160. if exist, ok := updateStatMap[v.EdbInfoId]; ok {
  161. tmp.Id = exist.Id
  162. modifyList = append(modifyList, tmp)
  163. } else {
  164. tmp.CreateTime = nowTime
  165. addList = append(addList, tmp)
  166. }
  167. }
  168. }
  169. //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
  170. if len(addList) > 0 {
  171. err = logStat.Add(addList)
  172. }
  173. if len(modifyList) > 0 {
  174. err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
  175. }
  176. return
  177. }
  178. func checkMySteelEdbInfoNeedRefresh(frequency string) (needRefresh int, err error) {
  179. now := time.Now()
  180. week := int(now.Weekday())
  181. //日度
  182. if week >= 1 && week <= 6 {
  183. if frequency == "日度" {
  184. needRefresh = 1
  185. return
  186. }
  187. }
  188. //周度
  189. if week >= 3 && week <= 6 {
  190. if frequency == "周度" {
  191. needRefresh = 1
  192. return
  193. }
  194. }
  195. day := now.Day() //季度,月度,年度都是每个月1号刷新
  196. if day == 1 {
  197. needRefresh = 1
  198. }
  199. return
  200. }
  201. // SetEdbSourceStat 定时统计数据源汇总表
  202. func SetEdbSourceStat(needStat bool) (err error) {
  203. defer func() {
  204. if err != nil {
  205. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
  206. alarm_msg.SendAlarmMsg(tips, 3)
  207. }
  208. }()
  209. //查询钢联的所有指标信息
  210. nowTime := time.Now()
  211. today := time.Now().Format(utils.FormatDate)
  212. nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
  213. statCond := " and create_time >= ? and create_time < ?"
  214. var statPars []interface{}
  215. statPars = append(statPars, today, nextDay)
  216. //查询当日钢联所有的统计数据
  217. updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
  218. if err != nil {
  219. err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
  220. return
  221. }
  222. if !needStat && len(updateStatList) == 0 {
  223. return
  224. }
  225. updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat)
  226. if len(updateStatList) > 0 {
  227. for _, v := range updateStatList {
  228. updateStatMap[v.TerminalCode] = append(updateStatMap[v.TerminalCode], v)
  229. }
  230. }
  231. cond := " and create_time >= ? and create_time < ?"
  232. var pars []interface{}
  233. pars = append(pars, today, nextDay)
  234. //查询当日钢联所有的统计数据
  235. statList, err := data_stat.GetEdbSourceStatByCondition(cond, pars)
  236. if err != nil {
  237. err = fmt.Errorf("查询钢联化工数据源统计报错,err: %s", err)
  238. return
  239. }
  240. statMap := make(map[string]*data_stat.EdbSourceStat)
  241. if len(statList) > 0 {
  242. for _, v := range statList {
  243. statMap[v.TerminalCode] = v
  244. }
  245. }
  246. // 查询今日被删除的指标数
  247. delList, err := data_stat.GetEdbDeleteLogNumByCreateTime(today, nextDay)
  248. if err != nil {
  249. err = fmt.Errorf("查询今日被删除指标数目报错,err: %s", err)
  250. return
  251. }
  252. delMap := make(map[string]int)
  253. if len(delList) > 0 {
  254. for _, v := range delList {
  255. delMap[v.TerminalCode] = v.Num
  256. }
  257. }
  258. logStat := new(data_stat.EdbSourceStat)
  259. //组装新增数据
  260. addList := make([]*data_stat.EdbSourceStat, 0)
  261. modifyList := make([]*data_stat.EdbSourceStat, 0)
  262. for terminalCode, list := range updateStatMap {
  263. tmp := new(data_stat.EdbSourceStat)
  264. for k, v := range list {
  265. if k == 0 {
  266. tmp.SourceName = v.SourceName
  267. tmp.Source = v.Source
  268. tmp.TerminalCode = v.TerminalCode
  269. tmp.ModifyTime = nowTime
  270. }
  271. tmp.EdbNum = tmp.EdbNum + 1
  272. if v.IsAdd == 1 {
  273. tmp.EdbNewNum = tmp.EdbNewNum + 1
  274. }
  275. if v.NeedRefresh == 1 {
  276. tmp.NeedRefreshNum = tmp.NeedRefreshNum + 1
  277. }
  278. if v.HasRefresh == 1 {
  279. tmp.HasRefreshNum = tmp.HasRefreshNum + 1
  280. }
  281. // 区分刷新成功和更新成功
  282. if v.DataUpdateResult == 1 { //处理更新结果
  283. tmp.UpdateSuccessNum = tmp.UpdateSuccessNum + 1
  284. } else if v.NeedRefresh == 1 {
  285. tmp.UpdateFailedNum = tmp.UpdateFailedNum + 1
  286. }
  287. if v.UpdateResult == 1 { //刷新结果
  288. tmp.RefreshSuccessNum = tmp.RefreshSuccessNum + 1
  289. } else if v.NeedRefresh == 1 {
  290. tmp.RefreshFailedNum = tmp.RefreshFailedNum + 1
  291. }
  292. }
  293. // 处理今天删除的指标数量
  294. if dn, ok := delMap[terminalCode]; ok {
  295. tmp.EdbDelNum = dn
  296. }
  297. // 判断是否需要新增还是更新
  298. if exist, ok := statMap[terminalCode]; ok {
  299. tmp.Id = exist.Id
  300. modifyList = append(modifyList, tmp)
  301. } else {
  302. tmp.CreateTime = nowTime
  303. addList = append(addList, tmp)
  304. }
  305. }
  306. //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
  307. if len(addList) > 0 {
  308. err = logStat.Add(addList)
  309. }
  310. if len(modifyList) > 0 {
  311. err = data_stat.UpdateEdbSourceStatMulti(modifyList)
  312. }
  313. return
  314. }