base_from_wind.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package services
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/services/alarm_msg"
  7. "eta/eta_index_lib/utils"
  8. "fmt"
  9. "github.com/rdlucklib/rdluck_tools/http"
  10. "time"
  11. )
  12. const (
  13. WindNoAuthCode = -40522015
  14. )
  15. // GetEdbDataFromWind 获取wind数据
  16. func GetEdbDataFromWind(edbCode, startDate, endDate string) (item *models.EdbDataFromWind, errorCode int, err error) {
  17. windUrl, err := GetWindUrl(edbCode)
  18. if err != nil {
  19. errorCode = 421
  20. go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取wind服务器地址失败,err:%s", err.Error()), 3)
  21. return
  22. }
  23. thsUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
  24. thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate)
  25. utils.FileLog.Info(fmt.Sprintf("windUrl:%s", thsUrl))
  26. body, err := http.Get(thsUrl)
  27. fmt.Println("GetEdbDataByWind body:")
  28. fmt.Println(string(body))
  29. utils.FileLog.Info(fmt.Sprint("指标编码:", edbCode, ";wind result:", string(body)))
  30. if err != nil {
  31. return
  32. }
  33. item = new(models.EdbDataFromWind)
  34. err = json.Unmarshal(body, &item)
  35. //异常的话,需要邮件通知
  36. if len(item.ErrorCode) > 0 {
  37. if item.ErrorCode["0"] != 0 {
  38. if item.ErrorCode["0"] == -40522017 {
  39. //{
  40. //DT: {
  41. //0: 1654646400000
  42. //},
  43. //CLOSE: {
  44. //0: "CEDBService:: quota exceeded."
  45. //},
  46. //ErrorCode: {
  47. //0: -40522017
  48. //}
  49. //}
  50. // 设置服务器已超限
  51. SetIsLimitEdbCodeInWindUrl(windUrl)
  52. err = DeleteEdbCodeInWindUrl(edbCode)
  53. if err != nil {
  54. return
  55. }
  56. return GetEdbDataFromWind(edbCode, startDate, endDate)
  57. } else if item.ErrorCode["0"] == -40520005 {
  58. //.ErrorCode=-40520005
  59. //.Data=[No Python API Authority
  60. SetIsLimitEdbCodeInWindUrl(windUrl)
  61. err = DeleteEdbCodeInWindUrl(edbCode)
  62. if err != nil {
  63. return
  64. }
  65. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  66. return GetEdbDataFromWind(edbCode, startDate, endDate)
  67. } else if item.ErrorCode["0"] == WindNoAuthCode {
  68. // 指标下架, 无权限
  69. return nil, WindNoAuthCode, nil
  70. } else {
  71. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  72. }
  73. }
  74. }
  75. return
  76. }
  77. // GetEdbDataFromWindUrl 通过url获取wind数据
  78. func GetEdbDataFromWindUrl(windUrl, edbCode, startDate, endDate string) (item *models.EdbDataFromWind, errorCode int, err error) {
  79. if windUrl == `` {
  80. return GetEdbDataFromWind(edbCode, startDate, endDate)
  81. }
  82. requestWindUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
  83. requestWindUrl = fmt.Sprintf(requestWindUrl, edbCode, startDate, endDate)
  84. utils.FileLog.Info(fmt.Sprintf("windUrl:%s", requestWindUrl))
  85. body, err := http.Get(requestWindUrl)
  86. fmt.Println("GetEdbDataByWind body:")
  87. fmt.Println(string(body))
  88. utils.FileLog.Info(fmt.Sprint("wind result:", string(body)))
  89. if err != nil {
  90. return
  91. }
  92. item = new(models.EdbDataFromWind)
  93. err = json.Unmarshal(body, &item)
  94. //异常的话,需要邮件通知
  95. if len(item.ErrorCode) > 0 {
  96. if item.ErrorCode["0"] != 0 {
  97. if item.ErrorCode["0"] == -40522017 {
  98. //{
  99. //DT: {
  100. //0: 1654646400000
  101. //},
  102. //CLOSE: {
  103. //0: "CEDBService:: quota exceeded."
  104. //},
  105. //ErrorCode: {
  106. //0: -40522017
  107. //}
  108. //}
  109. // 设置服务器已超限
  110. errorCode = 421
  111. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据接口超限,地址:%s", requestWindUrl), 2)
  112. //go utils.SendEmail("wind数据接口超限", "地址:"+requestWindUrl, utils.EmailSendToUsers)
  113. return
  114. } else if item.ErrorCode["0"] == -40520005 {
  115. //.ErrorCode=-40520005
  116. //.Data=[No Python API Authority
  117. err = errors.New("No Python API Authority")
  118. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  119. return
  120. } else if item.ErrorCode["0"] == WindNoAuthCode {
  121. // 指标下架, 无权限
  122. return nil, WindNoAuthCode, nil
  123. } else {
  124. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  125. }
  126. }
  127. }
  128. return
  129. }
  130. // GetWindUrl 获取wind的url
  131. func GetWindUrl(edbCode string) (windUrl string, err error) {
  132. defer func() {
  133. if err == nil && windUrl == "" {
  134. err = errors.New("获取wind服务器地址失败,指标超限了")
  135. }
  136. }()
  137. //从缓存中获取
  138. cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
  139. windUrl, _ = utils.Rc.RedisString(cacheKey)
  140. if windUrl != "" {
  141. return
  142. }
  143. //如果缓存中没有的话,那么从配置中获取
  144. for _, windUrlMap := range utils.Hz_Wind_Data_Url_LIST {
  145. //判断该url是否被占满了
  146. //count, tmpErr := GetCountEdbCodeInWindUrl(windUrlMap.Url)
  147. //if tmpErr != nil && tmpErr.Error() != "nil returned" {
  148. // err = tmpErr
  149. // return
  150. //}
  151. //if count < windUrlMap.Num {
  152. // windUrl = windUrlMap.Url
  153. // AddEdbCodeInWindUrl(windUrlMap.Url, edbCode)
  154. // return
  155. //}
  156. //如果超限了,那么进入下一循环
  157. isLimit, tmpErr := GetIsLimitEdbCodeInWindUrl(windUrlMap.Url)
  158. if isLimit {
  159. err = tmpErr
  160. continue
  161. }
  162. windUrl = windUrlMap.Url
  163. AddEdbCodeInWindUrl(windUrlMap.Url, edbCode)
  164. return
  165. }
  166. return
  167. }
  168. // GetCountEdbCodeInWindUrl 从缓存key中获取已经插入入的指标数
  169. func GetCountEdbCodeInWindUrl(windUrl string) (num int, err error) {
  170. cacheKey := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  171. num, err = utils.Rc.RedisInt(cacheKey)
  172. if err != nil && err.Error() == "redigo: nil returned" {
  173. err = nil
  174. }
  175. return
  176. }
  177. // GetIsLimitEdbCodeInWindUrl 从缓存key中获取是否超限
  178. func GetIsLimitEdbCodeInWindUrl(windUrl string) (isLimit bool, err error) {
  179. cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  180. num, err := utils.Rc.RedisInt(cacheKey)
  181. if err != nil && err.Error() == "redigo: nil returned" {
  182. err = nil
  183. }
  184. if num > 0 {
  185. isLimit = true
  186. }
  187. return
  188. }
  189. // SetIsLimitEdbCodeInWindUrl 设置服务器已超限
  190. func SetIsLimitEdbCodeInWindUrl(windUrl string) {
  191. cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  192. _ = utils.Rc.SetNX(cacheKey, 1, utils.GetTodayLastSecond())
  193. return
  194. }
  195. // AddEdbCodeInWindUrl 将指标插入到缓存key中
  196. // @return isInsert bool 是否插入数据,true时为插入数据,false表示数据已存在
  197. func AddEdbCodeInWindUrl(windUrl, edbCode string) (isInsert bool) {
  198. cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
  199. isInsert = utils.Rc.SetNX(cacheKey, windUrl, utils.GetTodayLastSecond())
  200. cacheKey2 := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  201. utils.Rc.Incrby(cacheKey2, 1)
  202. return
  203. }
  204. // DeleteEdbCodeInWindUrl 删除指标编码 服务器归属 缓存
  205. func DeleteEdbCodeInWindUrl(edbCode string) (err error) {
  206. cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
  207. err = utils.Rc.Delete(cacheKey)
  208. return
  209. }