package data import ( "encoding/json" "eta_gn/eta_api/models" "eta_gn/eta_api/models/data_manage" "eta_gn/eta_api/services/alarm_msg" "eta_gn/eta_api/utils" "fmt" "sync" "time" ) // FactorEdbStepCalculate 因子指标-多公式计算 func FactorEdbStepCalculate(seriesId int, edbArr []*data_manage.EdbInfo, calculates []data_manage.FactorEdbSeriesCalculatePars, lang string, recalculate bool) (calculateResp data_manage.FactorEdbSeriesStepCalculateResp, err error) { if len(edbArr) == 0 || len(calculates) == 0 { return } defer func() { if err != nil { tips := fmt.Sprintf("StepCalculate计算失败, ErrMsg: %v", err) fmt.Println(tips) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } if len(calculateResp.Fail) > 0 { tips := "StepCalculate计算失败, ErrMsg: " for _, f := range calculateResp.Fail { tips += fmt.Sprintf("code: %s, err: %s\n", f.EdbCode, f.ErrMsg) } fmt.Println(tips) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 2) } }() // 重新计算-先清除原数据 calculateDataOb := new(data_manage.FactorEdbSeriesCalculateData) if recalculate { cond := fmt.Sprintf("%s = ?", calculateDataOb.Cols().FactorEdbSeriesId) pars := make([]interface{}, 0) pars = append(pars, seriesId) if e := calculateDataOb.RemoveByCondition(cond, pars); e != nil { err = fmt.Errorf("清除原数据失败, err: %v", e) return } } wg := sync.WaitGroup{} calculateWorkers := make(chan struct{}, 10) for _, edb := range edbArr { wg.Add(1) go func(v *data_manage.EdbInfo) { defer func() { wg.Done() <-calculateWorkers }() calculateWorkers <- struct{}{} var result data_manage.FactorEdbSeriesStepCalculateResult result.EdbInfoId = v.EdbInfoId result.EdbCode = v.EdbCode result.Msg = "计算失败" // 获取基础数据 edbData, e := data_manage.GetEdbDataAllByEdbCode(v.EdbCode, v.Source, v.SubSource, 0) if e != nil { result.ErrMsg = fmt.Sprintf("获取基础数据失败, edbCode: %s, err: %v", v.EdbCode, e) calculateResp.Fail = append(calculateResp.Fail, result) return } if len(edbData) == 0 { result.Msg = "该指标无基础数据" result.ErrMsg = fmt.Sprintf("该指标无基础数据, edbCode: %s", v.EdbCode) calculateResp.Fail = append(calculateResp.Fail, result) return } // 请求指标服务进行计算 j, e := json.Marshal(data_manage.BaseStepCalculateReq{ DataList: edbData, Calculates: calculates, }) if e != nil { result.ErrMsg = fmt.Sprintf("请求体JSON格式化失败, edbCode: %s, err: %v", v.EdbCode, e) calculateResp.Fail = append(calculateResp.Fail, result) return } requestRes, e := BaseStepCalculate(string(j), lang) if e != nil { result.ErrMsg = fmt.Sprintf("指标计算响应失败, edbCode: %s, err: %v", v.EdbCode, e) calculateResp.Fail = append(calculateResp.Fail, result) return } if requestRes.Ret != 200 { result.Msg = requestRes.Msg result.ErrMsg = requestRes.ErrMsg calculateResp.Fail = append(calculateResp.Fail, result) return } // 计算成功的保存结果 dataArr := make([]*data_manage.FactorEdbSeriesCalculateData, 0) for _, d := range requestRes.Data.DateList { val, ok := requestRes.Data.DataMap[d] if !ok { continue } dataTime, e := time.ParseInLocation(time.DateOnly, d, time.Local) if e != nil { result.ErrMsg = fmt.Sprintf("解析计算结果日期失败, edbCode: %s, date: %s, err: %v, ", v.EdbCode, d, e) calculateResp.Fail = append(calculateResp.Fail, result) return } dataArr = append(dataArr, &data_manage.FactorEdbSeriesCalculateData{ FactorEdbSeriesId: seriesId, EdbInfoId: v.EdbInfoId, EdbCode: v.EdbCode, DataTime: dataTime, Value: val, CreateTime: time.Now().Local(), ModifyTime: time.Now().Local(), DataTimestamp: dataTime.UnixNano() / 1e6, }) } if len(dataArr) == 0 { result.Msg = "计算结果无数据" result.ErrMsg = fmt.Sprintf("计算结果无数据, edbCode: %s", v.EdbCode) calculateResp.Fail = append(calculateResp.Fail, result) return } if e = calculateDataOb.CreateMulti(dataArr); e != nil { result.ErrMsg = fmt.Sprintf("保存计算结果失败, edbCode: %s, err: %v, ", v.EdbCode, e) calculateResp.Fail = append(calculateResp.Fail, result) return } result.Msg = "计算成功" calculateResp.Success = append(calculateResp.Success, result) }(edb) } wg.Wait() return } // PostRefreshFactorEdbRecalculate 因子指标重计算 func PostRefreshFactorEdbRecalculate(edbInfoId int, edbCode string) (resp *models.BaseResponse, err error) { param := make(map[string]interface{}) param["EdbInfoId"] = edbInfoId param["EdbCode"] = edbCode postUrl := fmt.Sprintf("%s%s", utils.EDB_LIB_URL, "factor_edb_series/recalculate") postData, e := json.Marshal(param) if e != nil { err = fmt.Errorf("param json err: %v", e) return } result, e := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json") if e != nil { err = fmt.Errorf("http post err: %v", e) return } utils.FileLog.Info("PostRefreshFactorEdbRecalculate:" + postUrl + ";" + string(postData) + ";result:" + string(result)) if e = json.Unmarshal(result, &resp); e != nil { err = fmt.Errorf("resp unmarshal err: %v", e) return } return } // PostRefreshFactorEdbChartRecalculate 因子指标图表重计算 func PostRefreshFactorEdbChartRecalculate(chartInfoId int) (resp *models.BaseResponse, err error) { param := make(map[string]interface{}) param["ChartInfoId"] = chartInfoId postUrl := fmt.Sprintf("%s%s", utils.EDB_LIB_URL, "factor_edb_series/chart_recalculate") postData, e := json.Marshal(param) if e != nil { err = fmt.Errorf("param json err: %v", e) return } result, e := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json") if e != nil { err = fmt.Errorf("http post err: %v", e) return } utils.FileLog.Info("PostRefreshFactorEdbChartRecalculate:" + postUrl + ";" + string(postData) + ";result:" + string(result)) if e = json.Unmarshal(result, &resp); e != nil { err = fmt.Errorf("resp unmarshal err: %v", e) return } return }