package data import ( "encoding/json" "eta/eta_api/models" "eta/eta_api/models/data_manage" "eta/eta_api/models/mgo" "eta/eta_api/services/alarm_msg" "eta/eta_api/utils" "fmt" "go.mongodb.org/mongo-driver/bson" "time" ) // CheckExistThsHfEdb 校验已存在的同花顺高频指标 func CheckExistThsHfEdb(stockCodes, edbCodes []string) (checkResp data_manage.ThsHfExistCheckResp, existMap map[string]bool, err error) { // 待校验的指标编码 var indexCode []string prefix := utils.ThsHf for _, sv := range stockCodes { for _, ev := range edbCodes { t := fmt.Sprintf("%s%s%s", prefix, sv, ev) indexCode = append(indexCode, t) } } baseIndexes := make([]*data_manage.BaseFromThsHfIndex, 0) { ob := new(data_manage.BaseFromThsHfIndex) fields := []string{ob.Cols().PrimaryId, ob.Cols().IndexCode, ob.Cols().IndexName, ob.Cols().StockCode, ob.Cols().Indicator} list, e := ob.GetItemsByCondition(``, make([]interface{}, 0), fields, "") if e != nil { err = fmt.Errorf("获取高频指标列表失败, %v", e) return } baseIndexes = list } var existNum int existMap = make(map[string]bool) for _, bv := range baseIndexes { code := fmt.Sprintf("%s%s%s", prefix, bv.StockCode, bv.Indicator) for _, iv := range indexCode { if code != iv { continue } existMap[fmt.Sprintf("%s-%s", bv.StockCode, bv.Indicator)] = true existNum += 1 checkResp.ExistIndex = append(checkResp.ExistIndex, data_manage.ThsHfExistCheckIndex{ IndexId: bv.BaseFromThsHfIndexId, IndexCode: bv.IndexCode, IndexName: bv.IndexName, }) break } } if existNum > 0 && existNum == len(indexCode) { checkResp.ExistAll = true } return } // GetEdbDataThsHf 获取同花顺高频数据指标 func GetEdbDataThsHf(req data_manage.ThsHfSearchEdbReq) (indexes []*data_manage.ThsHfIndexWithData, err error) { param := make(map[string]interface{}) param["StockCode"] = req.StockCode param["EdbCode"] = req.EdbCode param["StartTime"] = req.StartTime param["EndTime"] = req.EndTime param["Interval"] = req.Interval param["Fill"] = req.Fill param["CPS"] = req.CPS param["BaseDate"] = req.BaseDate uri := `ths/hf/edb_data` resp, e := postThsHfEdbData(param, uri) if e != nil { err = fmt.Errorf("postThsHfEdbData, %v", e) return } if resp.Ret == 200 { indexes = resp.Data } return } // postThsHfEdbData 刷新指标数据 func postThsHfEdbData(param map[string]interface{}, urlStr string) (resp *data_manage.ThsHfIndexDataLibResp, err error) { postUrl := utils.EDB_LIB_URL + urlStr postData, err := json.Marshal(param) if err != nil { return } result, err := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json") if err != nil { return } utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";" + string(postData) + ";result:" + string(result)) err = json.Unmarshal(result, &resp) if err != nil { return } return resp, nil } // BaseAddThsHf 新增数据源 func BaseAddThsHf(req data_manage.ThsHfBaseAddReq) (resp *models.BaseResponse, err error) { param := make(map[string]interface{}) param["StartTime"] = req.StartTime param["EndTime"] = req.EndTime param["Interval"] = req.Interval param["Fill"] = req.Fill param["CPS"] = req.CPS param["BaseDate"] = req.BaseDate param["SysAdminId"] = req.SysAdminId param["SysAdminName"] = req.SysAdminName param["ClassifyId"] = req.ClassifyId param["Unit"] = req.Unit param["IndexName"] = req.IndexName param["Frequency"] = req.Frequency param["StockCode"] = req.StockCode param["EdbCode"] = req.EdbCode uri := `ths/hf/base/add` res, e := postRefreshEdbData(param, uri) if e != nil { err = fmt.Errorf("postRefreshEdbData, %v", e) return } resp = res return } // RefreshBaseThsHfIndex 刷新源指标 func RefreshBaseThsHfIndex(indexIds []int, refreshType int) (isAsync bool, err error) { if len(indexIds) == 0 { return } defer func() { if err != nil { tips := fmt.Sprintf("RefreshBaseThsHfIndex-刷新同花顺高频指标失败, %v", err) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() indexes := make([]*data_manage.BaseFromThsHfIndex, 0) { ob := new(data_manage.BaseFromThsHfIndex) cond := fmt.Sprintf(" AND %s IN (%s)", ob.Cols().PrimaryId, utils.GetOrmInReplace(len(indexIds))) pars := make([]interface{}, 0) pars = append(pars, indexIds) list, e := ob.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取源指标失败, %v", e) return } if len(list) == 0 { return } indexes = list } refreshUrl := "ths/hf/base/refresh" // 异步刷新 if len(indexes) > 10 { isAsync = true go func() { for _, v := range indexes { param := make(map[string]interface{}) param["BaseIndexCode"] = v.IndexCode param["RefreshType"] = refreshType resp, e := postRefreshEdbData(param, refreshUrl) if e != nil { utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, err: %v", v.IndexCode, e)) continue } if resp != nil && resp.Ret != 200 { utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, Ret: %d, ErrMsg: %s", v.IndexCode, resp.Ret, resp.ErrMsg)) continue } } }() return } // 同步刷新 for _, v := range indexes { param := make(map[string]interface{}) param["BaseIndexCode"] = v.IndexCode param["RefreshType"] = refreshType resp, e := postRefreshEdbData(param, refreshUrl) if e != nil { utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, err: %v", v.IndexCode, e)) continue } if resp.Ret != 200 { utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, Ret: %d, ErrMsg: %s", v.IndexCode, resp.Ret, resp.ErrMsg)) continue } } return } // GetThsHfBaseIndexData 获取指标数据 func GetThsHfBaseIndexData(indexCode, startTime, endTime string) (dataList []*data_manage.BaseFromThsHfDataItem, err error) { if utils.UseMongo { list, e := getThsHfBaseIndexDataByMongo(indexCode, startTime, endTime) if e != nil { err = fmt.Errorf("获取指标数据失败-Mongo, %v", e) return } dataList = list return } // MySQL var ( cond string pars []interface{} ) dataOb := new(data_manage.BaseFromThsHfData) if startTime != "" && endTime != "" { cond += fmt.Sprintf(" AND %s = ? AND (%s BETWEEN ? AND ?)", dataOb.Cols().IndexCode, dataOb.Cols().DataTime) pars = append(pars, indexCode, startTime, endTime) } if startTime != "" && endTime == "" { cond += fmt.Sprintf(" AND %s = ? AND %s > ?)", dataOb.Cols().IndexCode, dataOb.Cols().DataTime) pars = append(pars, indexCode, startTime) } list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s DESC", dataOb.Cols().DataTime)) if e != nil { err = fmt.Errorf("获取指标数据失败-MySQL, %v", e) return } for _, v := range list { dataList = append(dataList, v.Format2Item()) } return } // getThsHfBaseIndexDataByMongo func getThsHfBaseIndexDataByMongo(indexCode, startTime, endTime string) (dataList []*data_manage.BaseFromThsHfDataItem, err error) { dataList = make([]*data_manage.BaseFromThsHfDataItem, 0) mogDataObj := mgo.BaseFromThsHfData{} // 构建查询条件 queryConditions := bson.M{ "index_code": indexCode, } // 时间区间 if startTime != "" && endTime != "" { st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local) if e != nil { err = fmt.Errorf("start time parse err: %v", e) return } ed, e := time.ParseInLocation(utils.FormatDateTime, endTime, time.Local) if e != nil { err = fmt.Errorf("end time parse err: %v", e) return } queryConditions["data_time"] = bson.M{ "$gte": st, "$lte": ed, } } if startTime != "" && endTime == "" { st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local) if e != nil { err = fmt.Errorf("start time parse err: %v", e) return } queryConditions["data_time"] = bson.M{ "$gte": st, } } // 获取列表数据 list, e := mogDataObj.GetAllDataList(queryConditions, []string{"-data_time"}) if e != nil { err = fmt.Errorf("GetAllDataList err: %v", e) return } for _, v := range list { dataList = append(dataList, formatMgoBaseThsHfData2Item(v)) } return } func formatMgoBaseThsHfData2Item(origin *mgo.BaseFromThsHfData) (item *data_manage.BaseFromThsHfDataItem) { if origin == nil { return } item = new(data_manage.BaseFromThsHfDataItem) item.DataId = int(origin.BaseFromThsHfDataId) item.IndexId = int(origin.BaseFromThsHfIndexId) item.IndexCode = origin.IndexCode item.DataTime = utils.TimeTransferString(utils.FormatDateTime, origin.DataTime) item.Value = origin.Value item.UniqueCode = origin.UniqueCode return }