edb_refresh.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_task/models"
  6. "eta/eta_task/models/data_manage"
  7. "eta/eta_task/models/data_manage/edb_refresh"
  8. "eta/eta_task/services/alarm_msg"
  9. "eta/eta_task/services/data"
  10. "eta/eta_task/utils"
  11. "fmt"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // ConfigRefreshData
  17. // @Description: 配置刷新数据
  18. // @author: Roc
  19. // @datetime 2024-01-10 13:55:05
  20. // @param cont context.Context
  21. // @return err error
  22. func ConfigRefreshData(cont context.Context) (err error) {
  23. errMsgList := make([]string, 0)
  24. defer func() {
  25. if err != nil {
  26. fmt.Println(err)
  27. }
  28. }()
  29. // 一期是只做wind、同花顺、钢联、有色
  30. now := time.Now()
  31. //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
  32. //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
  33. defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
  34. if err != nil {
  35. errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
  36. }
  37. sourceEdbInfoListMap, err := getConfigRefreshData(now)
  38. if err != nil {
  39. errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
  40. }
  41. // 将两个合并
  42. allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
  43. wgNum := len(allSourceEdbInfoListMap)
  44. if wgNum <= 0 {
  45. return
  46. }
  47. wg := sync.WaitGroup{}
  48. wg.Add(wgNum)
  49. for _, edbList := range allSourceEdbInfoListMap {
  50. if edbList == nil {
  51. wg.Done()
  52. continue
  53. }
  54. go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
  55. }
  56. wg.Wait()
  57. fmt.Println("Refresh End")
  58. return
  59. }
  60. // Function to merge two maps
  61. func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) {
  62. if dst == nil {
  63. return src
  64. }
  65. if src == nil {
  66. return dst
  67. }
  68. newMap = dst
  69. for k, v := range src {
  70. if newK, ok := newMap[k]; ok {
  71. newK = append(newK, v...)
  72. newMap[k] = newK
  73. } else {
  74. newMap[k] = v
  75. }
  76. }
  77. return newMap
  78. }
  79. // getDefaultRefreshData
  80. // @Description: 根据默认配置获取需要刷新的指标列表
  81. // @author: Roc
  82. // @datetime 2024-01-10 13:55:38
  83. // @param now time.Time
  84. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  85. // @return err error
  86. func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  87. errMsgList := make([]string, 0)
  88. defer func() {
  89. if err != nil {
  90. fmt.Println(err)
  91. }
  92. }()
  93. // 一期是只做wind、同花顺、钢联、有色
  94. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  95. currTimeStr := getPreviousHalfHour(now)
  96. fmt.Println(currTimeStr)
  97. // 所有默认配置刷新项
  98. list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
  99. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  100. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  101. // 获取各个刷新频率的配置
  102. for _, refreshFrequency := range refreshFrequencyList {
  103. // 获取刷新频率条件
  104. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  105. if !isHandler {
  106. // 可能是非交易日,所以过滤不处理
  107. continue
  108. }
  109. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  110. pars = append(pars, refreshFrequency, currTimeStr)
  111. // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
  112. condition += ` AND source not in (?,?)`
  113. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
  114. tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
  115. if tmpErr != nil {
  116. err = tmpErr
  117. return
  118. }
  119. list = append(list, tmpList...)
  120. }
  121. // 更新的单元格数
  122. refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig)
  123. // 数据源刷新频度的列表数组
  124. refreshDataFrequencyListMap := make(map[int]map[int][]string)
  125. wgNum := 0
  126. // 处理待刷新的数据源,整理成数组,方便获取对应的指标
  127. for _, item := range list {
  128. // 更新的单元格数
  129. key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency)
  130. refreshDataNumMap[key] = item
  131. // 数据源刷新频度的列表数组
  132. subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source]
  133. if !ok {
  134. subSourceFrequencyList = make(map[int][]string)
  135. }
  136. frequencyList, ok := subSourceFrequencyList[item.SubSource]
  137. if !ok {
  138. wgNum++
  139. frequencyList = make([]string, 0)
  140. }
  141. subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency)
  142. refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList
  143. }
  144. for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
  145. switch source {
  146. case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
  147. // 这种不处理
  148. default:
  149. for subSource, frequencyList := range subSourceFrequencyListMap {
  150. edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
  151. if tmpErr != nil {
  152. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  153. }
  154. for _, v := range edbList {
  155. // 数据刷新的期数
  156. dataRefreshNum := utils.DATA_REFRESH
  157. key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
  158. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  159. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  160. dataRefreshNum = 0
  161. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  162. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  163. }
  164. }
  165. v.DataRefreshNum = dataRefreshNum
  166. }
  167. key := fmt.Sprint(source, "_", subSource)
  168. sourceEdbInfoListMap[key] = edbList
  169. }
  170. }
  171. }
  172. fmt.Println("Get Refresh End")
  173. return
  174. }
  175. // getConfigRefreshData
  176. // @Description: 根据指标配置获取需要刷新的指标列表
  177. // @author: Roc
  178. // @datetime 2024-01-10 13:55:59
  179. // @param now time.Time
  180. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  181. // @return err error
  182. func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  183. defer func() {
  184. if err != nil {
  185. fmt.Println(err)
  186. }
  187. }()
  188. // 一期是只做wind、同花顺、钢联、有色
  189. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  190. currTimeStr := getPreviousHalfHour(now)
  191. // 所有默认配置刷新项
  192. list := make([]*edb_refresh.EdbRefreshConfig, 0)
  193. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  194. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  195. // 获取各个刷新频率的配置
  196. for _, refreshFrequency := range refreshFrequencyList {
  197. // 获取刷新频率条件
  198. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  199. if !isHandler {
  200. // 可能是非交易日,所以过滤不处理
  201. continue
  202. }
  203. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  204. pars = append(pars, refreshFrequency, currTimeStr)
  205. tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars)
  206. if tmpErr != nil {
  207. err = tmpErr
  208. return
  209. }
  210. list = append(list, tmpList...)
  211. }
  212. // 配置列表
  213. configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig)
  214. configIdList := make([]int, 0)
  215. for _, v := range list {
  216. configIdList = append(configIdList, v.EdbRefreshConfigId)
  217. configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
  218. }
  219. edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource([]int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}, configIdList)
  220. if err != nil {
  221. return
  222. }
  223. for _, v := range edbInfoList {
  224. key := fmt.Sprint(v.Source, "_", v.SubSource)
  225. tmpList, ok := sourceEdbInfoListMap[key]
  226. if !ok {
  227. tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  228. }
  229. // 数据刷新的期数
  230. dataRefreshNum := utils.DATA_REFRESH
  231. if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 {
  232. if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数
  233. dataRefreshNum = 0
  234. } else if edbRefreshConfig.RefreshDataNum > 0 { //
  235. dataRefreshNum = edbRefreshConfig.RefreshDataNum
  236. }
  237. }
  238. v.DataRefreshNum = dataRefreshNum
  239. sourceEdbInfoListMap[key] = append(tmpList, v)
  240. }
  241. fmt.Println("Get ConfigRefreshData End")
  242. return
  243. }
  244. // BaseRefreshData
  245. // @Description: 基础数据刷新
  246. // @author: Roc
  247. // @datetime 2024-01-09 16:27:45
  248. // @param wg *sync.WaitGroup
  249. // @return err error
  250. func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) {
  251. errMsgList := make([]string, 0)
  252. defer func() {
  253. if err != nil {
  254. fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error())
  255. go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3)
  256. }
  257. if len(errMsgList) > 0 {
  258. errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n"))
  259. fmt.Println(errMsg)
  260. go alarm_msg.SendAlarmMsg(errMsg, 3)
  261. }
  262. wg.Done()
  263. }()
  264. // 数据刷新的期数
  265. dataRefreshNum := utils.DATA_REFRESH
  266. // 是否从最开始的日期更新
  267. var isRefreshByStartDate bool
  268. if source != utils.DATA_SOURCE_THS {
  269. for _, v := range items {
  270. // 如果暂停更新,那就过滤
  271. if v.NoUpdate == 1 {
  272. continue
  273. }
  274. if v.DataRefreshNum > 0 {
  275. dataRefreshNum = v.DataRefreshNum
  276. }
  277. startDate := ""
  278. if isRefreshByStartDate {
  279. startDate = v.StartDate.Format(utils.FormatDate)
  280. } else {
  281. if v.Frequency == "日度" {
  282. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  283. } else if v.Frequency == "周度" {
  284. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  285. } else if v.Frequency == "旬度" {
  286. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  287. } else if v.Frequency == "月度" {
  288. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  289. } else if v.Frequency == "季度" {
  290. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  291. } else if v.Frequency == "半年度" {
  292. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  293. } else if v.Frequency == "年度" {
  294. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  295. } else {
  296. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  297. }
  298. }
  299. fmt.Println(startDate)
  300. // 数据更新
  301. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  302. if tmpErr != nil {
  303. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  304. continue
  305. }
  306. if resp.Ret != 200 {
  307. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  308. continue
  309. }
  310. }
  311. }
  312. // 同花顺控制速率, 每秒最多4次请求(同花顺EDB函数限制为5, 考虑到可能存在用户同时在使用, 这里批量刷新设置为4, 话说5确实是有点少了吧=_=!)
  313. if source == utils.DATA_SOURCE_THS {
  314. ticker := time.NewTicker(250 * time.Millisecond)
  315. defer ticker.Stop()
  316. for _, v := range items {
  317. <-ticker.C
  318. // 如果暂停更新,那就过滤
  319. if v.NoUpdate == 1 {
  320. continue
  321. }
  322. if v.DataRefreshNum > 0 {
  323. dataRefreshNum = v.DataRefreshNum
  324. }
  325. startDate := ""
  326. if isRefreshByStartDate {
  327. startDate = v.StartDate.Format(utils.FormatDate)
  328. } else {
  329. if v.Frequency == "日度" {
  330. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  331. } else if v.Frequency == "周度" {
  332. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  333. } else if v.Frequency == "旬度" {
  334. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  335. } else if v.Frequency == "月度" {
  336. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  337. } else if v.Frequency == "季度" {
  338. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  339. } else if v.Frequency == "半年度" {
  340. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  341. } else if v.Frequency == "年度" {
  342. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  343. } else {
  344. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  345. }
  346. }
  347. fmt.Println(startDate)
  348. // 数据更新
  349. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  350. if tmpErr != nil {
  351. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  352. continue
  353. }
  354. if resp.Ret != 200 {
  355. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  356. continue
  357. }
  358. }
  359. }
  360. fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
  361. return err
  362. }
  363. // getRefreshFrequencyCondition
  364. // @Description: 根据时间和刷新频率获取条件
  365. // @author: Roc
  366. // @datetime 2024-01-09 16:27:11
  367. // @param now time.Time
  368. // @param refreshFrequency string
  369. // @return condition string
  370. // @return pars []interface{}
  371. // @return isHandler bool
  372. func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) {
  373. isHandler = true
  374. var dayNum int
  375. var isLastDay bool
  376. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  377. switch refreshFrequency {
  378. case "每自然日":
  379. // 自然日不需要额外条件
  380. return
  381. case "每交易日":
  382. // 周六日不处理
  383. if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday {
  384. isHandler = false
  385. }
  386. return
  387. case "每周":
  388. currWeekDay := now.Weekday()
  389. if currWeekDay == time.Sunday {
  390. currWeekDay = 7
  391. isLastDay = true
  392. }
  393. dayNum = int(currWeekDay)
  394. case "每旬":
  395. currDay := now.Day()
  396. if currDay <= 10 {
  397. dayNum = currDay
  398. // 如果是这旬的最后一天
  399. if currDay == 10 {
  400. isLastDay = true
  401. }
  402. } else if currDay <= 20 {
  403. dayNum = currDay - 10
  404. // 如果是这旬的最后一天
  405. if currDay == 20 {
  406. isLastDay = true
  407. }
  408. } else {
  409. dayNum = currDay - 20
  410. // 当月的最后一天
  411. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  412. // 如果是这旬的最后一天
  413. if currDay == monthLastDay.Day() {
  414. isLastDay = true
  415. }
  416. }
  417. case "每月":
  418. // 当前日期
  419. currDay := now.Day()
  420. dayNum = currDay
  421. // 当期的最后一天
  422. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  423. // 如果是这期的最后一天
  424. if currDay == monthLastDay.Day() {
  425. isLastDay = true
  426. }
  427. case "每季":
  428. // 当期的第一天 ; 当期的最后一天
  429. var startDay, endDay time.Time
  430. currMonth := now.Month()
  431. currDay := now.Day()
  432. if currMonth <= 3 {
  433. // 当季的第一天
  434. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  435. // 当季的最后一天
  436. endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  437. } else if currMonth <= 6 {
  438. // 当期的第一天
  439. startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local)
  440. // 当期的最后一天
  441. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  442. } else if currMonth <= 9 {
  443. // 当期的第一天
  444. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  445. // 当期的最后一天
  446. endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  447. } else {
  448. // 当期的第一天
  449. startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local)
  450. // 当期的最后一天
  451. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  452. }
  453. // 计算这期的第一天和当日的天数
  454. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  455. // 如果是这期的最后一天
  456. if currMonth == endDay.Month() && currDay == endDay.Day() {
  457. isLastDay = true
  458. }
  459. case "每半年":
  460. // 当期的第一天 ; 当期的最后一天
  461. var startDay, endDay time.Time
  462. currMonth := now.Month()
  463. currDay := now.Day()
  464. if currMonth <= 6 {
  465. // 当期的第一天
  466. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  467. // 当期的最后一天
  468. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  469. } else {
  470. // 当期的第一天
  471. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  472. // 当期的最后一天
  473. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  474. }
  475. // 计算这期的第一天和当日的天数
  476. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  477. // 如果是这期的最后一天
  478. if currMonth == endDay.Month() && currDay == endDay.Day() {
  479. isLastDay = true
  480. }
  481. case "每年":
  482. currMonth := now.Month()
  483. currDay := now.Day()
  484. // 当期的第一天
  485. startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  486. // 当期的最后一天
  487. endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  488. // 计算这期的第一天和当日的天数
  489. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  490. // 如果是这期的最后一天
  491. if currMonth == endDay.Month() && currDay == endDay.Day() {
  492. isLastDay = true
  493. }
  494. }
  495. // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数
  496. if isLastDay {
  497. condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )`
  498. pars = append(pars, 0, dayNum)
  499. } else {
  500. // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数
  501. condition += ` AND refresh_frequency_day = ? `
  502. pars = append(pars, dayNum)
  503. }
  504. return
  505. }
  506. // getPreviousHalfHour
  507. // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串
  508. // @author: Roc
  509. // @datetime 2024-01-09 14:27:34
  510. // @param now time.Time
  511. // @return string
  512. func getPreviousHalfHour(now time.Time) string {
  513. minute := now.Minute()
  514. if minute >= 30 {
  515. return fmt.Sprintf("%02d:%02d", now.Hour(), 30)
  516. }
  517. return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
  518. }
  519. // 根据配置把钢联化工和wind指标设置成禁止刷新
  520. func DisableEdbRefresh(cont context.Context) (err error) {
  521. //设置刷新key,如果没有执行完 报错提示
  522. cacheKey := "eta_task:DisableEdbRefresh"
  523. deleteCache := true
  524. defer func() {
  525. if deleteCache {
  526. utils.Rc.Delete(cacheKey)
  527. }
  528. if err != nil {
  529. tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
  530. utils.FileLog.Info(tips)
  531. go alarm_msg.SendAlarmMsg(tips, 3)
  532. }
  533. }()
  534. if !utils.Rc.SetNX(cacheKey, 1, 2*time.Minute) {
  535. deleteCache = false
  536. err = fmt.Errorf("系统处理中,请稍后重试!")
  537. return
  538. }
  539. //查询配置,如果未开启自动设置禁止刷新,则无需处理
  540. obj := new(models.BusinessConf)
  541. conf, err := obj.GetItemByConfKey("EdbStopRefreshRule")
  542. if err != nil {
  543. if err.Error() == utils.ErrNoRow() {
  544. err = fmt.Errorf("未找到配置项,无需处理")
  545. return
  546. }
  547. return
  548. }
  549. //将json转为结构体
  550. rule := new(models.EdbStopRefreshRule)
  551. err = json.Unmarshal([]byte(conf.ConfVal), rule)
  552. if err != nil {
  553. return
  554. }
  555. //判断是否开启自动设置禁止刷新
  556. if rule.IsOpen == 0 {
  557. return
  558. }
  559. //获取当前时间
  560. now := time.Now()
  561. if rule.BaseIndexStopDays > 0 { //设置数据源钢联化工指标禁止更新
  562. baseIndexEndDate := now.AddDate(0, 0, -rule.BaseIndexStopDays+1).Format(utils.FormatDate)
  563. // 查询钢联化工指标,查询创建时间在baseIndexStartDate前,的所有钢联化工指标,分批查询,先查总数,再查列表
  564. totalCount, e := data_manage.GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate)
  565. if e != nil {
  566. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  567. return
  568. }
  569. //分页查询
  570. pageSize := 100
  571. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  572. stopRefreshIds := make([]int32, 0)
  573. for i := 0; i < pageNum; i++ {
  574. start := i * pageSize
  575. indexItems, e := data_manage.GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate, start, pageSize)
  576. if e != nil {
  577. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  578. return
  579. }
  580. if len(indexItems) == 0 {
  581. continue
  582. }
  583. indexCodeList := make([]string, 0)
  584. for _, indexItem := range indexItems {
  585. indexCodeList = append(indexCodeList, indexItem.IndexCode)
  586. }
  587. condition := ` AND source=? AND edb_code in (` + utils.GetOrmInReplace(len(indexCodeList)) + `)`
  588. var pars []interface{}
  589. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexCodeList)
  590. // 查询指标库里这些指标是否已创建
  591. edbList, e := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  592. if e != nil {
  593. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  594. return
  595. }
  596. edbMap := make(map[string]bool)
  597. for _, edb := range edbList {
  598. edbMap[edb.EdbCode] = true
  599. }
  600. for _, indexItem := range indexItems {
  601. // 判断指标是否被创建
  602. if _, ok := edbMap[indexItem.IndexCode]; !ok {
  603. stopRefreshIds = append(stopRefreshIds, indexItem.BaseFromMysteelChemicalIndexId)
  604. if len(stopRefreshIds) > 100 {
  605. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  606. if err != nil {
  607. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  608. return
  609. }
  610. stopRefreshIds = make([]int32, 0)
  611. }
  612. }
  613. }
  614. }
  615. // 未被创建,则设置禁止刷新
  616. if len(stopRefreshIds) > 0 {
  617. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  618. if err != nil {
  619. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  620. return
  621. }
  622. }
  623. }
  624. if rule.EdbStopDays > 0 {
  625. // 查询钢联和wind来源的指标
  626. edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
  627. condition := ` AND no_update=0 AND source in (?,?) AND create_time < ?`
  628. var pars []interface{}
  629. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate)
  630. // 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
  631. totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
  632. if e != nil {
  633. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  634. return
  635. }
  636. //分页查询
  637. pageSize := 100
  638. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  639. stopRefreshIds := make([]int, 0)
  640. stopRefreshMysteelCode := make([]string, 0)
  641. fromEdbIdList := make([]int, 0)
  642. for i := 0; i < pageNum; i++ {
  643. start := i * pageSize
  644. edbItems, e := data_manage.GetEdbInfoPageByCondition(condition, pars, start, pageSize)
  645. if e != nil {
  646. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  647. return
  648. }
  649. if len(edbItems) == 0 {
  650. continue
  651. }
  652. edbInfoIds := make([]int, 0)
  653. fromEdbIdList = make([]int, 0)
  654. for _, item := range edbItems {
  655. edbInfoIds = append(edbInfoIds, item.EdbInfoId)
  656. }
  657. // 查询指标库里这些指标 引用情况
  658. relationList, e := data_manage.GetEdbInfoRelationByEdbInfoIds(edbInfoIds)
  659. if e != nil {
  660. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  661. return
  662. }
  663. edbMap := make(map[int]struct{})
  664. for _, item := range relationList {
  665. edbMap[item] = struct{}{}
  666. }
  667. for _, item := range edbItems {
  668. if _, ok := edbMap[item.EdbInfoId]; !ok {
  669. stopRefreshIds = append(stopRefreshIds, item.EdbInfoId)
  670. if item.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
  671. stopRefreshMysteelCode = append(stopRefreshMysteelCode, item.EdbCode)
  672. }
  673. if item.EdbInfoType == 0 && item.EdbType == 1 {
  674. fromEdbIdList = append(fromEdbIdList, item.EdbInfoId)
  675. }
  676. // 更新指标禁止刷新状态
  677. if len(stopRefreshIds) > 100 {
  678. // 查询相关的计算指标
  679. calculateEdbIdList := make([]int, 0)
  680. if len(fromEdbIdList) > 0 {
  681. hasFind := make(map[int]struct{})
  682. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  683. if err != nil {
  684. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  685. return
  686. }
  687. }
  688. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  689. if err != nil {
  690. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  691. return
  692. }
  693. stopRefreshIds = []int{}
  694. stopRefreshMysteelCode = []string{}
  695. }
  696. }
  697. }
  698. }
  699. // 更新指标禁止刷新状态
  700. if len(stopRefreshIds) > 0 {
  701. // 查询相关的计算指标
  702. calculateEdbIdList := make([]int, 0)
  703. if len(fromEdbIdList) > 0 {
  704. hasFind := make(map[int]struct{})
  705. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  706. if err != nil {
  707. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  708. return
  709. }
  710. }
  711. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  712. if err != nil {
  713. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  714. return
  715. }
  716. }
  717. }
  718. return
  719. }