edb_refresh.go 29 KB


  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_task/models"
  6. "eta/eta_task/models/data_manage"
  7. "eta/eta_task/models/data_manage/edb_refresh"
  8. "eta/eta_task/services/alarm_msg"
  9. "eta/eta_task/services/data"
  10. "eta/eta_task/utils"
  11. "fmt"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // ConfigRefreshData
  17. // @Description: 配置刷新数据
  18. // @author: Roc
  19. // @datetime 2024-01-10 13:55:05
  20. // @param cont context.Context
  21. // @return err error
  22. func ConfigRefreshData(cont context.Context) (err error) {
  23. errMsgList := make([]string, 0)
  24. defer func() {
  25. if err != nil {
  26. fmt.Println(err)
  27. }
  28. }()
  29. // 一期是只做wind、同花顺、钢联、有色
  30. now := time.Now()
  31. //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
  32. //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
  33. defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
  34. if err != nil {
  35. errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
  36. }
  37. sourceEdbInfoListMap, err := getConfigRefreshData(now)
  38. if err != nil {
  39. errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
  40. }
  41. // 将两个合并
  42. allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
  43. wgNum := len(allSourceEdbInfoListMap)
  44. if wgNum <= 0 {
  45. return
  46. }
  47. wg := sync.WaitGroup{}
  48. wg.Add(wgNum)
  49. for _, edbList := range allSourceEdbInfoListMap {
  50. if edbList == nil {
  51. wg.Done()
  52. continue
  53. }
  54. //if len(edbList) != 0 {
  55. // // debug环境仅测试刷新钢联
  56. // if edbList[0].Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL && utils.RunMode == "debug" {
  57. //
  58. // }
  59. //}
  60. go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
  61. }
  62. wg.Wait()
  63. fmt.Println("Refresh End")
  64. return
  65. }
  66. // Function to merge two maps
  67. func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) {
  68. if dst == nil {
  69. return src
  70. }
  71. if src == nil {
  72. return dst
  73. }
  74. newMap = dst
  75. for k, v := range src {
  76. if newK, ok := newMap[k]; ok {
  77. newK = append(newK, v...)
  78. newMap[k] = newK
  79. } else {
  80. newMap[k] = v
  81. }
  82. }
  83. return newMap
  84. }
  85. // getDefaultRefreshData
  86. // @Description: 根据默认配置获取需要刷新的指标列表
  87. // @author: Roc
  88. // @datetime 2024-01-10 13:55:38
  89. // @param now time.Time
  90. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  91. // @return err error
  92. func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  93. errMsgList := make([]string, 0)
  94. defer func() {
  95. if err != nil {
  96. fmt.Println(err)
  97. }
  98. }()
  99. // 一期是只做wind、同花顺、钢联、有色
  100. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  101. currTimeStr := getPreviousHalfHour(now)
  102. fmt.Println(currTimeStr)
  103. // 所有默认配置刷新项
  104. list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
  105. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  106. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  107. conf, err := models.GetBusinessConf()
  108. if err != nil {
  109. fmt.Println(err)
  110. utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
  111. return
  112. }
  113. // 获取钢联化工的数据获取方式
  114. mySteelChemicalDataMethod := "excel"
  115. if v, ok := conf["MySteelDataMethod"]; ok {
  116. if v == "api" {
  117. mySteelChemicalDataMethod = v
  118. }
  119. }
  120. utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
  121. // 获取各个刷新频率的配置
  122. for _, refreshFrequency := range refreshFrequencyList {
  123. // 获取刷新频率条件
  124. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  125. if !isHandler {
  126. // 可能是非交易日,所以过滤不处理
  127. continue
  128. }
  129. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  130. pars = append(pars, refreshFrequency, currTimeStr)
  131. // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
  132. if mySteelChemicalDataMethod == "api" {
  133. // 钢联化工使用api的方式获取数据的,不需要过滤
  134. condition += ` AND source not in (?)`
  135. pars = append(pars, utils.DATA_SOURCE_YS)
  136. } else {
  137. condition += ` AND source not in (?,?)`
  138. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
  139. }
  140. tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
  141. if tmpErr != nil {
  142. err = tmpErr
  143. return
  144. }
  145. list = append(list, tmpList...)
  146. }
  147. // 更新的单元格数
  148. refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig)
  149. // 数据源刷新频度的列表数组
  150. refreshDataFrequencyListMap := make(map[int]map[int][]string)
  151. wgNum := 0
  152. // 处理待刷新的数据源,整理成数组,方便获取对应的指标
  153. for _, item := range list {
  154. // 更新的单元格数
  155. key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency)
  156. refreshDataNumMap[key] = item
  157. // 数据源刷新频度的列表数组
  158. subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source]
  159. if !ok {
  160. subSourceFrequencyList = make(map[int][]string)
  161. }
  162. frequencyList, ok := subSourceFrequencyList[item.SubSource]
  163. if !ok {
  164. wgNum++
  165. frequencyList = make([]string, 0)
  166. }
  167. subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency)
  168. refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList
  169. }
  170. for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
  171. switch source {
  172. case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
  173. // 只处理钢联化工使用api方式获取数据的情况
  174. if mySteelChemicalDataMethod == "api" {
  175. for subSource, frequencyList := range subSourceFrequencyListMap {
  176. items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList)
  177. if tmpErr != nil {
  178. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  179. }
  180. indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  181. for _, v := range items {
  182. tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig)
  183. // 数据刷新的期数
  184. dataRefreshNum := utils.DATA_REFRESH
  185. key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency)
  186. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  187. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  188. dataRefreshNum = 0
  189. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  190. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  191. }
  192. }
  193. tmpConf.EdbCode = v.IndexCode
  194. tmpConf.EdbName = v.IndexName
  195. tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL
  196. tmpConf.Frequency = v.Frequency
  197. tmpConf.Unit = v.Unit
  198. tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate)
  199. tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate)
  200. tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId)
  201. tmpConf.DataRefreshNum = dataRefreshNum
  202. tmpConf.EdbInfoId = v.EdbInfoId
  203. indexList = append(indexList, tmpConf)
  204. }
  205. key := fmt.Sprint(source, "_", subSource)
  206. sourceEdbInfoListMap[key] = indexList
  207. }
  208. }
  209. // 其他情况不处理
  210. default:
  211. for subSource, frequencyList := range subSourceFrequencyListMap {
  212. edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
  213. if tmpErr != nil {
  214. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  215. }
  216. for _, v := range edbList {
  217. // 数据刷新的期数
  218. dataRefreshNum := utils.DATA_REFRESH
  219. key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
  220. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  221. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  222. dataRefreshNum = 0
  223. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  224. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  225. }
  226. }
  227. v.DataRefreshNum = dataRefreshNum
  228. }
  229. key := fmt.Sprint(source, "_", subSource)
  230. sourceEdbInfoListMap[key] = edbList
  231. }
  232. }
  233. }
  234. fmt.Println("Get Refresh End")
  235. return
  236. }
  237. // getConfigRefreshData
  238. // @Description: 根据指标配置获取需要刷新的指标列表
  239. // @author: Roc
  240. // @datetime 2024-01-10 13:55:59
  241. // @param now time.Time
  242. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  243. // @return err error
  244. func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  245. defer func() {
  246. if err != nil {
  247. fmt.Println(err)
  248. }
  249. }()
  250. // 一期是只做wind、同花顺、钢联、有色
  251. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  252. currTimeStr := getPreviousHalfHour(now)
  253. // 所有默认配置刷新项
  254. list := make([]*edb_refresh.EdbRefreshConfig, 0)
  255. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  256. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  257. // 获取各个刷新频率的配置
  258. for _, refreshFrequency := range refreshFrequencyList {
  259. // 获取刷新频率条件
  260. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  261. if !isHandler {
  262. // 可能是非交易日,所以过滤不处理
  263. continue
  264. }
  265. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  266. pars = append(pars, refreshFrequency, currTimeStr)
  267. tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars)
  268. if tmpErr != nil {
  269. err = tmpErr
  270. return
  271. }
  272. list = append(list, tmpList...)
  273. }
  274. // 配置列表
  275. configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig)
  276. configIdList := make([]int, 0)
  277. for _, v := range list {
  278. configIdList = append(configIdList, v.EdbRefreshConfigId)
  279. configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
  280. }
  281. conf, err := models.GetBusinessConf()
  282. if err != nil {
  283. fmt.Println(err)
  284. return
  285. }
  286. // 获取钢联化工的数据获取方式
  287. mySteelChemicalDataMethod := "excel"
  288. if v, ok := conf["MySteelDataMethod"]; ok {
  289. if v == "api" {
  290. mySteelChemicalDataMethod = v
  291. }
  292. }
  293. // 当钢联的数据获取方式是api时,不用过滤
  294. var sourceList []int
  295. if mySteelChemicalDataMethod == "api" {
  296. sourceList = []int{utils.DATA_SOURCE_YS}
  297. } else {
  298. sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
  299. }
  300. edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList)
  301. if err != nil {
  302. return
  303. }
  304. for _, v := range edbInfoList {
  305. key := fmt.Sprint(v.Source, "_", v.SubSource)
  306. tmpList, ok := sourceEdbInfoListMap[key]
  307. if !ok {
  308. tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  309. }
  310. // 数据刷新的期数
  311. dataRefreshNum := utils.DATA_REFRESH
  312. if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 {
  313. if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数
  314. dataRefreshNum = 0
  315. } else if edbRefreshConfig.RefreshDataNum > 0 { //
  316. dataRefreshNum = edbRefreshConfig.RefreshDataNum
  317. }
  318. }
  319. v.DataRefreshNum = dataRefreshNum
  320. sourceEdbInfoListMap[key] = append(tmpList, v)
  321. }
  322. fmt.Println("Get ConfigRefreshData End")
  323. return
  324. }
  325. // BaseRefreshData
  326. // @Description: 基础数据刷新
  327. // @author: Roc
  328. // @datetime 2024-01-09 16:27:45
  329. // @param wg *sync.WaitGroup
  330. // @return err error
  331. func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) {
  332. errMsgList := make([]string, 0)
  333. defer func() {
  334. if err != nil {
  335. fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error())
  336. go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3)
  337. }
  338. if len(errMsgList) > 0 {
  339. errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n"))
  340. fmt.Println(errMsg)
  341. go alarm_msg.SendAlarmMsg(errMsg, 3)
  342. }
  343. wg.Done()
  344. }()
  345. // 数据刷新的期数
  346. dataRefreshNum := utils.DATA_REFRESH
  347. // 是否从最开始的日期更新
  348. var isRefreshByStartDate bool
  349. if source != utils.DATA_SOURCE_THS {
  350. for _, v := range items {
  351. // 如果暂停更新,那就过滤
  352. if v.NoUpdate == 1 {
  353. continue
  354. }
  355. if v.DataRefreshNum > 0 {
  356. dataRefreshNum = v.DataRefreshNum
  357. }
  358. startDate := ""
  359. if isRefreshByStartDate {
  360. startDate = v.StartDate.Format(utils.FormatDate)
  361. } else {
  362. if v.Frequency == "日度" {
  363. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  364. } else if v.Frequency == "周度" {
  365. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  366. } else if v.Frequency == "旬度" {
  367. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  368. } else if v.Frequency == "月度" {
  369. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  370. } else if v.Frequency == "季度" {
  371. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  372. } else if v.Frequency == "半年度" {
  373. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  374. } else if v.Frequency == "年度" {
  375. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  376. } else {
  377. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  378. }
  379. }
  380. fmt.Println(startDate)
  381. // 数据更新
  382. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  383. if tmpErr != nil {
  384. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  385. continue
  386. }
  387. if resp.Ret != 200 {
  388. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  389. continue
  390. }
  391. }
  392. }
  393. // 同花顺控制速率, 每秒最多4次请求(同花顺EDB函数限制为5, 考虑到可能存在用户同时在使用, 这里批量刷新设置为4, 话说5确实是有点少了吧=_=!)
  394. if source == utils.DATA_SOURCE_THS {
  395. ticker := time.NewTicker(250 * time.Millisecond)
  396. defer ticker.Stop()
  397. for _, v := range items {
  398. <-ticker.C
  399. // 如果暂停更新,那就过滤
  400. if v.NoUpdate == 1 {
  401. continue
  402. }
  403. if v.DataRefreshNum > 0 {
  404. dataRefreshNum = v.DataRefreshNum
  405. }
  406. startDate := ""
  407. if isRefreshByStartDate {
  408. startDate = v.StartDate.Format(utils.FormatDate)
  409. } else {
  410. if v.Frequency == "日度" {
  411. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  412. } else if v.Frequency == "周度" {
  413. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  414. } else if v.Frequency == "旬度" {
  415. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  416. } else if v.Frequency == "月度" {
  417. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  418. } else if v.Frequency == "季度" {
  419. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  420. } else if v.Frequency == "半年度" {
  421. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  422. } else if v.Frequency == "年度" {
  423. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  424. } else {
  425. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  426. }
  427. }
  428. fmt.Println(startDate)
  429. // 数据更新
  430. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  431. if tmpErr != nil {
  432. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  433. continue
  434. }
  435. if resp.Ret != 200 {
  436. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  437. continue
  438. }
  439. }
  440. }
  441. fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
  442. return err
  443. }
  444. // getRefreshFrequencyCondition
  445. // @Description: 根据时间和刷新频率获取条件
  446. // @author: Roc
  447. // @datetime 2024-01-09 16:27:11
  448. // @param now time.Time
  449. // @param refreshFrequency string
  450. // @return condition string
  451. // @return pars []interface{}
  452. // @return isHandler bool
  453. func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) {
  454. isHandler = true
  455. var dayNum int
  456. var isLastDay bool
  457. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  458. switch refreshFrequency {
  459. case "每自然日":
  460. // 自然日不需要额外条件
  461. return
  462. case "每交易日":
  463. // 周六日不处理
  464. if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday {
  465. isHandler = false
  466. }
  467. return
  468. case "每周":
  469. currWeekDay := now.Weekday()
  470. if currWeekDay == time.Sunday {
  471. currWeekDay = 7
  472. isLastDay = true
  473. }
  474. dayNum = int(currWeekDay)
  475. case "每旬":
  476. currDay := now.Day()
  477. if currDay <= 10 {
  478. dayNum = currDay
  479. // 如果是这旬的最后一天
  480. if currDay == 10 {
  481. isLastDay = true
  482. }
  483. } else if currDay <= 20 {
  484. dayNum = currDay - 10
  485. // 如果是这旬的最后一天
  486. if currDay == 20 {
  487. isLastDay = true
  488. }
  489. } else {
  490. dayNum = currDay - 20
  491. // 当月的最后一天
  492. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  493. // 如果是这旬的最后一天
  494. if currDay == monthLastDay.Day() {
  495. isLastDay = true
  496. }
  497. }
  498. case "每月":
  499. // 当前日期
  500. currDay := now.Day()
  501. dayNum = currDay
  502. // 当期的最后一天
  503. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  504. // 如果是这期的最后一天
  505. if currDay == monthLastDay.Day() {
  506. isLastDay = true
  507. }
  508. case "每季":
  509. // 当期的第一天 ; 当期的最后一天
  510. var startDay, endDay time.Time
  511. currMonth := now.Month()
  512. currDay := now.Day()
  513. if currMonth <= 3 {
  514. // 当季的第一天
  515. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  516. // 当季的最后一天
  517. endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  518. } else if currMonth <= 6 {
  519. // 当期的第一天
  520. startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local)
  521. // 当期的最后一天
  522. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  523. } else if currMonth <= 9 {
  524. // 当期的第一天
  525. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  526. // 当期的最后一天
  527. endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  528. } else {
  529. // 当期的第一天
  530. startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local)
  531. // 当期的最后一天
  532. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  533. }
  534. // 计算这期的第一天和当日的天数
  535. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  536. // 如果是这期的最后一天
  537. if currMonth == endDay.Month() && currDay == endDay.Day() {
  538. isLastDay = true
  539. }
  540. case "每半年":
  541. // 当期的第一天 ; 当期的最后一天
  542. var startDay, endDay time.Time
  543. currMonth := now.Month()
  544. currDay := now.Day()
  545. if currMonth <= 6 {
  546. // 当期的第一天
  547. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  548. // 当期的最后一天
  549. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  550. } else {
  551. // 当期的第一天
  552. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  553. // 当期的最后一天
  554. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  555. }
  556. // 计算这期的第一天和当日的天数
  557. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  558. // 如果是这期的最后一天
  559. if currMonth == endDay.Month() && currDay == endDay.Day() {
  560. isLastDay = true
  561. }
  562. case "每年":
  563. currMonth := now.Month()
  564. currDay := now.Day()
  565. // 当期的第一天
  566. startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  567. // 当期的最后一天
  568. endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  569. // 计算这期的第一天和当日的天数
  570. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  571. // 如果是这期的最后一天
  572. if currMonth == endDay.Month() && currDay == endDay.Day() {
  573. isLastDay = true
  574. }
  575. }
  576. // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数
  577. if isLastDay {
  578. condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )`
  579. pars = append(pars, 0, dayNum)
  580. } else {
  581. // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数
  582. condition += ` AND refresh_frequency_day = ? `
  583. pars = append(pars, dayNum)
  584. }
  585. return
  586. }
  587. // getPreviousHalfHour
  588. // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串
  589. // @author: Roc
  590. // @datetime 2024-01-09 14:27:34
  591. // @param now time.Time
  592. // @return string
  593. func getPreviousHalfHour(now time.Time) string {
  594. minute := now.Minute()
  595. if minute >= 30 {
  596. return fmt.Sprintf("%02d:%02d", now.Hour(), 30)
  597. }
  598. return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
  599. }
  600. // 根据配置把钢联化工和wind指标设置成禁止刷新
  601. func DisableEdbRefresh(cont context.Context) (err error) {
  602. //设置刷新key,如果没有执行完 报错提示
  603. cacheKey := "eta_task:DisableEdbRefresh"
  604. deleteCache := true
  605. defer func() {
  606. if deleteCache {
  607. utils.Rc.Delete(cacheKey)
  608. }
  609. if err != nil {
  610. tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
  611. utils.FileLog.Info(tips)
  612. go alarm_msg.SendAlarmMsg(tips, 3)
  613. }
  614. }()
  615. if !utils.Rc.SetNX(cacheKey, 1, 2*time.Minute) {
  616. deleteCache = false
  617. err = fmt.Errorf("系统处理中,请稍后重试!")
  618. return
  619. }
  620. //查询配置,如果未开启自动设置禁止刷新,则无需处理
  621. obj := new(models.BusinessConf)
  622. conf, err := obj.GetItemByConfKey("EdbStopRefreshRule")
  623. if err != nil {
  624. if err.Error() == utils.ErrNoRow() {
  625. err = fmt.Errorf("未找到配置项,无需处理")
  626. return
  627. }
  628. return
  629. }
  630. //将json转为结构体
  631. rule := new(models.EdbStopRefreshRule)
  632. err = json.Unmarshal([]byte(conf.ConfVal), rule)
  633. if err != nil {
  634. return
  635. }
  636. //判断是否开启自动设置禁止刷新
  637. if rule.IsOpen == 0 {
  638. return
  639. }
  640. //获取当前时间
  641. now := time.Now()
  642. if rule.BaseIndexStopDays > 0 { //设置数据源钢联化工指标禁止更新
  643. baseIndexEndDate := now.AddDate(0, 0, -rule.BaseIndexStopDays+1).Format(utils.FormatDate)
  644. // 查询钢联化工指标,查询创建时间在baseIndexStartDate前,的所有钢联化工指标,分批查询,先查总数,再查列表
  645. totalCount, e := data_manage.GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate)
  646. if e != nil {
  647. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  648. return
  649. }
  650. //分页查询
  651. pageSize := 100
  652. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  653. stopRefreshIds := make([]int32, 0)
  654. for i := 0; i < pageNum; i++ {
  655. start := i * pageSize
  656. indexItems, e := data_manage.GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate, start, pageSize)
  657. if e != nil {
  658. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  659. return
  660. }
  661. if len(indexItems) == 0 {
  662. continue
  663. }
  664. indexCodeList := make([]string, 0)
  665. for _, indexItem := range indexItems {
  666. indexCodeList = append(indexCodeList, indexItem.IndexCode)
  667. }
  668. condition := ` AND source=? AND edb_code in (` + utils.GetOrmInReplace(len(indexCodeList)) + `)`
  669. var pars []interface{}
  670. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexCodeList)
  671. // 查询指标库里这些指标是否已创建
  672. edbList, e := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  673. if e != nil {
  674. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  675. return
  676. }
  677. edbMap := make(map[string]bool)
  678. for _, edb := range edbList {
  679. edbMap[edb.EdbCode] = true
  680. }
  681. for _, indexItem := range indexItems {
  682. // 判断指标是否被创建
  683. if _, ok := edbMap[indexItem.IndexCode]; !ok {
  684. // 判断大写的指标编码是否存在
  685. if _, ok1 := edbMap[strings.ToLower(indexItem.IndexCode)]; !ok1 {
  686. stopRefreshIds = append(stopRefreshIds, indexItem.BaseFromMysteelChemicalIndexId)
  687. if len(stopRefreshIds) > 100 {
  688. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  689. if err != nil {
  690. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  691. return
  692. }
  693. stopRefreshIds = make([]int32, 0)
  694. }
  695. }
  696. }
  697. }
  698. }
  699. // 未被创建,则设置禁止刷新
  700. if len(stopRefreshIds) > 0 {
  701. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  702. if err != nil {
  703. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  704. return
  705. }
  706. }
  707. }
  708. if rule.EdbStopDays > 0 {
  709. // 查询钢联和wind来源的指标
  710. edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
  711. condition := ` AND no_update=0 AND source in (?,?) AND ((create_time < ? and set_update_time is null) or set_update_time < ? )`
  712. var pars []interface{}
  713. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate, edbEndDate)
  714. // 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
  715. totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
  716. if e != nil {
  717. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  718. return
  719. }
  720. //分页查询
  721. pageSize := 100
  722. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  723. stopRefreshIds := make([]int, 0)
  724. stopRefreshMysteelCode := make([]string, 0)
  725. fromEdbIdList := make([]int, 0)
  726. for i := 0; i < pageNum; i++ {
  727. start := i * pageSize
  728. edbItems, e := data_manage.GetEdbInfoPageByCondition(condition, pars, start, pageSize)
  729. if e != nil {
  730. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  731. return
  732. }
  733. if len(edbItems) == 0 {
  734. continue
  735. }
  736. edbInfoIds := make([]int, 0)
  737. fromEdbIdList = make([]int, 0)
  738. for _, item := range edbItems {
  739. edbInfoIds = append(edbInfoIds, item.EdbInfoId)
  740. }
  741. // 查询指标库里这些指标 引用情况
  742. relationList, e := data_manage.GetEdbInfoRelationByEdbInfoIds(edbInfoIds)
  743. if e != nil {
  744. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  745. return
  746. }
  747. edbMap := make(map[int]struct{})
  748. for _, item := range relationList {
  749. edbMap[item] = struct{}{}
  750. }
  751. for _, item := range edbItems {
  752. if _, ok := edbMap[item.EdbInfoId]; !ok {
  753. stopRefreshIds = append(stopRefreshIds, item.EdbInfoId)
  754. if item.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
  755. stopRefreshMysteelCode = append(stopRefreshMysteelCode, item.EdbCode)
  756. }
  757. if item.EdbInfoType == 0 && item.EdbType == 1 {
  758. fromEdbIdList = append(fromEdbIdList, item.EdbInfoId)
  759. }
  760. // 更新指标禁止刷新状态
  761. if len(stopRefreshIds) > 100 {
  762. // 查询相关的计算指标
  763. calculateEdbIdList := make([]int, 0)
  764. if len(fromEdbIdList) > 0 {
  765. hasFind := make(map[int]struct{})
  766. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  767. if err != nil {
  768. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  769. return
  770. }
  771. }
  772. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  773. if err != nil {
  774. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  775. return
  776. }
  777. stopRefreshIds = []int{}
  778. stopRefreshMysteelCode = []string{}
  779. }
  780. }
  781. }
  782. }
  783. // 更新指标禁止刷新状态
  784. if len(stopRefreshIds) > 0 {
  785. // 查询相关的计算指标
  786. calculateEdbIdList := make([]int, 0)
  787. if len(fromEdbIdList) > 0 {
  788. hasFind := make(map[int]struct{})
  789. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  790. if err != nil {
  791. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  792. return
  793. }
  794. }
  795. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  796. if err != nil {
  797. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  798. return
  799. }
  800. }
  801. }
  802. return
  803. }