package data import ( "context" "errors" "eta_gn/eta_task/models/data_manage" "eta_gn/eta_task/models/data_manage/edb_refresh" "eta_gn/eta_task/services/alarm_msg" "eta_gn/eta_task/utils" "fmt" "strings" "sync" "time" ) // RefreshDataFromCalculateAll 刷新所有计算指标 func RefreshDataFromCalculateAll() (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { errMsg := "刷新所有计算指标失败 ErrMsg:" + err.Error() utils.FileLog.Info(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } if len(errMsgList) > 0 { utils.FileLog.Info("刷新所有计算指标失败 ErrMsg:" + strings.Join(errMsgList, "\n")) go alarm_msg.SendAlarmMsg("刷新所有计算指标失败 ErrMsg:"+strings.Join(errMsgList, "\n"), 3) } }() var condition string var pars []interface{} // 查询 普通指标的计算指标 condition += " AND edb_type=? AND edb_info_type=? AND no_update=0" pars = append(pars, 2, 0) items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0) if err != nil { return err } nowStr := time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) startDateOfWeek := utils.GetNowWeekMonday() endDateOfWeek := utils.GetNowWeekLastDay() for _, v := range items { if v.Frequency == "日度" { if v.EndDate.Format(utils.FormatDate) == nowStr { continue } } else if v.Frequency == "周度" { if !v.EndDate.Before(startDateOfWeek) && !v.EndDate.After(endDateOfWeek) { continue } } source := v.Source startDate := v.StartDate.Format(utils.FormatDate) if startDate == "0001-01-01" { continue } fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source) fmt.Println("RefreshEdbCalculateData", v.EdbInfoId, v.EdbCode, startDate) result, tmpErr := RefreshEdbCalculateData(v.EdbInfoId, v.EdbCode, startDate) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error())) continue } if result.Ret != 200 { errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;msg:%s;errMsg:%s", v.EdbInfoId, v.EdbCode, result.Msg, result.ErrMsg)) //return err continue } } return err } // RefreshBasePredictDataAll 刷新所有的基础预测指标 func RefreshBasePredictDataAll() (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { errMsg := "刷新所有计算指标失败 ErrMsg:" + err.Error() utils.FileLog.Info(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } if len(errMsgList) > 0 { utils.FileLog.Info("刷新所有基础预测指标失败 ErrMsg:" + strings.Join(errMsgList, "\n")) go alarm_msg.SendAlarmMsg("刷新所有基础预测指标失败 ErrMsg:"+strings.Join(errMsgList, "\n"), 3) } }() var condition string var pars []interface{} // 查询 普通指标的计算指标 condition += " AND edb_type=? AND edb_info_type=? " pars = append(pars, 1, 1) items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0) if err != nil { errMsgList = append(errMsgList, fmt.Sprintf("获取基础预测指标列表失败;err:%s", err.Error())) return err } for _, v := range items { source := v.Source startDate := v.StartDate.Format(utils.FormatDate) if startDate == "0001-01-01" { continue } fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source, "======RefreshBasePredictDataAll:", startDate) result, tmpErr := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprintf("刷新基础预测指标失败1,指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error())) continue } if result.Ret != 200 { errMsgList = append(errMsgList, fmt.Sprintf("刷新基础预测指标失败2,指标ID:%d;指标编码:%s;报错提示信息msg:%s;报错信息err:%s", v.EdbInfoId, v.EdbCode, result.ErrMsg, result.Msg)) continue } } return err } // RefreshPredictDataFromCalculateAll 刷新所有预测计算指标 func RefreshPredictDataFromCalculateAll() (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { errMsg := "刷新所有计算预测指标失败 ErrMsg:" + err.Error() utils.FileLog.Info(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } if len(errMsgList) > 0 { utils.FileLog.Info("刷新所有计算预测指标失败 Err:" + strings.Join(errMsgList, "\n")) go alarm_msg.SendAlarmMsg("刷新所有计算预测指标失败 Err:"+strings.Join(errMsgList, "\n"), 3) } }() var condition string var pars []interface{} // 查询 普通指标的计算指标 condition += " AND edb_type=? AND edb_info_type=? " pars = append(pars, 2, 1) items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0) if err != nil { return err } for _, v := range items { source := v.Source startDate := v.StartDate.Format(utils.FormatDate) if startDate == "0001-01-01" { continue } fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source, "======RefreshPredictEdbCalculateData:", startDate) result, tmpErr := RefreshPredictEdbCalculateData(v.EdbInfoId, v.EdbCode, startDate) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error())) continue } if result.Ret != 200 { errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;msg:%s;errMsg:%s", v.EdbInfoId, v.EdbCode, result.Msg, result.ErrMsg)) continue } } return err } // RefreshDataFromManual 刷新手工指标数据 func RefreshDataFromManual(wg *sync.WaitGroup) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println("RefreshDataFromManual Err:" + err.Error()) go alarm_msg.SendAlarmMsg("RefreshDataFromManual ErrMsg:"+err.Error(), 3) } if len(errMsgList) > 0 { errMsg := "RefreshDataFromManual Err:" + strings.Join(errMsgList, "\n") fmt.Println(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } wg.Done() }() var condition string var pars []interface{} condition += " AND source=? " pars = append(pars, utils.DATA_SOURCE_MANUAL) items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0) if err != nil { return errors.New("GetEdbInfoByCondition:" + err.Error()) } for _, v := range items { startDate := v.StartDate.Format(utils.FormatDate) resp, err := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if err != nil { errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+err.Error()) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg) continue } } return err } func ResetEdbInfoIsUpdate(cont context.Context) (err error) { go data_manage.ResetEdbInfoIsUpdate() return nil } // RefreshBaseDataFromSource // @Description: 刷新基础数据 // @author: Roc // @datetime 2024-08-01 18:10:03 // @param wg *sync.WaitGroup // @param source int // @return err error func RefreshBaseDataFromSource(wg *sync.WaitGroup, source int) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { errMsg := fmt.Sprintf("刷新基础数据失败,来源:%d,ErrMsg:%s", source, err.Error()) utils.FileLog.Info(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } if len(errMsgList) > 0 { errMsg := fmt.Sprintf("刷新基础数据失败,来源:%d,ErrMsg:%s", source, strings.Join(errMsgList, "\n")) utils.FileLog.Info(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } wg.Done() }() var condition string var pars []interface{} condition += " AND source=? " pars = append(pars, source) items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0) if err != nil { return errors.New("GetEdbInfoByCondition:" + err.Error()) } for _, v := range items { startDate := "" if v.Frequency == "日度" { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } else if v.Frequency == "周度" { startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate) } else if v.Frequency == "月度" { startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate) } else if v.Frequency == "季度" { startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate) } else if v.Frequency == "年度" { startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate) } else { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } resp, tmpErr := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if err != nil { errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;err:%s", v.EdbInfoId, v.EdbCode, tmpErr.Error())) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, fmt.Sprintf("指标ID:%d;指标编码:%s;msg:%s;errMsg:%s", v.EdbInfoId, v.EdbCode, resp.Msg, resp.ErrMsg)) continue } } return err } // RefreshDataFromTradeAnalysis 刷新持仓分析指标 func RefreshDataFromTradeAnalysis(wg *sync.WaitGroup) (err error) { utils.FileLog.Info(fmt.Sprintf("持仓分析指标刷新开始: %s", time.Now().Format(utils.FormatDateTime))) errMsgList := make([]string, 0) defer func() { if err != nil { tips := fmt.Sprintf("RefreshDataFromTradeAnalysis err: %v", err) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } if len(errMsgList) > 0 { tips := fmt.Sprintf("RefreshDataFromTradeAnalysis ErrMsg: %s", strings.Join(errMsgList, "\n")) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } wg.Done() }() var condition string var pars []interface{} condition += ` AND source = ? AND no_update = 0 ` pars = append(pars, utils.DATA_SOURCE_TRADE_ANALYSIS) items, e := data_manage.GetEdbInfoByCondition(condition, pars, 0) if e != nil { err = fmt.Errorf("获取持仓分析指标失败, %v", e) return } for _, v := range items { // 持仓分析指标只有日度 startDate := v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if e != nil { errMsgList = append(errMsgList, fmt.Sprintf("EdbCode: %s, RefreshEdbData err: %v", v.EdbCode, e)) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, fmt.Sprintf("EdbCode: %s, RefreshEdbData err: %v, errMsg: %s", v.EdbCode, e, resp.ErrMsg)) continue } } utils.FileLog.Info(fmt.Sprintf("持仓分析指标刷新结束: %s", time.Now().Format(utils.FormatDateTime))) return err } // NoneConfigRefreshDataGn 刷新未配置的所有指标来源 func NoneConfigRefreshDataGn(wg *sync.WaitGroup) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { tips := fmt.Sprintf("NoneConfigRefreshDataGn ErrMsg: %v", err) utils.FileLog.Info(tips) } if len(errMsgList) > 0 { errMsg := "NoneConfigRefreshDataGn Err:" + strings.Join(errMsgList, "\n") utils.FileLog.Info(errMsg) } wg.Done() }() // 过滤出无刷新配置的指标数据来源 sourceRefreshIds := make([]int, 0) { cond := fmt.Sprintf(" AND is_base = ?") pars := make([]interface{}, 0) pars = append(pars, 1) list, e := data_manage.GetEdbSourceItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取需要刷新的基础指标来源失败, %v", e) return } sourceBaseIds := make([]int, 0) for _, v := range list { sourceBaseIds = append(sourceBaseIds, v.EdbSourceId) } // 获取默认刷新中配置过的数据来源 hasConfigIds, e := edb_refresh.GetDistinctDefaultRefreshSourceIds() if e != nil { err = fmt.Errorf("获取已配置过刷新的指标来源失败, %v", e) return } // 取两个[]int的差集 sourceRefreshIds = utils.MinusInt(sourceBaseIds, hasConfigIds) } if len(sourceRefreshIds) == 0 { utils.FileLog.Info("无未配置刷新时间的指标来源需要刷新") return } // 根据数据源依次刷新 for _, sourceId := range sourceRefreshIds { cond := ` AND source = ? ` pars := make([]interface{}, 0) pars = append(pars, sourceId) // 获取指标 items, e := data_manage.GetEdbInfoByCondition(cond, pars, 0) if e != nil { err = fmt.Errorf("获取指标失败, Source: %d, Err: %v", sourceId, e) return } for _, v := range items { startDate := "" if v.EndDate.IsZero() { startDate = utils.BaseEdbRefreshStartDate } if v.Frequency == "日度" { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } else if v.Frequency == "周度" { startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate) } else if v.Frequency == "月度" { startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate) } else if v.Frequency == "季度" { startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate) } else if v.Frequency == "年度" { startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate) } else { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if e != nil { errMsgList = append(errMsgList, fmt.Sprintf("RefreshEdbData Code: %s Err: %v\n", v.EdbCode, e)) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, fmt.Sprintf("RefreshEdbData Code: %s Err: %s, ErrMsg: %s\n", v.EdbCode, resp.Msg, resp.ErrMsg)) continue } } } return }