base_from_wind.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package services
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/rdlucklib/rdluck_tools/http"
  7. "hongze/hongze_edb_lib/services/alarm_msg"
  8. "hongze/hongze_edb_lib/utils"
  9. "time"
  10. )
  11. type EdbDataFromWind struct {
  12. Close map[string]float64 `json:"CLOSE"`
  13. Dt map[string]int64 `json:"DT"`
  14. ErrorCode map[string]int64 `json:"ErrorCode"`
  15. ErrMsg string
  16. }
  17. // GetEdbDataFromWind 获取wind数据
  18. func GetEdbDataFromWind(edbCode, startDate, endDate string) (item *EdbDataFromWind, errorCode int, err error) {
  19. windUrl, err := GetWindUrl(edbCode)
  20. if err != nil {
  21. errorCode = 421
  22. go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取wind服务器地址失败,err:%s", err.Error()), 3)
  23. return
  24. }
  25. thsUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
  26. thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate)
  27. utils.FileLog.Info(fmt.Sprintf("windUrl:%s", thsUrl))
  28. body, err := http.Get(thsUrl)
  29. fmt.Println("GetEdbDataByWind body:")
  30. fmt.Println(string(body))
  31. utils.FileLog.Info(fmt.Sprint("指标编码:", edbCode, ";wind result:", string(body)))
  32. if err != nil {
  33. return
  34. }
  35. item = new(EdbDataFromWind)
  36. err = json.Unmarshal(body, &item)
  37. //异常的话,需要邮件通知
  38. if len(item.ErrorCode) > 0 {
  39. if item.ErrorCode["0"] != 0 {
  40. if item.ErrorCode["0"] == -40522017 {
  41. //{
  42. //DT: {
  43. //0: 1654646400000
  44. //},
  45. //CLOSE: {
  46. //0: "CEDBService:: quota exceeded."
  47. //},
  48. //ErrorCode: {
  49. //0: -40522017
  50. //}
  51. //}
  52. // 设置服务器已超限
  53. SetIsLimitEdbCodeInWindUrl(windUrl)
  54. err = DeleteEdbCodeInWindUrl(edbCode)
  55. if err != nil {
  56. return
  57. }
  58. return GetEdbDataFromWind(edbCode, startDate, endDate)
  59. } else if item.ErrorCode["0"] == -40520005 {
  60. //.ErrorCode=-40520005
  61. //.Data=[No Python API Authority
  62. SetIsLimitEdbCodeInWindUrl(windUrl)
  63. err = DeleteEdbCodeInWindUrl(edbCode)
  64. if err != nil {
  65. return
  66. }
  67. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  68. return GetEdbDataFromWind(edbCode, startDate, endDate)
  69. } else {
  70. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  71. }
  72. }
  73. }
  74. return
  75. }
  76. // GetEdbDataFromWindUrl 通过url获取wind数据
  77. func GetEdbDataFromWindUrl(windUrl, edbCode, startDate, endDate string) (item *EdbDataFromWind, errorCode int, err error) {
  78. if windUrl == `` {
  79. return GetEdbDataFromWind(edbCode, startDate, endDate)
  80. }
  81. requestWindUrl := windUrl + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
  82. requestWindUrl = fmt.Sprintf(requestWindUrl, edbCode, startDate, endDate)
  83. utils.FileLog.Info(fmt.Sprintf("windUrl:%s", requestWindUrl))
  84. body, err := http.Get(requestWindUrl)
  85. fmt.Println("GetEdbDataByWind body:")
  86. fmt.Println(string(body))
  87. utils.FileLog.Info(fmt.Sprint("wind result:", string(body)))
  88. if err != nil {
  89. return
  90. }
  91. item = new(EdbDataFromWind)
  92. err = json.Unmarshal(body, &item)
  93. //异常的话,需要邮件通知
  94. if len(item.ErrorCode) > 0 {
  95. if item.ErrorCode["0"] != 0 {
  96. if item.ErrorCode["0"] == -40522017 {
  97. //{
  98. //DT: {
  99. //0: 1654646400000
  100. //},
  101. //CLOSE: {
  102. //0: "CEDBService:: quota exceeded."
  103. //},
  104. //ErrorCode: {
  105. //0: -40522017
  106. //}
  107. //}
  108. // 设置服务器已超限
  109. errorCode = 421
  110. go utils.SendEmail("wind数据接口超限", "地址:"+requestWindUrl, utils.EmailSendToUsers)
  111. return
  112. } else if item.ErrorCode["0"] == -40520005 {
  113. //.ErrorCode=-40520005
  114. //.Data=[No Python API Authority
  115. err = errors.New("No Python API Authority")
  116. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  117. return
  118. } else {
  119. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  120. }
  121. }
  122. }
  123. return
  124. }
  125. // FutureGoodDataFromWind wind期货数据
  126. type FutureGoodDataFromWind struct {
  127. ErrorCode map[string]int64 `json:"ErrorCode"`
  128. Dt map[string]int64 `json:"DT"`
  129. TradeCode map[string]string `json:"TRADE_CODE"`
  130. Close map[string]float64 `json:"CLOSE"`
  131. Open map[string]float64 `json:"OPEN"`
  132. High map[string]float64 `json:"HIGH"`
  133. Low map[string]float64 `json:"LOW"`
  134. Volume map[string]float64 `json:"VOLUME"`
  135. Amt map[string]float64 `json:"AMT"`
  136. Oi map[string]float64 `json:"OI"`
  137. Settle map[string]float64 `json:"SETTLE"`
  138. ErrMsg string
  139. }
  140. // GetFutureGoodDataFromWindUrl 通过url获取wind的商品数据
  141. func GetFutureGoodDataFromWindUrl(windUrl, edbCode, startDate, endDate string) (item *FutureGoodDataFromWind, errorCode int, err error) {
  142. requestWindUrl := windUrl + `edbInfo/wind/future_good?FutureGoodEdbCode=%s&StartDate=%s&EndDate=%s`
  143. requestWindUrl = fmt.Sprintf(requestWindUrl, edbCode, startDate, endDate)
  144. utils.FileLog.Info(fmt.Sprintf("windUrl:%s", requestWindUrl))
  145. body, err := http.Get(requestWindUrl)
  146. fmt.Println("GetFutureGoodDataFromWindUrl body:")
  147. fmt.Println(string(body))
  148. utils.FileLog.Info(fmt.Sprint("wind result:", string(body)))
  149. if err != nil {
  150. return
  151. }
  152. item = new(FutureGoodDataFromWind)
  153. err = json.Unmarshal(body, &item)
  154. //异常的话,需要邮件通知
  155. if len(item.ErrorCode) > 0 {
  156. if item.ErrorCode["0"] != 0 {
  157. if item.ErrorCode["0"] == -40522017 {
  158. //{
  159. //DT: {
  160. //0: 1654646400000
  161. //},
  162. //CLOSE: {
  163. //0: "CEDBService:: quota exceeded."
  164. //},
  165. //ErrorCode: {
  166. //0: -40522017
  167. //}
  168. //}
  169. // 设置服务器已超限
  170. errorCode = 421
  171. go utils.SendEmail("wind数据接口超限", "地址:"+requestWindUrl, utils.EmailSendToUsers)
  172. return
  173. } else if item.ErrorCode["0"] == -40520005 {
  174. //.ErrorCode=-40520005
  175. //.Data=[No Python API Authority
  176. err = errors.New("No Python API Authority")
  177. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind商品数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  178. return
  179. } else {
  180. go alarm_msg.SendAlarmMsg(fmt.Sprintf("wind商品数据服务异常,edbCode:%s,ErrorCode:%d,result:%s", edbCode, item.ErrorCode["0"], string(body)), 3)
  181. }
  182. }
  183. }
  184. return
  185. }
  186. // GetWindUrl 获取wind的url
  187. func GetWindUrl(edbCode string) (windUrl string, err error) {
  188. defer func() {
  189. if err == nil && windUrl == "" {
  190. err = errors.New("获取wind服务器地址失败,指标超限了")
  191. }
  192. }()
  193. //从缓存中获取
  194. cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
  195. windUrl, _ = utils.Rc.RedisString(cacheKey)
  196. if windUrl != "" {
  197. return
  198. }
  199. //如果缓存中没有的话,那么从配置中获取
  200. for _, windUrlMap := range utils.Hz_Wind_Data_Url_LIST {
  201. //判断该url是否被占满了
  202. //count, tmpErr := GetCountEdbCodeInWindUrl(windUrlMap.Url)
  203. //if tmpErr != nil && tmpErr.Error() != "nil returned" {
  204. // err = tmpErr
  205. // return
  206. //}
  207. //if count < windUrlMap.Num {
  208. // windUrl = windUrlMap.Url
  209. // AddEdbCodeInWindUrl(windUrlMap.Url, edbCode)
  210. // return
  211. //}
  212. //如果超限了,那么进入下一循环
  213. isLimit, tmpErr := GetIsLimitEdbCodeInWindUrl(windUrlMap.Url)
  214. if isLimit {
  215. err = tmpErr
  216. continue
  217. }
  218. windUrl = windUrlMap.Url
  219. AddEdbCodeInWindUrl(windUrlMap.Url, edbCode)
  220. return
  221. }
  222. return
  223. }
  224. // GetCountEdbCodeInWindUrl 从缓存key中获取已经插入入的指标数
  225. func GetCountEdbCodeInWindUrl(windUrl string) (num int, err error) {
  226. cacheKey := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  227. num, err = utils.Rc.RedisInt(cacheKey)
  228. if err != nil && err.Error() == "redigo: nil returned" {
  229. err = nil
  230. }
  231. return
  232. }
  233. // GetIsLimitEdbCodeInWindUrl 从缓存key中获取是否超限
  234. func GetIsLimitEdbCodeInWindUrl(windUrl string) (isLimit bool, err error) {
  235. cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  236. num, err := utils.Rc.RedisInt(cacheKey)
  237. if err != nil && err.Error() == "redigo: nil returned" {
  238. err = nil
  239. }
  240. if num > 0 {
  241. isLimit = true
  242. }
  243. return
  244. }
  245. // SetIsLimitEdbCodeInWindUrl 设置服务器已超限
  246. func SetIsLimitEdbCodeInWindUrl(windUrl string) {
  247. cacheKey := utils.CACHE_WIND_URL + ":limit:" + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  248. _ = utils.Rc.SetNX(cacheKey, 1, utils.GetTodayLastSecond())
  249. return
  250. }
  251. // AddEdbCodeInWindUrl 将指标插入到缓存key中
  252. // @return isInsert bool 是否插入数据,true时为插入数据,false表示数据已存在
  253. func AddEdbCodeInWindUrl(windUrl, edbCode string) (isInsert bool) {
  254. cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
  255. isInsert = utils.Rc.SetNX(cacheKey, windUrl, utils.GetTodayLastSecond())
  256. cacheKey2 := utils.CACHE_WIND_URL + time.Now().Format(utils.FormatDateUnSpace) + ":" + utils.MD5(windUrl)
  257. utils.Rc.Incrby(cacheKey2, 1)
  258. return
  259. }
  260. // DeleteEdbCodeInWindUrl 删除指标编码 服务器归属 缓存
  261. func DeleteEdbCodeInWindUrl(edbCode string) (err error) {
  262. cacheKey := utils.CACHE_WIND_URL + ":" + edbCode
  263. err = utils.Rc.Delete(cacheKey)
  264. return
  265. }