package services import ( "encoding/json" "eta/eta_index_lib/logic" "eta/eta_index_lib/models" "eta/eta_index_lib/models/mgo" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "github.com/rdlucklib/rdluck_tools/http" "github.com/shopspring/decimal" "go.mongodb.org/mongo-driver/bson" "net/url" "strings" "time" ) const ( ThsHfApiUrl = "https://quantapi.51ifind.com/api/v1/high_frequency" ) // GetEdbDataFromThsHf 获取高频数据 func GetEdbDataFromThsHf(thsParams models.ThsHfSearchEdbReq, terminalCode string) (indexes []*models.ThsHfIndexWithData, err error) { terminal, e := GetTerminal(utils.DATA_SOURCE_THS, terminalCode) if e != nil { err = fmt.Errorf("获取同花顺终端配置失败, %v", e) return } if thsParams.EndTime == "" { thsParams.EndTime = time.Now().Local().Format(utils.FormatDateTime) } // 走API if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" { var token string token, e = GetAccessToken(false, terminal.Value) if e != nil { err = fmt.Errorf("获取同花顺API-AccessToken失败, %v", e) return } // TEST //token = "9eba1634116ea2aed9a5b12b6e12b0b5fcbe0847.signs_NTc2NjQ4MTA5" return getEdbDataFromThsHfHttp(thsParams, terminal.Value, token) } // 走公用机 if terminal.ServerUrl == "" { err = fmt.Errorf("同花顺终端地址未配置") return } return getEdbDataFromThsHfApp(thsParams, 0, terminal.ServerUrl) } // getEdbDataFromThsHfHttp API-获取高频指标数据 func getEdbDataFromThsHfHttp(thsParams models.ThsHfSearchEdbReq, refreshToken, accessToken string) (indexes []*models.ThsHfIndexWithData, err error) { defer func() { if err != nil { tips := fmt.Sprintf("同花顺高频指标API-getEdbDataFromThsHfHttp err: %v", err) utils.FileLog.Info(tips) } }() // 请求参数参考 //dataMap := map[string]interface{}{ // "codes": "CU2407.SHF,CU2408.SHF", // "indicators": "pct_chg", // "starttime": "2024-06-06 09:15:00", // "endtime": "2024-06-11 15:15:00", // "functionpara": map[string]interface { // }{ // "Limitstart": "10:00:00", // "Limitend": "14:15:00", // "Interval": 60, // "Fill": "Previous", // "CPS": "forward4", // "Timeformat": "LocalTime", // "BaseDate": "2024-01-01", // }, //} // 额外参数 funcParams := map[string]interface{}{} funcParams["Interval"] = thsParams.Interval if thsParams.Fill != "" { funcParams["Fill"] = thsParams.Fill } if thsParams.CPS != "" { funcParams["CPS"] = thsParams.CPS } if thsParams.BaseDate != "" { funcParams["BaseDate"] = thsParams.BaseDate } // TEST //funcParams["Limitstart"] = "10:00:00" //funcParams["Limitend"] = "14:15:00" //funcParams["Fill"] = "Previous" //funcParams["CPS"] = "forward4" //funcParams["Timeformat"] = "LocalTime" //funcParams["BaseDate"] = "2024-01-01" dataMap := map[string]interface{}{ "codes": thsParams.StockCode, "indicators": thsParams.EdbCode, "starttime": thsParams.StartTime, "endtime": thsParams.EndTime, "functionpara": funcParams, } // 请求接口 body, e, _ := postCurl(ThsHfApiUrl, dataMap, 0, refreshToken, accessToken) if e != nil { utils.FileLog.Info(string(body)) err = fmt.Errorf("同花顺API-请求失败, %v", e) return } apiResp := new(models.ThsHfApiResp) if e = json.Unmarshal(body, &apiResp); e != nil { err = fmt.Errorf("同花顺API-解析响应失败, %v", e) return } if apiResp.ErrorCode != 0 { err = fmt.Errorf("同花顺高频API-状态码: %d, 提示信息: %s", apiResp.ErrorCode, apiResp.ErrMsg) return } indexes = make([]*models.ThsHfIndexWithData, 0) if len(apiResp.Tables) == 0 { utils.FileLog.Info("同花顺高频API-无数据") return } // 结果示例 // { // "errorcode": 0, // "errmsg": "Success!", // "tables": [{ // "thscode": "CU2407.SHF", // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"], // "table": { // "open": [77930.000000, 77980.000000, 77910.000000, 77850.000000], // "close": [77980.000000, 77920.000000, 77850.000000, 77780.000000] // } // }, { // "thscode": "CU2408.SHF", // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"], // "table": { // "open": [78180.000000, 78280.000000, 78220.000000, 78110.000000], // "close": [78280.000000, 78220.000000, 78110.000000, 78060.000000] // } // }] // } // Tables中的每一个对应一个证券代码 for _, v := range apiResp.Tables { if len(v.Time) == 0 || len(v.Table) == 0 { continue } // Table中的K-V对应指标代码-数据值序列 for tk, tv := range v.Table { index := new(models.ThsHfIndexWithData) index.StockCode = v.ThsCode index.EdbCode = tk td := make([]*models.ThsHfIndexData, 0) tvl := len(tv) for k, t := range v.Time { if k >= tvl { continue } dt, e := time.ParseInLocation("2006-01-02 15:04", t, time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("同花顺API-time parse t: %s, err: %v", t, e)) continue } td = append(td, &models.ThsHfIndexData{ DataTime: dt, Value: tv[k], }) } index.IndexData = td indexes = append(indexes, index) } } return } // getEdbDataFromThsHfApp 公用机-获取高频指标数据 func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverUrl string) (indexes []*models.ThsHfIndexWithData, err error) { var requestUrl string defer func() { if err != nil { utils.FileLog.Info(fmt.Sprintf("requestUrl: %s", requestUrl)) utils.FileLog.Info(fmt.Sprintf("getEdbDataFromThsHfApp: %v", err)) } }() //serverUrl = "http://wxmsgsen1.hzinsights.com:8040/" baseUrl := fmt.Sprintf("%s%s", serverUrl, "edbInfo/ths/hf?") // 额外参数 var funcParam string if thsParams.Interval > 0 { funcParam += fmt.Sprintf("Interval:%d,", thsParams.Interval) } if thsParams.Fill != "" { funcParam += fmt.Sprintf("Fill:%s,", thsParams.Fill) } if thsParams.CPS != "" { funcParam += fmt.Sprintf("CPS:%s,", thsParams.CPS) } if thsParams.BaseDate != "" { funcParam += fmt.Sprintf("BaseDate:%s,", thsParams.BaseDate) } funcParam = strings.TrimRight(funcParam, ",") params := url.Values{} params.Add("codes", thsParams.StockCode) params.Add("indicators", thsParams.EdbCode) params.Add("function_para", funcParam) params.Add("start_time", thsParams.StartTime) params.Add("end_time", thsParams.EndTime) // 请求终端 requestUrl = baseUrl + params.Encode() body, e := http.Get(requestUrl) if e != nil { err = fmt.Errorf("") return } dataBody := strings.TrimLeft(string(body), `"`) dataBody = strings.TrimRight(dataBody, `"`) dataBody = strings.ReplaceAll(dataBody, `\`, ``) //utils.FileLog.Info(dataBody) appResp := new(TerminalResponse) if e = json.Unmarshal([]byte(dataBody), &appResp); e != nil { err = fmt.Errorf("同花顺APP-解析响应失败, %v", e) return } if appResp.ErrorCode != 0 { //如果是同花顺登录session失效了,那么就重新请求获取数据 if appResp.ErrorCode == -1020 && num == 0 { return getEdbDataFromThsHfApp(thsParams, 1, serverUrl) } err = fmt.Errorf("同花顺APP-状态码: %d, 提示信息: %s", appResp.ErrorCode, appResp.ErrMsg) return } // 响应结果示例 // { // "errorcode": 0, // "errmsg": "Success!", // "data": [{ // "time": "2024-06-04 09:30", // "thscode": "CU2406.SHF", // "open": 81900.0, // "close": 81820.0 // }, { // "time": "2024-06-04 10:00", // "thscode": "CU2406.SHF", // "open": 81820.0, // "close": 81790.0 // }, { // "time": "2024-06-04 10:45", // "thscode": "CU2406.SHF", // "open": 81820.0, // "close": 81950.0 // }] // } indexes = make([]*models.ThsHfIndexWithData, 0) indexMap := make(map[string]*models.ThsHfIndexWithData) for _, stockData := range appResp.Data { strTime := stockData["time"].(string) dataTime, e := time.ParseInLocation("2006-01-02 15:04", strTime, time.Local) if e != nil { utils.FileLog.Info("数据日期格式有误, time: %s, %v", strTime, e) continue } stockCode := stockData["thscode"].(string) // 指标代码+数据 for k, v := range stockData { if k == "time" || k == "thscode" { continue } if v == nil { continue } val, ok := v.(float64) if !ok { continue } mk := fmt.Sprintf("%s-%s", stockCode, k) if indexMap[mk] == nil { indexMap[mk] = new(models.ThsHfIndexWithData) indexMap[mk].StockCode = stockCode indexMap[mk].EdbCode = k indexMap[mk].IndexData = make([]*models.ThsHfIndexData, 0) } indexMap[mk].IndexData = append(indexMap[mk].IndexData, &models.ThsHfIndexData{ DataTime: dataTime, Value: val, }) } } for _, v := range indexMap { indexes = append(indexes, v) } return } // RefreshThsHfBaseIndex 源指标刷新 func RefreshThsHfBaseIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("RefreshThsHfBaseIndex-更新失败, %v", err) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() if indexItem == nil { err = fmt.Errorf("指标不存在") return } if len(codeWithData.IndexData) == 0 { return } // 获取源指标数据 dataOb := new(models.BaseFromThsHfData) originData := make([]*models.BaseFromThsHfData, 0) { cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().IndexCode) pars := make([]interface{}, 0) pars = append(pars, indexItem.IndexCode) if startTime != "" { cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime) pars = append(pars, startTime) } list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取源指标数据失败, %v", e) return } originData = list } // 更新指标数据 dateExist := make(map[string]*models.BaseFromThsHfData) newValExist := make(map[string]bool) if len(originData) > 0 { // unicode去重 for _, d := range originData { dateExist[d.UniqueCode] = d } } // 筛选新增/更新数据 updateData := make([]*models.BaseFromThsHfData, 0) insertData := make([]*models.BaseFromThsHfData, 0) for _, d := range codeWithData.IndexData { uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format(utils.FormatDateTimeMinute))) origin := dateExist[uni] // unicode检验是否存在 strNewVal := decimal.NewFromFloat(d.Value).Round(4).String() di, _ := decimal.NewFromString(strNewVal) newVal, _ := di.Float64() if origin != nil { strExistVal := decimal.NewFromFloat(origin.Value).Round(4).String() if strNewVal == strExistVal { continue } origin.Value = newVal origin.ModifyTime = time.Now().Local() updateData = append(updateData, origin) continue } // 新增的数据去重 if newValExist[uni] { continue } newValExist[uni] = true newData := new(models.BaseFromThsHfData) newData.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId newData.IndexCode = indexItem.IndexCode newData.DataTime = d.DataTime newData.Value = newVal newData.CreateTime = time.Now() newData.ModifyTime = time.Now() newData.UniqueCode = uni newData.DataTimestamp = d.DataTime.UnixNano() / 1e6 insertData = append(insertData, newData) } if e := dataOb.MultiInsertOrUpdate(insertData, updateData); e != nil { err = fmt.Errorf("新增/更新源指标数据失败, %v", e) return } // 更新指标开始结束时间 minMax, e := dataOb.GetIndexMinMax(indexItem.IndexCode) if e == nil && minMax != nil { minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local) if e != nil { err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e) return } maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local) if e != nil { err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e) return } indexItem.StartDate = minDate indexItem.EndDate = maxDate indexItem.ModifyTime = time.Now().Local() updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime} if e = indexItem.Update(updateCols); e != nil { err = fmt.Errorf("更新源指标开始结束时间失败, %v", e) return } } // 同步刷新指标库 go func() { _ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime) }() return } // RefreshThsHfBaseIndexMgo 源指标刷新-Mongo func RefreshThsHfBaseIndexMgo(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("RefreshThsHfBaseIndexMgo-更新失败, %v", err) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() if indexItem == nil { err = fmt.Errorf("指标不存在") return } if len(codeWithData.IndexData) == 0 { return } mogDataObj := new(mgo.BaseFromThsHfData) // 获取已存在的所有数据 existCond := bson.M{ "index_code": indexItem.IndexCode, } if startTime != "" { st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local) if e != nil { err = fmt.Errorf("start time parse err: %v", e) return } existCond["data_time"] = bson.M{ "$gte": st, } } exitDataList, e := mogDataObj.GetAllDataList(existCond, []string{"data_time"}) if e != nil { err = fmt.Errorf("GetAllDataList err: %v", e) return } // 已经存在的数据集 exitDataMap := make(map[string]*mgo.BaseFromThsHfData) for _, v := range exitDataList { exitDataMap[v.UniqueCode] = v } // 待添加的数据集 addDataList := make([]interface{}, 0) updateDataList := make([]mgo.BaseFromThsHfData, 0) for _, data := range codeWithData.IndexData { strNewVal := decimal.NewFromFloat(data.Value).Round(4).String() di, _ := decimal.NewFromString(strNewVal) newVal, _ := di.Float64() // unicode检验是否存在 uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, data.DataTime.Format(utils.FormatDateTimeMinute))) findData, ok := exitDataMap[uni] if !ok { addDataList = append(addDataList, mgo.BaseFromThsHfData{ BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId), IndexCode: indexItem.IndexCode, DataTime: data.DataTime, Value: newVal, UniqueCode: uni, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: data.DataTime.UnixNano() / 1e6, }) continue } // 值不匹配,修改数据 strExistVal := decimal.NewFromFloat(findData.Value).Round(4).String() if strNewVal == strExistVal { continue } findData.Value = newVal updateDataList = append(updateDataList, *findData) } // 入库 { coll := mogDataObj.GetCollection() if len(addDataList) > 0 { if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil { err = fmt.Errorf("BatchInsertDataByColl, err: %v", e) return } } if len(updateDataList) > 0 { for _, v := range updateDataList { if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil { err = fmt.Errorf("UpdateDataByColl, err: %v", e) return } } } } // 修改最大最小日期 minMax, err := indexItem.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode) if err != nil { return } if err == nil && minMax != nil { minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local) if e != nil { err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e) return } maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local) if e != nil { err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e) return } indexItem.StartDate = minDate indexItem.EndDate = maxDate indexItem.ModifyTime = time.Now().Local() updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime} if e = indexItem.Update(updateCols); e != nil { err = fmt.Errorf("更新源指标开始结束时间失败, %v", e) return } } // 同步刷新指标库 go func() { _ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime) }() return } // RefreshEdbFromThsHfBaseIndex 根据源指标刷新指标库 func RefreshEdbFromThsHfBaseIndex(baseCode, startTime string) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标库失败, %v", err) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() // 获取指标关联信息 mappings := make([]*models.BaseFromEdbMapping, 0) { ob := new(models.BaseFromEdbMapping) cond := fmt.Sprintf(" AND %s = ?", ob.Cols().BaseIndexCode) pars := make([]interface{}, 0) pars = append(pars, baseCode) list, e := ob.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取源指标关联失败, %v", e) return } mappings = list } if len(mappings) == 0 { return } codeMapping := make(map[string]*models.BaseFromEdbMapping) edbInfoIds := make([]int, 0) for _, v := range mappings { if codeMapping[v.EdbCode] == nil { codeMapping[v.EdbCode] = v } edbInfoIds = append(edbInfoIds, v.EdbInfoId) } // 指标信息 edbInfoList, e := models.GetEdbInfoByIdList(edbInfoIds) if e != nil { err = fmt.Errorf("获取指标信息列表失败, %v", e) return } codeEdb := make(map[string]*models.EdbInfo) for _, v := range edbInfoList { if codeEdb[v.EdbCode] == nil { codeEdb[v.EdbCode] = v } } thsOb := new(models.EdbThsHf) source := thsOb.GetSource() subSource := thsOb.GetSubSource() for _, v := range mappings { cacheKey := fmt.Sprintf("%s_%d_%d_%s", utils.CACHE_EDB_DATA_REFRESH, source, subSource, v.EdbCode) if utils.Rc.IsExist(cacheKey) { continue } utils.Rc.SetNX(cacheKey, 1, 1*time.Minute) edb := codeEdb[v.EdbCode] if edb == nil { utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-指标信息有误, EdbCode: %s", v.EdbCode)) continue } // 刷新指标 if e = thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil { utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标失败, %v", e)) _ = utils.Rc.Delete(cacheKey) continue } // 更新指标最值 if e = thsOb.UnifiedModifyEdbInfoMaxAndMinInfo(edb); e != nil { utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-更新指标最值失败, %v", e)) _ = utils.Rc.Delete(cacheKey) return } _ = utils.Rc.Delete(cacheKey) // 更新ES go logic.UpdateEs(edb.EdbInfoId) } return }