123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- package services
- import (
- "encoding/json"
- "errors"
- "eta/eta_index_lib/models"
- "eta/eta_index_lib/services/alarm_msg"
- "eta/eta_index_lib/utils"
- "fmt"
- "github.com/rdlucklib/rdluck_tools/http"
- "time"
- )
- const (
- WindNoAuthCode = -40522015
- )
- // GetEdbDataFromWind 获取wind数据
- func GetEdbDataFromWind(edbCode, startDate, endDate string) (item *models.EdbDataFromWind, errorCode int, err error) {
- windUrl, err := GetWindUrl(edbCode)
- if err != nil {
- errorCode = 421
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取wind服务器地址失败,err:%s", err.Error()), 3)
- return
- }
- thsUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
- thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate)
- utils.FileLog.Info(fmt.Sprintf("windUrl:%s", thsUrl))
- body, err := http.Get(thsUrl)
- fmt.Println("GetEdbDataByWind body:")
- fmt.Println(string(body))
- utils.FileLog.Info(fmt.Sprint("指标编码:", edbCode, ";wind result:", string(body)))
- if err != nil {
- return
- }
- item = new(models.EdbDataFromWind)
- err = json.Unmarshal(body, &item)
- //异常的话,需要邮件通知
- if len(item.ErrorCode) > 0 {
- if item.ErrorCode["0"] != 0 {
- if item.ErrorCode["0"] == -40522017 {
- //{
- //DT: {
- //0: 1654646400000
- //},
- //CLOSE: {
- //0: "CEDBService:: quota exceeded."
- //},
- //ErrorCode: {
- //0: -40522017
- //}
- //}
- // 设置服务器已超限
- SetIsLimitEdbCodeInWindUrl(windUrl)
- err = DeleteEdbCodeInWindUrl(edbCode)
- if err != nil {
- return
- }
- return GetEdbDataFromWind(edbCode, startDate, endDate)
- } else if item.ErrorCode["0"] == -40520005 {
- //.ErrorCode=-40520005
- //.Data=[No Python API Authority
- SetIsLimitEdbCodeInWindUrl(windUrl)
- err = DeleteEdbCodeInWindUrl(edbCode)
- if err != nil {
- return
- }
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
- return GetEdbDataFromWind(edbCode, startDate, endDate)
- } else if item.ErrorCode["0"] == WindNoAuthCode {
- // 指标下架, 无权限
- return nil, WindNoAuthCode, nil
- } else {
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
- }
- }
- }
- return
- }
- // GetEdbDataFromWindUrl 通过url获取wind数据
- func GetEdbDataFromWindUrl(windUrl, edbCode, startDate, endDate string) (item *models.EdbDataFromWind, errorCode int, err error) {
- if windUrl == `` {
- return GetEdbDataFromWind(edbCode, startDate, endDate)
- }
- requestWindUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
- requestWindUrl = fmt.Sprintf(requestWindUrl, edbCode, startDate, endDate)
- utils.FileLog.Info(fmt.Sprintf("windUrl:%s", requestWindUrl))
- body, err := http.Get(requestWindUrl)
- fmt.Println("GetEdbDataByWind body:")
- fmt.Println(string(body))
- utils.FileLog.Info(fmt.Sprint("wind result:", string(body)))
- if err != nil {
- return
- }
- item = new(models.EdbDataFromWind)
- err = json.Unmarshal(body, &item)
- //异常的话,需要邮件通知
- if len(item.ErrorCode) > 0 {
- if item.ErrorCode["0"] != 0 {
- if item.ErrorCode["0"] == -40522017 {
- //{
- //DT: {
- //0: 1654646400000
- //},
- //CLOSE: {
- //0: "CEDBService:: quota exceeded."
- //},
- //ErrorCode: {
- //0: -40522017
- //}
- //}
- // 设置服务器已超限
- errorCode = 421
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据接口超限,地址:%s", requestWindUrl), 2)
- //go utils.SendEmail("wind数据接口超限", "地址:"+requestWindUrl, utils.EmailSendToUsers)
- return
- } else if item.ErrorCode["0"] == -40520005 {
- //.ErrorCode=-40520005
- //.Data=[No Python API Authority
- err = errors.New("No Python API Authority")
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
- return
- } else if item.ErrorCode["0"] == WindNoAuthCode {
- // 指标下架, 无权限
- return nil, WindNoAuthCode, nil
- } else {
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
- }
- }
- }
- return
- }
- // GetWindUrl 获取wind的url
- func GetWindUrl(edbCode string) (windUrl string, err error) {
- defer func() {
- if err == nil && windUrl == "" {
- err = errors.New("获取wind服务器地址失败,指标超限了")
- }
- }()
- //从缓存中获取
- cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
- windUrl, _ = utils.Rc.RedisString(cacheKey)
- if windUrl != "" {
- return
- }
- //如果缓存中没有的话,那么从配置中获取
- for _, windUrlMap := range utils.Hz_Wind_Data_Url_LIST {
- //判断该url是否被占满了
- //count, tmpErr := GetCountEdbCodeInWindUrl(windUrlMap.Url)
- //if tmpErr != nil && tmpErr.Error() != "nil returned" {
- // err = tmpErr
- // return
- //}
- //if count < windUrlMap.Num {
- // windUrl = windUrlMap.Url
- // AddEdbCodeInWindUrl(windUrlMap.Url, edbCode)
- // return
- //}
- //如果超限了,那么进入下一循环
- isLimit, tmpErr := GetIsLimitEdbCodeInWindUrl(windUrlMap.Url)
- if isLimit {
- err = tmpErr
- continue
- }
- windUrl = windUrlMap.Url
- AddEdbCodeInWindUrl(windUrlMap.Url, edbCode)
- return
- }
- return
- }
- // GetCountEdbCodeInWindUrl 从缓存key中获取已经插入入的指标数
- func GetCountEdbCodeInWindUrl(windUrl string) (num int, err error) {
- cacheKey := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
- num, err = utils.Rc.RedisInt(cacheKey)
- if err != nil && err.Error() == "redigo: nil returned" {
- err = nil
- }
- return
- }
- // GetIsLimitEdbCodeInWindUrl 从缓存key中获取是否超限
- func GetIsLimitEdbCodeInWindUrl(windUrl string) (isLimit bool, err error) {
- cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
- num, err := utils.Rc.RedisInt(cacheKey)
- if err != nil && err.Error() == "redigo: nil returned" {
- err = nil
- }
- if num > 0 {
- isLimit = true
- }
- return
- }
- // SetIsLimitEdbCodeInWindUrl 设置服务器已超限
- func SetIsLimitEdbCodeInWindUrl(windUrl string) {
- cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
- _ = utils.Rc.SetNX(cacheKey, 1, utils.GetTodayLastSecond())
- return
- }
- // AddEdbCodeInWindUrl 将指标插入到缓存key中
- // @return isInsert bool 是否插入数据,true时为插入数据,false表示数据已存在
- func AddEdbCodeInWindUrl(windUrl, edbCode string) (isInsert bool) {
- cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
- isInsert = utils.Rc.SetNX(cacheKey, windUrl, utils.GetTodayLastSecond())
- cacheKey2 := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
- utils.Rc.Incrby(cacheKey2, 1)
- return
- }
- // DeleteEdbCodeInWindUrl 删除指标编码 服务器归属 缓存
- func DeleteEdbCodeInWindUrl(edbCode string) (err error) {
- cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
- err = utils.Rc.Delete(cacheKey)
- return
- }
- func GetEdbDataFromBridge(windUrl string, req models.AddEdbInfoReq, endDate string) (list []*models.EdbDataFromWindSimple, err error, errMsg string) {
- dataItem, err := GetEdbDataFromBridgeJy(windUrl, req.EdbCode, utils.BASE_START_DATE, endDate)
- if err != nil {
- errMsg = "桥接服务里获取指标信息失败 GetEdbDataFromBridgeJy,Err:" + err.Error()
- err = errors.New("获取指标信息失败!")
- return
- }
- for _, v := range dataItem.IndexData {
- tmp := new(models.EdbDataFromWindSimple)
- dataTime, tErr := time.Parse(utils.FormatDate, v.DataTime)
- if tErr != nil {
- err = tErr
- return
- }
- timestamp := dataTime.UnixNano() / 1e6
- timeStr := fmt.Sprintf("%d", timestamp)
- saveVal := utils.SubFloatToString(v.Val, 30)
- tmp.TimestampStr = timeStr
- tmp.DateTime = v.DataTime
- tmp.Val = saveVal
- list = append(list, tmp)
- }
- return
- }
- // RefreshEdbDataFromWindBridge 桥接服务刷新wind指标数据
- func RefreshEdbDataFromWindBridge(windUrl string, req models.RefreshEdbInfoReq, edbInfo *models.EdbInfo, endDate string) (list []*models.EdbDataFromWindSimple, err error, errMsg string) {
- dataItem, err := GetEdbDataFromBridgeJy(windUrl, edbInfo.EdbCode, req.StartDate, endDate)
- if err != nil {
- errMsg = "获取指标信息失败 GetEdbDataFromWind,Err:" + err.Error()
- err = errors.New("获取指标信息失败!")
- return
- }
- // 忽略掉指标下架的错误, 并更新指标为停止更新
- if dataItem.Status == -1 {
- edbInfo.NoUpdate = 1
- edbInfo.ModifyTime = time.Now().Local()
- if e := edbInfo.Update([]string{"NoUpdate", "ModifyTime"}); e != nil {
- errMsg = "更新wind指标停更失败, Err: " + e.Error()
- err = errors.New("刷新失败")
- return
- }
- return
- }
- for _, v := range dataItem.IndexData {
- tmp := new(models.EdbDataFromWindSimple)
- dataTime, tErr := time.Parse(utils.FormatDate, v.DataTime)
- if tErr != nil {
- err = tErr
- }
- timestamp := dataTime.UnixNano() / 1e6
- timeStr := fmt.Sprintf("%d", timestamp)
- saveVal := utils.SubFloatToString(v.Val, 20)
- tmp.TimestampStr = timeStr
- tmp.DateTime = v.DataTime
- tmp.Val = saveVal
- list = append(list, tmp)
- }
- return
- }
|