package services import ( "encoding/json" "errors" "eta/eta_index_lib/models" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "github.com/rdlucklib/rdluck_tools/http" "time" ) const ( WindNoAuthCode = -40522015 ) // GetEdbDataFromWind 获取wind数据 func GetEdbDataFromWind(edbCode, startDate, endDate string) (item *models.EdbDataFromWind, errorCode int, err error) { windUrl, err := GetWindUrl(edbCode) if err != nil { errorCode = 421 go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取wind服务器地址失败,err:%s", err.Error()), 3) return } thsUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s` thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate) utils.FileLog.Info(fmt.Sprintf("windUrl:%s", thsUrl)) body, err := http.Get(thsUrl) fmt.Println("GetEdbDataByWind body:") fmt.Println(string(body)) utils.FileLog.Info(fmt.Sprint("指标编码:", edbCode, ";wind result:", string(body))) if err != nil { return } item = new(models.EdbDataFromWind) err = json.Unmarshal(body, &item) //异常的话,需要邮件通知 if len(item.ErrorCode) > 0 { if item.ErrorCode["0"] != 0 { if item.ErrorCode["0"] == -40522017 { //{ //DT: { //0: 1654646400000 //}, //CLOSE: { //0: "CEDBService:: quota exceeded." //}, //ErrorCode: { //0: -40522017 //} //} // 设置服务器已超限 SetIsLimitEdbCodeInWindUrl(windUrl) err = DeleteEdbCodeInWindUrl(edbCode) if err != nil { return } return GetEdbDataFromWind(edbCode, startDate, endDate) } else if item.ErrorCode["0"] == -40520005 { //.ErrorCode=-40520005 //.Data=[No Python API Authority SetIsLimitEdbCodeInWindUrl(windUrl) err = DeleteEdbCodeInWindUrl(edbCode) if err != nil { return } go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3) return GetEdbDataFromWind(edbCode, startDate, endDate) } else if item.ErrorCode["0"] == WindNoAuthCode { // 指标下架, 无权限 return nil, WindNoAuthCode, nil } else { go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3) } } } return } // GetEdbDataFromWindUrl 通过url获取wind数据 func GetEdbDataFromWindUrl(windUrl, edbCode, startDate, endDate string) (item *models.EdbDataFromWind, errorCode int, err error) { if windUrl == `` { return GetEdbDataFromWind(edbCode, startDate, endDate) } requestWindUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s` requestWindUrl = fmt.Sprintf(requestWindUrl, edbCode, startDate, endDate) utils.FileLog.Info(fmt.Sprintf("windUrl:%s", requestWindUrl)) body, err := http.Get(requestWindUrl) fmt.Println("GetEdbDataByWind body:") fmt.Println(string(body)) utils.FileLog.Info(fmt.Sprint("wind result:", string(body))) if err != nil { return } item = new(models.EdbDataFromWind) err = json.Unmarshal(body, &item) //异常的话,需要邮件通知 if len(item.ErrorCode) > 0 { if item.ErrorCode["0"] != 0 { if item.ErrorCode["0"] == -40522017 { //{ //DT: { //0: 1654646400000 //}, //CLOSE: { //0: "CEDBService:: quota exceeded." //}, //ErrorCode: { //0: -40522017 //} //} // 设置服务器已超限 errorCode = 421 go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据接口超限,地址:%s", requestWindUrl), 2) //go utils.SendEmail("wind数据接口超限", "地址:"+requestWindUrl, utils.EmailSendToUsers) return } else if item.ErrorCode["0"] == -40520005 { //.ErrorCode=-40520005 //.Data=[No Python API Authority err = errors.New("No Python API Authority") go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3) return } else if item.ErrorCode["0"] == WindNoAuthCode { // 指标下架, 无权限 return nil, WindNoAuthCode, nil } else { go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3) } } } return } // GetWindUrl 获取wind的url func GetWindUrl(edbCode string) (windUrl string, err error) { defer func() { if err == nil && windUrl == "" { err = errors.New("获取wind服务器地址失败,指标超限了") } }() //从缓存中获取 cacheKey := utils.CACHE_WIND_URL + ":" + edbCode windUrl, _ = utils.Rc.RedisString(cacheKey) if windUrl != "" { return } //如果缓存中没有的话,那么从配置中获取 for _, windUrlMap := range utils.Hz_Wind_Data_Url_LIST { //判断该url是否被占满了 //count, tmpErr := GetCountEdbCodeInWindUrl(windUrlMap.Url) //if tmpErr != nil && tmpErr.Error() != "nil returned" { // err = tmpErr // return //} //if count < windUrlMap.Num { // windUrl = windUrlMap.Url // AddEdbCodeInWindUrl(windUrlMap.Url, edbCode) // return //} //如果超限了,那么进入下一循环 isLimit, tmpErr := GetIsLimitEdbCodeInWindUrl(windUrlMap.Url) if isLimit { err = tmpErr continue } windUrl = windUrlMap.Url AddEdbCodeInWindUrl(windUrlMap.Url, edbCode) return } return } // GetCountEdbCodeInWindUrl 从缓存key中获取已经插入入的指标数 func GetCountEdbCodeInWindUrl(windUrl string) (num int, err error) { cacheKey := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl) num, err = utils.Rc.RedisInt(cacheKey) if err != nil && err.Error() == "redigo: nil returned" { err = nil } return } // GetIsLimitEdbCodeInWindUrl 从缓存key中获取是否超限 func GetIsLimitEdbCodeInWindUrl(windUrl string) (isLimit bool, err error) { cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl) num, err := utils.Rc.RedisInt(cacheKey) if err != nil && err.Error() == "redigo: nil returned" { err = nil } if num > 0 { isLimit = true } return } // SetIsLimitEdbCodeInWindUrl 设置服务器已超限 func SetIsLimitEdbCodeInWindUrl(windUrl string) { cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl) _ = utils.Rc.SetNX(cacheKey, 1, utils.GetTodayLastSecond()) return } // AddEdbCodeInWindUrl 将指标插入到缓存key中 // @return isInsert bool 是否插入数据,true时为插入数据,false表示数据已存在 func AddEdbCodeInWindUrl(windUrl, edbCode string) (isInsert bool) { cacheKey := utils.CACHE_WIND_URL + ":" + edbCode isInsert = utils.Rc.SetNX(cacheKey, windUrl, utils.GetTodayLastSecond()) cacheKey2 := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl) utils.Rc.Incrby(cacheKey2, 1) return } // DeleteEdbCodeInWindUrl 删除指标编码 服务器归属 缓存 func DeleteEdbCodeInWindUrl(edbCode string) (err error) { cacheKey := utils.CACHE_WIND_URL + ":" + edbCode err = utils.Rc.Delete(cacheKey) return } func GetEdbDataFromBridge(windUrl string, req models.AddEdbInfoReq, endDate string) (list []*models.EdbDataFromWindSimple, err error, errMsg string) { dataItem, err := GetEdbDataFromBridgeJy(windUrl, req.EdbCode, utils.BASE_START_DATE, endDate) if err != nil { errMsg = "桥接服务里获取指标信息失败 GetEdbDataFromBridgeJy,Err:" + err.Error() err = errors.New("获取指标信息失败!") return } for _, v := range dataItem.IndexData { tmp := new(models.EdbDataFromWindSimple) dataTime, tErr := time.Parse(utils.FormatDate, v.DataTime) if tErr != nil { err = tErr return } timestamp := dataTime.UnixNano() / 1e6 timeStr := fmt.Sprintf("%d", timestamp) saveVal := utils.SubFloatToString(v.Val, 30) tmp.TimestampStr = timeStr tmp.DateTime = v.DataTime tmp.Val = saveVal list = append(list, tmp) } return } // RefreshEdbDataFromWindBridge 桥接服务刷新wind指标数据 func RefreshEdbDataFromWindBridge(windUrl string, req models.RefreshEdbInfoReq, edbInfo *models.EdbInfo, endDate string) (list []*models.EdbDataFromWindSimple, err error, errMsg string) { dataItem, err := GetEdbDataFromBridgeJy(windUrl, edbInfo.EdbCode, req.StartDate, endDate) if err != nil { errMsg = "获取指标信息失败 GetEdbDataFromWind,Err:" + err.Error() err = errors.New("获取指标信息失败!") return } // 忽略掉指标下架的错误, 并更新指标为停止更新 if dataItem.Status == -1 { edbInfo.NoUpdate = 1 edbInfo.ModifyTime = time.Now().Local() if e := edbInfo.Update([]string{"NoUpdate", "ModifyTime"}); e != nil { errMsg = "更新wind指标停更失败, Err: " + e.Error() err = errors.New("刷新失败") return } return } for _, v := range dataItem.IndexData { tmp := new(models.EdbDataFromWindSimple) dataTime, tErr := time.Parse(utils.FormatDate, v.DataTime) if tErr != nil { err = tErr } timestamp := dataTime.UnixNano() / 1e6 timeStr := fmt.Sprintf("%d", timestamp) saveVal := utils.SubFloatToString(v.Val, 20) tmp.TimestampStr = timeStr tmp.DateTime = v.DataTime tmp.Val = saveVal list = append(list, tmp) } return }