factor_edb_series.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package data
  2. import (
  3. "encoding/json"
  4. "eta_gn/eta_api/models"
  5. "eta_gn/eta_api/models/data_manage"
  6. "eta_gn/eta_api/services/alarm_msg"
  7. "eta_gn/eta_api/utils"
  8. "fmt"
  9. "sync"
  10. "time"
  11. )
  12. func FactorEdbStepCalculate(seriesId int, edbArr []*data_manage.EdbInfo, calculates []data_manage.FactorEdbSeriesCalculatePars, lang string, recalculate bool) (calculateResp data_manage.FactorEdbSeriesStepCalculateResp, err error) {
  13. if len(edbArr) == 0 || len(calculates) == 0 {
  14. return
  15. }
  16. defer func() {
  17. if err != nil {
  18. tips := fmt.Sprintf("StepCalculate计算失败, ErrMsg: %v", err)
  19. fmt.Println(tips)
  20. utils.FileLog.Info(tips)
  21. go alarm_msg.SendAlarmMsg(tips, 3)
  22. }
  23. if len(calculateResp.Fail) > 0 {
  24. tips := "StepCalculate计算失败, ErrMsg: "
  25. for _, f := range calculateResp.Fail {
  26. tips += fmt.Sprintf("code: %s, err: %s\n", f.EdbCode, f.ErrMsg)
  27. }
  28. fmt.Println(tips)
  29. utils.FileLog.Info(tips)
  30. go alarm_msg.SendAlarmMsg(tips, 2)
  31. }
  32. }()
  33. calculateDataOb := new(data_manage.FactorEdbSeriesCalculateData)
  34. if recalculate {
  35. cond := fmt.Sprintf("%s = ?", calculateDataOb.Cols().FactorEdbSeriesId)
  36. pars := make([]interface{}, 0)
  37. pars = append(pars, seriesId)
  38. if e := calculateDataOb.RemoveByCondition(cond, pars); e != nil {
  39. err = fmt.Errorf("清除原数据失败, err: %v", e)
  40. return
  41. }
  42. }
  43. wg := sync.WaitGroup{}
  44. calculateWorkers := make(chan struct{}, 10)
  45. for _, edb := range edbArr {
  46. wg.Add(1)
  47. go func(v *data_manage.EdbInfo) {
  48. defer func() {
  49. wg.Done()
  50. <-calculateWorkers
  51. }()
  52. calculateWorkers <- struct{}{}
  53. var result data_manage.FactorEdbSeriesStepCalculateResult
  54. result.EdbInfoId = v.EdbInfoId
  55. result.EdbCode = v.EdbCode
  56. result.Msg = "计算失败"
  57. edbData, e := data_manage.GetEdbDataAllByEdbCode(v.EdbCode, v.Source, v.SubSource, 0)
  58. if e != nil {
  59. result.ErrMsg = fmt.Sprintf("获取基础数据失败, edbCode: %s, err: %v", v.EdbCode, e)
  60. calculateResp.Fail = append(calculateResp.Fail, result)
  61. return
  62. }
  63. if len(edbData) == 0 {
  64. result.Msg = "该指标无基础数据"
  65. result.ErrMsg = fmt.Sprintf("该指标无基础数据, edbCode: %s", v.EdbCode)
  66. calculateResp.Fail = append(calculateResp.Fail, result)
  67. return
  68. }
  69. j, e := json.Marshal(data_manage.BaseStepCalculateReq{
  70. DataList: edbData,
  71. Calculates: calculates,
  72. })
  73. if e != nil {
  74. result.ErrMsg = fmt.Sprintf("请求体JSON格式化失败, edbCode: %s, err: %v", v.EdbCode, e)
  75. calculateResp.Fail = append(calculateResp.Fail, result)
  76. return
  77. }
  78. requestRes, e := BaseStepCalculate(string(j), lang)
  79. if e != nil {
  80. result.ErrMsg = fmt.Sprintf("指标计算响应失败, edbCode: %s, err: %v", v.EdbCode, e)
  81. calculateResp.Fail = append(calculateResp.Fail, result)
  82. return
  83. }
  84. if requestRes.Ret != 200 {
  85. result.Msg = requestRes.Msg
  86. result.ErrMsg = requestRes.ErrMsg
  87. calculateResp.Fail = append(calculateResp.Fail, result)
  88. return
  89. }
  90. dataArr := make([]*data_manage.FactorEdbSeriesCalculateData, 0)
  91. for _, d := range requestRes.Data.DateList {
  92. val, ok := requestRes.Data.DataMap[d]
  93. if !ok {
  94. continue
  95. }
  96. dataTime, e := time.ParseInLocation(time.DateOnly, d, time.Local)
  97. if e != nil {
  98. result.ErrMsg = fmt.Sprintf("解析计算结果日期失败, edbCode: %s, date: %s, err: %v, ", v.EdbCode, d, e)
  99. calculateResp.Fail = append(calculateResp.Fail, result)
  100. return
  101. }
  102. dataArr = append(dataArr, &data_manage.FactorEdbSeriesCalculateData{
  103. FactorEdbSeriesId: seriesId,
  104. EdbInfoId: v.EdbInfoId,
  105. EdbCode: v.EdbCode,
  106. DataTime: dataTime,
  107. Value: val,
  108. CreateTime: time.Now().Local(),
  109. ModifyTime: time.Now().Local(),
  110. DataTimestamp: dataTime.UnixNano() / 1e6,
  111. })
  112. }
  113. if len(dataArr) == 0 {
  114. result.Msg = "计算结果无数据"
  115. result.ErrMsg = fmt.Sprintf("计算结果无数据, edbCode: %s", v.EdbCode)
  116. calculateResp.Fail = append(calculateResp.Fail, result)
  117. return
  118. }
  119. if e = calculateDataOb.CreateMulti(dataArr); e != nil {
  120. result.ErrMsg = fmt.Sprintf("保存计算结果失败, edbCode: %s, err: %v, ", v.EdbCode, e)
  121. calculateResp.Fail = append(calculateResp.Fail, result)
  122. return
  123. }
  124. result.Msg = "计算成功"
  125. calculateResp.Success = append(calculateResp.Success, result)
  126. }(edb)
  127. }
  128. wg.Wait()
  129. return
  130. }
  131. func PostRefreshFactorEdbRecalculate(edbInfoId int, edbCode string) (resp *models.BaseResponse, err error) {
  132. param := make(map[string]interface{})
  133. param["EdbInfoId"] = edbInfoId
  134. param["EdbCode"] = edbCode
  135. postUrl := fmt.Sprintf("%s%s", utils.EDB_LIB_URL, "factor_edb_series/recalculate")
  136. postData, e := json.Marshal(param)
  137. if e != nil {
  138. err = fmt.Errorf("param json err: %v", e)
  139. return
  140. }
  141. result, e := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json")
  142. if e != nil {
  143. err = fmt.Errorf("http post err: %v", e)
  144. return
  145. }
  146. utils.FileLog.Info("PostRefreshFactorEdbRecalculate:" + postUrl + ";" + string(postData) + ";result:" + string(result))
  147. if e = json.Unmarshal(result, &resp); e != nil {
  148. err = fmt.Errorf("resp unmarshal err: %v", e)
  149. return
  150. }
  151. return
  152. }
  153. func PostRefreshFactorEdbChartRecalculate(chartInfoId int) (resp *models.BaseResponse, err error) {
  154. param := make(map[string]interface{})
  155. param["ChartInfoId"] = chartInfoId
  156. postUrl := fmt.Sprintf("%s%s", utils.EDB_LIB_URL, "factor_edb_series/chart_recalculate")
  157. postData, e := json.Marshal(param)
  158. if e != nil {
  159. err = fmt.Errorf("param json err: %v", e)
  160. return
  161. }
  162. result, e := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json")
  163. if e != nil {
  164. err = fmt.Errorf("http post err: %v", e)
  165. return
  166. }
  167. utils.FileLog.Info("PostRefreshFactorEdbChartRecalculate:" + postUrl + ";" + string(postData) + ";result:" + string(result))
  168. if e = json.Unmarshal(result, &resp); e != nil {
  169. err = fmt.Errorf("resp unmarshal err: %v", e)
  170. return
  171. }
  172. return
  173. }