package services import ( "encoding/json" "eta/eta_index_lib/logic" "eta/eta_index_lib/models" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "io/ioutil" "net/http" "strings" "time" ) var ( PCSGBloombergGeneralIndexDataUrl = "/api/pcsg/bloomberg/index_data/general" // 通用指标API ) type PCSGBloombergApiReq struct { TaskKey string `description:"任务key"` Frequency string `description:"指标频度"` } type PCSGBloombergTask struct { TaskKey string `json:"TaskKey"` Frequency string `json:"Frequency"` VCode bool `json:"VCode"` ExtraLetter string `json:"ExtraLetter"` IndexNamePrefix string `json:"IndexNamePrefix" description:"指标名称前缀"` IndexCodeSuffix string `json:"IndexCodeSuffix" description:"指标编码后缀"` } // LoadPCSGBloombergTask 加载配置 func LoadPCSGBloombergTask() (tasks []*PCSGBloombergTask, err error) { filePath := "./static/pcsg_task.json" b, e := ioutil.ReadFile(filePath) if e != nil { err = fmt.Errorf("读取配置失败, err: %v", e) return } if e = json.Unmarshal(b, &tasks); e != nil { err = fmt.Errorf("解析配置失败, err: %v", e) return } return } // GetPCSGBloombergGeneralIndexFromBridge 获取通用数据类型指标 func GetPCSGBloombergGeneralIndexFromBridge(params PCSGBloombergApiReq) (indexes []models.BaseFromBloombergApiIndexAndData, err error) { defer func() { if err != nil { tips := fmt.Sprintf("GetPCSGBloombergGeneralIndexFromBridge-获取指标数据失败, err: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() p, e := json.Marshal(params) if e != nil { err = fmt.Errorf("params json marshal err: %v", e) return } url := fmt.Sprint(utils.EtaBridgeUrl, PCSGBloombergGeneralIndexDataUrl) body := ioutil.NopCloser(strings.NewReader(string(p))) client := &http.Client{} req, e := http.NewRequest("POST", url, body) if e != nil { err = fmt.Errorf("http create request err: %s", e.Error()) return } checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key) contentType := "application/json;charset=utf-8" req.Header.Set("Content-Type", contentType) req.Header.Set("Authorization", checkToken) resp, e := client.Do(req) if e != nil { err = fmt.Errorf("http client do err: %s", e.Error()) return } defer func() { _ = resp.Body.Close() }() b, e := ioutil.ReadAll(resp.Body) if e != nil { err = fmt.Errorf("resp body read err: %s", e.Error()) return } if len(b) == 0 { err = fmt.Errorf("resp body is empty") return } // 生产环境解密 if utils.RunMode == "release" { str := string(b) str = strings.Trim(str, `"`) b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey) } result := new(models.BridgePCSGBloombergResultData) if e = json.Unmarshal(b, &result); e != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b)) return } if result.Code != 200 { err = fmt.Errorf("result: %s", string(b)) return } indexes = result.Data return } // PCSGWrite2BaseBloomberg 写入彭博数据源 func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool, extraLetter, namePrefix, codeSuffix string) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() // 这里挡一下...万一没限制加进库了不好删... if isVCode && extraLetter == "" { err = fmt.Errorf("中间字母有误") return } for _, v := range indexes { if v.IndexCode == "" { continue } // 无数据的情况不处理 if len(v.Data) == 0 { continue } if isVCode { v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, extraLetter) } // 指标编码后缀 if codeSuffix != "" { v.IndexCode = fmt.Sprintf("%s %s", v.IndexCode, codeSuffix) } // 指标是否存在 index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { err = fmt.Errorf("获取Bloomberg原始指标失败, err: %s", e.Error()) return } // 指标名称+前缀 indexName := v.IndexName if indexName != "" && namePrefix != "" { indexName = fmt.Sprint(namePrefix, indexName) } // 新增指标 if index == nil { newIndex := new(models.BaseFromBloombergIndex) newIndex.IndexCode = v.IndexCode newIndex.IndexName = indexName newIndex.Unit = v.Unit newIndex.Source = utils.DATA_SOURCE_BLOOMBERG newIndex.Frequency = v.Frequency newIndex.CreateTime = time.Now().Local() newIndex.ModifyTime = time.Now().Local() if e = newIndex.Create(); e != nil { err = fmt.Errorf("新增Bloomberg原始指标失败, err: %s", e.Error()) return } index = newIndex } else { // 无指标名称的情况下更新指标基础信息 if index.IndexName == "" { index.IndexName = indexName index.Unit = v.Unit index.Frequency = v.Frequency index.ModifyTime = time.Now().Local() if e = index.Update([]string{"IndexName", "Unit", "Frequency", "ModifyTime"}); e != nil { err = fmt.Errorf("更新Bloomberg原始指标失败, err: %s", e.Error()) return } } } // 更新指标数据 var cond string var pars []interface{} cond += ` AND index_code = ? ` pars = append(pars, v.IndexCode) indexData, e := models.GetBaseFromBloombergDataByCondition(cond, pars) if e != nil { err = fmt.Errorf("获取Bloomberg历史数据失败, err: %s", e.Error()) return } dateExist := make(map[string]*models.BaseFromBloombergData) newValExist := make(map[string]bool) if len(indexData) > 0 { for _, d := range indexData { strDate := d.DataTime.Format(utils.FormatDate) dateExist[strDate] = d } } // 筛选新增/更新数据 updateData := make([]*models.BaseFromBloombergData, 0) insertData := make([]*models.BaseFromBloombergData, 0) for _, d := range v.Data { strDate := d.DataTime.Format(utils.FormatDate) originData := dateExist[strDate] if originData != nil { if utils.FloatAlmostEqual(originData.Value, d.Value) { continue } originData.Value = d.Value originData.ModifyTime = time.Now().Local() updateData = append(updateData, originData) } else { // 新增的数据去重 if newValExist[strDate] { continue } newValExist[strDate] = true newData := new(models.BaseFromBloombergData) newData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId newData.IndexCode = index.IndexCode newData.DataTime = d.DataTime newData.Value = d.Value newData.CreateTime = time.Now() newData.ModifyTime = time.Now() timestamp := d.DataTime.UnixNano() / 1e6 newData.DataTimestamp = int(timestamp) insertData = append(insertData, newData) } } if e = models.MultiInsertOrUpdateBaseFromBloombergData(insertData, updateData); e != nil { err = fmt.Errorf("新增/更新Bloomberg指标数据失败, err: %s", e.Error()) return } // 更新指标开始结束时间 minMax, e := models.GetBaseFromBloombergIndexMinMax(index.IndexCode) if e == nil && minMax != nil { e = models.ModifyBaseFromBloombergIndexMinMax(index.IndexCode, minMax) if e != nil { err = fmt.Errorf("更新Bloomberg开始结束时间失败, err: %s", e.Error()) return } } // 同步刷新指标库 go func() { edb, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BLOOMBERG, index.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { utils.FileLog.Info("获取Bloomberg指标库信息失败, err: " + e.Error()) return } if edb != nil { _, _, e = logic.RefreshBaseEdbInfo(edb, ``) if e != nil { utils.FileLog.Info(fmt.Sprintf("Bloomberg RefreshBaseEdbInfo, edbCode: %s, err: %v", index.IndexCode, e)) return } } }() } return }