base_from_pcsg.go 11 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/services/alarm_msg"
  7. "eta/eta_index_lib/utils"
  8. "fmt"
  9. "io/ioutil"
  10. "net/http"
  11. "strings"
  12. "time"
  13. )
  14. var (
  15. BridgeApiPCSGBloombergDailyUrl = "/api/pcsg/bloomberg/daily_index" // 日度指标API
  16. BridgeApiPCSGBloombergWeeklyUrl = "/api/pcsg/bloomberg/weekly_index" // 周度指标API
  17. BridgeApiPCSGBloombergMonthlyUrl = "/api/pcsg/bloomberg/monthly_index" // 月度指标API
  18. BridgeApiPCSGBloombergDailyRun3Url = "/api/pcsg/bloomberg/daily_index_run3" // 月度指标API
  19. )
  20. // GetPCSGBloombergDailyFromBridge 获取彭博日度指标
  21. func GetPCSGBloombergDailyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
  22. defer func() {
  23. if err != nil {
  24. tips := fmt.Sprintf("GetPCSGBloombergDailyFromBridge-获取彭博日度指标失败, err: %s", err.Error())
  25. utils.FileLog.Info(tips)
  26. go alarm_msg.SendAlarmMsg(tips, 3)
  27. }
  28. }()
  29. url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergDailyUrl)
  30. body := ioutil.NopCloser(strings.NewReader(""))
  31. client := &http.Client{}
  32. req, e := http.NewRequest("POST", url, body)
  33. if e != nil {
  34. err = fmt.Errorf("http create request err: %s", e.Error())
  35. return
  36. }
  37. checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
  38. contentType := "application/json;charset=utf-8"
  39. req.Header.Set("Content-Type", contentType)
  40. req.Header.Set("Authorization", checkToken)
  41. resp, e := client.Do(req)
  42. if e != nil {
  43. err = fmt.Errorf("http client do err: %s", e.Error())
  44. return
  45. }
  46. defer func() {
  47. _ = resp.Body.Close()
  48. }()
  49. b, e := ioutil.ReadAll(resp.Body)
  50. if e != nil {
  51. err = fmt.Errorf("resp body read err: %s", e.Error())
  52. return
  53. }
  54. if len(b) == 0 {
  55. err = fmt.Errorf("resp body is empty")
  56. return
  57. }
  58. // 生产环境解密
  59. if utils.RunMode == "release" {
  60. str := string(b)
  61. str = strings.Trim(str, `"`)
  62. b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
  63. }
  64. result := new(models.BridgePCSGBloombergResultData)
  65. if e = json.Unmarshal(b, &result); e != nil {
  66. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
  67. return
  68. }
  69. if result.Code != 200 {
  70. err = fmt.Errorf("result: %s", string(b))
  71. return
  72. }
  73. indexes = result.Data
  74. return
  75. }
  76. // GetPCSGBloombergWeeklyFromBridge 获取彭博周度指标
  77. func GetPCSGBloombergWeeklyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
  78. defer func() {
  79. if err != nil {
  80. tips := fmt.Sprintf("GetPCSGBloombergWeeklyFromBridge-获取彭博周度指标失败, err: %s", err.Error())
  81. utils.FileLog.Info(tips)
  82. go alarm_msg.SendAlarmMsg(tips, 3)
  83. }
  84. }()
  85. url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergWeeklyUrl)
  86. body := ioutil.NopCloser(strings.NewReader(""))
  87. client := &http.Client{}
  88. req, e := http.NewRequest("POST", url, body)
  89. if e != nil {
  90. err = fmt.Errorf("http create request err: %s", e.Error())
  91. return
  92. }
  93. checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
  94. contentType := "application/json;charset=utf-8"
  95. req.Header.Set("Content-Type", contentType)
  96. req.Header.Set("Authorization", checkToken)
  97. resp, e := client.Do(req)
  98. if e != nil {
  99. err = fmt.Errorf("http client do err: %s", e.Error())
  100. return
  101. }
  102. defer func() {
  103. _ = resp.Body.Close()
  104. }()
  105. b, e := ioutil.ReadAll(resp.Body)
  106. if e != nil {
  107. err = fmt.Errorf("resp body read err: %s", e.Error())
  108. return
  109. }
  110. if len(b) == 0 {
  111. err = fmt.Errorf("resp body is empty")
  112. return
  113. }
  114. // 生产环境解密
  115. if utils.RunMode == "release" {
  116. str := string(b)
  117. str = strings.Trim(str, `"`)
  118. b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
  119. }
  120. result := new(models.BridgePCSGBloombergResultData)
  121. if e = json.Unmarshal(b, &result); e != nil {
  122. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
  123. return
  124. }
  125. if result.Code != 200 {
  126. err = fmt.Errorf("result: %s", string(b))
  127. return
  128. }
  129. indexes = result.Data
  130. return
  131. }
  132. // GetPCSGBloombergMonthlyFromBridge 获取彭博月度指标
  133. func GetPCSGBloombergMonthlyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
  134. defer func() {
  135. if err != nil {
  136. tips := fmt.Sprintf("GetPCSGBloombergMonthlyFromBridge-获取彭博月度指标失败, err: %s", err.Error())
  137. utils.FileLog.Info(tips)
  138. go alarm_msg.SendAlarmMsg(tips, 3)
  139. }
  140. }()
  141. url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergMonthlyUrl)
  142. body := ioutil.NopCloser(strings.NewReader(""))
  143. client := &http.Client{}
  144. req, e := http.NewRequest("POST", url, body)
  145. if e != nil {
  146. err = fmt.Errorf("http create request err: %s", e.Error())
  147. return
  148. }
  149. checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
  150. contentType := "application/json;charset=utf-8"
  151. req.Header.Set("Content-Type", contentType)
  152. req.Header.Set("Authorization", checkToken)
  153. resp, e := client.Do(req)
  154. if e != nil {
  155. err = fmt.Errorf("http client do err: %s", e.Error())
  156. return
  157. }
  158. defer func() {
  159. _ = resp.Body.Close()
  160. }()
  161. b, e := ioutil.ReadAll(resp.Body)
  162. if e != nil {
  163. err = fmt.Errorf("resp body read err: %s", e.Error())
  164. return
  165. }
  166. if len(b) == 0 {
  167. err = fmt.Errorf("resp body is empty")
  168. return
  169. }
  170. // 生产环境解密
  171. if utils.RunMode == "release" {
  172. str := string(b)
  173. str = strings.Trim(str, `"`)
  174. b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
  175. }
  176. result := new(models.BridgePCSGBloombergResultData)
  177. if e = json.Unmarshal(b, &result); e != nil {
  178. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
  179. return
  180. }
  181. if result.Code != 200 {
  182. err = fmt.Errorf("result: %s", string(b))
  183. return
  184. }
  185. indexes = result.Data
  186. return
  187. }
  188. // GetPCSGBloombergDailyFromBridgeRun3 获取彭博日度指标
  189. func GetPCSGBloombergDailyFromBridgeRun3() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
  190. defer func() {
  191. if err != nil {
  192. tips := fmt.Sprintf("GetPCSGBloombergDailyFromBridgeRun3-获取彭博日度指标失败, err: %s", err.Error())
  193. utils.FileLog.Info(tips)
  194. go alarm_msg.SendAlarmMsg(tips, 3)
  195. }
  196. }()
  197. url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergDailyRun3Url)
  198. body := ioutil.NopCloser(strings.NewReader(""))
  199. client := &http.Client{}
  200. req, e := http.NewRequest("POST", url, body)
  201. if e != nil {
  202. err = fmt.Errorf("http create request err: %s", e.Error())
  203. return
  204. }
  205. checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
  206. contentType := "application/json;charset=utf-8"
  207. req.Header.Set("Content-Type", contentType)
  208. req.Header.Set("Authorization", checkToken)
  209. resp, e := client.Do(req)
  210. if e != nil {
  211. err = fmt.Errorf("http client do err: %s", e.Error())
  212. return
  213. }
  214. defer func() {
  215. _ = resp.Body.Close()
  216. }()
  217. b, e := ioutil.ReadAll(resp.Body)
  218. if e != nil {
  219. err = fmt.Errorf("resp body read err: %s", e.Error())
  220. return
  221. }
  222. if len(b) == 0 {
  223. err = fmt.Errorf("resp body is empty")
  224. return
  225. }
  226. // 生产环境解密
  227. if utils.RunMode == "release" {
  228. str := string(b)
  229. str = strings.Trim(str, `"`)
  230. b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
  231. }
  232. result := new(models.BridgePCSGBloombergResultData)
  233. if e = json.Unmarshal(b, &result); e != nil {
  234. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
  235. return
  236. }
  237. if result.Code != 200 {
  238. err = fmt.Errorf("result: %s", string(b))
  239. return
  240. }
  241. indexes = result.Data
  242. return
  243. }
  244. // PCSGWrite2BaseBloomberg 写入彭博数据源
  245. func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool) (err error) {
  246. defer func() {
  247. if err != nil {
  248. tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error())
  249. utils.FileLog.Info(tips)
  250. go alarm_msg.SendAlarmMsg(tips, 3)
  251. }
  252. }()
  253. for _, v := range indexes {
  254. if v.IndexCode == "" {
  255. continue
  256. }
  257. // 无数据的情况不处理
  258. if len(v.Data) == 0 {
  259. continue
  260. }
  261. if isVCode {
  262. v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, "V")
  263. }
  264. // 指标是否存在
  265. index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode)
  266. if e != nil && e.Error() != utils.ErrNoRow() {
  267. err = fmt.Errorf("获取Bloomberg原始指标失败, err: %s", e.Error())
  268. return
  269. }
  270. // 新增指标
  271. if index == nil {
  272. newIndex := new(models.BaseFromBloombergIndex)
  273. newIndex.IndexCode = v.IndexCode
  274. newIndex.IndexName = v.IndexName
  275. newIndex.Unit = v.Unit
  276. newIndex.Source = utils.DATA_SOURCE_BLOOMBERG
  277. newIndex.Frequency = v.Frequency
  278. newIndex.CreateTime = time.Now().Local()
  279. newIndex.ModifyTime = time.Now().Local()
  280. if e = newIndex.Create(); e != nil {
  281. err = fmt.Errorf("新增Bloomberg原始指标失败, err: %s", e.Error())
  282. return
  283. }
  284. index = newIndex
  285. } else {
  286. // 无指标名称的情况下更新指标基础信息
  287. if index.IndexName == "" {
  288. index.IndexName = v.IndexName
  289. index.Unit = v.Unit
  290. index.Frequency = v.Frequency
  291. index.ModifyTime = time.Now().Local()
  292. if e = index.Update([]string{"IndexName", "Unit", "Frequency", "ModifyTime"}); e != nil {
  293. err = fmt.Errorf("更新Bloomberg原始指标失败, err: %s", e.Error())
  294. return
  295. }
  296. }
  297. }
  298. // 更新指标数据
  299. var cond string
  300. var pars []interface{}
  301. cond += ` AND index_code = ? `
  302. pars = append(pars, v.IndexCode)
  303. indexData, e := models.GetBaseFromBloombergDataByCondition(cond, pars)
  304. if e != nil {
  305. err = fmt.Errorf("获取Bloomberg历史数据失败, err: %s", e.Error())
  306. return
  307. }
  308. dateExist := make(map[string]*models.BaseFromBloombergData)
  309. newValExist := make(map[string]bool)
  310. if len(indexData) > 0 {
  311. for _, d := range indexData {
  312. strDate := d.DataTime.Format(utils.FormatDate)
  313. dateExist[strDate] = d
  314. }
  315. }
  316. // 筛选新增/更新数据
  317. updateData := make([]*models.BaseFromBloombergData, 0)
  318. insertData := make([]*models.BaseFromBloombergData, 0)
  319. for _, d := range v.Data {
  320. strDate := d.DataTime.Format(utils.FormatDate)
  321. originData := dateExist[strDate]
  322. if originData != nil {
  323. if utils.FloatAlmostEqual(originData.Value, d.Value) {
  324. continue
  325. }
  326. originData.Value = d.Value
  327. originData.ModifyTime = time.Now().Local()
  328. updateData = append(updateData, originData)
  329. } else {
  330. // 新增的数据去重
  331. if newValExist[strDate] {
  332. continue
  333. }
  334. newValExist[strDate] = true
  335. newData := new(models.BaseFromBloombergData)
  336. newData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  337. newData.IndexCode = index.IndexCode
  338. newData.DataTime = d.DataTime
  339. newData.Value = d.Value
  340. newData.CreateTime = time.Now()
  341. newData.ModifyTime = time.Now()
  342. timestamp := d.DataTime.UnixNano() / 1e6
  343. newData.DataTimestamp = int(timestamp)
  344. insertData = append(insertData, newData)
  345. }
  346. }
  347. if e = models.MultiInsertOrUpdateBaseFromBloombergData(insertData, updateData); e != nil {
  348. err = fmt.Errorf("新增/更新Bloomberg指标数据失败, err: %s", e.Error())
  349. return
  350. }
  351. // 更新指标开始结束时间
  352. minMax, e := models.GetBaseFromBloombergIndexMinMax(index.IndexCode)
  353. if e == nil && minMax != nil {
  354. e = models.ModifyBaseFromBloombergIndexMinMax(index.IndexCode, minMax)
  355. if e != nil {
  356. err = fmt.Errorf("更新Bloomberg开始结束时间失败, err: %s", e.Error())
  357. return
  358. }
  359. }
  360. // 同步刷新指标库
  361. go func() {
  362. edb, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BLOOMBERG, index.IndexCode)
  363. if e != nil && e.Error() != utils.ErrNoRow() {
  364. utils.FileLog.Info("获取Bloomberg指标库信息失败, err: " + e.Error())
  365. return
  366. }
  367. if edb != nil {
  368. logic.RefreshBaseEdbInfo(edb, ``)
  369. }
  370. }()
  371. }
  372. return
  373. }