base_from_pcsg.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/services/alarm_msg"
  7. "eta/eta_index_lib/utils"
  8. "fmt"
  9. "io/ioutil"
  10. "net/http"
  11. "strings"
  12. "time"
  13. )
  14. var (
  15. PCSGBloombergGeneralIndexDataUrl = "/api/pcsg/bloomberg/index_data/general" // 通用指标API
  16. )
  17. type PCSGBloombergApiReq struct {
  18. TaskKey string `description:"任务key"`
  19. Frequency string `description:"指标频度"`
  20. }
  21. type PCSGBloombergTask struct {
  22. TaskKey string `json:"TaskKey"`
  23. Frequency string `json:"Frequency"`
  24. VCode bool `json:"VCode"`
  25. }
  26. // LoadPCSGBloombergTask 加载配置
  27. func LoadPCSGBloombergTask() (tasks []*PCSGBloombergTask, err error) {
  28. filePath := "./static/pcsg_task.json"
  29. b, e := ioutil.ReadFile(filePath)
  30. if e != nil {
  31. err = fmt.Errorf("读取配置失败, err: %v", e)
  32. return
  33. }
  34. if e = json.Unmarshal(b, &tasks); e != nil {
  35. err = fmt.Errorf("解析配置失败, err: %v", e)
  36. return
  37. }
  38. return
  39. }
  40. // GetPCSGBloombergGeneralIndexFromBridge 获取通用数据类型指标
  41. func GetPCSGBloombergGeneralIndexFromBridge(params PCSGBloombergApiReq) (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
  42. defer func() {
  43. if err != nil {
  44. tips := fmt.Sprintf("GetPCSGBloombergGeneralIndexFromBridge-获取指标数据失败, err: %s", err.Error())
  45. utils.FileLog.Info(tips)
  46. go alarm_msg.SendAlarmMsg(tips, 3)
  47. }
  48. }()
  49. p, e := json.Marshal(params)
  50. if e != nil {
  51. err = fmt.Errorf("params json marshal err: %v", e)
  52. return
  53. }
  54. url := fmt.Sprint(utils.EtaBridgeUrl, PCSGBloombergGeneralIndexDataUrl)
  55. body := ioutil.NopCloser(strings.NewReader(string(p)))
  56. client := &http.Client{}
  57. req, e := http.NewRequest("POST", url, body)
  58. if e != nil {
  59. err = fmt.Errorf("http create request err: %s", e.Error())
  60. return
  61. }
  62. checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
  63. contentType := "application/json;charset=utf-8"
  64. req.Header.Set("Content-Type", contentType)
  65. req.Header.Set("Authorization", checkToken)
  66. resp, e := client.Do(req)
  67. if e != nil {
  68. err = fmt.Errorf("http client do err: %s", e.Error())
  69. return
  70. }
  71. defer func() {
  72. _ = resp.Body.Close()
  73. }()
  74. b, e := ioutil.ReadAll(resp.Body)
  75. if e != nil {
  76. err = fmt.Errorf("resp body read err: %s", e.Error())
  77. return
  78. }
  79. if len(b) == 0 {
  80. err = fmt.Errorf("resp body is empty")
  81. return
  82. }
  83. // 生产环境解密
  84. if utils.RunMode == "release" {
  85. str := string(b)
  86. str = strings.Trim(str, `"`)
  87. b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
  88. }
  89. result := new(models.BridgePCSGBloombergResultData)
  90. if e = json.Unmarshal(b, &result); e != nil {
  91. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
  92. return
  93. }
  94. if result.Code != 200 {
  95. err = fmt.Errorf("result: %s", string(b))
  96. return
  97. }
  98. indexes = result.Data
  99. return
  100. }
  101. // PCSGWrite2BaseBloomberg 写入彭博数据源
  102. func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool) (err error) {
  103. defer func() {
  104. if err != nil {
  105. tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error())
  106. utils.FileLog.Info(tips)
  107. go alarm_msg.SendAlarmMsg(tips, 3)
  108. }
  109. }()
  110. for _, v := range indexes {
  111. if v.IndexCode == "" {
  112. continue
  113. }
  114. // 无数据的情况不处理
  115. if len(v.Data) == 0 {
  116. continue
  117. }
  118. if isVCode {
  119. v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, "V")
  120. }
  121. // 指标是否存在
  122. index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode)
  123. if e != nil && e.Error() != utils.ErrNoRow() {
  124. err = fmt.Errorf("获取Bloomberg原始指标失败, err: %s", e.Error())
  125. return
  126. }
  127. // 新增指标
  128. if index == nil {
  129. newIndex := new(models.BaseFromBloombergIndex)
  130. newIndex.IndexCode = v.IndexCode
  131. newIndex.IndexName = v.IndexName
  132. newIndex.Unit = v.Unit
  133. newIndex.Source = utils.DATA_SOURCE_BLOOMBERG
  134. newIndex.Frequency = v.Frequency
  135. newIndex.CreateTime = time.Now().Local()
  136. newIndex.ModifyTime = time.Now().Local()
  137. if e = newIndex.Create(); e != nil {
  138. err = fmt.Errorf("新增Bloomberg原始指标失败, err: %s", e.Error())
  139. return
  140. }
  141. index = newIndex
  142. } else {
  143. // 无指标名称的情况下更新指标基础信息
  144. if index.IndexName == "" {
  145. index.IndexName = v.IndexName
  146. index.Unit = v.Unit
  147. index.Frequency = v.Frequency
  148. index.ModifyTime = time.Now().Local()
  149. if e = index.Update([]string{"IndexName", "Unit", "Frequency", "ModifyTime"}); e != nil {
  150. err = fmt.Errorf("更新Bloomberg原始指标失败, err: %s", e.Error())
  151. return
  152. }
  153. }
  154. }
  155. // 更新指标数据
  156. var cond string
  157. var pars []interface{}
  158. cond += ` AND index_code = ? `
  159. pars = append(pars, v.IndexCode)
  160. indexData, e := models.GetBaseFromBloombergDataByCondition(cond, pars)
  161. if e != nil {
  162. err = fmt.Errorf("获取Bloomberg历史数据失败, err: %s", e.Error())
  163. return
  164. }
  165. dateExist := make(map[string]*models.BaseFromBloombergData)
  166. newValExist := make(map[string]bool)
  167. if len(indexData) > 0 {
  168. for _, d := range indexData {
  169. strDate := d.DataTime.Format(utils.FormatDate)
  170. dateExist[strDate] = d
  171. }
  172. }
  173. // 筛选新增/更新数据
  174. updateData := make([]*models.BaseFromBloombergData, 0)
  175. insertData := make([]*models.BaseFromBloombergData, 0)
  176. for _, d := range v.Data {
  177. strDate := d.DataTime.Format(utils.FormatDate)
  178. originData := dateExist[strDate]
  179. if originData != nil {
  180. if utils.FloatAlmostEqual(originData.Value, d.Value) {
  181. continue
  182. }
  183. originData.Value = d.Value
  184. originData.ModifyTime = time.Now().Local()
  185. updateData = append(updateData, originData)
  186. } else {
  187. // 新增的数据去重
  188. if newValExist[strDate] {
  189. continue
  190. }
  191. newValExist[strDate] = true
  192. newData := new(models.BaseFromBloombergData)
  193. newData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  194. newData.IndexCode = index.IndexCode
  195. newData.DataTime = d.DataTime
  196. newData.Value = d.Value
  197. newData.CreateTime = time.Now()
  198. newData.ModifyTime = time.Now()
  199. timestamp := d.DataTime.UnixNano() / 1e6
  200. newData.DataTimestamp = int(timestamp)
  201. insertData = append(insertData, newData)
  202. }
  203. }
  204. if e = models.MultiInsertOrUpdateBaseFromBloombergData(insertData, updateData); e != nil {
  205. err = fmt.Errorf("新增/更新Bloomberg指标数据失败, err: %s", e.Error())
  206. return
  207. }
  208. // 更新指标开始结束时间
  209. minMax, e := models.GetBaseFromBloombergIndexMinMax(index.IndexCode)
  210. if e == nil && minMax != nil {
  211. e = models.ModifyBaseFromBloombergIndexMinMax(index.IndexCode, minMax)
  212. if e != nil {
  213. err = fmt.Errorf("更新Bloomberg开始结束时间失败, err: %s", e.Error())
  214. return
  215. }
  216. }
  217. // 同步刷新指标库
  218. go func() {
  219. edb, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BLOOMBERG, index.IndexCode)
  220. if e != nil && e.Error() != utils.ErrNoRow() {
  221. utils.FileLog.Info("获取Bloomberg指标库信息失败, err: " + e.Error())
  222. return
  223. }
  224. if edb != nil {
  225. logic.RefreshBaseEdbInfo(edb, ``)
  226. }
  227. }()
  228. }
  229. return
  230. }