edb_info.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package data
  2. import (
  3. "context"
  4. "errors"
  5. "eta_gn/eta_task/models/data_manage"
  6. "eta_gn/eta_task/models/data_manage/edb_refresh"
  7. "eta_gn/eta_task/services/alarm_msg"
  8. "eta_gn/eta_task/utils"
  9. "fmt"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. // RefreshDataFromCalculateAll 刷新所有计算指标
  15. func RefreshDataFromCalculateAll() (err error) {
  16. errMsgList := make([]string, 0)
  17. defer func() {
  18. if err != nil {
  19. errMsg := "刷新所有计算指标失败 ErrMsg:" + err.Error()
  20. utils.FileLog.Info(errMsg)
  21. go alarm_msg.SendAlarmMsg(errMsg, 3)
  22. }
  23. if len(errMsgList) > 0 {
  24. utils.FileLog.Info("刷新所有计算指标失败 ErrMsg:" + strings.Join(errMsgList, "\n"))
  25. go alarm_msg.SendAlarmMsg("刷新所有计算指标失败 ErrMsg:"+strings.Join(errMsgList, "\n"), 3)
  26. }
  27. }()
  28. var condition string
  29. var pars []interface{}
  30. // 查询 普通指标的计算指标
  31. condition += " AND edb_type=? AND edb_info_type=? AND no_update=0"
  32. pars = append(pars, 2, 0)
  33. items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  34. if err != nil {
  35. return err
  36. }
  37. nowStr := time.Now().AddDate(0, 0, -1).Format(utils.FormatDate)
  38. startDateOfWeek := utils.GetNowWeekMonday()
  39. endDateOfWeek := utils.GetNowWeekLastDay()
  40. for _, v := range items {
  41. if v.Frequency == "日度" {
  42. if v.EndDate.Format(utils.FormatDate) == nowStr {
  43. continue
  44. }
  45. } else if v.Frequency == "周度" {
  46. if !v.EndDate.Before(startDateOfWeek) && !v.EndDate.After(endDateOfWeek) {
  47. continue
  48. }
  49. }
  50. source := v.Source
  51. startDate := v.StartDate.Format(utils.FormatDate)
  52. if startDate == "0001-01-01" {
  53. continue
  54. }
  55. fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source)
  56. fmt.Println("RefreshEdbCalculateData", v.EdbInfoId, v.EdbCode, startDate)
  57. result, tmpErr := RefreshEdbCalculateData(v.EdbInfoId, v.EdbCode, startDate)
  58. if tmpErr != nil {
  59. errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error()))
  60. continue
  61. }
  62. if result.Ret != 200 {
  63. errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;msg:%s;errMsg:%s", v.EdbInfoId, v.EdbCode, result.Msg, result.ErrMsg))
  64. //return err
  65. continue
  66. }
  67. }
  68. return err
  69. }
  70. // RefreshBasePredictDataAll 刷新所有的基础预测指标
  71. func RefreshBasePredictDataAll() (err error) {
  72. errMsgList := make([]string, 0)
  73. defer func() {
  74. if err != nil {
  75. errMsg := "刷新所有计算指标失败 ErrMsg:" + err.Error()
  76. utils.FileLog.Info(errMsg)
  77. go alarm_msg.SendAlarmMsg(errMsg, 3)
  78. }
  79. if len(errMsgList) > 0 {
  80. utils.FileLog.Info("刷新所有基础预测指标失败 ErrMsg:" + strings.Join(errMsgList, "\n"))
  81. go alarm_msg.SendAlarmMsg("刷新所有基础预测指标失败 ErrMsg:"+strings.Join(errMsgList, "\n"), 3)
  82. }
  83. }()
  84. var condition string
  85. var pars []interface{}
  86. // 查询 普通指标的计算指标
  87. condition += " AND edb_type=? AND edb_info_type=? "
  88. pars = append(pars, 1, 1)
  89. items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  90. if err != nil {
  91. errMsgList = append(errMsgList, fmt.Sprintf("获取基础预测指标列表失败;err:%s", err.Error()))
  92. return err
  93. }
  94. for _, v := range items {
  95. source := v.Source
  96. startDate := v.StartDate.Format(utils.FormatDate)
  97. if startDate == "0001-01-01" {
  98. continue
  99. }
  100. fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source, "======RefreshBasePredictDataAll:", startDate)
  101. result, tmpErr := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  102. if tmpErr != nil {
  103. errMsgList = append(errMsgList, fmt.Sprintf("刷新基础预测指标失败1,指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error()))
  104. continue
  105. }
  106. if result.Ret != 200 {
  107. errMsgList = append(errMsgList, fmt.Sprintf("刷新基础预测指标失败2,指标ID:%d;指标编码:%s;报错提示信息msg:%s;报错信息err:%s", v.EdbInfoId, v.EdbCode, result.ErrMsg, result.Msg))
  108. continue
  109. }
  110. }
  111. return err
  112. }
  113. // RefreshPredictDataFromCalculateAll 刷新所有预测计算指标
  114. func RefreshPredictDataFromCalculateAll() (err error) {
  115. errMsgList := make([]string, 0)
  116. defer func() {
  117. if err != nil {
  118. errMsg := "刷新所有计算预测指标失败 ErrMsg:" + err.Error()
  119. utils.FileLog.Info(errMsg)
  120. go alarm_msg.SendAlarmMsg(errMsg, 3)
  121. }
  122. if len(errMsgList) > 0 {
  123. utils.FileLog.Info("刷新所有计算预测指标失败 Err:" + strings.Join(errMsgList, "\n"))
  124. go alarm_msg.SendAlarmMsg("刷新所有计算预测指标失败 Err:"+strings.Join(errMsgList, "\n"), 3)
  125. }
  126. }()
  127. var condition string
  128. var pars []interface{}
  129. // 查询 普通指标的计算指标
  130. condition += " AND edb_type=? AND edb_info_type=? "
  131. pars = append(pars, 2, 1)
  132. items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  133. if err != nil {
  134. return err
  135. }
  136. for _, v := range items {
  137. source := v.Source
  138. startDate := v.StartDate.Format(utils.FormatDate)
  139. if startDate == "0001-01-01" {
  140. continue
  141. }
  142. fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source, "======RefreshPredictEdbCalculateData:", startDate)
  143. result, tmpErr := RefreshPredictEdbCalculateData(v.EdbInfoId, v.EdbCode, startDate)
  144. if tmpErr != nil {
  145. errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error()))
  146. continue
  147. }
  148. if result.Ret != 200 {
  149. errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;msg:%s;errMsg:%s", v.EdbInfoId, v.EdbCode, result.Msg, result.ErrMsg))
  150. continue
  151. }
  152. }
  153. return err
  154. }
  155. // RefreshDataFromManual 刷新手工指标数据
  156. func RefreshDataFromManual(wg *sync.WaitGroup) (err error) {
  157. errMsgList := make([]string, 0)
  158. defer func() {
  159. if err != nil {
  160. fmt.Println("RefreshDataFromManual Err:" + err.Error())
  161. go alarm_msg.SendAlarmMsg("RefreshDataFromManual ErrMsg:"+err.Error(), 3)
  162. }
  163. if len(errMsgList) > 0 {
  164. errMsg := "RefreshDataFromManual Err:" + strings.Join(errMsgList, "\n")
  165. fmt.Println(errMsg)
  166. go alarm_msg.SendAlarmMsg(errMsg, 3)
  167. }
  168. wg.Done()
  169. }()
  170. var condition string
  171. var pars []interface{}
  172. condition += " AND source=? "
  173. pars = append(pars, utils.DATA_SOURCE_MANUAL)
  174. items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  175. if err != nil {
  176. return errors.New("GetEdbInfoByCondition:" + err.Error())
  177. }
  178. for _, v := range items {
  179. startDate := v.StartDate.Format(utils.FormatDate)
  180. resp, err := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  181. if err != nil {
  182. errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+err.Error())
  183. continue
  184. }
  185. if resp.Ret != 200 {
  186. errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
  187. continue
  188. }
  189. }
  190. return err
  191. }
  192. func ResetEdbInfoIsUpdate(cont context.Context) (err error) {
  193. go data_manage.ResetEdbInfoIsUpdate()
  194. return nil
  195. }
  196. // RefreshBaseDataFromSource
  197. // @Description: 刷新基础数据
  198. // @author: Roc
  199. // @datetime 2024-08-01 18:10:03
  200. // @param wg *sync.WaitGroup
  201. // @param source int
  202. // @return err error
  203. func RefreshBaseDataFromSource(wg *sync.WaitGroup, source int) (err error) {
  204. errMsgList := make([]string, 0)
  205. defer func() {
  206. if err != nil {
  207. errMsg := fmt.Sprintf("刷新基础数据失败,来源:%d,ErrMsg:%s", source, err.Error())
  208. utils.FileLog.Info(errMsg)
  209. go alarm_msg.SendAlarmMsg(errMsg, 3)
  210. }
  211. if len(errMsgList) > 0 {
  212. errMsg := fmt.Sprintf("刷新基础数据失败,来源:%d,ErrMsg:%s", source, strings.Join(errMsgList, "\n"))
  213. utils.FileLog.Info(errMsg)
  214. go alarm_msg.SendAlarmMsg(errMsg, 3)
  215. }
  216. wg.Done()
  217. }()
  218. var condition string
  219. var pars []interface{}
  220. condition += " AND source=? "
  221. pars = append(pars, source)
  222. items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  223. if err != nil {
  224. return errors.New("GetEdbInfoByCondition:" + err.Error())
  225. }
  226. for _, v := range items {
  227. startDate := ""
  228. if v.Frequency == "日度" {
  229. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  230. } else if v.Frequency == "周度" {
  231. startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate)
  232. } else if v.Frequency == "月度" {
  233. startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate)
  234. } else if v.Frequency == "季度" {
  235. startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate)
  236. } else if v.Frequency == "年度" {
  237. startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate)
  238. } else {
  239. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  240. }
  241. resp, tmpErr := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  242. if err != nil {
  243. errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error()))
  244. continue
  245. }
  246. if resp.Ret != 200 {
  247. errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;msg:%s;errMsg:%s", v.EdbInfoId, v.EdbCode, resp.Msg, resp.ErrMsg))
  248. continue
  249. }
  250. }
  251. return err
  252. }
  253. // RefreshDataFromTradeAnalysis 刷新持仓分析指标
  254. func RefreshDataFromTradeAnalysis(wg *sync.WaitGroup) (err error) {
  255. utils.FileLog.Info(fmt.Sprintf("持仓分析指标刷新开始: %s", time.Now().Format(utils.FormatDateTime)))
  256. errMsgList := make([]string, 0)
  257. defer func() {
  258. if err != nil {
  259. tips := fmt.Sprintf("RefreshDataFromTradeAnalysis err: %v", err)
  260. utils.FileLog.Info(tips)
  261. go alarm_msg.SendAlarmMsg(tips, 3)
  262. }
  263. if len(errMsgList) > 0 {
  264. tips := fmt.Sprintf("RefreshDataFromTradeAnalysis ErrMsg: %s", strings.Join(errMsgList, "\n"))
  265. utils.FileLog.Info(tips)
  266. go alarm_msg.SendAlarmMsg(tips, 3)
  267. }
  268. wg.Done()
  269. }()
  270. var condition string
  271. var pars []interface{}
  272. condition += ` AND source = ? AND no_update = 0 `
  273. pars = append(pars, utils.DATA_SOURCE_TRADE_ANALYSIS)
  274. items, e := data_manage.GetEdbInfoByCondition(condition, pars, 0)
  275. if e != nil {
  276. err = fmt.Errorf("获取持仓分析指标失败, %v", e)
  277. return
  278. }
  279. for _, v := range items {
  280. // 持仓分析指标只有日度
  281. startDate := v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  282. resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  283. if e != nil {
  284. errMsgList = append(errMsgList, fmt.Sprintf("EdbCode: %s, RefreshEdbData err: %v", v.EdbCode, e))
  285. continue
  286. }
  287. if resp.Ret != 200 {
  288. errMsgList = append(errMsgList, fmt.Sprintf("EdbCode: %s, RefreshEdbData err: %v, errMsg: %s", v.EdbCode, e, resp.ErrMsg))
  289. continue
  290. }
  291. }
  292. utils.FileLog.Info(fmt.Sprintf("持仓分析指标刷新结束: %s", time.Now().Format(utils.FormatDateTime)))
  293. return err
  294. }
  295. // NoneConfigRefreshDataGn 刷新未配置的所有指标来源
  296. func NoneConfigRefreshDataGn(wg *sync.WaitGroup) (err error) {
  297. errMsgList := make([]string, 0)
  298. defer func() {
  299. if err != nil {
  300. tips := fmt.Sprintf("NoneConfigRefreshDataGn ErrMsg: %v", err)
  301. utils.FileLog.Info(tips)
  302. }
  303. if len(errMsgList) > 0 {
  304. errMsg := "NoneConfigRefreshDataGn Err:" + strings.Join(errMsgList, "\n")
  305. utils.FileLog.Info(errMsg)
  306. }
  307. wg.Done()
  308. }()
  309. // 过滤出无刷新配置的指标数据来源
  310. sourceRefreshIds := make([]int, 0)
  311. {
  312. cond := fmt.Sprintf(" AND is_base = ?")
  313. pars := make([]interface{}, 0)
  314. pars = append(pars, 1)
  315. list, e := data_manage.GetEdbSourceItemsByCondition(cond, pars, []string{}, "")
  316. if e != nil {
  317. err = fmt.Errorf("获取需要刷新的基础指标来源失败, %v", e)
  318. return
  319. }
  320. sourceBaseIds := make([]int, 0)
  321. for _, v := range list {
  322. sourceBaseIds = append(sourceBaseIds, v.EdbSourceId)
  323. }
  324. // 获取默认刷新中配置过的数据来源
  325. hasConfigIds, e := edb_refresh.GetDistinctDefaultRefreshSourceIds()
  326. if e != nil {
  327. err = fmt.Errorf("获取已配置过刷新的指标来源失败, %v", e)
  328. return
  329. }
  330. // 取两个[]int的差集
  331. sourceRefreshIds = utils.MinusInt(sourceBaseIds, hasConfigIds)
  332. }
  333. if len(sourceRefreshIds) == 0 {
  334. utils.FileLog.Info("无未配置刷新时间的指标来源需要刷新")
  335. return
  336. }
  337. // 根据数据源依次刷新
  338. for _, sourceId := range sourceRefreshIds {
  339. cond := ` AND source = ? `
  340. pars := make([]interface{}, 0)
  341. pars = append(pars, sourceId)
  342. // 获取指标
  343. items, e := data_manage.GetEdbInfoByCondition(cond, pars, 0)
  344. if e != nil {
  345. err = fmt.Errorf("获取指标失败, Source: %d, Err: %v", sourceId, e)
  346. return
  347. }
  348. for _, v := range items {
  349. startDate := ""
  350. if v.EndDate.IsZero() {
  351. startDate = utils.BaseEdbRefreshStartDate
  352. }
  353. if v.Frequency == "日度" {
  354. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  355. } else if v.Frequency == "周度" {
  356. startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate)
  357. } else if v.Frequency == "月度" {
  358. startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate)
  359. } else if v.Frequency == "季度" {
  360. startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate)
  361. } else if v.Frequency == "年度" {
  362. startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate)
  363. } else {
  364. startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
  365. }
  366. resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
  367. if e != nil {
  368. errMsgList = append(errMsgList, fmt.Sprintf("RefreshEdbData Code: %s Err: %v\n", v.EdbCode, e))
  369. continue
  370. }
  371. if resp.Ret != 200 {
  372. errMsgList = append(errMsgList, fmt.Sprintf("RefreshEdbData Code: %s Err: %s, ErrMsg: %s\n", v.EdbCode, resp.Msg, resp.ErrMsg))
  373. continue
  374. }
  375. }
  376. }
  377. return
  378. }