base_from_pcsg.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/services/alarm_msg"
  7. "eta/eta_index_lib/utils"
  8. "fmt"
  9. "io/ioutil"
  10. "net/http"
  11. "strings"
  12. "time"
  13. )
  14. var (
  15. PCSGBloombergGeneralIndexDataUrl = "/api/pcsg/bloomberg/index_data/general" // 通用指标API
  16. )
  17. type PCSGBloombergApiReq struct {
  18. TaskKey string `description:"任务key"`
  19. Frequency string `description:"指标频度"`
  20. }
  21. type PCSGBloombergTask struct {
  22. TaskKey string `json:"TaskKey"`
  23. Frequency string `json:"Frequency"`
  24. VCode bool `json:"VCode"`
  25. ExtraLetter string `json:"ExtraLetter"`
  26. IndexNamePrefix string `json:"IndexNamePrefix"`
  27. }
  28. // LoadPCSGBloombergTask 加载配置
  29. func LoadPCSGBloombergTask() (tasks []*PCSGBloombergTask, err error) {
  30. filePath := "./static/pcsg_task.json"
  31. b, e := ioutil.ReadFile(filePath)
  32. if e != nil {
  33. err = fmt.Errorf("读取配置失败, err: %v", e)
  34. return
  35. }
  36. if e = json.Unmarshal(b, &tasks); e != nil {
  37. err = fmt.Errorf("解析配置失败, err: %v", e)
  38. return
  39. }
  40. return
  41. }
  42. // GetPCSGBloombergGeneralIndexFromBridge 获取通用数据类型指标
  43. func GetPCSGBloombergGeneralIndexFromBridge(params PCSGBloombergApiReq) (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
  44. defer func() {
  45. if err != nil {
  46. tips := fmt.Sprintf("GetPCSGBloombergGeneralIndexFromBridge-获取指标数据失败, err: %s", err.Error())
  47. utils.FileLog.Info(tips)
  48. go alarm_msg.SendAlarmMsg(tips, 3)
  49. }
  50. }()
  51. p, e := json.Marshal(params)
  52. if e != nil {
  53. err = fmt.Errorf("params json marshal err: %v", e)
  54. return
  55. }
  56. url := fmt.Sprint(utils.EtaBridgeUrl, PCSGBloombergGeneralIndexDataUrl)
  57. body := ioutil.NopCloser(strings.NewReader(string(p)))
  58. client := &http.Client{}
  59. req, e := http.NewRequest("POST", url, body)
  60. if e != nil {
  61. err = fmt.Errorf("http create request err: %s", e.Error())
  62. return
  63. }
  64. checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
  65. contentType := "application/json;charset=utf-8"
  66. req.Header.Set("Content-Type", contentType)
  67. req.Header.Set("Authorization", checkToken)
  68. resp, e := client.Do(req)
  69. if e != nil {
  70. err = fmt.Errorf("http client do err: %s", e.Error())
  71. return
  72. }
  73. defer func() {
  74. _ = resp.Body.Close()
  75. }()
  76. b, e := ioutil.ReadAll(resp.Body)
  77. if e != nil {
  78. err = fmt.Errorf("resp body read err: %s", e.Error())
  79. return
  80. }
  81. if len(b) == 0 {
  82. err = fmt.Errorf("resp body is empty")
  83. return
  84. }
  85. // 生产环境解密
  86. if utils.RunMode == "release" {
  87. str := string(b)
  88. str = strings.Trim(str, `"`)
  89. b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
  90. }
  91. result := new(models.BridgePCSGBloombergResultData)
  92. if e = json.Unmarshal(b, &result); e != nil {
  93. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
  94. return
  95. }
  96. if result.Code != 200 {
  97. err = fmt.Errorf("result: %s", string(b))
  98. return
  99. }
  100. indexes = result.Data
  101. return
  102. }
  103. // PCSGWrite2BaseBloomberg 写入彭博数据源
  104. func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool, extraLetter, namePrefix string) (err error) {
  105. defer func() {
  106. if err != nil {
  107. tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error())
  108. utils.FileLog.Info(tips)
  109. go alarm_msg.SendAlarmMsg(tips, 3)
  110. }
  111. }()
  112. // 这里挡一下...万一没限制加进库了不好删...
  113. if isVCode && extraLetter == "" {
  114. err = fmt.Errorf("中间字母有误")
  115. return
  116. }
  117. for _, v := range indexes {
  118. if v.IndexCode == "" {
  119. continue
  120. }
  121. // 无数据的情况不处理
  122. if len(v.Data) == 0 {
  123. continue
  124. }
  125. if isVCode {
  126. v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, extraLetter)
  127. }
  128. // 指标是否存在
  129. index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode)
  130. if e != nil && e.Error() != utils.ErrNoRow() {
  131. err = fmt.Errorf("获取Bloomberg原始指标失败, err: %s", e.Error())
  132. return
  133. }
  134. // 指标名称+前缀
  135. indexName := v.IndexName
  136. if indexName != "" && namePrefix != "" {
  137. indexName = fmt.Sprint(namePrefix, indexName)
  138. }
  139. // 新增指标
  140. if index == nil {
  141. newIndex := new(models.BaseFromBloombergIndex)
  142. newIndex.IndexCode = v.IndexCode
  143. newIndex.IndexName = indexName
  144. newIndex.Unit = v.Unit
  145. newIndex.Source = utils.DATA_SOURCE_BLOOMBERG
  146. newIndex.Frequency = v.Frequency
  147. newIndex.CreateTime = time.Now().Local()
  148. newIndex.ModifyTime = time.Now().Local()
  149. if e = newIndex.Create(); e != nil {
  150. err = fmt.Errorf("新增Bloomberg原始指标失败, err: %s", e.Error())
  151. return
  152. }
  153. index = newIndex
  154. } else {
  155. // 无指标名称的情况下更新指标基础信息
  156. if index.IndexName == "" {
  157. index.IndexName = indexName
  158. index.Unit = v.Unit
  159. index.Frequency = v.Frequency
  160. index.ModifyTime = time.Now().Local()
  161. if e = index.Update([]string{"IndexName", "Unit", "Frequency", "ModifyTime"}); e != nil {
  162. err = fmt.Errorf("更新Bloomberg原始指标失败, err: %s", e.Error())
  163. return
  164. }
  165. }
  166. }
  167. // 更新指标数据
  168. var cond string
  169. var pars []interface{}
  170. cond += ` AND index_code = ? `
  171. pars = append(pars, v.IndexCode)
  172. indexData, e := models.GetBaseFromBloombergDataByCondition(cond, pars)
  173. if e != nil {
  174. err = fmt.Errorf("获取Bloomberg历史数据失败, err: %s", e.Error())
  175. return
  176. }
  177. dateExist := make(map[string]*models.BaseFromBloombergData)
  178. newValExist := make(map[string]bool)
  179. if len(indexData) > 0 {
  180. for _, d := range indexData {
  181. strDate := d.DataTime.Format(utils.FormatDate)
  182. dateExist[strDate] = d
  183. }
  184. }
  185. // 筛选新增/更新数据
  186. updateData := make([]*models.BaseFromBloombergData, 0)
  187. insertData := make([]*models.BaseFromBloombergData, 0)
  188. for _, d := range v.Data {
  189. strDate := d.DataTime.Format(utils.FormatDate)
  190. originData := dateExist[strDate]
  191. if originData != nil {
  192. if utils.FloatAlmostEqual(originData.Value, d.Value) {
  193. continue
  194. }
  195. originData.Value = d.Value
  196. originData.ModifyTime = time.Now().Local()
  197. updateData = append(updateData, originData)
  198. } else {
  199. // 新增的数据去重
  200. if newValExist[strDate] {
  201. continue
  202. }
  203. newValExist[strDate] = true
  204. newData := new(models.BaseFromBloombergData)
  205. newData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  206. newData.IndexCode = index.IndexCode
  207. newData.DataTime = d.DataTime
  208. newData.Value = d.Value
  209. newData.CreateTime = time.Now()
  210. newData.ModifyTime = time.Now()
  211. timestamp := d.DataTime.UnixNano() / 1e6
  212. newData.DataTimestamp = int(timestamp)
  213. insertData = append(insertData, newData)
  214. }
  215. }
  216. if e = models.MultiInsertOrUpdateBaseFromBloombergData(insertData, updateData); e != nil {
  217. err = fmt.Errorf("新增/更新Bloomberg指标数据失败, err: %s", e.Error())
  218. return
  219. }
  220. // 更新指标开始结束时间
  221. minMax, e := models.GetBaseFromBloombergIndexMinMax(index.IndexCode)
  222. if e == nil && minMax != nil {
  223. e = models.ModifyBaseFromBloombergIndexMinMax(index.IndexCode, minMax)
  224. if e != nil {
  225. err = fmt.Errorf("更新Bloomberg开始结束时间失败, err: %s", e.Error())
  226. return
  227. }
  228. }
  229. // 同步刷新指标库
  230. go func() {
  231. edb, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BLOOMBERG, index.IndexCode)
  232. if e != nil && e.Error() != utils.ErrNoRow() {
  233. utils.FileLog.Info("获取Bloomberg指标库信息失败, err: " + e.Error())
  234. return
  235. }
  236. if edb != nil {
  237. _, _, e = logic.RefreshBaseEdbInfo(edb, ``)
  238. if e != nil {
  239. utils.FileLog.Info(fmt.Sprintf("Bloomberg RefreshBaseEdbInfo, edbCode: %s, err: %v", index.IndexCode, e))
  240. return
  241. }
  242. }
  243. }()
  244. }
  245. return
  246. }