factor_edb_series.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package data
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/models"
  5. "eta/eta_api/models/data_manage"
  6. "eta/eta_api/services/alarm_msg"
  7. "eta/eta_api/utils"
  8. "fmt"
  9. "sync"
  10. "time"
  11. )
  12. // FactorEdbStepCalculate 因子指标-多公式计算
  13. func FactorEdbStepCalculate(seriesId int, edbArr []*data_manage.EdbInfo, calculates []data_manage.FactorEdbSeriesCalculatePars, lang string, recalculate bool) (calculateResp data_manage.FactorEdbSeriesStepCalculateResp, err error) {
  14. if len(edbArr) == 0 || len(calculates) == 0 {
  15. return
  16. }
  17. defer func() {
  18. if err != nil {
  19. tips := fmt.Sprintf("StepCalculate计算失败, ErrMsg: %v", err)
  20. fmt.Println(tips)
  21. utils.FileLog.Info(tips)
  22. go alarm_msg.SendAlarmMsg(tips, 3)
  23. }
  24. if len(calculateResp.Fail) > 0 {
  25. tips := "StepCalculate计算失败, ErrMsg: "
  26. for _, f := range calculateResp.Fail {
  27. tips += fmt.Sprintf("code: %s, err: %s\n", f.EdbCode, f.ErrMsg)
  28. }
  29. fmt.Println(tips)
  30. utils.FileLog.Info(tips)
  31. go alarm_msg.SendAlarmMsg(tips, 2)
  32. }
  33. }()
  34. // 重新计算-先清除原数据
  35. calculateDataOb := new(data_manage.FactorEdbSeriesCalculateData)
  36. if recalculate {
  37. cond := fmt.Sprintf("%s = ?", calculateDataOb.Cols().FactorEdbSeriesId)
  38. pars := make([]interface{}, 0)
  39. pars = append(pars, seriesId)
  40. if e := calculateDataOb.RemoveByCondition(cond, pars); e != nil {
  41. err = fmt.Errorf("清除原数据失败, err: %v", e)
  42. return
  43. }
  44. }
  45. wg := sync.WaitGroup{}
  46. calculateWorkers := make(chan struct{}, 10)
  47. for _, edb := range edbArr {
  48. wg.Add(1)
  49. go func(v *data_manage.EdbInfo) {
  50. defer func() {
  51. wg.Done()
  52. <-calculateWorkers
  53. }()
  54. calculateWorkers <- struct{}{}
  55. var result data_manage.FactorEdbSeriesStepCalculateResult
  56. result.EdbInfoId = v.EdbInfoId
  57. result.EdbCode = v.EdbCode
  58. result.Msg = "计算失败"
  59. // 获取基础数据
  60. edbData, e := data_manage.GetEdbDataAllByEdbCode(v.EdbCode, v.Source, v.SubSource, 0)
  61. if e != nil {
  62. result.ErrMsg = fmt.Sprintf("获取基础数据失败, edbCode: %s, err: %v", v.EdbCode, e)
  63. calculateResp.Fail = append(calculateResp.Fail, result)
  64. return
  65. }
  66. if len(edbData) == 0 {
  67. result.Msg = "该指标无基础数据"
  68. result.ErrMsg = fmt.Sprintf("该指标无基础数据, edbCode: %s", v.EdbCode)
  69. calculateResp.Fail = append(calculateResp.Fail, result)
  70. return
  71. }
  72. // 请求指标服务进行计算
  73. j, e := json.Marshal(data_manage.BaseStepCalculateReq{
  74. DataList: edbData,
  75. Calculates: calculates,
  76. })
  77. if e != nil {
  78. result.ErrMsg = fmt.Sprintf("请求体JSON格式化失败, edbCode: %s, err: %v", v.EdbCode, e)
  79. calculateResp.Fail = append(calculateResp.Fail, result)
  80. return
  81. }
  82. requestRes, e := BaseStepCalculate(string(j), lang)
  83. if e != nil {
  84. result.ErrMsg = fmt.Sprintf("指标计算响应失败, edbCode: %s, err: %v", v.EdbCode, e)
  85. calculateResp.Fail = append(calculateResp.Fail, result)
  86. return
  87. }
  88. if requestRes.Ret != 200 {
  89. result.Msg = requestRes.Msg
  90. result.ErrMsg = requestRes.ErrMsg
  91. calculateResp.Fail = append(calculateResp.Fail, result)
  92. return
  93. }
  94. // 计算成功的保存结果
  95. dataArr := make([]*data_manage.FactorEdbSeriesCalculateData, 0)
  96. for _, d := range requestRes.Data.DateList {
  97. val, ok := requestRes.Data.DataMap[d]
  98. if !ok {
  99. continue
  100. }
  101. dataTime, e := time.ParseInLocation(time.DateOnly, d, time.Local)
  102. if e != nil {
  103. result.ErrMsg = fmt.Sprintf("解析计算结果日期失败, edbCode: %s, date: %s, err: %v, ", v.EdbCode, d, e)
  104. calculateResp.Fail = append(calculateResp.Fail, result)
  105. return
  106. }
  107. dataArr = append(dataArr, &data_manage.FactorEdbSeriesCalculateData{
  108. FactorEdbSeriesId: seriesId,
  109. EdbInfoId: v.EdbInfoId,
  110. EdbCode: v.EdbCode,
  111. DataTime: dataTime,
  112. Value: val,
  113. CreateTime: time.Now().Local(),
  114. ModifyTime: time.Now().Local(),
  115. DataTimestamp: dataTime.UnixNano() / 1e6,
  116. })
  117. }
  118. if len(dataArr) == 0 {
  119. result.Msg = "计算结果无数据"
  120. result.ErrMsg = fmt.Sprintf("计算结果无数据, edbCode: %s", v.EdbCode)
  121. calculateResp.Fail = append(calculateResp.Fail, result)
  122. return
  123. }
  124. if e = calculateDataOb.CreateMulti(dataArr); e != nil {
  125. result.ErrMsg = fmt.Sprintf("保存计算结果失败, edbCode: %s, err: %v, ", v.EdbCode, e)
  126. calculateResp.Fail = append(calculateResp.Fail, result)
  127. return
  128. }
  129. result.Msg = "计算成功"
  130. calculateResp.Success = append(calculateResp.Success, result)
  131. }(edb)
  132. }
  133. wg.Wait()
  134. return
  135. }
  136. // PostRefreshFactorEdbRecalculate 因子指标重计算
  137. func PostRefreshFactorEdbRecalculate(edbInfoId int, edbCode string) (resp *models.BaseResponse, err error) {
  138. param := make(map[string]interface{})
  139. param["EdbInfoId"] = edbInfoId
  140. param["EdbCode"] = edbCode
  141. postUrl := fmt.Sprintf("%s%s", utils.EDB_LIB_URL, "factor_edb_series/recalculate")
  142. postData, e := json.Marshal(param)
  143. if e != nil {
  144. err = fmt.Errorf("param json err: %v", e)
  145. return
  146. }
  147. result, e := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json")
  148. if e != nil {
  149. err = fmt.Errorf("http post err: %v", e)
  150. return
  151. }
  152. utils.FileLog.Info("PostRefreshFactorEdbRecalculate:" + postUrl + ";" + string(postData) + ";result:" + string(result))
  153. if e = json.Unmarshal(result, &resp); e != nil {
  154. err = fmt.Errorf("resp unmarshal err: %v", e)
  155. return
  156. }
  157. return
  158. }
  159. // PostRefreshFactorEdbChartRecalculate 因子指标图表重计算
  160. func PostRefreshFactorEdbChartRecalculate(chartInfoId int) (resp *models.BaseResponse, err error) {
  161. param := make(map[string]interface{})
  162. param["ChartInfoId"] = chartInfoId
  163. postUrl := fmt.Sprintf("%s%s", utils.EDB_LIB_URL, "factor_edb_series/chart_recalculate")
  164. postData, e := json.Marshal(param)
  165. if e != nil {
  166. err = fmt.Errorf("param json err: %v", e)
  167. return
  168. }
  169. result, e := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json")
  170. if e != nil {
  171. err = fmt.Errorf("http post err: %v", e)
  172. return
  173. }
  174. utils.FileLog.Info("PostRefreshFactorEdbChartRecalculate:" + postUrl + ";" + string(postData) + ";result:" + string(result))
  175. if e = json.Unmarshal(result, &resp); e != nil {
  176. err = fmt.Errorf("resp unmarshal err: %v", e)
  177. return
  178. }
  179. return
  180. }