edb_refresh.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_task/models"
  6. "eta/eta_task/models/data_manage"
  7. "eta/eta_task/models/data_manage/edb_refresh"
  8. "eta/eta_task/services/alarm_msg"
  9. "eta/eta_task/services/data"
  10. "eta/eta_task/utils"
  11. "fmt"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // ConfigRefreshData
  17. // @Description: 配置刷新数据
  18. // @author: Roc
  19. // @datetime 2024-01-10 13:55:05
  20. // @param cont context.Context
  21. // @return err error
  22. func ConfigRefreshData(cont context.Context) (err error) {
  23. errMsgList := make([]string, 0)
  24. defer func() {
  25. if err != nil {
  26. fmt.Println(err)
  27. }
  28. }()
  29. // 一期是只做wind、同花顺、钢联、有色
  30. now := time.Now()
  31. //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
  32. //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
  33. defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
  34. if err != nil {
  35. errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
  36. }
  37. sourceEdbInfoListMap, err := getConfigRefreshData(now)
  38. if err != nil {
  39. errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
  40. }
  41. // 将两个合并
  42. allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
  43. wgNum := len(allSourceEdbInfoListMap)
  44. if wgNum <= 0 {
  45. return
  46. }
  47. wg := sync.WaitGroup{}
  48. wg.Add(wgNum)
  49. for _, edbList := range allSourceEdbInfoListMap {
  50. if edbList == nil {
  51. wg.Done()
  52. continue
  53. }
  54. if len(edbList) != 0 {
  55. go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
  56. }
  57. }
  58. wg.Wait()
  59. fmt.Println("Refresh End")
  60. return
  61. }
  62. // Function to merge two maps
  63. func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) {
  64. if dst == nil {
  65. return src
  66. }
  67. if src == nil {
  68. return dst
  69. }
  70. newMap = dst
  71. for k, v := range src {
  72. if newK, ok := newMap[k]; ok {
  73. newK = append(newK, v...)
  74. newMap[k] = newK
  75. } else {
  76. newMap[k] = v
  77. }
  78. }
  79. return newMap
  80. }
  81. // getDefaultRefreshData
  82. // @Description: 根据默认配置获取需要刷新的指标列表
  83. // @author: Roc
  84. // @datetime 2024-01-10 13:55:38
  85. // @param now time.Time
  86. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  87. // @return err error
  88. func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  89. errMsgList := make([]string, 0)
  90. defer func() {
  91. if err != nil {
  92. fmt.Println(err)
  93. }
  94. }()
  95. // 一期是只做wind、同花顺、钢联、有色
  96. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  97. currTimeStr := getPreviousHalfHour(now)
  98. fmt.Println(currTimeStr)
  99. // 所有默认配置刷新项
  100. list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
  101. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  102. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  103. conf, err := models.GetBusinessConf()
  104. if err != nil {
  105. fmt.Println(err)
  106. utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
  107. return
  108. }
  109. // 获取钢联化工的数据获取方式
  110. mySteelChemicalDataMethod := "excel"
  111. if v, ok := conf["MySteelDataMethod"]; ok {
  112. if v == "api" {
  113. mySteelChemicalDataMethod = v
  114. }
  115. }
  116. utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
  117. // 获取各个刷新频率的配置
  118. for _, refreshFrequency := range refreshFrequencyList {
  119. // 获取刷新频率条件
  120. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  121. if !isHandler {
  122. // 可能是非交易日,所以过滤不处理
  123. continue
  124. }
  125. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  126. pars = append(pars, refreshFrequency, currTimeStr)
  127. // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
  128. if mySteelChemicalDataMethod == "api" {
  129. // 钢联化工使用api的方式获取数据的,不需要过滤
  130. condition += ` AND source not in (?)`
  131. pars = append(pars, utils.DATA_SOURCE_YS)
  132. } else {
  133. condition += ` AND source not in (?,?)`
  134. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
  135. }
  136. tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
  137. if tmpErr != nil {
  138. err = tmpErr
  139. return
  140. }
  141. list = append(list, tmpList...)
  142. }
  143. // 更新的单元格数
  144. refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig)
  145. // 数据源刷新频度的列表数组
  146. refreshDataFrequencyListMap := make(map[int]map[int][]string)
  147. wgNum := 0
  148. // 处理待刷新的数据源,整理成数组,方便获取对应的指标
  149. for _, item := range list {
  150. // 更新的单元格数
  151. key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency)
  152. refreshDataNumMap[key] = item
  153. // 数据源刷新频度的列表数组
  154. subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source]
  155. if !ok {
  156. subSourceFrequencyList = make(map[int][]string)
  157. }
  158. frequencyList, ok := subSourceFrequencyList[item.SubSource]
  159. if !ok {
  160. wgNum++
  161. frequencyList = make([]string, 0)
  162. }
  163. subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency)
  164. refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList
  165. }
  166. for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
  167. switch source {
  168. case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
  169. // 只处理钢联化工使用api方式获取数据的情况
  170. if mySteelChemicalDataMethod == "api" {
  171. for subSource, frequencyList := range subSourceFrequencyListMap {
  172. items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList)
  173. if tmpErr != nil {
  174. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  175. }
  176. indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  177. for _, v := range items {
  178. tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig)
  179. // 数据刷新的期数
  180. dataRefreshNum := utils.DATA_REFRESH
  181. key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency)
  182. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  183. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  184. dataRefreshNum = 0
  185. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  186. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  187. }
  188. }
  189. tmpConf.EdbCode = v.IndexCode
  190. tmpConf.EdbName = v.IndexName
  191. tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL
  192. tmpConf.Frequency = v.Frequency
  193. tmpConf.Unit = v.Unit
  194. tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate)
  195. tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate)
  196. tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId)
  197. tmpConf.DataRefreshNum = dataRefreshNum
  198. tmpConf.EdbInfoId = v.EdbInfoId
  199. indexList = append(indexList, tmpConf)
  200. }
  201. key := fmt.Sprint(source, "_", subSource)
  202. sourceEdbInfoListMap[key] = indexList
  203. }
  204. }
  205. // 其他情况不处理
  206. default:
  207. for subSource, frequencyList := range subSourceFrequencyListMap {
  208. edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
  209. if tmpErr != nil {
  210. errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
  211. }
  212. for _, v := range edbList {
  213. // 数据刷新的期数
  214. dataRefreshNum := utils.DATA_REFRESH
  215. key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
  216. if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
  217. if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
  218. dataRefreshNum = 0
  219. } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
  220. dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
  221. }
  222. }
  223. v.DataRefreshNum = dataRefreshNum
  224. }
  225. key := fmt.Sprint(source, "_", subSource)
  226. sourceEdbInfoListMap[key] = edbList
  227. }
  228. }
  229. }
  230. fmt.Println("Get Refresh End")
  231. return
  232. }
  233. // getConfigRefreshData
  234. // @Description: 根据指标配置获取需要刷新的指标列表
  235. // @author: Roc
  236. // @datetime 2024-01-10 13:55:59
  237. // @param now time.Time
  238. // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
  239. // @return err error
  240. func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
  241. defer func() {
  242. if err != nil {
  243. fmt.Println(err)
  244. }
  245. }()
  246. // 一期是只做wind、同花顺、钢联、有色
  247. sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
  248. currTimeStr := getPreviousHalfHour(now)
  249. // 所有默认配置刷新项
  250. list := make([]*edb_refresh.EdbRefreshConfig, 0)
  251. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  252. refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
  253. // 获取各个刷新频率的配置
  254. for _, refreshFrequency := range refreshFrequencyList {
  255. // 获取刷新频率条件
  256. condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
  257. if !isHandler {
  258. // 可能是非交易日,所以过滤不处理
  259. continue
  260. }
  261. condition += ` AND refresh_frequency = ? AND refresh_time = ?`
  262. pars = append(pars, refreshFrequency, currTimeStr)
  263. tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars)
  264. if tmpErr != nil {
  265. err = tmpErr
  266. return
  267. }
  268. list = append(list, tmpList...)
  269. }
  270. // 配置列表
  271. configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig)
  272. configIdList := make([]int, 0)
  273. for _, v := range list {
  274. configIdList = append(configIdList, v.EdbRefreshConfigId)
  275. configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
  276. }
  277. conf, err := models.GetBusinessConf()
  278. if err != nil {
  279. fmt.Println(err)
  280. return
  281. }
  282. // 获取钢联化工的数据获取方式
  283. mySteelChemicalDataMethod := "excel"
  284. if v, ok := conf["MySteelDataMethod"]; ok {
  285. if v == "api" {
  286. mySteelChemicalDataMethod = v
  287. }
  288. }
  289. // 当钢联的数据获取方式是api时,不用过滤
  290. var sourceList []int
  291. if mySteelChemicalDataMethod == "api" {
  292. sourceList = []int{utils.DATA_SOURCE_YS}
  293. } else {
  294. sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
  295. }
  296. edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList)
  297. if err != nil {
  298. return
  299. }
  300. for _, v := range edbInfoList {
  301. key := fmt.Sprint(v.Source, "_", v.SubSource)
  302. tmpList, ok := sourceEdbInfoListMap[key]
  303. if !ok {
  304. tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
  305. }
  306. // 数据刷新的期数
  307. dataRefreshNum := utils.DATA_REFRESH
  308. if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 {
  309. if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数
  310. dataRefreshNum = 0
  311. } else if edbRefreshConfig.RefreshDataNum > 0 { //
  312. dataRefreshNum = edbRefreshConfig.RefreshDataNum
  313. }
  314. }
  315. v.DataRefreshNum = dataRefreshNum
  316. sourceEdbInfoListMap[key] = append(tmpList, v)
  317. }
  318. fmt.Println("Get ConfigRefreshData End")
  319. return
  320. }
  321. // BaseRefreshData
  322. // @Description: 基础数据刷新
  323. // @author: Roc
  324. // @datetime 2024-01-09 16:27:45
  325. // @param wg *sync.WaitGroup
  326. // @return err error
  327. func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) {
  328. errMsgList := make([]string, 0)
  329. defer func() {
  330. if err != nil {
  331. fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error())
  332. go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3)
  333. }
  334. if len(errMsgList) > 0 {
  335. errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n"))
  336. fmt.Println(errMsg)
  337. go alarm_msg.SendAlarmMsg(errMsg, 3)
  338. }
  339. wg.Done()
  340. }()
  341. // 数据刷新的期数
  342. dataRefreshNum := utils.DATA_REFRESH
  343. // 是否从最开始的日期更新
  344. var isRefreshByStartDate bool
  345. if source != utils.DATA_SOURCE_THS {
  346. for _, v := range items {
  347. // 如果暂停更新,那就过滤
  348. if v.NoUpdate == 1 {
  349. continue
  350. }
  351. if v.DataRefreshNum > 0 {
  352. dataRefreshNum = v.DataRefreshNum
  353. }
  354. startDate := ""
  355. if isRefreshByStartDate {
  356. startDate = v.StartDate.Format(utils.FormatDate)
  357. } else {
  358. if v.Frequency == "日度" {
  359. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  360. } else if v.Frequency == "周度" {
  361. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  362. } else if v.Frequency == "旬度" {
  363. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  364. } else if v.Frequency == "月度" {
  365. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  366. } else if v.Frequency == "季度" {
  367. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  368. } else if v.Frequency == "半年度" {
  369. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  370. } else if v.Frequency == "年度" {
  371. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  372. } else {
  373. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  374. }
  375. }
  376. fmt.Println(startDate)
  377. // 数据更新
  378. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  379. if tmpErr != nil {
  380. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  381. continue
  382. }
  383. if resp.Ret != 200 {
  384. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  385. continue
  386. }
  387. }
  388. }
  389. // 同花顺控制速率, 每秒最多4次请求(同花顺EDB函数限制为5, 考虑到可能存在用户同时在使用, 这里批量刷新设置为4, 话说5确实是有点少了吧=_=!)
  390. if source == utils.DATA_SOURCE_THS {
  391. ticker := time.NewTicker(250 * time.Millisecond)
  392. defer ticker.Stop()
  393. for _, v := range items {
  394. <-ticker.C
  395. // 如果暂停更新,那就过滤
  396. if v.NoUpdate == 1 {
  397. continue
  398. }
  399. if v.DataRefreshNum > 0 {
  400. dataRefreshNum = v.DataRefreshNum
  401. }
  402. startDate := ""
  403. if isRefreshByStartDate {
  404. startDate = v.StartDate.Format(utils.FormatDate)
  405. } else {
  406. if v.Frequency == "日度" {
  407. startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
  408. } else if v.Frequency == "周度" {
  409. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
  410. } else if v.Frequency == "旬度" {
  411. startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
  412. } else if v.Frequency == "月度" {
  413. startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
  414. } else if v.Frequency == "季度" {
  415. startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
  416. } else if v.Frequency == "半年度" {
  417. startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
  418. } else if v.Frequency == "年度" {
  419. startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
  420. } else {
  421. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  422. }
  423. }
  424. fmt.Println(startDate)
  425. // 数据更新
  426. resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  427. if tmpErr != nil {
  428. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
  429. continue
  430. }
  431. if resp.Ret != 200 {
  432. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  433. continue
  434. }
  435. }
  436. }
  437. fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
  438. return err
  439. }
  440. // getRefreshFrequencyCondition
  441. // @Description: 根据时间和刷新频率获取条件
  442. // @author: Roc
  443. // @datetime 2024-01-09 16:27:11
  444. // @param now time.Time
  445. // @param refreshFrequency string
  446. // @return condition string
  447. // @return pars []interface{}
  448. // @return isHandler bool
  449. func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) {
  450. isHandler = true
  451. var dayNum int
  452. var isLastDay bool
  453. //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
  454. switch refreshFrequency {
  455. case "每自然日":
  456. // 自然日不需要额外条件
  457. return
  458. case "每交易日":
  459. // 周六日不处理
  460. if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday {
  461. isHandler = false
  462. }
  463. return
  464. case "每周":
  465. currWeekDay := now.Weekday()
  466. if currWeekDay == time.Sunday {
  467. currWeekDay = 7
  468. isLastDay = true
  469. }
  470. dayNum = int(currWeekDay)
  471. case "每旬":
  472. currDay := now.Day()
  473. if currDay <= 10 {
  474. dayNum = currDay
  475. // 如果是这旬的最后一天
  476. if currDay == 10 {
  477. isLastDay = true
  478. }
  479. } else if currDay <= 20 {
  480. dayNum = currDay - 10
  481. // 如果是这旬的最后一天
  482. if currDay == 20 {
  483. isLastDay = true
  484. }
  485. } else {
  486. dayNum = currDay - 20
  487. // 当月的最后一天
  488. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  489. // 如果是这旬的最后一天
  490. if currDay == monthLastDay.Day() {
  491. isLastDay = true
  492. }
  493. }
  494. case "每月":
  495. // 当前日期
  496. currDay := now.Day()
  497. dayNum = currDay
  498. // 当期的最后一天
  499. monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  500. // 如果是这期的最后一天
  501. if currDay == monthLastDay.Day() {
  502. isLastDay = true
  503. }
  504. case "每季":
  505. // 当期的第一天 ; 当期的最后一天
  506. var startDay, endDay time.Time
  507. currMonth := now.Month()
  508. currDay := now.Day()
  509. if currMonth <= 3 {
  510. // 当季的第一天
  511. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  512. // 当季的最后一天
  513. endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  514. } else if currMonth <= 6 {
  515. // 当期的第一天
  516. startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local)
  517. // 当期的最后一天
  518. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  519. } else if currMonth <= 9 {
  520. // 当期的第一天
  521. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  522. // 当期的最后一天
  523. endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  524. } else {
  525. // 当期的第一天
  526. startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local)
  527. // 当期的最后一天
  528. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  529. }
  530. // 计算这期的第一天和当日的天数
  531. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  532. // 如果是这期的最后一天
  533. if currMonth == endDay.Month() && currDay == endDay.Day() {
  534. isLastDay = true
  535. }
  536. case "每半年":
  537. // 当期的第一天 ; 当期的最后一天
  538. var startDay, endDay time.Time
  539. currMonth := now.Month()
  540. currDay := now.Day()
  541. if currMonth <= 6 {
  542. // 当期的第一天
  543. startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  544. // 当期的最后一天
  545. endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  546. } else {
  547. // 当期的第一天
  548. startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
  549. // 当期的最后一天
  550. endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  551. }
  552. // 计算这期的第一天和当日的天数
  553. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  554. // 如果是这期的最后一天
  555. if currMonth == endDay.Month() && currDay == endDay.Day() {
  556. isLastDay = true
  557. }
  558. case "每年":
  559. currMonth := now.Month()
  560. currDay := now.Day()
  561. // 当期的第一天
  562. startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
  563. // 当期的最后一天
  564. endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
  565. // 计算这期的第一天和当日的天数
  566. dayNum = utils.GetTimeSubDay(startDay, now) + 1
  567. // 如果是这期的最后一天
  568. if currMonth == endDay.Month() && currDay == endDay.Day() {
  569. isLastDay = true
  570. }
  571. }
  572. // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数
  573. if isLastDay {
  574. condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )`
  575. pars = append(pars, 0, dayNum)
  576. } else {
  577. // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数
  578. condition += ` AND refresh_frequency_day = ? `
  579. pars = append(pars, dayNum)
  580. }
  581. return
  582. }
  583. // getPreviousHalfHour
  584. // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串
  585. // @author: Roc
  586. // @datetime 2024-01-09 14:27:34
  587. // @param now time.Time
  588. // @return string
  589. func getPreviousHalfHour(now time.Time) string {
  590. minute := now.Minute()
  591. if minute >= 30 {
  592. return fmt.Sprintf("%02d:%02d", now.Hour(), 30)
  593. }
  594. return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
  595. }
  596. // 根据配置把钢联化工和wind指标设置成禁止刷新
  597. func DisableEdbRefresh(cont context.Context) (err error) {
  598. //设置刷新key,如果没有执行完 报错提示
  599. cacheKey := "eta_task:DisableEdbRefresh"
  600. deleteCache := true
  601. defer func() {
  602. if deleteCache {
  603. utils.Rc.Delete(cacheKey)
  604. }
  605. if err != nil {
  606. tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
  607. utils.FileLog.Info(tips)
  608. go alarm_msg.SendAlarmMsg(tips, 3)
  609. }
  610. }()
  611. if !utils.Rc.SetNX(cacheKey, 1, 2*time.Minute) {
  612. deleteCache = false
  613. err = fmt.Errorf("系统处理中,请稍后重试!")
  614. return
  615. }
  616. //查询配置,如果未开启自动设置禁止刷新,则无需处理
  617. obj := new(models.BusinessConf)
  618. conf, err := obj.GetItemByConfKey("EdbStopRefreshRule")
  619. if err != nil {
  620. if err.Error() == utils.ErrNoRow() {
  621. err = fmt.Errorf("未找到配置项,无需处理")
  622. return
  623. }
  624. return
  625. }
  626. //将json转为结构体
  627. rule := new(models.EdbStopRefreshRule)
  628. err = json.Unmarshal([]byte(conf.ConfVal), rule)
  629. if err != nil {
  630. return
  631. }
  632. //判断是否开启自动设置禁止刷新
  633. if rule.IsOpen == 0 {
  634. return
  635. }
  636. //获取当前时间
  637. now := time.Now()
  638. if rule.BaseIndexStopDays > 0 { //设置数据源钢联化工指标禁止更新
  639. baseIndexEndDate := now.AddDate(0, 0, -rule.BaseIndexStopDays+1).Format(utils.FormatDate)
  640. // 查询钢联化工指标,查询创建时间在baseIndexStartDate前,的所有钢联化工指标,分批查询,先查总数,再查列表
  641. totalCount, e := data_manage.GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate)
  642. if e != nil {
  643. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  644. return
  645. }
  646. //分页查询
  647. pageSize := 100
  648. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  649. stopRefreshIds := make([]int32, 0)
  650. for i := 0; i < pageNum; i++ {
  651. start := i * pageSize
  652. indexItems, e := data_manage.GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate, start, pageSize)
  653. if e != nil {
  654. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  655. return
  656. }
  657. if len(indexItems) == 0 {
  658. continue
  659. }
  660. indexCodeList := make([]string, 0)
  661. for _, indexItem := range indexItems {
  662. indexCodeList = append(indexCodeList, indexItem.IndexCode)
  663. }
  664. condition := ` AND source=? AND edb_code in (` + utils.GetOrmInReplace(len(indexCodeList)) + `)`
  665. var pars []interface{}
  666. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexCodeList)
  667. // 查询指标库里这些指标是否已创建
  668. edbList, e := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  669. if e != nil {
  670. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  671. return
  672. }
  673. edbMap := make(map[string]bool)
  674. for _, edb := range edbList {
  675. edbMap[edb.EdbCode] = true
  676. }
  677. for _, indexItem := range indexItems {
  678. // 判断指标是否被创建
  679. if _, ok := edbMap[indexItem.IndexCode]; !ok {
  680. stopRefreshIds = append(stopRefreshIds, indexItem.BaseFromMysteelChemicalIndexId)
  681. if len(stopRefreshIds) > 100 {
  682. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  683. if err != nil {
  684. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  685. return
  686. }
  687. stopRefreshIds = make([]int32, 0)
  688. }
  689. }
  690. }
  691. }
  692. // 未被创建,则设置禁止刷新
  693. if len(stopRefreshIds) > 0 {
  694. err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
  695. if err != nil {
  696. err = fmt.Errorf("设置禁止刷新失败:%v", err)
  697. return
  698. }
  699. }
  700. }
  701. if rule.EdbStopDays > 0 {
  702. // 查询钢联和wind来源的指标
  703. edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
  704. condition := ` AND no_update=0 AND source in (?,?) AND create_time < ?`
  705. var pars []interface{}
  706. pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate)
  707. // 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
  708. totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
  709. if e != nil {
  710. err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
  711. return
  712. }
  713. //分页查询
  714. pageSize := 100
  715. pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
  716. stopRefreshIds := make([]int, 0)
  717. stopRefreshMysteelCode := make([]string, 0)
  718. fromEdbIdList := make([]int, 0)
  719. for i := 0; i < pageNum; i++ {
  720. start := i * pageSize
  721. edbItems, e := data_manage.GetEdbInfoPageByCondition(condition, pars, start, pageSize)
  722. if e != nil {
  723. err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
  724. return
  725. }
  726. if len(edbItems) == 0 {
  727. continue
  728. }
  729. edbInfoIds := make([]int, 0)
  730. fromEdbIdList = make([]int, 0)
  731. for _, item := range edbItems {
  732. edbInfoIds = append(edbInfoIds, item.EdbInfoId)
  733. }
  734. // 查询指标库里这些指标 引用情况
  735. relationList, e := data_manage.GetEdbInfoRelationByEdbInfoIds(edbInfoIds)
  736. if e != nil {
  737. err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
  738. return
  739. }
  740. edbMap := make(map[int]struct{})
  741. for _, item := range relationList {
  742. edbMap[item] = struct{}{}
  743. }
  744. for _, item := range edbItems {
  745. if _, ok := edbMap[item.EdbInfoId]; !ok {
  746. stopRefreshIds = append(stopRefreshIds, item.EdbInfoId)
  747. if item.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
  748. stopRefreshMysteelCode = append(stopRefreshMysteelCode, item.EdbCode)
  749. }
  750. if item.EdbInfoType == 0 && item.EdbType == 1 {
  751. fromEdbIdList = append(fromEdbIdList, item.EdbInfoId)
  752. }
  753. // 更新指标禁止刷新状态
  754. if len(stopRefreshIds) > 100 {
  755. // 查询相关的计算指标
  756. calculateEdbIdList := make([]int, 0)
  757. if len(fromEdbIdList) > 0 {
  758. hasFind := make(map[int]struct{})
  759. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  760. if err != nil {
  761. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  762. return
  763. }
  764. }
  765. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  766. if err != nil {
  767. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  768. return
  769. }
  770. stopRefreshIds = []int{}
  771. stopRefreshMysteelCode = []string{}
  772. }
  773. }
  774. }
  775. }
  776. // 更新指标禁止刷新状态
  777. if len(stopRefreshIds) > 0 {
  778. // 查询相关的计算指标
  779. calculateEdbIdList := make([]int, 0)
  780. if len(fromEdbIdList) > 0 {
  781. hasFind := make(map[int]struct{})
  782. calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind)
  783. if err != nil {
  784. err = fmt.Errorf("查询计算指标信息失败:%v", err)
  785. return
  786. }
  787. }
  788. err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList)
  789. if err != nil {
  790. err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
  791. return
  792. }
  793. }
  794. }
  795. return
  796. }