base_from_ths_hf.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package data
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/models"
  5. "eta/eta_api/models/data_manage"
  6. "eta/eta_api/services/alarm_msg"
  7. "eta/eta_api/utils"
  8. "fmt"
  9. )
  10. // CheckExistThsHfEdb 校验已存在的同花顺高频指标
  11. func CheckExistThsHfEdb(stockCodes, edbCodes []string) (checkResp data_manage.ThsHfExistCheckResp, existMap map[string]bool, err error) {
  12. // 待校验的指标编码
  13. var indexCode []string
  14. prefix := utils.ThsHf
  15. for _, sv := range stockCodes {
  16. for _, ev := range edbCodes {
  17. t := fmt.Sprintf("%s%s%s", prefix, sv, ev)
  18. indexCode = append(indexCode, t)
  19. }
  20. }
  21. baseIndexes := make([]*data_manage.BaseFromThsHfIndex, 0)
  22. {
  23. ob := new(data_manage.BaseFromThsHfIndex)
  24. fields := []string{ob.Cols().PrimaryId, ob.Cols().IndexCode, ob.Cols().IndexName, ob.Cols().StockCode, ob.Cols().Indicator}
  25. list, e := ob.GetItemsByCondition(``, make([]interface{}, 0), fields, "")
  26. if e != nil {
  27. err = fmt.Errorf("获取高频指标列表失败, %v", e)
  28. return
  29. }
  30. baseIndexes = list
  31. }
  32. var existNum int
  33. existMap = make(map[string]bool)
  34. for _, bv := range baseIndexes {
  35. code := fmt.Sprintf("%s%s%s", prefix, bv.StockCode, bv.Indicator)
  36. for _, iv := range indexCode {
  37. if code != iv {
  38. continue
  39. }
  40. existMap[fmt.Sprintf("%s-%s", bv.StockCode, bv.Indicator)] = true
  41. existNum += 1
  42. checkResp.ExistIndex = append(checkResp.ExistIndex, data_manage.ThsHfExistCheckIndex{
  43. IndexId: bv.BaseFromThsHfIndexId,
  44. IndexCode: bv.IndexCode,
  45. IndexName: bv.IndexName,
  46. })
  47. break
  48. }
  49. }
  50. if existNum > 0 && existNum == len(indexCode) {
  51. checkResp.ExistAll = true
  52. }
  53. return
  54. }
  55. // GetEdbDataThsHf 获取同花顺高频数据指标
  56. func GetEdbDataThsHf(req data_manage.ThsHfSearchEdbReq) (indexes []*data_manage.ThsHfIndexWithData, err error) {
  57. param := make(map[string]interface{})
  58. param["StockCode"] = req.StockCode
  59. param["EdbCode"] = req.EdbCode
  60. param["StartTime"] = req.StartTime
  61. param["EndTime"] = req.EndTime
  62. param["Interval"] = req.Interval
  63. param["Fill"] = req.Fill
  64. param["CPS"] = req.CPS
  65. param["BaseDate"] = req.BaseDate
  66. uri := `ths/hf/edb_data`
  67. resp, e := postThsHfEdbData(param, uri)
  68. if e != nil {
  69. err = fmt.Errorf("postThsHfEdbData, %v", e)
  70. return
  71. }
  72. if resp.Ret == 200 {
  73. indexes = resp.Data
  74. }
  75. return
  76. }
  77. // postThsHfEdbData 刷新指标数据
  78. func postThsHfEdbData(param map[string]interface{}, urlStr string) (resp *data_manage.ThsHfIndexDataLibResp, err error) {
  79. postUrl := utils.EDB_LIB_URL + urlStr
  80. postData, err := json.Marshal(param)
  81. if err != nil {
  82. return
  83. }
  84. result, err := HttpPost(postUrl, string(postData), utils.ZhLangVersion, "application/json")
  85. if err != nil {
  86. return
  87. }
  88. utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";" + string(postData) + ";result:" + string(result))
  89. err = json.Unmarshal(result, &resp)
  90. if err != nil {
  91. return
  92. }
  93. return resp, nil
  94. }
  95. // BaseAddThsHf 新增数据源
  96. func BaseAddThsHf(req data_manage.ThsHfBaseAddReq) (resp *models.BaseResponse, err error) {
  97. param := make(map[string]interface{})
  98. param["StartTime"] = req.StartTime
  99. param["EndTime"] = req.EndTime
  100. param["Interval"] = req.Interval
  101. param["Fill"] = req.Fill
  102. param["CPS"] = req.CPS
  103. param["BaseDate"] = req.BaseDate
  104. param["SysAdminId"] = req.SysAdminId
  105. param["SysAdminName"] = req.SysAdminName
  106. param["ClassifyId"] = req.ClassifyId
  107. param["Unit"] = req.Unit
  108. param["IndexName"] = req.IndexName
  109. param["Frequency"] = req.Frequency
  110. param["StockCode"] = req.StockCode
  111. param["EdbCode"] = req.EdbCode
  112. uri := `ths/hf/base/add`
  113. res, e := postRefreshEdbData(param, uri)
  114. if e != nil {
  115. err = fmt.Errorf("postRefreshEdbData, %v", e)
  116. return
  117. }
  118. resp = res
  119. return
  120. }
  121. // RefreshBaseThsHfIndex 刷新源指标
  122. func RefreshBaseThsHfIndex(indexIds []int, refreshType int) (isAsync bool, err error) {
  123. if len(indexIds) == 0 {
  124. return
  125. }
  126. defer func() {
  127. if err != nil {
  128. tips := fmt.Sprintf("RefreshBaseThsHfIndex-刷新同花顺高频指标失败, %v", err)
  129. utils.FileLog.Info(tips)
  130. go alarm_msg.SendAlarmMsg(tips, 3)
  131. }
  132. }()
  133. indexes := make([]*data_manage.BaseFromThsHfIndex, 0)
  134. {
  135. ob := new(data_manage.BaseFromThsHfIndex)
  136. cond := fmt.Sprintf(" AND %s IN (%s)", ob.Cols().PrimaryId, utils.GetOrmInReplace(len(indexIds)))
  137. pars := make([]interface{}, 0)
  138. pars = append(pars, indexIds)
  139. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  140. if e != nil {
  141. err = fmt.Errorf("获取源指标失败, %v", e)
  142. return
  143. }
  144. if len(list) == 0 {
  145. return
  146. }
  147. indexes = list
  148. }
  149. refreshUrl := "ths/hf/base/refresh"
  150. // 异步刷新
  151. if len(indexes) > 10 {
  152. isAsync = true
  153. go func() {
  154. for _, v := range indexes {
  155. param := make(map[string]interface{})
  156. param["BaseIndexCode"] = v.IndexCode
  157. param["RefreshType"] = refreshType
  158. resp, e := postRefreshEdbData(param, refreshUrl)
  159. if e != nil {
  160. utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, err: %v", v.IndexCode, e))
  161. continue
  162. }
  163. if resp != nil && resp.Ret != 200 {
  164. utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, Ret: %d, ErrMsg: %s", v.IndexCode, resp.Ret, resp.ErrMsg))
  165. continue
  166. }
  167. }
  168. }()
  169. return
  170. }
  171. // 同步刷新
  172. for _, v := range indexes {
  173. param := make(map[string]interface{})
  174. param["BaseIndexCode"] = v.IndexCode
  175. param["RefreshType"] = refreshType
  176. resp, e := postRefreshEdbData(param, refreshUrl)
  177. if e != nil {
  178. utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, err: %v", v.IndexCode, e))
  179. continue
  180. }
  181. if resp.Ret != 200 {
  182. utils.FileLog.Info(fmt.Sprintf("thsHf-postRefreshEdbData, code: %s, Ret: %d, ErrMsg: %s", v.IndexCode, resp.Ret, resp.ErrMsg))
  183. continue
  184. }
  185. }
  186. return
  187. }