package services import ( "encoding/json" "eta/eta_index_lib/logic" "eta/eta_index_lib/models" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "io/ioutil" "net/http" "strings" "time" ) var ( BridgeApiPCSGBloombergDailyUrl = "/api/pcsg/bloomberg/daily_index" // 日度指标API BridgeApiPCSGBloombergWeeklyUrl = "/api/pcsg/bloomberg/weekly_index" // 周度指标API BridgeApiPCSGBloombergMonthlyUrl = "/api/pcsg/bloomberg/monthly_index" // 月度指标API ) // GetPCSGBloombergDailyFromBridge 获取彭博日度指标 func GetPCSGBloombergDailyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) { defer func() { if err != nil { tips := fmt.Sprintf("GetPCSGBloombergDailyFromBridge-获取彭博日度指标失败, err: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergDailyUrl) body := ioutil.NopCloser(strings.NewReader("")) client := &http.Client{} req, e := http.NewRequest("POST", url, body) if e != nil { err = fmt.Errorf("http create request err: %s", e.Error()) return } checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key) contentType := "application/json;charset=utf-8" req.Header.Set("Content-Type", contentType) req.Header.Set("Authorization", checkToken) resp, e := client.Do(req) if e != nil { err = fmt.Errorf("http client do err: %s", e.Error()) return } defer func() { _ = resp.Body.Close() }() b, e := ioutil.ReadAll(resp.Body) if e != nil { err = fmt.Errorf("resp body read err: %s", e.Error()) return } if len(b) == 0 { err = fmt.Errorf("resp body is empty") return } // 生产环境解密 if utils.RunMode == "release" { str := string(b) str = strings.Trim(str, `"`) b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey) } result := new(models.BridgePCSGBloombergResultData) if e = json.Unmarshal(b, &result); e != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b)) return } if result.Code != 200 { err = fmt.Errorf("result: %s", string(b)) return } indexes = result.Data return } // GetPCSGBloombergWeeklyFromBridge 获取彭博周度指标 func GetPCSGBloombergWeeklyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) { defer func() { if err != nil { tips := fmt.Sprintf("GetPCSGBloombergWeeklyFromBridge-获取彭博周度指标失败, err: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergWeeklyUrl) body := ioutil.NopCloser(strings.NewReader("")) client := &http.Client{} req, e := http.NewRequest("POST", url, body) if e != nil { err = fmt.Errorf("http create request err: %s", e.Error()) return } checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key) contentType := "application/json;charset=utf-8" req.Header.Set("Content-Type", contentType) req.Header.Set("Authorization", checkToken) resp, e := client.Do(req) if e != nil { err = fmt.Errorf("http client do err: %s", e.Error()) return } defer func() { _ = resp.Body.Close() }() b, e := ioutil.ReadAll(resp.Body) if e != nil { err = fmt.Errorf("resp body read err: %s", e.Error()) return } if len(b) == 0 { err = fmt.Errorf("resp body is empty") return } // 生产环境解密 if utils.RunMode == "release" { str := string(b) str = strings.Trim(str, `"`) b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey) } result := new(models.BridgePCSGBloombergResultData) if e = json.Unmarshal(b, &result); e != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b)) return } if result.Code != 200 { err = fmt.Errorf("result: %s", string(b)) return } indexes = result.Data return } // GetPCSGBloombergMonthlyFromBridge 获取彭博月度指标 func GetPCSGBloombergMonthlyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) { defer func() { if err != nil { tips := fmt.Sprintf("GetPCSGBloombergMonthlyFromBridge-获取彭博月度指标失败, err: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergMonthlyUrl) body := ioutil.NopCloser(strings.NewReader("")) client := &http.Client{} req, e := http.NewRequest("POST", url, body) if e != nil { err = fmt.Errorf("http create request err: %s", e.Error()) return } checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key) contentType := "application/json;charset=utf-8" req.Header.Set("Content-Type", contentType) req.Header.Set("Authorization", checkToken) resp, e := client.Do(req) if e != nil { err = fmt.Errorf("http client do err: %s", e.Error()) return } defer func() { _ = resp.Body.Close() }() b, e := ioutil.ReadAll(resp.Body) if e != nil { err = fmt.Errorf("resp body read err: %s", e.Error()) return } if len(b) == 0 { err = fmt.Errorf("resp body is empty") return } // 生产环境解密 if utils.RunMode == "release" { str := string(b) str = strings.Trim(str, `"`) b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey) } result := new(models.BridgePCSGBloombergResultData) if e = json.Unmarshal(b, &result); e != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b)) return } if result.Code != 200 { err = fmt.Errorf("result: %s", string(b)) return } indexes = result.Data return } // PCSGWrite2BaseBloomberg 写入彭博数据源 func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() for _, v := range indexes { if v.IndexCode == "" { continue } // 无数据的情况不处理 if len(v.Data) == 0 { continue } // 指标是否存在 index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { err = fmt.Errorf("获取Bloomberg原始指标失败, err: %s", e.Error()) return } // 新增指标 if index == nil { newIndex := new(models.BaseFromBloombergIndex) newIndex.IndexCode = v.IndexCode newIndex.IndexName = v.IndexName newIndex.Unit = v.Unit newIndex.Source = utils.DATA_SOURCE_BLOOMBERG newIndex.Frequency = v.Frequency newIndex.CreateTime = time.Now().Local() newIndex.ModifyTime = time.Now().Local() if e = newIndex.Create(); e != nil { err = fmt.Errorf("新增Bloomberg原始指标失败, err: %s", e.Error()) return } index = newIndex } else { // 无指标名称的情况下更新指标基础信息 if index.IndexName == "" { index.IndexName = v.IndexName index.Unit = v.Unit index.Frequency = v.Frequency index.ModifyTime = time.Now().Local() if e = index.Update([]string{"IndexName", "Unit", "Frequency", "ModifyTime"}); e != nil { err = fmt.Errorf("更新Bloomberg原始指标失败, err: %s", e.Error()) return } } } // 更新指标数据 var cond string var pars []interface{} cond += ` AND index_code = ? ` pars = append(pars, v.IndexCode) indexData, e := models.GetBaseFromBloombergDataByCondition(cond, pars) if e != nil { err = fmt.Errorf("获取Bloomberg历史数据失败, err: %s", e.Error()) return } dateExist := make(map[string]*models.BaseFromBloombergData) newValExist := make(map[string]bool) if len(indexData) > 0 { for _, d := range indexData { strDate := d.DataTime.Format(utils.FormatDate) dateExist[strDate] = d } } // 筛选新增/更新数据 updateData := make([]*models.BaseFromBloombergData, 0) insertData := make([]*models.BaseFromBloombergData, 0) for _, d := range v.Data { strDate := d.DataTime.Format(utils.FormatDate) originData := dateExist[strDate] if originData != nil { if utils.FloatAlmostEqual(originData.Value, d.Value) { continue } originData.Value = d.Value originData.ModifyTime = time.Now().Local() updateData = append(updateData, originData) } else { // 新增的数据去重 if newValExist[strDate] { continue } newValExist[strDate] = true newData := new(models.BaseFromBloombergData) newData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId newData.IndexCode = index.IndexCode newData.DataTime = d.DataTime newData.Value = d.Value newData.CreateTime = time.Now() newData.ModifyTime = time.Now() timestamp := d.DataTime.UnixNano() / 1e6 newData.DataTimestamp = int(timestamp) insertData = append(insertData, newData) } } if e = models.MultiInsertOrUpdateBaseFromBloombergData(insertData, updateData); e != nil { err = fmt.Errorf("新增/更新Bloomberg指标数据失败, err: %s", e.Error()) return } // 更新指标开始结束时间 minMax, e := models.GetBaseFromBloombergIndexMinMax(index.IndexCode) if e == nil && minMax != nil { e = models.ModifyBaseFromBloombergIndexMinMax(index.IndexCode, minMax) if e != nil { err = fmt.Errorf("更新Bloomberg开始结束时间失败, err: %s", e.Error()) return } } // 同步刷新指标库 go func() { edb, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BLOOMBERG, index.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { utils.FileLog.Info("获取Bloomberg指标库信息失败, err: " + e.Error()) return } if edb != nil { logic.RefreshBaseEdbInfo(edb, ``) } }() } return }