edb_info_stat.go 13 KB


  1. package services
  2. import (
  3. "eta/eta_index_lib/models"
  4. "eta/eta_index_lib/models/data_stat"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "time"
  9. )
  10. // AddEdbInfoUpdateLog 添加指标编辑/刷新日志
  11. func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int, updateType int) (err error) {
  12. var edbInfo *models.EdbInfo
  13. if edbInfoId > 0 {
  14. // 获取指标详情
  15. edbInfo, err = models.GetEdbInfoById(edbInfoId)
  16. if err != nil {
  17. err = fmt.Errorf("指标不存在")
  18. return
  19. }
  20. log := new(data_stat.EdbInfoUpdateLog)
  21. log.EdbInfoId = edbInfo.EdbInfoId
  22. log.SourceName = edbInfo.SourceName
  23. log.Source = edbInfo.Source
  24. log.EdbCode = edbInfo.EdbCode
  25. log.EdbName = edbInfo.EdbName
  26. log.EdbNameSource = edbInfo.SourceIndexName
  27. log.Frequency = edbInfo.Frequency
  28. log.Unit = edbInfo.Unit
  29. log.StartDate = edbInfo.StartDate
  30. log.EndDate = edbInfo.EndDate
  31. log.SysUserId = edbInfo.SysUserId
  32. log.SysUserRealName = edbInfo.SysUserRealName
  33. log.UniqueCode = edbInfo.UniqueCode
  34. log.EdbCreateTime = edbInfo.CreateTime
  35. log.EdbModifyTime = edbInfo.ModifyTime
  36. log.CreateTime = time.Now()
  37. log.LatestDate = edbInfo.LatestDate
  38. log.LatestValue = edbInfo.LatestValue
  39. log.TerminalCode = edbInfo.TerminalCode
  40. log.UpdateResult = updateResult
  41. log.UpdateFailedReason = updateFailedReason
  42. log.DataUpdateTime = edbInfo.DataUpdateTime
  43. log.ErDataUpdateDate = edbInfo.ErDataUpdateDate
  44. log.DataUpdateResult = dataUpdateResult
  45. log.DataUpdateFailedReason = dataUpdateFailedReason
  46. log.IsSourceRefresh = isSourceRefresh
  47. log.UpdateType = updateType
  48. _, err = data_stat.AddEdbUpdateLog(log)
  49. if err != nil {
  50. err = fmt.Errorf("新增指标更新日志失败,Err: %s", err)
  51. return
  52. }
  53. }
  54. return
  55. }
  56. // SetMysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表
  57. func SetMysteelChemicalEdbInfoUpdateStat(needStat bool) (err error) {
  58. defer func() {
  59. if err != nil {
  60. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
  61. utils.FileLog.Info(tips)
  62. alarm_msg.SendAlarmMsg(tips, 3)
  63. }
  64. }()
  65. //查询钢联的所有在更新的指标信息
  66. condition := " and source = ? and no_update=0"
  67. var pars []interface{}
  68. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL)
  69. edbList, err := models.GetEdbInfoByCondition(condition, pars, 0)
  70. if err != nil {
  71. err = fmt.Errorf("查询钢联化工指标信息出错,err: %s", err)
  72. return
  73. }
  74. nowTime := time.Now()
  75. today := time.Now().Format(utils.FormatDate)
  76. todayT, _ := time.ParseInLocation(utils.FormatDate, today, time.Local)
  77. nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
  78. //查询当日所有钢联指标的终端更新记录
  79. updateLogList, err := data_stat.GetEdbUpdateSourceLogByCreateDate(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
  80. if err != nil {
  81. err = fmt.Errorf("查询钢联化工指标终端更新日志报错,err: %s", err)
  82. return
  83. }
  84. fmt.Println(len(updateLogList))
  85. if !needStat && len(updateLogList) == 0 { //如果不存在变更记录 则不进行汇总
  86. return
  87. }
  88. updateLogMap := make(map[int]*data_stat.EdbInfoUpdateLog)
  89. if len(updateLogList) > 0 {
  90. for _, v := range updateLogList {
  91. if _, ok := updateLogMap[v.EdbInfoId]; !ok {
  92. updateLogMap[v.EdbInfoId] = v
  93. }
  94. }
  95. }
  96. statCond := " and source = ? and create_time >= ? and create_time < ?"
  97. var statPars []interface{}
  98. statPars = append(statPars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
  99. //查询当日钢联所有的刷新记录
  100. updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
  101. if err != nil {
  102. err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
  103. return
  104. }
  105. updateStatMap := make(map[int]*data_stat.EdbInfoUpdateStat)
  106. if len(updateStatList) > 0 {
  107. for _, v := range updateStatList {
  108. updateStatMap[v.EdbInfoId] = v
  109. }
  110. }
  111. indexObj := new(models.BaseFromMysteelChemicalIndex)
  112. week := int(nowTime.Weekday())
  113. weekNeedRefreshMap := make(map[string]struct{})
  114. if week >= 3 && week <= 6 {
  115. endDate := utils.GetNowWeekMonday().Format(utils.FormatDate)
  116. nowDate := time.Now().Format(utils.FormatDate)
  117. cond := ` AND frequency = ? AND (end_date < ? or end_date=?) AND is_stop = 0`
  118. var tmpPars []interface{}
  119. tmpPars = append(tmpPars, "周度", endDate, nowDate)
  120. //查询所有需要当日刷新的周度指标
  121. indexTotal, tErr := indexObj.GetIndexByCondition(cond, tmpPars)
  122. if tErr != nil {
  123. err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr)
  124. return
  125. }
  126. for _, v := range indexTotal {
  127. weekNeedRefreshMap[v.IndexCode] = struct{}{}
  128. }
  129. }
  130. //查询所有停更指标
  131. stopRefreshMap := make(map[string]struct{})
  132. tmpCond := ` AND is_stop = 1`
  133. //查询所有需要当日刷新的周度指标
  134. indexStop, tErr := indexObj.GetIndexByCondition(tmpCond, []interface{}{})
  135. if tErr != nil {
  136. err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr)
  137. return
  138. }
  139. for _, v := range indexStop {
  140. stopRefreshMap[v.IndexCode] = struct{}{}
  141. }
  142. logStat := new(data_stat.EdbInfoUpdateStat)
  143. //组装新增数据
  144. addList := make([]*data_stat.EdbInfoUpdateStat, 0)
  145. modifyList := make([]*data_stat.EdbInfoUpdateStat, 0)
  146. if len(edbList) > 0 {
  147. for _, v := range edbList {
  148. if _, ok := stopRefreshMap[v.EdbCode]; ok {
  149. continue
  150. }
  151. tmp := &data_stat.EdbInfoUpdateStat{
  152. EdbInfoId: v.EdbInfoId,
  153. SourceName: v.SourceName,
  154. Source: v.Source,
  155. EdbCode: v.EdbCode,
  156. EdbName: v.EdbName,
  157. EdbNameSource: v.EdbNameSource,
  158. Frequency: v.Frequency,
  159. Unit: v.Unit,
  160. StartDate: v.StartDate,
  161. EndDate: v.EndDate,
  162. SysUserId: v.SysUserId,
  163. SysUserRealName: v.SysUserRealName,
  164. UniqueCode: v.UniqueCode,
  165. EdbCreateTime: v.CreateTime,
  166. EdbModifyTime: v.ModifyTime,
  167. LatestDate: v.LatestDate,
  168. LatestValue: v.LatestValue,
  169. TerminalCode: v.TerminalCode,
  170. DataUpdateTime: v.DataUpdateTime,
  171. ErDataUpdateDate: v.ErDataUpdateDate,
  172. ModifyTime: nowTime,
  173. }
  174. exist, existOk := updateStatMap[v.EdbInfoId]
  175. frequency := v.Frequency
  176. if v.Frequency == "旬度" { //特殊处理指标库里和数据源里频度不一致的情况
  177. //查询源指标库的频度
  178. indexTmp, e := indexObj.GetIndexItem(v.EdbCode)
  179. if e == nil {
  180. frequency = indexTmp.Frequency
  181. }
  182. }
  183. if existOk {
  184. tmp.NeedRefresh = exist.NeedRefresh
  185. } else {
  186. needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.EdbCode, frequency, weekNeedRefreshMap)
  187. tmp.NeedRefresh = needRefresh
  188. }
  189. // 判断是否当日新增
  190. if v.CreateTime.After(todayT) || v.CreateTime == todayT {
  191. tmp.IsAdd = 1
  192. } else {
  193. tmp.IsAdd = 2
  194. }
  195. if up, ok := updateLogMap[v.EdbInfoId]; ok {
  196. tmp.DataUpdateTime = up.DataUpdateTime
  197. tmp.ErDataUpdateDate = up.ErDataUpdateDate
  198. tmp.DataUpdateResult = up.DataUpdateResult
  199. tmp.DataUpdateFailedReason = up.DataUpdateFailedReason
  200. tmp.HasRefresh = 1
  201. tmp.UpdateResult = up.UpdateResult
  202. tmp.UpdateFailedReason = up.UpdateFailedReason
  203. tmp.UpdateTime = up.CreateTime.Format(utils.FormatDateTime)
  204. } else if tmp.NeedRefresh == 1 {
  205. tmp.HasRefresh = 0
  206. tmp.DataUpdateResult = 2
  207. tmp.DataUpdateFailedReason = "服务异常"
  208. }
  209. // 判断是否需要新增还是更新
  210. if existOk {
  211. tmp.Id = exist.Id
  212. modifyList = append(modifyList, tmp)
  213. } else {
  214. tmp.CreateTime = nowTime
  215. addList = append(addList, tmp)
  216. }
  217. if len(addList) >= 500 {
  218. err = logStat.Add(addList)
  219. if err != nil {
  220. err = fmt.Errorf("新增钢联化工明细记录报错,err: %s", err)
  221. return
  222. }
  223. addList = addList[:0]
  224. }
  225. if len(modifyList) >= 500 {
  226. err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
  227. if err != nil {
  228. err = fmt.Errorf("更新钢联化工明细记录报错,err: %s", err)
  229. return
  230. }
  231. modifyList = modifyList[:0]
  232. }
  233. }
  234. }
  235. //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
  236. if len(addList) > 0 {
  237. err = logStat.Add(addList)
  238. }
  239. if len(modifyList) > 0 {
  240. err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
  241. }
  242. return
  243. }
  244. func checkMySteelEdbInfoNeedRefresh(edbCode, frequency string, weekNeedRefreshMap map[string]struct{}) (needRefresh int, err error) {
  245. now := time.Now()
  246. week := int(now.Weekday())
  247. //日度
  248. if week >= 1 && week <= 6 {
  249. if frequency == "日度" {
  250. needRefresh = 1
  251. return
  252. }
  253. }
  254. //周度
  255. if week >= 3 && week <= 6 {
  256. _, ok := weekNeedRefreshMap[edbCode]
  257. if frequency == "周度" && ok {
  258. needRefresh = 1
  259. return
  260. }
  261. }
  262. //季度,月度,年度都是每个周末刷新
  263. if week == 0 {
  264. if frequency == "旬度" || frequency == "月度" || frequency == "季度" || frequency == "年度" {
  265. needRefresh = 1
  266. return
  267. }
  268. }
  269. return
  270. }
  271. // SetEdbSourceStat 定时统计数据源汇总表
  272. func SetEdbSourceStat(needStat bool) (err error) {
  273. defer func() {
  274. if err != nil {
  275. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
  276. alarm_msg.SendAlarmMsg(tips, 3)
  277. }
  278. }()
  279. //查询钢联的所有指标信息
  280. nowTime := time.Now()
  281. today := time.Now().Format(utils.FormatDate)
  282. nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
  283. statCond := " and create_time >= ? and create_time < ?"
  284. var statPars []interface{}
  285. statPars = append(statPars, today, nextDay)
  286. //查询当日钢联所有的统计数据
  287. updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
  288. if err != nil {
  289. err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
  290. return
  291. }
  292. if !needStat && len(updateStatList) == 0 {
  293. return
  294. }
  295. updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat)
  296. if len(updateStatList) > 0 {
  297. for _, v := range updateStatList {
  298. updateStatMap[v.TerminalCode] = append(updateStatMap[v.TerminalCode], v)
  299. }
  300. }
  301. cond := " and create_time >= ? and create_time < ?"
  302. var pars []interface{}
  303. pars = append(pars, today, nextDay)
  304. //查询当日钢联所有的统计数据
  305. statList, err := data_stat.GetEdbSourceStatByCondition(cond, pars)
  306. if err != nil {
  307. err = fmt.Errorf("查询钢联化工数据源统计报错,err: %s", err)
  308. return
  309. }
  310. statMap := make(map[string]*data_stat.EdbSourceStat)
  311. if len(statList) > 0 {
  312. for _, v := range statList {
  313. statMap[v.TerminalCode] = v
  314. }
  315. }
  316. // 查询今日被删除的指标数
  317. delList, err := data_stat.GetEdbDeleteLogNumByCreateTime(today, nextDay)
  318. if err != nil {
  319. err = fmt.Errorf("查询今日被删除指标数目报错,err: %s", err)
  320. return
  321. }
  322. delMap := make(map[string]int)
  323. if len(delList) > 0 {
  324. for _, v := range delList {
  325. delMap[v.TerminalCode] = v.Num
  326. }
  327. }
  328. logStat := new(data_stat.EdbSourceStat)
  329. //组装新增数据
  330. addList := make([]*data_stat.EdbSourceStat, 0)
  331. modifyList := make([]*data_stat.EdbSourceStat, 0)
  332. for terminalCode, list := range updateStatMap {
  333. tmp := new(data_stat.EdbSourceStat)
  334. for k, v := range list {
  335. if k == 0 {
  336. tmp.SourceName = v.SourceName
  337. tmp.Source = v.Source
  338. tmp.TerminalCode = v.TerminalCode
  339. tmp.ModifyTime = nowTime
  340. }
  341. tmp.EdbNum = tmp.EdbNum + 1
  342. if v.IsAdd == 1 {
  343. tmp.EdbNewNum = tmp.EdbNewNum + 1
  344. }
  345. if v.NeedRefresh == 1 {
  346. tmp.NeedRefreshNum = tmp.NeedRefreshNum + 1
  347. }
  348. if v.HasRefresh == 1 {
  349. tmp.HasRefreshNum = tmp.HasRefreshNum + 1
  350. }
  351. // 区分刷新成功和更新成功
  352. if v.DataUpdateResult == 1 { //处理更新结果
  353. tmp.UpdateSuccessNum = tmp.UpdateSuccessNum + 1
  354. } else if v.NeedRefresh == 1 {
  355. tmp.UpdateFailedNum = tmp.UpdateFailedNum + 1
  356. }
  357. if v.UpdateResult == 1 { //刷新结果
  358. tmp.RefreshSuccessNum = tmp.RefreshSuccessNum + 1
  359. } else if v.HasRefresh == 1 {
  360. tmp.RefreshFailedNum = tmp.RefreshFailedNum + 1
  361. }
  362. }
  363. // 处理今天删除的指标数量
  364. if dn, ok := delMap[terminalCode]; ok {
  365. tmp.EdbDelNum = dn
  366. }
  367. // 判断是否需要新增还是更新
  368. if exist, ok := statMap[terminalCode]; ok {
  369. tmp.Id = exist.Id
  370. modifyList = append(modifyList, tmp)
  371. } else {
  372. tmp.CreateTime = nowTime
  373. addList = append(addList, tmp)
  374. }
  375. if len(addList) >= 500 {
  376. err = logStat.Add(addList)
  377. if err != nil {
  378. err = fmt.Errorf("新增钢联化工统计表报错,err: %s", err)
  379. return
  380. }
  381. addList = addList[:0]
  382. }
  383. if len(modifyList) >= 500 {
  384. err = data_stat.UpdateEdbSourceStatMulti(modifyList)
  385. if err != nil {
  386. err = fmt.Errorf("更新钢联化工统计表报错,err: %s", err)
  387. return
  388. }
  389. modifyList = modifyList[:0]
  390. }
  391. }
  392. //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
  393. if len(addList) > 0 {
  394. err = logStat.Add(addList)
  395. }
  396. if len(modifyList) > 0 {
  397. err = data_stat.UpdateEdbSourceStatMulti(modifyList)
  398. }
  399. return
  400. }