edb_refresh.go 29 KB


  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_task/models"
  6. "eta/eta_task/models/data_manage"
  7. "eta/eta_task/models/data_manage/edb_refresh"
  8. "eta/eta_task/services/alarm_msg"
  9. "eta/eta_task/services/data"
  10. "eta/eta_task/utils"
  11. "fmt"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // ConfigRefreshData
  17. // @Description: 配置刷新数据
  18. // @author: Roc
  19. // @datetime 2024-01-10 13:55:05
  20. // @param cont context.Context
  21. // @return err error
  22. func ConfigRefreshData(cont context.Context) (err error) {
  23. errMsgList := make([]string, 0)
  24. defer func() {
  25. if err != nil {
  26. fmt.Println(err)
  27. }
  28. }()
  29. // 一期是只做wind、同花顺、钢联、有色
  30. now := time.Now()
  31. //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
  32. //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
  33. defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
  34. if err != nil {
  35. errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
  36. }
  37. sourceEdbInfoListMap, err := getConfigRefreshData(now)
  38. if err != nil {
  39. errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
  40. }
  41. // 将两个合并
  42. allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
  43. wgNum := len(allSourceEdbInfoListMap)
  44. if wgNum <= 0 {
  45. return
  46. }
  47. wg := sync.WaitGroup{}
  48. wg.Add(wgNum)
  49. for _, edbList := range allSourceEdbInfoListMap {
  50. if edbList == nil {
  51. wg.Done()
  52. continue
  53. }
  54. if len(edbList) != 0 {
  55. // debug环境仅测试刷新钢联
  56. if edbList[0].Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL && utils.RunMode == "debug" {
  57. go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
  58. }
  59. }
  60. }
  61. wg.Wait()
  62. fmt.Println("Refresh End")
  63. return
  64. }
  65. // Function to merge two maps
  66. func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) {
  67. if dst == nil {
  68. return src
  69. }
  70. if src == nil {
  71. return dst
  72. }
  73. newMap = dst
  74. for k, v := range src {
  75. if newK, ok := newMap[k]; ok {
  76. newK = append(newK, v...)
  77. newMap[k] = newK
  78. } else {
  79. newMap[k] = v
  80. }
  81. }
  82. return newMap
  83. }
  84. // getDefaultRefreshData
  85. // @Description: 根据默认配置获取需要刷新的指标列表
  86. // @author: Roc
  87. // @datetime 2024-01-10 13:55:38
  88. // @param now time.Time
  89. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  90. // @return err error
  91. func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  92. errMsgList := make([]string, 0)
  93. defer func() {
  94. if err != nil {
  95. fmt.Println(err)
  96. }
  97. }()
  98. // 一期是只做wind、同花顺、钢联、有色
  99. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  100. currTimeStr := getPreviousHalfHour(now)
  101. fmt.Println(currTimeStr)
  102. // 所有默认配置刷新项
  103. list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
  104. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  105. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  106. conf, err := models.GetBusinessConf()
  107. if err != nil {
  108. fmt.Println(err)
  109. utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
  110. return
  111. }
  112. // 获取钢联化工的数据获取方式
  113. mySteelChemicalDataMethod := "excel"
  114. if v, ok := conf["MySteelDataMethod"]; ok {
  115. if v == "api" {
  116. mySteelChemicalDataMethod = v
  117. }
  118. }
  119. utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
  120. // 获取各个刷新频率的配置
  121. for _, refreshFrequency := range refreshFrequencyList {
  122. // 获取刷新频率条件
  123. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  124. if !isHandler {
  125. // 可能是非交易日,所以过滤不处理
  126. continue
  127. }
  128. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  129. pars = append(pars, refreshFrequency, currTimeStr)
  130. // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
  131. if mySteelChemicalDataMethod == "api" {
  132. // 钢联化工使用api的方式获取数据的,不需要过滤
  133. condition += ` AND source not in (?)`
  134. pars = append(pars, utils.DATA_SOURCE_YS)
  135. } else {
  136. condition += ` AND source not in (?,?)`
  137. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
  138. }
  139. tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
  140. if tmpErr != nil {
  141. err = tmpErr
  142. return
  143. }
  144. list = append(list, tmpList...)
  145. }
  146. // 更新的单元格数
  147. refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig)
  148. // 数据源刷新频度的列表数组
  149. refreshDataFrequencyListMap := make(map[int]map[int][]string)
  150. wgNum := 0
  151. // 处理待刷新的数据源,整理成数组,方便获取对应的指标
  152. for _, item := range list {
  153. // 更新的单元格数
  154. key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency)
  155. refreshDataNumMap[key] = item
  156. // 数据源刷新频度的列表数组
  157. subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source]
  158. if !ok {
  159. subSourceFrequencyList = make(map[int][]string)
  160. }
  161. frequencyList, ok := subSourceFrequencyList[item.SubSource]
  162. if !ok {
  163. wgNum++
  164. frequencyList = make([]string, 0)
  165. }
  166. subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency)
  167. refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList
  168. }
  169. for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
  170. switch source {
  171. case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
  172. // 只处理钢联化工使用api方式获取数据的情况
  173. if mySteelChemicalDataMethod == "api" {
  174. for subSource, frequencyList := range subSourceFrequencyListMap {
  175. items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList)
  176. if tmpErr != nil {
  177. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  178. }
  179. indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  180. for _, v := range items {
  181. tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig)
  182. // 数据刷新的期数
  183. dataRefreshNum := utils.DATA_REFRESH
  184. key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency)
  185. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  186. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  187. dataRefreshNum = 0
  188. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  189. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  190. }
  191. }
  192. tmpConf.EdbCode = v.IndexCode
  193. tmpConf.EdbName = v.IndexName
  194. tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL
  195. tmpConf.Frequency = v.Frequency
  196. tmpConf.Unit = v.Unit
  197. tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate)
  198. tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate)
  199. tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId)
  200. tmpConf.DataRefreshNum = dataRefreshNum
  201. tmpConf.EdbInfoId = v.EdbInfoId
  202. indexList = append(indexList, tmpConf)
  203. }
  204. key := fmt.Sprint(source, "_", subSource)
  205. sourceEdbInfoListMap[key] = indexList
  206. }
  207. }
  208. // 其他情况不处理
  209. default:
  210. for subSource, frequencyList := range subSourceFrequencyListMap {
  211. edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
  212. if tmpErr != nil {
  213. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  214. }
  215. for _, v := range edbList {
  216. // 数据刷新的期数
  217. dataRefreshNum := utils.DATA_REFRESH
  218. key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
  219. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  220. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  221. dataRefreshNum = 0
  222. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  223. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  224. }
  225. }
  226. v.DataRefreshNum = dataRefreshNum
  227. }
  228. key := fmt.Sprint(source, "_", subSource)
  229. sourceEdbInfoListMap[key] = edbList
  230. }
  231. }
  232. }
  233. fmt.Println("Get Refresh End")
  234. return
  235. }
  236. // getConfigRefreshData
  237. // @Description: 根据指标配置获取需要刷新的指标列表
  238. // @author: Roc
  239. // @datetime 2024-01-10 13:55:59
  240. // @param now time.Time
  241. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  242. // @return err error
  243. func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  244. defer func() {
  245. if err != nil {
  246. fmt.Println(err)
  247. }
  248. }()
  249. // 一期是只做wind、同花顺、钢联、有色
  250. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  251. currTimeStr := getPreviousHalfHour(now)
  252. // 所有默认配置刷新项
  253. list := make([]*edb_refresh.EdbRefreshConfig, 0)
  254. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  255. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  256. // 获取各个刷新频率的配置
  257. for _, refreshFrequency := range refreshFrequencyList {
  258. // 获取刷新频率条件
  259. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  260. if !isHandler {
  261. // 可能是非交易日,所以过滤不处理
  262. continue
  263. }
  264. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  265. pars = append(pars, refreshFrequency, currTimeStr)
  266. tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars)
  267. if tmpErr != nil {
  268. err = tmpErr
  269. return
  270. }
  271. list = append(list, tmpList...)
  272. }
  273. // 配置列表
  274. configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig)
  275. configIdList := make([]int, 0)
  276. for _, v := range list {
  277. configIdList = append(configIdList, v.EdbRefreshConfigId)
  278. configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
  279. }
  280. conf, err := models.GetBusinessConf()
  281. if err != nil {
  282. fmt.Println(err)
  283. return
  284. }
  285. // 获取钢联化工的数据获取方式
  286. mySteelChemicalDataMethod := "excel"
  287. if v, ok := conf["MySteelDataMethod"]; ok {
  288. if v == "api" {
  289. mySteelChemicalDataMethod = v
  290. }
  291. }
  292. // 当钢联的数据获取方式是api时,不用过滤
  293. var sourceList []int
  294. if mySteelChemicalDataMethod == "api" {
  295. sourceList = []int{utils.DATA_SOURCE_YS}
  296. } else {
  297. sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
  298. }
  299. edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList)
  300. if err != nil {
  301. return
  302. }
  303. for _, v := range edbInfoList {
  304. key := fmt.Sprint(v.Source, "_", v.SubSource)
  305. tmpList, ok := sourceEdbInfoListMap[key]
  306. if !ok {
  307. tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  308. }
  309. // 数据刷新的期数
  310. dataRefreshNum := utils.DATA_REFRESH
  311. if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 {
  312. if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数
  313. dataRefreshNum = 0
  314. } else if edbRefreshConfig.RefreshDataNum > 0 { //
  315. dataRefreshNum = edbRefreshConfig.RefreshDataNum
  316. }
  317. }
  318. v.DataRefreshNum = dataRefreshNum
  319. sourceEdbInfoListMap[key] = append(tmpList, v)
  320. }
  321. fmt.Println("Get ConfigRefreshData End")
  322. return
  323. }
  324. // BaseRefreshData
  325. // @Description: 基础数据刷新
  326. // @author: Roc
  327. // @datetime 2024-01-09 16:27:45
  328. // @param wg *sync.WaitGroup
  329. // @return err error
  330. func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) {
  331. errMsgList := make([]string, 0)
  332. defer func() {
  333. if err != nil {
  334. fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error())
  335. go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3)
  336. }
  337. if len(errMsgList) > 0 {
  338. errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n"))
  339. fmt.Println(errMsg)
  340. go alarm_msg.SendAlarmMsg(errMsg, 3)
  341. }
  342. wg.Done()
  343. }()
  344. // 数据刷新的期数
  345. dataRefreshNum := utils.DATA_REFRESH
  346. // 是否从最开始的日期更新
  347. var isRefreshByStartDate bool
  348. if source != utils.DATA_SOURCE_THS {
  349. for _, v := range items {
  350. // 如果暂停更新,那就过滤
  351. if v.NoUpdate == 1 {
  352. continue
  353. }
  354. if v.DataRefreshNum > 0 {
  355. dataRefreshNum = v.DataRefreshNum
  356. }
  357. startDate := ""
  358. if isRefreshByStartDate {
  359. startDate = v.StartDate.Format(utils.FormatDate)
  360. } else {
  361. if v.Frequency == "日度" {
  362. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  363. } else if v.Frequency == "周度" {
  364. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  365. } else if v.Frequency == "旬度" {
  366. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  367. } else if v.Frequency == "月度" {
  368. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  369. } else if v.Frequency == "季度" {
  370. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  371. } else if v.Frequency == "半年度" {
  372. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  373. } else if v.Frequency == "年度" {
  374. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  375. } else {
  376. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  377. }
  378. }
  379. fmt.Println(startDate)
  380. // 数据更新
  381. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  382. if tmpErr != nil {
  383. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  384. continue
  385. }
  386. if resp.Ret != 200 {
  387. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  388. continue
  389. }
  390. }
  391. }
  392. // 同花顺控制速率, 每秒最多4次请求(同花顺EDB函数限制为5, 考虑到可能存在用户同时在使用, 这里批量刷新设置为4, 话说5确实是有点少了吧=_=!)
  393. if source == utils.DATA_SOURCE_THS {
  394. ticker := time.NewTicker(250 * time.Millisecond)
  395. defer ticker.Stop()
  396. for _, v := range items {
  397. <-ticker.C
  398. // 如果暂停更新,那就过滤
  399. if v.NoUpdate == 1 {
  400. continue
  401. }
  402. if v.DataRefreshNum > 0 {
  403. dataRefreshNum = v.DataRefreshNum
  404. }
  405. startDate := ""
  406. if isRefreshByStartDate {
  407. startDate = v.StartDate.Format(utils.FormatDate)
  408. } else {
  409. if v.Frequency == "日度" {
  410. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  411. } else if v.Frequency == "周度" {
  412. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  413. } else if v.Frequency == "旬度" {
  414. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  415. } else if v.Frequency == "月度" {
  416. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  417. } else if v.Frequency == "季度" {
  418. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  419. } else if v.Frequency == "半年度" {
  420. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  421. } else if v.Frequency == "年度" {
  422. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  423. } else {
  424. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  425. }
  426. }
  427. fmt.Println(startDate)
  428. // 数据更新
  429. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  430. if tmpErr != nil {
  431. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  432. continue
  433. }
  434. if resp.Ret != 200 {
  435. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  436. continue
  437. }
  438. }
  439. }
  440. fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
  441. return err
  442. }
  443. // getRefreshFrequencyCondition
  444. // @Description: 根据时间和刷新频率获取条件
  445. // @author: Roc
  446. // @datetime 2024-01-09 16:27:11
  447. // @param now time.Time
  448. // @param refreshFrequency string
  449. // @return condition string
  450. // @return pars []interface{}
  451. // @return isHandler bool
  452. func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) {
  453. isHandler = true
  454. var dayNum int
  455. var isLastDay bool
  456. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  457. switch refreshFrequency {
  458. case "每自然日":
  459. // 自然日不需要额外条件
  460. return
  461. case "每交易日":
  462. // 周六日不处理
  463. if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday {
  464. isHandler = false
  465. }
  466. return
  467. case "每周":
  468. currWeekDay := now.Weekday()
  469. if currWeekDay == time.Sunday {
  470. currWeekDay = 7
  471. isLastDay = true
  472. }
  473. dayNum = int(currWeekDay)
  474. case "每旬":
  475. currDay := now.Day()
  476. if currDay <= 10 {
  477. dayNum = currDay
  478. // 如果是这旬的最后一天
  479. if currDay == 10 {
  480. isLastDay = true
  481. }
  482. } else if currDay <= 20 {
  483. dayNum = currDay - 10
  484. // 如果是这旬的最后一天
  485. if currDay == 20 {
  486. isLastDay = true
  487. }
  488. } else {
  489. dayNum = currDay - 20
  490. // 当月的最后一天
  491. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  492. // 如果是这旬的最后一天
  493. if currDay == monthLastDay.Day() {
  494. isLastDay = true
  495. }
  496. }
  497. case "每月":
  498. // 当前日期
  499. currDay := now.Day()
  500. dayNum = currDay
  501. // 当期的最后一天
  502. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  503. // 如果是这期的最后一天
  504. if currDay == monthLastDay.Day() {
  505. isLastDay = true
  506. }
  507. case "每季":
  508. // 当期的第一天 ; 当期的最后一天
  509. var startDay, endDay time.Time
  510. currMonth := now.Month()
  511. currDay := now.Day()
  512. if currMonth <= 3 {
  513. // 当季的第一天
  514. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  515. // 当季的最后一天
  516. endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  517. } else if currMonth <= 6 {
  518. // 当期的第一天
  519. startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local)
  520. // 当期的最后一天
  521. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  522. } else if currMonth <= 9 {
  523. // 当期的第一天
  524. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  525. // 当期的最后一天
  526. endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  527. } else {
  528. // 当期的第一天
  529. startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local)
  530. // 当期的最后一天
  531. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  532. }
  533. // 计算这期的第一天和当日的天数
  534. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  535. // 如果是这期的最后一天
  536. if currMonth == endDay.Month() && currDay == endDay.Day() {
  537. isLastDay = true
  538. }
  539. case "每半年":
  540. // 当期的第一天 ; 当期的最后一天
  541. var startDay, endDay time.Time
  542. currMonth := now.Month()
  543. currDay := now.Day()
  544. if currMonth <= 6 {
  545. // 当期的第一天
  546. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  547. // 当期的最后一天
  548. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  549. } else {
  550. // 当期的第一天
  551. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  552. // 当期的最后一天
  553. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  554. }
  555. // 计算这期的第一天和当日的天数
  556. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  557. // 如果是这期的最后一天
  558. if currMonth == endDay.Month() && currDay == endDay.Day() {
  559. isLastDay = true
  560. }
  561. case "每年":
  562. currMonth := now.Month()
  563. currDay := now.Day()
  564. // 当期的第一天
  565. startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  566. // 当期的最后一天
  567. endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  568. // 计算这期的第一天和当日的天数
  569. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  570. // 如果是这期的最后一天
  571. if currMonth == endDay.Month() && currDay == endDay.Day() {
  572. isLastDay = true
  573. }
  574. }
  575. // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数
  576. if isLastDay {
  577. condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )`
  578. pars = append(pars, 0, dayNum)
  579. } else {
  580. // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数
  581. condition += ` AND refresh_frequency_day = ? `
  582. pars = append(pars, dayNum)
  583. }
  584. return
  585. }
  586. // getPreviousHalfHour
  587. // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串
  588. // @author: Roc
  589. // @datetime 2024-01-09 14:27:34
  590. // @param now time.Time
  591. // @return string
  592. func getPreviousHalfHour(now time.Time) string {
  593. minute := now.Minute()
  594. if minute >= 30 {
  595. return fmt.Sprintf("%02d:%02d", now.Hour(), 30)
  596. }
  597. return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
  598. }
  599. // 根据配置把钢联化工和wind指标设置成禁止刷新
  600. func DisableEdbRefresh(cont context.Context) (err error) {
  601. //设置刷新key,如果没有执行完 报错提示
  602. cacheKey := "eta_task:DisableEdbRefresh"
  603. deleteCache := true
  604. defer func() {
  605. if deleteCache {
  606. utils.Rc.Delete(cacheKey)
  607. }
  608. if err != nil {
  609. tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
  610. utils.FileLog.Info(tips)
  611. go alarm_msg.SendAlarmMsg(tips, 3)
  612. }
  613. }()
  614. if !utils.Rc.SetNX(cacheKey, 1, 2*time.Minute) {
  615. deleteCache = false
  616. err = fmt.Errorf("系统处理中,请稍后重试!")
  617. return
  618. }
  619. //查询配置,如果未开启自动设置禁止刷新,则无需处理
  620. obj := new(models.BusinessConf)
  621. conf, err := obj.GetItemByConfKey("EdbStopRefreshRule")
  622. if err != nil {
  623. if err.Error() == utils.ErrNoRow() {
  624. err = fmt.Errorf("未找到配置项,无需处理")
  625. return
  626. }
  627. return
  628. }
  629. //将json转为结构体
  630. rule := new(models.EdbStopRefreshRule)
  631. err = json.Unmarshal([]byte(conf.ConfVal), rule)
  632. if err != nil {
  633. return
  634. }
  635. //判断是否开启自动设置禁止刷新
  636. if rule.IsOpen == 0 {
  637. return
  638. }
  639. //获取当前时间
  640. now := time.Now()
  641. if rule.BaseIndexStopDays > 0 { //设置数据源钢联化工指标禁止更新
  642. baseIndexEndDate := now.AddDate(0, 0, -rule.BaseIndexStopDays+1).Format(utils.FormatDate)
  643. // 查询钢联化工指标,查询创建时间在baseIndexStartDate前,的所有钢联化工指标,分批查询,先查总数,再查列表
  644. totalCount, e := data_manage.GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate)
  645. if e != nil {
  646. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  647. return
  648. }
  649. //分页查询
  650. pageSize := 100
  651. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  652. stopRefreshIds := make([]int32, 0)
  653. for i := 0; i < pageNum; i++ {
  654. start := i * pageSize
  655. indexItems, e := data_manage.GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate, start, pageSize)
  656. if e != nil {
  657. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  658. return
  659. }
  660. if len(indexItems) == 0 {
  661. continue
  662. }
  663. indexCodeList := make([]string, 0)
  664. for _, indexItem := range indexItems {
  665. indexCodeList = append(indexCodeList, indexItem.IndexCode)
  666. }
  667. condition := ` AND source=? AND edb_code in (` + utils.GetOrmInReplace(len(indexCodeList)) + `)`
  668. var pars []interface{}
  669. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexCodeList)
  670. // 查询指标库里这些指标是否已创建
  671. edbList, e := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  672. if e != nil {
  673. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  674. return
  675. }
  676. edbMap := make(map[string]bool)
  677. for _, edb := range edbList {
  678. edbMap[edb.EdbCode] = true
  679. }
  680. for _, indexItem := range indexItems {
  681. // 判断指标是否被创建
  682. if _, ok := edbMap[indexItem.IndexCode]; !ok {
  683. stopRefreshIds = append(stopRefreshIds, indexItem.BaseFromMysteelChemicalIndexId)
  684. if len(stopRefreshIds) > 100 {
  685. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  686. if err != nil {
  687. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  688. return
  689. }
  690. stopRefreshIds = make([]int32, 0)
  691. }
  692. }
  693. }
  694. }
  695. // 未被创建,则设置禁止刷新
  696. if len(stopRefreshIds) > 0 {
  697. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  698. if err != nil {
  699. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  700. return
  701. }
  702. }
  703. }
  704. if rule.EdbStopDays > 0 {
  705. // 查询钢联和wind来源的指标
  706. edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
  707. condition := ` AND no_update=0 AND source in (?,?) AND create_time < ?`
  708. var pars []interface{}
  709. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate)
  710. // 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
  711. totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
  712. if e != nil {
  713. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  714. return
  715. }
  716. //分页查询
  717. pageSize := 100
  718. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  719. stopRefreshIds := make([]int, 0)
  720. stopRefreshMysteelCode := make([]string, 0)
  721. fromEdbIdList := make([]int, 0)
  722. for i := 0; i < pageNum; i++ {
  723. start := i * pageSize
  724. edbItems, e := data_manage.GetEdbInfoPageByCondition(condition, pars, start, pageSize)
  725. if e != nil {
  726. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  727. return
  728. }
  729. if len(edbItems) == 0 {
  730. continue
  731. }
  732. edbInfoIds := make([]int, 0)
  733. fromEdbIdList = make([]int, 0)
  734. for _, item := range edbItems {
  735. edbInfoIds = append(edbInfoIds, item.EdbInfoId)
  736. }
  737. // 查询指标库里这些指标 引用情况
  738. relationList, e := data_manage.GetEdbInfoRelationByEdbInfoIds(edbInfoIds)
  739. if e != nil {
  740. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  741. return
  742. }
  743. edbMap := make(map[int]struct{})
  744. for _, item := range relationList {
  745. edbMap[item] = struct{}{}
  746. }
  747. for _, item := range edbItems {
  748. if _, ok := edbMap[item.EdbInfoId]; !ok {
  749. stopRefreshIds = append(stopRefreshIds, item.EdbInfoId)
  750. if item.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
  751. stopRefreshMysteelCode = append(stopRefreshMysteelCode, item.EdbCode)
  752. }
  753. if item.EdbInfoType == 0 && item.EdbType == 1 {
  754. fromEdbIdList = append(fromEdbIdList, item.EdbInfoId)
  755. }
  756. // 更新指标禁止刷新状态
  757. if len(stopRefreshIds) > 100 {
  758. // 查询相关的计算指标
  759. calculateEdbIdList := make([]int, 0)
  760. if len(fromEdbIdList) > 0 {
  761. hasFind := make(map[int]struct{})
  762. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  763. if err != nil {
  764. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  765. return
  766. }
  767. }
  768. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  769. if err != nil {
  770. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  771. return
  772. }
  773. stopRefreshIds = []int{}
  774. stopRefreshMysteelCode = []string{}
  775. }
  776. }
  777. }
  778. }
  779. // 更新指标禁止刷新状态
  780. if len(stopRefreshIds) > 0 {
  781. // 查询相关的计算指标
  782. calculateEdbIdList := make([]int, 0)
  783. if len(fromEdbIdList) > 0 {
  784. hasFind := make(map[int]struct{})
  785. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  786. if err != nil {
  787. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  788. return
  789. }
  790. }
  791. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  792. if err != nil {
  793. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  794. return
  795. }
  796. }
  797. }
  798. return
  799. }