base_from_ths_hf.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/services/alarm_msg"
  7. "eta/eta_index_lib/utils"
  8. "fmt"
  9. "github.com/rdlucklib/rdluck_tools/http"
  10. "github.com/shopspring/decimal"
  11. "net/url"
  12. "strings"
  13. "time"
  14. )
  15. const (
  16. ThsHfApiUrl = "https://quantapi.51ifind.com/api/v1/high_frequency"
  17. )
  18. // GetEdbDataFromThsHf 获取高频数据
  19. func GetEdbDataFromThsHf(thsParams models.ThsHfSearchEdbReq, terminalCode string) (indexes []*models.ThsHfIndexWithData, err error) {
  20. terminal, e := GetTerminal(utils.DATA_SOURCE_THS, terminalCode)
  21. if e != nil {
  22. err = fmt.Errorf("获取同花顺终端配置失败, %v", e)
  23. return
  24. }
  25. if thsParams.EndTime == "" {
  26. thsParams.EndTime = time.Now().Local().Format(utils.FormatDateTime)
  27. }
  28. // 走API
  29. if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" {
  30. var token string
  31. token, e = GetAccessToken(false, terminal.Value)
  32. if e != nil {
  33. err = fmt.Errorf("获取同花顺API-AccessToken失败, %v", e)
  34. return
  35. }
  36. // TEST
  37. //token = "9eba1634116ea2aed9a5b12b6e12b0b5fcbe0847.signs_NTc2NjQ4MTA5"
  38. return getEdbDataFromThsHfHttp(thsParams, terminal.Value, token)
  39. }
  40. // 走公用机
  41. if terminal.ServerUrl == "" {
  42. err = fmt.Errorf("同花顺终端地址未配置")
  43. return
  44. }
  45. return getEdbDataFromThsHfApp(thsParams, 0, terminal.ServerUrl)
  46. }
  47. // getEdbDataFromThsHfHttp API-获取高频指标数据
  48. func getEdbDataFromThsHfHttp(thsParams models.ThsHfSearchEdbReq, refreshToken, accessToken string) (indexes []*models.ThsHfIndexWithData, err error) {
  49. defer func() {
  50. if err != nil {
  51. tips := fmt.Sprintf("同花顺高频指标API-getEdbDataFromThsHfHttp err: %v", err)
  52. utils.FileLog.Info(tips)
  53. }
  54. }()
  55. // 请求参数参考
  56. //dataMap := map[string]interface{}{
  57. // "codes": "CU2407.SHF,CU2408.SHF",
  58. // "indicators": "pct_chg",
  59. // "starttime": "2024-06-06 09:15:00",
  60. // "endtime": "2024-06-11 15:15:00",
  61. // "functionpara": map[string]interface {
  62. // }{
  63. // "Limitstart": "10:00:00",
  64. // "Limitend": "14:15:00",
  65. // "Interval": 60,
  66. // "Fill": "Previous",
  67. // "CPS": "forward4",
  68. // "Timeformat": "LocalTime",
  69. // "BaseDate": "2024-01-01",
  70. // },
  71. //}
  72. // 额外参数
  73. funcParams := map[string]interface{}{}
  74. funcParams["Interval"] = thsParams.Interval
  75. if thsParams.Fill != "" {
  76. funcParams["Fill"] = thsParams.Fill
  77. }
  78. if thsParams.CPS != "" {
  79. funcParams["CPS"] = thsParams.CPS
  80. }
  81. if thsParams.BaseDate != "" {
  82. funcParams["BaseDate"] = thsParams.BaseDate
  83. }
  84. // TEST
  85. //funcParams["Limitstart"] = "10:00:00"
  86. //funcParams["Limitend"] = "14:15:00"
  87. //funcParams["Fill"] = "Previous"
  88. //funcParams["CPS"] = "forward4"
  89. //funcParams["Timeformat"] = "LocalTime"
  90. //funcParams["BaseDate"] = "2024-01-01"
  91. dataMap := map[string]interface{}{
  92. "codes": thsParams.StockCode,
  93. "indicators": thsParams.EdbCode,
  94. "starttime": thsParams.StartTime,
  95. "endtime": thsParams.EndTime,
  96. "functionpara": funcParams,
  97. }
  98. // 请求接口
  99. body, e, _ := postCurl(ThsHfApiUrl, dataMap, 0, refreshToken, accessToken)
  100. if e != nil {
  101. utils.FileLog.Info(string(body))
  102. err = fmt.Errorf("同花顺API-请求失败, %v", e)
  103. return
  104. }
  105. apiResp := new(models.ThsHfApiResp)
  106. if e = json.Unmarshal(body, &apiResp); e != nil {
  107. err = fmt.Errorf("同花顺API-解析响应失败, %v", e)
  108. return
  109. }
  110. if apiResp.ErrorCode != 0 {
  111. err = fmt.Errorf("同花顺高频API-状态码: %d, 提示信息: %s", apiResp.ErrorCode, apiResp.ErrMsg)
  112. return
  113. }
  114. indexes = make([]*models.ThsHfIndexWithData, 0)
  115. if len(apiResp.Tables) == 0 {
  116. utils.FileLog.Info("同花顺高频API-无数据")
  117. return
  118. }
  119. // 结果示例
  120. // {
  121. // "errorcode": 0,
  122. // "errmsg": "Success!",
  123. // "tables": [{
  124. // "thscode": "CU2407.SHF",
  125. // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"],
  126. // "table": {
  127. // "open": [77930.000000, 77980.000000, 77910.000000, 77850.000000],
  128. // "close": [77980.000000, 77920.000000, 77850.000000, 77780.000000]
  129. // }
  130. // }, {
  131. // "thscode": "CU2408.SHF",
  132. // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"],
  133. // "table": {
  134. // "open": [78180.000000, 78280.000000, 78220.000000, 78110.000000],
  135. // "close": [78280.000000, 78220.000000, 78110.000000, 78060.000000]
  136. // }
  137. // }]
  138. // }
  139. // Tables中的每一个对应一个证券代码
  140. for _, v := range apiResp.Tables {
  141. if len(v.Time) == 0 || len(v.Table) == 0 {
  142. continue
  143. }
  144. // Table中的K-V对应指标代码-数据值序列
  145. for tk, tv := range v.Table {
  146. index := new(models.ThsHfIndexWithData)
  147. index.StockCode = v.ThsCode
  148. index.EdbCode = tk
  149. td := make([]*models.ThsHfIndexData, 0)
  150. tvl := len(tv)
  151. for k, t := range v.Time {
  152. if k >= tvl {
  153. continue
  154. }
  155. dt, e := time.ParseInLocation("2006-01-02 15:04", t, time.Local)
  156. if e != nil {
  157. utils.FileLog.Info(fmt.Sprintf("同花顺API-time parse t: %s, err: %v", t, e))
  158. continue
  159. }
  160. td = append(td, &models.ThsHfIndexData{
  161. DataTime: dt,
  162. Value: tv[k],
  163. })
  164. }
  165. index.IndexData = td
  166. indexes = append(indexes, index)
  167. }
  168. }
  169. return
  170. }
  171. // getEdbDataFromThsHfApp 公用机-获取高频指标数据
  172. func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverUrl string) (indexes []*models.ThsHfIndexWithData, err error) {
  173. var requestUrl string
  174. defer func() {
  175. if err != nil {
  176. utils.FileLog.Info(fmt.Sprintf("requestUrl: %s", requestUrl))
  177. utils.FileLog.Info(fmt.Sprintf("getEdbDataFromThsHfApp: %v", err))
  178. }
  179. }()
  180. //serverUrl = "http://wxmsgsen1.hzinsights.com:8040/"
  181. baseUrl := fmt.Sprintf("%s%s", serverUrl, "edbInfo/ths/hf?")
  182. // 额外参数
  183. var funcParam string
  184. if thsParams.Interval > 0 {
  185. funcParam += fmt.Sprintf("Interval:%d,", thsParams.Interval)
  186. }
  187. if thsParams.Fill != "" {
  188. funcParam += fmt.Sprintf("Fill:%s,", thsParams.Fill)
  189. }
  190. if thsParams.CPS != "" {
  191. funcParam += fmt.Sprintf("CPS:%s,", thsParams.CPS)
  192. }
  193. if thsParams.BaseDate != "" {
  194. funcParam += fmt.Sprintf("BaseDate:%s,", thsParams.BaseDate)
  195. }
  196. funcParam = strings.TrimRight(funcParam, ",")
  197. params := url.Values{}
  198. params.Add("codes", thsParams.StockCode)
  199. params.Add("indicators", thsParams.EdbCode)
  200. params.Add("function_para", funcParam)
  201. params.Add("start_time", thsParams.StartTime)
  202. params.Add("end_time", thsParams.EndTime)
  203. // 请求终端
  204. requestUrl = baseUrl + params.Encode()
  205. body, e := http.Get(requestUrl)
  206. if e != nil {
  207. err = fmt.Errorf("")
  208. return
  209. }
  210. dataBody := strings.TrimLeft(string(body), `"`)
  211. dataBody = strings.TrimRight(dataBody, `"`)
  212. dataBody = strings.ReplaceAll(dataBody, `\`, ``)
  213. //utils.FileLog.Info(dataBody)
  214. appResp := new(TerminalResponse)
  215. if e = json.Unmarshal([]byte(dataBody), &appResp); e != nil {
  216. err = fmt.Errorf("同花顺APP-解析响应失败, %v", e)
  217. return
  218. }
  219. if appResp.ErrorCode != 0 {
  220. //如果是同花顺登录session失效了,那么就重新请求获取数据
  221. if appResp.ErrorCode == -1020 && num == 0 {
  222. return getEdbDataFromThsHfApp(thsParams, 1, serverUrl)
  223. }
  224. err = fmt.Errorf("同花顺APP-状态码: %d, 提示信息: %s", appResp.ErrorCode, appResp.ErrMsg)
  225. return
  226. }
  227. // 响应结果示例
  228. // {
  229. // "errorcode": 0,
  230. // "errmsg": "Success!",
  231. // "data": [{
  232. // "time": "2024-06-04 09:30",
  233. // "thscode": "CU2406.SHF",
  234. // "open": 81900.0,
  235. // "close": 81820.0
  236. // }, {
  237. // "time": "2024-06-04 10:00",
  238. // "thscode": "CU2406.SHF",
  239. // "open": 81820.0,
  240. // "close": 81790.0
  241. // }, {
  242. // "time": "2024-06-04 10:45",
  243. // "thscode": "CU2406.SHF",
  244. // "open": 81820.0,
  245. // "close": 81950.0
  246. // }]
  247. // }
  248. indexes = make([]*models.ThsHfIndexWithData, 0)
  249. indexMap := make(map[string]*models.ThsHfIndexWithData)
  250. for _, stockData := range appResp.Data {
  251. strTime := stockData["time"].(string)
  252. dataTime, e := time.ParseInLocation("2006-01-02 15:04", strTime, time.Local)
  253. if e != nil {
  254. utils.FileLog.Info("数据日期格式有误, time: %s, %v", strTime, e)
  255. continue
  256. }
  257. stockCode := stockData["thscode"].(string)
  258. // 指标代码+数据
  259. for k, v := range stockData {
  260. if k == "time" || k == "thscode" {
  261. continue
  262. }
  263. if v == nil {
  264. continue
  265. }
  266. val, ok := v.(float64)
  267. if !ok {
  268. continue
  269. }
  270. mk := fmt.Sprintf("%s-%s", stockCode, k)
  271. if indexMap[mk] == nil {
  272. indexMap[mk] = new(models.ThsHfIndexWithData)
  273. indexMap[mk].StockCode = stockCode
  274. indexMap[mk].EdbCode = k
  275. indexMap[mk].IndexData = make([]*models.ThsHfIndexData, 0)
  276. }
  277. indexMap[mk].IndexData = append(indexMap[mk].IndexData, &models.ThsHfIndexData{
  278. DataTime: dataTime,
  279. Value: val,
  280. })
  281. }
  282. }
  283. for _, v := range indexMap {
  284. indexes = append(indexes, v)
  285. }
  286. return
  287. }
  288. // WriteRefreshBaseThsHfIndex 源指标刷新
  289. func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
  290. defer func() {
  291. if err != nil {
  292. tips := fmt.Sprintf("WriteRefreshBaseThsHfIndex-更新失败, %v", err)
  293. utils.FileLog.Info(tips)
  294. go alarm_msg.SendAlarmMsg(tips, 3)
  295. }
  296. }()
  297. if indexItem == nil {
  298. err = fmt.Errorf("指标不存在")
  299. return
  300. }
  301. if len(codeWithData.IndexData) == 0 {
  302. return
  303. }
  304. // 获取源指标数据
  305. dataOb := new(models.BaseFromThsHfData)
  306. originData := make([]*models.BaseFromThsHfData, 0)
  307. {
  308. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().IndexCode)
  309. pars := make([]interface{}, 0)
  310. pars = append(pars, indexItem.IndexCode)
  311. if startTime != "" {
  312. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  313. pars = append(pars, startTime)
  314. }
  315. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  316. if e != nil {
  317. err = fmt.Errorf("获取源指标数据失败, %v", e)
  318. return
  319. }
  320. originData = list
  321. }
  322. // 更新指标数据
  323. dateExist := make(map[string]*models.BaseFromThsHfData)
  324. newValExist := make(map[string]bool)
  325. if len(originData) > 0 {
  326. // unicode去重
  327. for _, d := range originData {
  328. uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
  329. dateExist[uni] = d
  330. }
  331. }
  332. // 筛选新增/更新数据
  333. updateData := make([]*models.BaseFromThsHfData, 0)
  334. insertData := make([]*models.BaseFromThsHfData, 0)
  335. for _, d := range codeWithData.IndexData {
  336. uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
  337. origin := dateExist[uni]
  338. // unicode检验是否存在
  339. strNewVal := decimal.NewFromFloat(d.Value).Round(4).String()
  340. di, _ := decimal.NewFromString(strNewVal)
  341. newVal, _ := di.Float64()
  342. if origin != nil {
  343. strExistVal := decimal.NewFromFloat(origin.Value).Round(4).String()
  344. if strNewVal == strExistVal {
  345. continue
  346. }
  347. origin.Value = newVal
  348. origin.ModifyTime = time.Now().Local()
  349. updateData = append(updateData, origin)
  350. }
  351. // 新增的数据去重
  352. if newValExist[uni] {
  353. continue
  354. }
  355. newValExist[uni] = true
  356. newData := new(models.BaseFromThsHfData)
  357. newData.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId
  358. newData.IndexCode = indexItem.IndexCode
  359. newData.DataTime = d.DataTime
  360. newData.Value = newVal
  361. newData.CreateTime = time.Now()
  362. newData.ModifyTime = time.Now()
  363. newData.UniqueCode = uni
  364. newData.DataTimestamp = d.DataTime.UnixNano() / 1e6
  365. insertData = append(insertData, newData)
  366. }
  367. if e := dataOb.MultiInsertOrUpdate(insertData, updateData); e != nil {
  368. err = fmt.Errorf("新增/更新源指标数据失败, %v", e)
  369. return
  370. }
  371. // 更新指标开始结束时间
  372. minMax, e := dataOb.GetIndexMinMax(indexItem.IndexCode)
  373. if e == nil && minMax != nil {
  374. minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
  375. if e != nil {
  376. err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
  377. return
  378. }
  379. maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
  380. if e != nil {
  381. err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
  382. return
  383. }
  384. indexItem.StartDate = minDate
  385. indexItem.EndDate = maxDate
  386. indexItem.ModifyTime = time.Now().Local()
  387. updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
  388. if e = indexItem.Update(updateCols); e != nil {
  389. err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
  390. return
  391. }
  392. }
  393. // 同步刷新指标库
  394. go func() {
  395. _ = RefreshThsHfIndexFromBase(indexItem.IndexCode, startTime)
  396. }()
  397. return
  398. }
  399. // RefreshThsHfIndexFromBase 根据源指标刷新指标库
  400. func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
  401. defer func() {
  402. if err != nil {
  403. tips := fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标库失败, %v", err)
  404. utils.FileLog.Info(tips)
  405. go alarm_msg.SendAlarmMsg(tips, 3)
  406. }
  407. }()
  408. // 获取指标关联信息
  409. mappings := make([]*models.BaseFromEdbMapping, 0)
  410. {
  411. ob := new(models.BaseFromEdbMapping)
  412. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().BaseIndexCode)
  413. pars := make([]interface{}, 0)
  414. pars = append(pars, baseCode)
  415. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  416. if e != nil {
  417. err = fmt.Errorf("获取源指标关联失败, %v", e)
  418. return
  419. }
  420. mappings = list
  421. }
  422. if len(mappings) == 0 {
  423. return
  424. }
  425. codeMapping := make(map[string]*models.BaseFromEdbMapping)
  426. edbInfoIds := make([]int, 0)
  427. for _, v := range mappings {
  428. if codeMapping[v.EdbCode] == nil {
  429. codeMapping[v.EdbCode] = v
  430. }
  431. edbInfoIds = append(edbInfoIds, v.EdbInfoId)
  432. }
  433. // 指标信息
  434. edbInfoList, e := models.GetEdbInfoByIdList(edbInfoIds)
  435. if e != nil {
  436. err = fmt.Errorf("获取指标信息列表失败, %v", e)
  437. return
  438. }
  439. codeEdb := make(map[string]*models.EdbInfo)
  440. for _, v := range edbInfoList {
  441. if codeEdb[v.EdbCode] == nil {
  442. codeEdb[v.EdbCode] = v
  443. }
  444. }
  445. thsOb := new(models.EdbThsHf)
  446. source := thsOb.GetSource()
  447. subSource := thsOb.GetSubSource()
  448. for _, v := range mappings {
  449. cacheKey := fmt.Sprintf("%s_%d_%d_%s", utils.CACHE_EDB_DATA_REFRESH, source, subSource, v.EdbCode)
  450. if utils.Rc.IsExist(cacheKey) {
  451. continue
  452. }
  453. utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
  454. edb := codeEdb[v.EdbCode]
  455. if edb == nil {
  456. utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-指标信息有误, EdbCode: %s", v.EdbCode))
  457. continue
  458. }
  459. // 刷新指标
  460. if e := thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil {
  461. utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标失败, %v", e))
  462. _ = utils.Rc.Delete(cacheKey)
  463. continue
  464. }
  465. // 更新指标最值
  466. e, _ = models.UnifiedModifyEdbInfoMaxAndMinInfo(edb)
  467. if e != nil {
  468. utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-更新指标最值失败, %v", e))
  469. _ = utils.Rc.Delete(cacheKey)
  470. continue
  471. }
  472. _ = utils.Rc.Delete(cacheKey)
  473. // 更新ES
  474. go logic.UpdateEs(edb.EdbInfoId)
  475. }
  476. return
  477. }