edb_refresh.go 18 KB


  1. package edb_refresh
  2. import (
  3. "context"
  4. "eta/eta_index_lib/models/edb_refresh"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // GetPreviousHalfHourDefaultConfigList
  13. // @Description: 获取上半个小时的默认配置列表
  14. // @author: Roc
  15. // @datetime 2024-01-11 14:46:54
  16. // @param source int
  17. // @param subSource int
  18. // @return list []*edb_refresh.EdbRefreshDefaultConfig
  19. // @return err error
  20. func GetPreviousHalfHourDefaultConfigList(source, subSource int) (list []*edb_refresh.EdbRefreshDefaultConfig, err error) {
  21. defer func() {
  22. if err != nil {
  23. fmt.Println(err)
  24. }
  25. }()
  26. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  27. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  28. now := time.Now()
  29. //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
  30. //now = time.Date(2023, 12, 31, 04, 10, 59, 0, time.Local)
  31. currTimeStr := getPreviousHalfHour(now)
  32. fmt.Println(currTimeStr)
  33. // 所有默认配置刷新项
  34. list = make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
  35. // 获取各个刷新频率的配置
  36. for _, refreshFrequency := range refreshFrequencyList {
  37. // 获取刷新频率条件
  38. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  39. if !isHandler {
  40. // 可能是非交易日,所以过滤不处理
  41. continue
  42. }
  43. condition += ` AND refresh_frequency = ? AND refresh_time = ? AND source = ? AND sub_source = ? `
  44. pars = append(pars, refreshFrequency, currTimeStr, source, subSource)
  45. tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
  46. if tmpErr != nil {
  47. err = tmpErr
  48. return
  49. }
  50. list = append(list, tmpList...)
  51. }
  52. return
  53. }
  54. // ConfigRefreshData
  55. // @Description: 配置刷新数据
  56. // @author: Roc
  57. // @datetime 2024-01-10 13:55:05
  58. // @param cont context.Context
  59. // @return err error
  60. func ConfigRefreshData(cont context.Context) (err error) {
  61. errMsgList := make([]string, 0)
  62. defer func() {
  63. if err != nil {
  64. fmt.Println(err)
  65. }
  66. }()
  67. // 一期是只做wind、同花顺、钢联、有色
  68. now := time.Now()
  69. //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
  70. //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
  71. defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
  72. if err != nil {
  73. errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
  74. }
  75. sourceEdbInfoListMap, err := getConfigRefreshData(now)
  76. if err != nil {
  77. errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
  78. }
  79. // 将两个合并
  80. allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
  81. wgNum := len(allSourceEdbInfoListMap)
  82. if wgNum <= 0 {
  83. return
  84. }
  85. wg := sync.WaitGroup{}
  86. wg.Add(wgNum)
  87. for _, edbList := range allSourceEdbInfoListMap {
  88. go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
  89. }
  90. wg.Wait()
  91. fmt.Println("Refresh End")
  92. return
  93. }
  94. // Function to merge two maps
  95. func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) map[string][]*edb_refresh.EdbInfoListAndRefreshConfig {
  96. if dst == nil {
  97. return src
  98. }
  99. if src == nil {
  100. return dst
  101. }
  102. for k, v := range src {
  103. if dstk, ok := dst[k]; ok {
  104. dstk = append(dstk, v...)
  105. dst[k] = dstk
  106. } else {
  107. dst[k] = v
  108. }
  109. }
  110. return dst
  111. }
  112. // getDefaultRefreshData
  113. // @Description: 根据默认配置获取需要刷新的指标列表
  114. // @author: Roc
  115. // @datetime 2024-01-10 13:55:38
  116. // @param now time.Time
  117. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  118. // @return err error
  119. func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  120. errMsgList := make([]string, 0)
  121. defer func() {
  122. if err != nil {
  123. fmt.Println(err)
  124. }
  125. }()
  126. // 一期是只做wind、同花顺、钢联、有色
  127. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  128. currTimeStr := getPreviousHalfHour(now)
  129. fmt.Println(currTimeStr)
  130. // 所有默认配置刷新项
  131. list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
  132. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  133. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  134. // 获取各个刷新频率的配置
  135. for _, refreshFrequency := range refreshFrequencyList {
  136. // 获取刷新频率条件
  137. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  138. if !isHandler {
  139. // 可能是非交易日,所以过滤不处理
  140. continue
  141. }
  142. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  143. pars = append(pars, refreshFrequency, currTimeStr)
  144. // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
  145. condition += ` AND source not in (?,?)`
  146. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
  147. tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
  148. if tmpErr != nil {
  149. err = tmpErr
  150. return
  151. }
  152. list = append(list, tmpList...)
  153. }
  154. // 更新的单元格数
  155. refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig)
  156. // 数据源刷新频度的列表数组
  157. refreshDataFrequencyListMap := make(map[int]map[int][]string)
  158. wgNum := 0
  159. // 处理待刷新的数据源,整理成数组,方便获取对应的指标
  160. for _, item := range list {
  161. // 更新的单元格数
  162. key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency)
  163. refreshDataNumMap[key] = item
  164. // 数据源刷新频度的列表数组
  165. subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source]
  166. if !ok {
  167. subSourceFrequencyList = make(map[int][]string)
  168. }
  169. frequencyList, ok := subSourceFrequencyList[item.SubSource]
  170. if !ok {
  171. wgNum++
  172. frequencyList = make([]string, 0)
  173. }
  174. subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency)
  175. refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList
  176. }
  177. for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
  178. switch source {
  179. case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
  180. // 这种不处理
  181. default:
  182. for subSource, frequencyList := range subSourceFrequencyListMap {
  183. edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
  184. if tmpErr != nil {
  185. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  186. }
  187. for _, v := range edbList {
  188. // 数据刷新的期数
  189. dataRefreshNum := utils.DATA_REFRESH
  190. key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
  191. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  192. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  193. dataRefreshNum = 0
  194. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  195. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  196. }
  197. }
  198. v.DataRefreshNum = dataRefreshNum
  199. }
  200. key := fmt.Sprint(source, "_", subSource)
  201. sourceEdbInfoListMap[key] = edbList
  202. }
  203. }
  204. }
  205. fmt.Println("Get Refresh End")
  206. return
  207. }
  208. // getConfigRefreshData
  209. // @Description: 根据指标配置获取需要刷新的指标列表
  210. // @author: Roc
  211. // @datetime 2024-01-10 13:55:59
  212. // @param now time.Time
  213. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  214. // @return err error
  215. func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  216. defer func() {
  217. if err != nil {
  218. fmt.Println(err)
  219. }
  220. }()
  221. // 一期是只做wind、同花顺、钢联、有色
  222. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  223. currTimeStr := getPreviousHalfHour(now)
  224. // 所有默认配置刷新项
  225. list := make([]*edb_refresh.EdbRefreshConfig, 0)
  226. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  227. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  228. // 获取各个刷新频率的配置
  229. for _, refreshFrequency := range refreshFrequencyList {
  230. // 获取刷新频率条件
  231. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  232. if !isHandler {
  233. // 可能是非交易日,所以过滤不处理
  234. continue
  235. }
  236. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  237. pars = append(pars, refreshFrequency, currTimeStr)
  238. tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars)
  239. if tmpErr != nil {
  240. err = tmpErr
  241. return
  242. }
  243. list = append(list, tmpList...)
  244. }
  245. // 配置列表
  246. configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig)
  247. configIdList := make([]int, 0)
  248. for _, v := range list {
  249. configIdList = append(configIdList, v.EdbRefreshConfigId)
  250. configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
  251. }
  252. edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource([]int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}, configIdList)
  253. if err != nil {
  254. return
  255. }
  256. for _, v := range edbInfoList {
  257. key := fmt.Sprint(v.Source, "_", v.SubSource)
  258. tmpList, ok := sourceEdbInfoListMap[key]
  259. if !ok {
  260. tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  261. }
  262. // 数据刷新的期数
  263. dataRefreshNum := utils.DATA_REFRESH
  264. if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 {
  265. if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数
  266. dataRefreshNum = 0
  267. } else if edbRefreshConfig.RefreshDataNum > 0 { //
  268. dataRefreshNum = edbRefreshConfig.RefreshDataNum
  269. }
  270. }
  271. v.DataRefreshNum = dataRefreshNum
  272. sourceEdbInfoListMap[key] = append(tmpList, v)
  273. }
  274. fmt.Println("Get ConfigRefreshData End")
  275. return
  276. }
  277. // BaseRefreshData
  278. // @Description: 基础数据刷新
  279. // @author: Roc
  280. // @datetime 2024-01-09 16:27:45
  281. // @param wg *sync.WaitGroup
  282. // @return err error
  283. func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) {
  284. errMsgList := make([]string, 0)
  285. defer func() {
  286. if err != nil {
  287. fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error())
  288. go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3)
  289. }
  290. if len(errMsgList) > 0 {
  291. errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n"))
  292. fmt.Println(errMsg)
  293. go alarm_msg.SendAlarmMsg(errMsg, 3)
  294. }
  295. wg.Done()
  296. }()
  297. // 数据刷新的期数
  298. dataRefreshNum := utils.DATA_REFRESH
  299. // 是否从最开始的日期更新
  300. var isRefreshByStartDate bool
  301. for _, v := range items {
  302. if v.DataRefreshNum > 0 {
  303. dataRefreshNum = v.DataRefreshNum
  304. }
  305. startDate := ""
  306. if isRefreshByStartDate {
  307. startDate = v.StartDate.Format(utils.FormatDate)
  308. } else {
  309. if v.Frequency == "日度" {
  310. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  311. } else if v.Frequency == "周度" {
  312. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  313. } else if v.Frequency == "旬度" {
  314. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  315. } else if v.Frequency == "月度" {
  316. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  317. } else if v.Frequency == "季度" {
  318. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  319. } else if v.Frequency == "半年度" {
  320. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  321. } else if v.Frequency == "年度" {
  322. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  323. } else {
  324. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  325. }
  326. }
  327. fmt.Println(startDate)
  328. // 数据更新
  329. //resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  330. //if tmpErr != nil {
  331. // errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  332. // continue
  333. //}
  334. //if resp.Ret != 200 {
  335. // errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  336. // continue
  337. //}
  338. }
  339. fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
  340. return err
  341. }
  342. // getRefreshFrequencyCondition
  343. // @Description: 根据时间和刷新频率获取条件
  344. // @author: Roc
  345. // @datetime 2024-01-09 16:27:11
  346. // @param now time.Time
  347. // @param refreshFrequency string
  348. // @return condition string
  349. // @return pars []interface{}
  350. // @return isHandler bool
  351. func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) {
  352. isHandler = true
  353. var dayNum int
  354. var isLastDay bool
  355. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  356. switch refreshFrequency {
  357. case "每自然日":
  358. // 自然日不需要额外条件
  359. return
  360. case "每交易日":
  361. // 周六日不处理
  362. if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday {
  363. isHandler = false
  364. }
  365. return
  366. case "每周":
  367. currWeekDay := now.Weekday()
  368. if currWeekDay == time.Sunday {
  369. currWeekDay = 7
  370. isLastDay = true
  371. }
  372. dayNum = int(currWeekDay)
  373. case "每旬":
  374. currDay := now.Day()
  375. if currDay <= 10 {
  376. dayNum = currDay
  377. // 如果是这旬的最后一天
  378. if currDay == 10 {
  379. isLastDay = true
  380. }
  381. } else if currDay <= 20 {
  382. dayNum = currDay - 10
  383. // 如果是这旬的最后一天
  384. if currDay == 20 {
  385. isLastDay = true
  386. }
  387. } else {
  388. dayNum = currDay - 20
  389. // 当月的最后一天
  390. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  391. // 如果是这旬的最后一天
  392. if currDay == monthLastDay.Day() {
  393. isLastDay = true
  394. }
  395. }
  396. case "每月":
  397. // 当前日期
  398. currDay := now.Day()
  399. dayNum = currDay
  400. // 当期的最后一天
  401. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  402. // 如果是这期的最后一天
  403. if currDay == monthLastDay.Day() {
  404. isLastDay = true
  405. }
  406. case "每季":
  407. // 当期的第一天 ; 当期的最后一天
  408. var startDay, endDay time.Time
  409. currMonth := now.Month()
  410. currDay := now.Day()
  411. if currMonth <= 3 {
  412. // 当季的第一天
  413. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  414. // 当季的最后一天
  415. endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  416. } else if currMonth <= 6 {
  417. // 当期的第一天
  418. startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local)
  419. // 当期的最后一天
  420. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  421. } else if currMonth <= 9 {
  422. // 当期的第一天
  423. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  424. // 当期的最后一天
  425. endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  426. } else {
  427. // 当期的第一天
  428. startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local)
  429. // 当期的最后一天
  430. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  431. }
  432. // 计算这期的第一天和当日的天数
  433. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  434. // 如果是这期的最后一天
  435. if currMonth == endDay.Month() && currDay == endDay.Day() {
  436. isLastDay = true
  437. }
  438. case "每半年":
  439. // 当期的第一天 ; 当期的最后一天
  440. var startDay, endDay time.Time
  441. currMonth := now.Month()
  442. currDay := now.Day()
  443. if currMonth <= 6 {
  444. // 当期的第一天
  445. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  446. // 当期的最后一天
  447. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  448. } else {
  449. // 当期的第一天
  450. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  451. // 当期的最后一天
  452. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  453. }
  454. // 计算这期的第一天和当日的天数
  455. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  456. // 如果是这期的最后一天
  457. if currMonth == endDay.Month() && currDay == endDay.Day() {
  458. isLastDay = true
  459. }
  460. case "每年":
  461. currMonth := now.Month()
  462. currDay := now.Day()
  463. // 当期的第一天
  464. startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  465. // 当期的最后一天
  466. endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  467. // 计算这期的第一天和当日的天数
  468. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  469. // 如果是这期的最后一天
  470. if currMonth == endDay.Month() && currDay == endDay.Day() {
  471. isLastDay = true
  472. }
  473. }
  474. // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数
  475. if isLastDay {
  476. condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )`
  477. pars = append(pars, 0, dayNum)
  478. } else {
  479. // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数
  480. condition += ` AND refresh_frequency_day = ? `
  481. pars = append(pars, dayNum)
  482. }
  483. return
  484. }
  485. // getPreviousHalfHour
  486. // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串
  487. // @author: Roc
  488. // @datetime 2024-01-09 14:27:34
  489. // @param now time.Time
  490. // @return string
  491. func getPreviousHalfHour(now time.Time) string {
  492. minute := now.Minute()
  493. if minute >= 30 {
  494. return fmt.Sprintf("%02d:%02d", now.Hour(), 30)
  495. }
  496. return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
  497. }