edb_info_stat.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package services
  2. import (
  3. "eta/eta_index_lib/models"
  4. "eta/eta_index_lib/models/data_stat"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "time"
  9. )
  10. // AddEdbInfoUpdateLog 添加指标编辑/刷新日志
  11. func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int, updateType int) (err error) {
  12. var edbInfo *models.EdbInfo
  13. if edbInfoId > 0 {
  14. // 获取指标详情
  15. edbInfo, err = models.GetEdbInfoById(edbInfoId)
  16. if err != nil {
  17. err = fmt.Errorf("指标不存在")
  18. return
  19. }
  20. log := new(data_stat.EdbInfoUpdateLog)
  21. log.EdbInfoId = edbInfo.EdbInfoId
  22. log.SourceName = edbInfo.SourceName
  23. log.Source = edbInfo.Source
  24. log.EdbCode = edbInfo.EdbCode
  25. log.EdbName = edbInfo.EdbName
  26. log.EdbNameSource = edbInfo.SourceIndexName
  27. log.Frequency = edbInfo.Frequency
  28. log.Unit = edbInfo.Unit
  29. log.StartDate = edbInfo.StartDate
  30. log.EndDate = edbInfo.EndDate
  31. log.SysUserId = edbInfo.SysUserId
  32. log.SysUserRealName = edbInfo.SysUserRealName
  33. log.UniqueCode = edbInfo.UniqueCode
  34. log.EdbCreateTime = edbInfo.CreateTime
  35. log.EdbModifyTime = edbInfo.ModifyTime
  36. log.CreateTime = time.Now()
  37. log.LatestDate = edbInfo.LatestDate
  38. log.LatestValue = edbInfo.LatestValue
  39. log.TerminalCode = edbInfo.TerminalCode
  40. log.UpdateResult = updateResult
  41. log.UpdateFailedReason = updateFailedReason
  42. log.DataUpdateTime = edbInfo.DataUpdateTime
  43. log.ErDataUpdateDate = edbInfo.ErDataUpdateDate
  44. log.DataUpdateResult = dataUpdateResult
  45. log.DataUpdateFailedReason = dataUpdateFailedReason
  46. log.IsSourceRefresh = isSourceRefresh
  47. log.UpdateType = updateType
  48. _, err = data_stat.AddEdbUpdateLog(log)
  49. if err != nil {
  50. err = fmt.Errorf("新增指标更新日志失败,Err: %s", err)
  51. return
  52. }
  53. }
  54. return
  55. }
  56. // SetMysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表
  57. func SetMysteelChemicalEdbInfoUpdateStat(needStat bool) (err error) {
  58. defer func() {
  59. if err != nil {
  60. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
  61. alarm_msg.SendAlarmMsg(tips, 3)
  62. }
  63. }()
  64. //查询钢联的所有在更新的指标信息
  65. condition := " and source = ? and no_update=0"
  66. var pars []interface{}
  67. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL)
  68. edbList, err := models.GetEdbInfoByCondition(condition, pars, 0)
  69. if err != nil {
  70. err = fmt.Errorf("查询钢联化工指标信息出错,err: %s", err)
  71. return
  72. }
  73. nowTime := time.Now()
  74. today := time.Now().Format(utils.FormatDate)
  75. todayT, _ := time.ParseInLocation(utils.FormatDate, today, time.Local)
  76. nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
  77. //查询当日所有钢联指标的终端更新记录
  78. updateLogList, err := data_stat.GetEdbUpdateSourceLogByCreateDate(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
  79. if err != nil {
  80. err = fmt.Errorf("查询钢联化工指标终端更新日志报错,err: %s", err)
  81. return
  82. }
  83. if !needStat && len(updateLogList) == 0 {
  84. return
  85. }
  86. updateLogMap := make(map[int]*data_stat.EdbInfoUpdateLog)
  87. if len(updateLogList) > 0 {
  88. for _, v := range updateLogList {
  89. if _, ok := updateLogMap[v.EdbInfoId]; !ok {
  90. updateLogMap[v.EdbInfoId] = v
  91. }
  92. }
  93. }
  94. statCond := " and source = ? and create_time >= ? and create_time < ?"
  95. var statPars []interface{}
  96. statPars = append(statPars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
  97. //查询当日钢联所有的刷新记录
  98. updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
  99. if err != nil {
  100. err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
  101. return
  102. }
  103. updateStatMap := make(map[int]*data_stat.EdbInfoUpdateStat)
  104. if len(updateStatList) > 0 {
  105. for _, v := range updateStatList {
  106. updateStatMap[v.EdbInfoId] = v
  107. }
  108. }
  109. indexObj := new(models.BaseFromMysteelChemicalIndex)
  110. week := int(nowTime.Weekday())
  111. weekNeedRefreshMap := make(map[string]struct{})
  112. if week >= 3 && week <= 6 {
  113. yesterday := nowTime.AddDate(0, 0, -1).Format(utils.FormatDate)
  114. cond := ` AND frequency = ? AND end_date < ? AND is_stop = 0`
  115. var tmpPars []interface{}
  116. tmpPars = append(tmpPars, "周度", yesterday)
  117. //查询所有需要当日刷新的周度指标
  118. indexTotal, tErr := indexObj.GetIndexByCondition(cond, tmpPars)
  119. if tErr != nil {
  120. err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr)
  121. return
  122. }
  123. for _, v := range indexTotal {
  124. weekNeedRefreshMap[v.IndexCode] = struct{}{}
  125. }
  126. }
  127. //查询所有停更指标
  128. stopRefreshMap := make(map[string]struct{})
  129. tmpCond := ` AND is_stop = 1`
  130. var tmpPars []interface{}
  131. //查询所有需要当日刷新的周度指标
  132. indexStop, tErr := indexObj.GetIndexByCondition(tmpCond, tmpPars)
  133. if tErr != nil {
  134. err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr)
  135. return
  136. }
  137. for _, v := range indexStop {
  138. stopRefreshMap[v.IndexCode] = struct{}{}
  139. }
  140. logStat := new(data_stat.EdbInfoUpdateStat)
  141. //组装新增数据
  142. addList := make([]*data_stat.EdbInfoUpdateStat, 0)
  143. modifyList := make([]*data_stat.EdbInfoUpdateStat, 0)
  144. if len(edbList) > 0 {
  145. for _, v := range edbList {
  146. if _, ok := stopRefreshMap[v.EdbCode]; ok {
  147. continue
  148. }
  149. tmp := &data_stat.EdbInfoUpdateStat{
  150. EdbInfoId: v.EdbInfoId,
  151. SourceName: v.SourceName,
  152. Source: v.Source,
  153. EdbCode: v.EdbCode,
  154. EdbName: v.EdbName,
  155. EdbNameSource: v.EdbNameSource,
  156. Frequency: v.Frequency,
  157. Unit: v.Unit,
  158. StartDate: v.StartDate,
  159. EndDate: v.EndDate,
  160. SysUserId: v.SysUserId,
  161. SysUserRealName: v.SysUserRealName,
  162. UniqueCode: v.UniqueCode,
  163. EdbCreateTime: v.CreateTime,
  164. EdbModifyTime: v.ModifyTime,
  165. LatestDate: v.LatestDate,
  166. LatestValue: v.LatestValue,
  167. TerminalCode: v.TerminalCode,
  168. DataUpdateTime: v.DataUpdateTime,
  169. ErDataUpdateDate: v.ErDataUpdateDate,
  170. ModifyTime: nowTime,
  171. }
  172. frequency := v.Frequency
  173. if v.Frequency == "旬度" { //特殊处理指标库里和数据源里频度不一致的情况
  174. //查询源指标库的频度
  175. indexTmp, e := indexObj.GetIndexItem(v.EdbCode)
  176. if e == nil {
  177. frequency = indexTmp.Frequency
  178. }
  179. }
  180. needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.EdbCode, frequency, weekNeedRefreshMap)
  181. tmp.NeedRefresh = needRefresh
  182. // 判断是否当日新增
  183. if v.CreateTime.After(todayT) || v.CreateTime == todayT {
  184. tmp.IsAdd = 1
  185. } else {
  186. tmp.IsAdd = 2
  187. }
  188. if up, ok := updateLogMap[v.EdbInfoId]; ok {
  189. tmp.DataUpdateTime = up.DataUpdateTime
  190. tmp.ErDataUpdateDate = up.ErDataUpdateDate
  191. tmp.DataUpdateResult = up.DataUpdateResult
  192. tmp.DataUpdateFailedReason = up.DataUpdateFailedReason
  193. tmp.HasRefresh = 1
  194. tmp.UpdateResult = up.UpdateResult
  195. tmp.UpdateFailedReason = up.UpdateFailedReason
  196. tmp.UpdateTime = up.CreateTime.Format(utils.FormatDateTime)
  197. } else if needRefresh == 1 {
  198. tmp.HasRefresh = 0
  199. tmp.DataUpdateResult = 2
  200. tmp.DataUpdateFailedReason = "服务异常"
  201. }
  202. // 判断是否需要新增还是更新
  203. if exist, ok := updateStatMap[v.EdbInfoId]; ok {
  204. tmp.Id = exist.Id
  205. modifyList = append(modifyList, tmp)
  206. } else {
  207. tmp.CreateTime = nowTime
  208. addList = append(addList, tmp)
  209. }
  210. }
  211. }
  212. //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
  213. if len(addList) > 0 {
  214. err = logStat.Add(addList)
  215. }
  216. if len(modifyList) > 0 {
  217. err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
  218. }
  219. return
  220. }
  221. func checkMySteelEdbInfoNeedRefresh(edbCode, frequency string, weekNeedRefreshMap map[string]struct{}) (needRefresh int, err error) {
  222. now := time.Now()
  223. week := int(now.Weekday())
  224. //日度
  225. if week >= 1 && week <= 6 {
  226. if frequency == "日度" {
  227. needRefresh = 1
  228. return
  229. }
  230. }
  231. //周度
  232. if week >= 3 && week <= 6 {
  233. _, ok := weekNeedRefreshMap[edbCode]
  234. if frequency == "周度" && ok {
  235. needRefresh = 1
  236. return
  237. }
  238. }
  239. //季度,月度,年度都是每个周末刷新
  240. if week == 0 {
  241. needRefresh = 1
  242. }
  243. return
  244. }
  245. // SetEdbSourceStat 定时统计数据源汇总表
  246. func SetEdbSourceStat(needStat bool) (err error) {
  247. defer func() {
  248. if err != nil {
  249. tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
  250. alarm_msg.SendAlarmMsg(tips, 3)
  251. }
  252. }()
  253. //查询钢联的所有指标信息
  254. nowTime := time.Now()
  255. today := time.Now().Format(utils.FormatDate)
  256. nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
  257. statCond := " and create_time >= ? and create_time < ?"
  258. var statPars []interface{}
  259. statPars = append(statPars, today, nextDay)
  260. //查询当日钢联所有的统计数据
  261. updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
  262. if err != nil {
  263. err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
  264. return
  265. }
  266. if !needStat && len(updateStatList) == 0 {
  267. return
  268. }
  269. updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat)
  270. if len(updateStatList) > 0 {
  271. for _, v := range updateStatList {
  272. updateStatMap[v.TerminalCode] = append(updateStatMap[v.TerminalCode], v)
  273. }
  274. }
  275. cond := " and create_time >= ? and create_time < ?"
  276. var pars []interface{}
  277. pars = append(pars, today, nextDay)
  278. //查询当日钢联所有的统计数据
  279. statList, err := data_stat.GetEdbSourceStatByCondition(cond, pars)
  280. if err != nil {
  281. err = fmt.Errorf("查询钢联化工数据源统计报错,err: %s", err)
  282. return
  283. }
  284. statMap := make(map[string]*data_stat.EdbSourceStat)
  285. if len(statList) > 0 {
  286. for _, v := range statList {
  287. statMap[v.TerminalCode] = v
  288. }
  289. }
  290. // 查询今日被删除的指标数
  291. delList, err := data_stat.GetEdbDeleteLogNumByCreateTime(today, nextDay)
  292. if err != nil {
  293. err = fmt.Errorf("查询今日被删除指标数目报错,err: %s", err)
  294. return
  295. }
  296. delMap := make(map[string]int)
  297. if len(delList) > 0 {
  298. for _, v := range delList {
  299. delMap[v.TerminalCode] = v.Num
  300. }
  301. }
  302. logStat := new(data_stat.EdbSourceStat)
  303. //组装新增数据
  304. addList := make([]*data_stat.EdbSourceStat, 0)
  305. modifyList := make([]*data_stat.EdbSourceStat, 0)
  306. for terminalCode, list := range updateStatMap {
  307. tmp := new(data_stat.EdbSourceStat)
  308. for k, v := range list {
  309. if k == 0 {
  310. tmp.SourceName = v.SourceName
  311. tmp.Source = v.Source
  312. tmp.TerminalCode = v.TerminalCode
  313. tmp.ModifyTime = nowTime
  314. }
  315. tmp.EdbNum = tmp.EdbNum + 1
  316. if v.IsAdd == 1 {
  317. tmp.EdbNewNum = tmp.EdbNewNum + 1
  318. }
  319. if v.NeedRefresh == 1 {
  320. tmp.NeedRefreshNum = tmp.NeedRefreshNum + 1
  321. }
  322. if v.HasRefresh == 1 {
  323. tmp.HasRefreshNum = tmp.HasRefreshNum + 1
  324. }
  325. // 区分刷新成功和更新成功
  326. if v.DataUpdateResult == 1 { //处理更新结果
  327. tmp.UpdateSuccessNum = tmp.UpdateSuccessNum + 1
  328. } else if v.NeedRefresh == 1 {
  329. tmp.UpdateFailedNum = tmp.UpdateFailedNum + 1
  330. }
  331. if v.UpdateResult == 1 { //刷新结果
  332. tmp.RefreshSuccessNum = tmp.RefreshSuccessNum + 1
  333. } else if v.HasRefresh == 1 {
  334. tmp.RefreshFailedNum = tmp.RefreshFailedNum + 1
  335. }
  336. }
  337. // 处理今天删除的指标数量
  338. if dn, ok := delMap[terminalCode]; ok {
  339. tmp.EdbDelNum = dn
  340. }
  341. // 判断是否需要新增还是更新
  342. if exist, ok := statMap[terminalCode]; ok {
  343. tmp.Id = exist.Id
  344. modifyList = append(modifyList, tmp)
  345. } else {
  346. tmp.CreateTime = nowTime
  347. addList = append(addList, tmp)
  348. }
  349. }
  350. //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
  351. if len(addList) > 0 {
  352. err = logStat.Add(addList)
  353. }
  354. if len(modifyList) > 0 {
  355. err = data_stat.UpdateEdbSourceStatMulti(modifyList)
  356. }
  357. return
  358. }