factor_edb_series.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package data
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/models/data_manage"
  5. "eta/eta_api/services/alarm_msg"
  6. "eta/eta_api/utils"
  7. "fmt"
  8. "sync"
  9. "time"
  10. )
  11. // FactorEdbStepCalculate 因子指标-多公式计算
  12. func FactorEdbStepCalculate(seriesId int, edbArr []*data_manage.EdbInfo, calculates []data_manage.FactorEdbSeriesCalculatePars, lang string, recalculate bool) (calculateResp data_manage.FactorEdbSeriesStepCalculateResp, err error) {
  13. if len(edbArr) == 0 || len(calculates) == 0 {
  14. return
  15. }
  16. defer func() {
  17. if err != nil {
  18. tips := fmt.Sprintf("StepCalculate计算失败, ErrMsg: %v", err)
  19. fmt.Println(tips)
  20. utils.FileLog.Info(tips)
  21. go alarm_msg.SendAlarmMsg(tips, 3)
  22. }
  23. if len(calculateResp.Fail) > 0 {
  24. tips := "StepCalculate计算失败, ErrMsg: "
  25. for _, f := range calculateResp.Fail {
  26. tips += fmt.Sprintf("code: %s, err: %s\n", f.EdbCode, f.ErrMsg)
  27. }
  28. fmt.Println(tips)
  29. utils.FileLog.Info(tips)
  30. go alarm_msg.SendAlarmMsg(tips, 2)
  31. }
  32. }()
  33. // 重新计算-先清除原数据
  34. calculateDataOb := new(data_manage.FactorEdbSeriesCalculateData)
  35. if recalculate {
  36. cond := fmt.Sprintf("%s = ?", calculateDataOb.Cols().FactorEdbSeriesId)
  37. pars := make([]interface{}, 0)
  38. pars = append(pars, seriesId)
  39. if e := calculateDataOb.RemoveByCondition(cond, pars); e != nil {
  40. err = fmt.Errorf("清除原数据失败, err: %v", e)
  41. return
  42. }
  43. }
  44. wg := sync.WaitGroup{}
  45. calculateWorkers := make(chan struct{}, 10)
  46. for _, edb := range edbArr {
  47. wg.Add(1)
  48. go func(v *data_manage.EdbInfo) {
  49. defer func() {
  50. wg.Done()
  51. <-calculateWorkers
  52. }()
  53. calculateWorkers <- struct{}{}
  54. var result data_manage.FactorEdbSeriesStepCalculateResult
  55. result.EdbInfoId = v.EdbInfoId
  56. result.EdbCode = v.EdbCode
  57. result.Msg = "计算失败"
  58. // 获取基础数据
  59. edbData, e := data_manage.GetEdbDataAllByEdbCode(v.EdbCode, v.Source, v.SubSource, 0)
  60. if e != nil {
  61. result.ErrMsg = fmt.Sprintf("获取基础数据失败, edbCode: %s, err: %v", v.EdbCode, e)
  62. calculateResp.Fail = append(calculateResp.Fail, result)
  63. return
  64. }
  65. if len(edbData) == 0 {
  66. result.Msg = "该指标无基础数据"
  67. result.ErrMsg = fmt.Sprintf("该指标无基础数据, edbCode: %s", v.EdbCode)
  68. calculateResp.Fail = append(calculateResp.Fail, result)
  69. return
  70. }
  71. // 请求指标服务进行计算
  72. j, e := json.Marshal(data_manage.BaseStepCalculateReq{
  73. DataList: edbData,
  74. Calculates: calculates,
  75. })
  76. if e != nil {
  77. result.ErrMsg = fmt.Sprintf("请求体JSON格式化失败, edbCode: %s, err: %v", v.EdbCode, e)
  78. calculateResp.Fail = append(calculateResp.Fail, result)
  79. return
  80. }
  81. requestRes, e := BaseStepCalculate(string(j), lang)
  82. if e != nil {
  83. result.ErrMsg = fmt.Sprintf("指标计算响应失败, edbCode: %s, err: %v", v.EdbCode, e)
  84. calculateResp.Fail = append(calculateResp.Fail, result)
  85. return
  86. }
  87. if requestRes.Ret != 200 {
  88. result.Msg = requestRes.Msg
  89. result.ErrMsg = requestRes.ErrMsg
  90. calculateResp.Fail = append(calculateResp.Fail, result)
  91. return
  92. }
  93. // 计算成功的保存结果
  94. dataArr := make([]*data_manage.FactorEdbSeriesCalculateData, 0)
  95. for _, d := range requestRes.Data.DateList {
  96. val, ok := requestRes.Data.DataMap[d]
  97. if !ok {
  98. continue
  99. }
  100. dataTime, e := time.ParseInLocation(time.DateOnly, d, time.Local)
  101. if e != nil {
  102. result.ErrMsg = fmt.Sprintf("解析计算结果日期失败, edbCode: %s, date: %s, err: %v, ", v.EdbCode, d, e)
  103. calculateResp.Fail = append(calculateResp.Fail, result)
  104. return
  105. }
  106. dataArr = append(dataArr, &data_manage.FactorEdbSeriesCalculateData{
  107. FactorEdbSeriesId: seriesId,
  108. EdbInfoId: v.EdbInfoId,
  109. EdbCode: v.EdbCode,
  110. DataTime: dataTime,
  111. Value: val,
  112. CreateTime: time.Now().Local(),
  113. ModifyTime: time.Now().Local(),
  114. DataTimestamp: dataTime.UnixNano() / 1e6,
  115. })
  116. }
  117. if len(dataArr) == 0 {
  118. result.Msg = "计算结果无数据"
  119. result.ErrMsg = fmt.Sprintf("计算结果无数据, edbCode: %s", v.EdbCode)
  120. calculateResp.Fail = append(calculateResp.Fail, result)
  121. return
  122. }
  123. if e = calculateDataOb.CreateMulti(dataArr); e != nil {
  124. result.ErrMsg = fmt.Sprintf("保存计算结果失败, edbCode: %s, err: %v, ", v.EdbCode, e)
  125. calculateResp.Fail = append(calculateResp.Fail, result)
  126. return
  127. }
  128. result.Msg = "计算成功"
  129. calculateResp.Success = append(calculateResp.Success, result)
  130. }(edb)
  131. }
  132. wg.Wait()
  133. return
  134. }