base_from_ths_hf.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/models/mgo"
  7. "eta/eta_index_lib/services/alarm_msg"
  8. "eta/eta_index_lib/utils"
  9. "fmt"
  10. "github.com/rdlucklib/rdluck_tools/http"
  11. "github.com/shopspring/decimal"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "net/url"
  14. "strings"
  15. "time"
  16. )
  17. const (
  18. ThsHfApiUrl = "https://quantapi.51ifind.com/api/v1/high_frequency"
  19. )
  20. // GetEdbDataFromThsHf 获取高频数据
  21. func GetEdbDataFromThsHf(thsParams models.ThsHfSearchEdbReq, terminalCode string) (indexes []*models.ThsHfIndexWithData, err error) {
  22. terminal, e := GetTerminal(utils.DATA_SOURCE_THS, terminalCode)
  23. if e != nil {
  24. err = fmt.Errorf("获取同花顺终端配置失败, %v", e)
  25. return
  26. }
  27. if thsParams.EndTime == "" {
  28. thsParams.EndTime = time.Now().Local().Format(utils.FormatDateTime)
  29. }
  30. // 走API
  31. if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" {
  32. var token string
  33. token, e = GetAccessToken(false, terminal.Value)
  34. if e != nil {
  35. err = fmt.Errorf("获取同花顺API-AccessToken失败, %v", e)
  36. return
  37. }
  38. // TEST
  39. //token = "9eba1634116ea2aed9a5b12b6e12b0b5fcbe0847.signs_NTc2NjQ4MTA5"
  40. return getEdbDataFromThsHfHttp(thsParams, terminal.Value, token)
  41. }
  42. // 走公用机
  43. if terminal.ServerUrl == "" {
  44. err = fmt.Errorf("同花顺终端地址未配置")
  45. return
  46. }
  47. return getEdbDataFromThsHfApp(thsParams, 0, terminal.ServerUrl)
  48. }
  49. // getEdbDataFromThsHfHttp API-获取高频指标数据
  50. func getEdbDataFromThsHfHttp(thsParams models.ThsHfSearchEdbReq, refreshToken, accessToken string) (indexes []*models.ThsHfIndexWithData, err error) {
  51. defer func() {
  52. if err != nil {
  53. tips := fmt.Sprintf("同花顺高频指标API-getEdbDataFromThsHfHttp err: %v", err)
  54. utils.FileLog.Info(tips)
  55. }
  56. }()
  57. // 请求参数参考
  58. //dataMap := map[string]interface{}{
  59. // "codes": "CU2407.SHF,CU2408.SHF",
  60. // "indicators": "pct_chg",
  61. // "starttime": "2024-06-06 09:15:00",
  62. // "endtime": "2024-06-11 15:15:00",
  63. // "functionpara": map[string]interface {
  64. // }{
  65. // "Limitstart": "10:00:00",
  66. // "Limitend": "14:15:00",
  67. // "Interval": 60,
  68. // "Fill": "Previous",
  69. // "CPS": "forward4",
  70. // "Timeformat": "LocalTime",
  71. // "BaseDate": "2024-01-01",
  72. // },
  73. //}
  74. // 额外参数
  75. funcParams := map[string]interface{}{}
  76. funcParams["Interval"] = thsParams.Interval
  77. if thsParams.Fill != "" {
  78. funcParams["Fill"] = thsParams.Fill
  79. }
  80. if thsParams.CPS != "" {
  81. funcParams["CPS"] = thsParams.CPS
  82. }
  83. if thsParams.BaseDate != "" {
  84. funcParams["BaseDate"] = thsParams.BaseDate
  85. }
  86. // TEST
  87. //funcParams["Limitstart"] = "10:00:00"
  88. //funcParams["Limitend"] = "14:15:00"
  89. //funcParams["Fill"] = "Previous"
  90. //funcParams["CPS"] = "forward4"
  91. //funcParams["Timeformat"] = "LocalTime"
  92. //funcParams["BaseDate"] = "2024-01-01"
  93. dataMap := map[string]interface{}{
  94. "codes": thsParams.StockCode,
  95. "indicators": thsParams.EdbCode,
  96. "starttime": thsParams.StartTime,
  97. "endtime": thsParams.EndTime,
  98. "functionpara": funcParams,
  99. }
  100. // 请求接口
  101. body, e, _ := postCurl(ThsHfApiUrl, dataMap, 0, refreshToken, accessToken)
  102. if e != nil {
  103. utils.FileLog.Info(string(body))
  104. err = fmt.Errorf("同花顺API-请求失败, %v", e)
  105. return
  106. }
  107. apiResp := new(models.ThsHfApiResp)
  108. if e = json.Unmarshal(body, &apiResp); e != nil {
  109. err = fmt.Errorf("同花顺API-解析响应失败, %v", e)
  110. return
  111. }
  112. if apiResp.ErrorCode != 0 {
  113. err = fmt.Errorf("同花顺高频API-状态码: %d, 提示信息: %s", apiResp.ErrorCode, apiResp.ErrMsg)
  114. return
  115. }
  116. indexes = make([]*models.ThsHfIndexWithData, 0)
  117. if len(apiResp.Tables) == 0 {
  118. utils.FileLog.Info("同花顺高频API-无数据")
  119. return
  120. }
  121. // 结果示例
  122. // {
  123. // "errorcode": 0,
  124. // "errmsg": "Success!",
  125. // "tables": [{
  126. // "thscode": "CU2407.SHF",
  127. // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"],
  128. // "table": {
  129. // "open": [77930.000000, 77980.000000, 77910.000000, 77850.000000],
  130. // "close": [77980.000000, 77920.000000, 77850.000000, 77780.000000]
  131. // }
  132. // }, {
  133. // "thscode": "CU2408.SHF",
  134. // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"],
  135. // "table": {
  136. // "open": [78180.000000, 78280.000000, 78220.000000, 78110.000000],
  137. // "close": [78280.000000, 78220.000000, 78110.000000, 78060.000000]
  138. // }
  139. // }]
  140. // }
  141. // Tables中的每一个对应一个证券代码
  142. for _, v := range apiResp.Tables {
  143. if len(v.Time) == 0 || len(v.Table) == 0 {
  144. continue
  145. }
  146. // Table中的K-V对应指标代码-数据值序列
  147. for tk, tv := range v.Table {
  148. index := new(models.ThsHfIndexWithData)
  149. index.StockCode = v.ThsCode
  150. index.EdbCode = tk
  151. td := make([]*models.ThsHfIndexData, 0)
  152. tvl := len(tv)
  153. for k, t := range v.Time {
  154. if k >= tvl {
  155. continue
  156. }
  157. dt, e := time.ParseInLocation("2006-01-02 15:04", t, time.Local)
  158. if e != nil {
  159. utils.FileLog.Info(fmt.Sprintf("同花顺API-time parse t: %s, err: %v", t, e))
  160. continue
  161. }
  162. td = append(td, &models.ThsHfIndexData{
  163. DataTime: dt,
  164. Value: tv[k],
  165. })
  166. }
  167. index.IndexData = td
  168. indexes = append(indexes, index)
  169. }
  170. }
  171. return
  172. }
  173. // getEdbDataFromThsHfApp 公用机-获取高频指标数据
  174. func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverUrl string) (indexes []*models.ThsHfIndexWithData, err error) {
  175. var requestUrl string
  176. defer func() {
  177. if err != nil {
  178. utils.FileLog.Info(fmt.Sprintf("requestUrl: %s", requestUrl))
  179. utils.FileLog.Info(fmt.Sprintf("getEdbDataFromThsHfApp: %v", err))
  180. }
  181. }()
  182. //serverUrl = "http://wxmsgsen1.hzinsights.com:8040/"
  183. baseUrl := fmt.Sprintf("%s%s", serverUrl, "edbInfo/ths/hf?")
  184. // 额外参数
  185. var funcParam string
  186. if thsParams.Interval > 0 {
  187. funcParam += fmt.Sprintf("Interval:%d,", thsParams.Interval)
  188. }
  189. if thsParams.Fill != "" {
  190. funcParam += fmt.Sprintf("Fill:%s,", thsParams.Fill)
  191. }
  192. if thsParams.CPS != "" {
  193. funcParam += fmt.Sprintf("CPS:%s,", thsParams.CPS)
  194. }
  195. if thsParams.BaseDate != "" {
  196. funcParam += fmt.Sprintf("BaseDate:%s,", thsParams.BaseDate)
  197. }
  198. funcParam = strings.TrimRight(funcParam, ",")
  199. params := url.Values{}
  200. params.Add("codes", thsParams.StockCode)
  201. params.Add("indicators", thsParams.EdbCode)
  202. params.Add("function_para", funcParam)
  203. params.Add("start_time", thsParams.StartTime)
  204. params.Add("end_time", thsParams.EndTime)
  205. // 请求终端
  206. requestUrl = baseUrl + params.Encode()
  207. body, e := http.Get(requestUrl)
  208. if e != nil {
  209. err = fmt.Errorf("")
  210. return
  211. }
  212. dataBody := strings.TrimLeft(string(body), `"`)
  213. dataBody = strings.TrimRight(dataBody, `"`)
  214. dataBody = strings.ReplaceAll(dataBody, `\`, ``)
  215. //utils.FileLog.Info(dataBody)
  216. appResp := new(TerminalResponse)
  217. if e = json.Unmarshal([]byte(dataBody), &appResp); e != nil {
  218. err = fmt.Errorf("同花顺APP-解析响应失败, %v", e)
  219. return
  220. }
  221. if appResp.ErrorCode != 0 {
  222. //如果是同花顺登录session失效了,那么就重新请求获取数据
  223. if appResp.ErrorCode == -1020 && num == 0 {
  224. return getEdbDataFromThsHfApp(thsParams, 1, serverUrl)
  225. }
  226. err = fmt.Errorf("同花顺APP-状态码: %d, 提示信息: %s", appResp.ErrorCode, appResp.ErrMsg)
  227. return
  228. }
  229. // 响应结果示例
  230. // {
  231. // "errorcode": 0,
  232. // "errmsg": "Success!",
  233. // "data": [{
  234. // "time": "2024-06-04 09:30",
  235. // "thscode": "CU2406.SHF",
  236. // "open": 81900.0,
  237. // "close": 81820.0
  238. // }, {
  239. // "time": "2024-06-04 10:00",
  240. // "thscode": "CU2406.SHF",
  241. // "open": 81820.0,
  242. // "close": 81790.0
  243. // }, {
  244. // "time": "2024-06-04 10:45",
  245. // "thscode": "CU2406.SHF",
  246. // "open": 81820.0,
  247. // "close": 81950.0
  248. // }]
  249. // }
  250. indexes = make([]*models.ThsHfIndexWithData, 0)
  251. indexMap := make(map[string]*models.ThsHfIndexWithData)
  252. for _, stockData := range appResp.Data {
  253. strTime := stockData["time"].(string)
  254. dataTime, e := time.ParseInLocation("2006-01-02 15:04", strTime, time.Local)
  255. if e != nil {
  256. utils.FileLog.Info("数据日期格式有误, time: %s, %v", strTime, e)
  257. continue
  258. }
  259. stockCode := stockData["thscode"].(string)
  260. // 指标代码+数据
  261. for k, v := range stockData {
  262. if k == "time" || k == "thscode" {
  263. continue
  264. }
  265. if v == nil {
  266. continue
  267. }
  268. val, ok := v.(float64)
  269. if !ok {
  270. continue
  271. }
  272. mk := fmt.Sprintf("%s-%s", stockCode, k)
  273. if indexMap[mk] == nil {
  274. indexMap[mk] = new(models.ThsHfIndexWithData)
  275. indexMap[mk].StockCode = stockCode
  276. indexMap[mk].EdbCode = k
  277. indexMap[mk].IndexData = make([]*models.ThsHfIndexData, 0)
  278. }
  279. indexMap[mk].IndexData = append(indexMap[mk].IndexData, &models.ThsHfIndexData{
  280. DataTime: dataTime,
  281. Value: val,
  282. })
  283. }
  284. }
  285. for _, v := range indexMap {
  286. indexes = append(indexes, v)
  287. }
  288. return
  289. }
  290. // RefreshThsHfBaseIndex 源指标刷新
  291. func RefreshThsHfBaseIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
  292. defer func() {
  293. if err != nil {
  294. tips := fmt.Sprintf("RefreshThsHfBaseIndex-更新失败, %v", err)
  295. utils.FileLog.Info(tips)
  296. go alarm_msg.SendAlarmMsg(tips, 3)
  297. }
  298. }()
  299. if indexItem == nil {
  300. err = fmt.Errorf("指标不存在")
  301. return
  302. }
  303. if len(codeWithData.IndexData) == 0 {
  304. return
  305. }
  306. // 获取源指标数据
  307. dataOb := new(models.BaseFromThsHfData)
  308. originData := make([]*models.BaseFromThsHfData, 0)
  309. {
  310. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().IndexCode)
  311. pars := make([]interface{}, 0)
  312. pars = append(pars, indexItem.IndexCode)
  313. if startTime != "" {
  314. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  315. pars = append(pars, startTime)
  316. }
  317. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  318. if e != nil {
  319. err = fmt.Errorf("获取源指标数据失败, %v", e)
  320. return
  321. }
  322. originData = list
  323. }
  324. // 更新指标数据
  325. dateExist := make(map[string]*models.BaseFromThsHfData)
  326. newValExist := make(map[string]bool)
  327. if len(originData) > 0 {
  328. // unicode去重
  329. for _, d := range originData {
  330. dateExist[d.UniqueCode] = d
  331. }
  332. }
  333. // 筛选新增/更新数据
  334. updateData := make([]*models.BaseFromThsHfData, 0)
  335. insertData := make([]*models.BaseFromThsHfData, 0)
  336. for _, d := range codeWithData.IndexData {
  337. uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format(utils.FormatDateTimeMinute)))
  338. origin := dateExist[uni]
  339. // unicode检验是否存在
  340. strNewVal := decimal.NewFromFloat(d.Value).Round(4).String()
  341. di, _ := decimal.NewFromString(strNewVal)
  342. newVal, _ := di.Float64()
  343. if origin != nil {
  344. strExistVal := decimal.NewFromFloat(origin.Value).Round(4).String()
  345. if strNewVal == strExistVal {
  346. continue
  347. }
  348. origin.Value = newVal
  349. origin.ModifyTime = time.Now().Local()
  350. updateData = append(updateData, origin)
  351. continue
  352. }
  353. // 新增的数据去重
  354. if newValExist[uni] {
  355. continue
  356. }
  357. newValExist[uni] = true
  358. newData := new(models.BaseFromThsHfData)
  359. newData.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId
  360. newData.IndexCode = indexItem.IndexCode
  361. newData.DataTime = d.DataTime
  362. newData.Value = newVal
  363. newData.CreateTime = time.Now()
  364. newData.ModifyTime = time.Now()
  365. newData.UniqueCode = uni
  366. newData.DataTimestamp = d.DataTime.UnixNano() / 1e6
  367. insertData = append(insertData, newData)
  368. }
  369. if e := dataOb.MultiInsertOrUpdate(insertData, updateData); e != nil {
  370. err = fmt.Errorf("新增/更新源指标数据失败, %v", e)
  371. return
  372. }
  373. // 更新指标开始结束时间
  374. minMax, e := dataOb.GetIndexMinMax(indexItem.IndexCode)
  375. if e == nil && minMax != nil {
  376. minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
  377. if e != nil {
  378. err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
  379. return
  380. }
  381. maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
  382. if e != nil {
  383. err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
  384. return
  385. }
  386. indexItem.StartDate = minDate
  387. indexItem.EndDate = maxDate
  388. indexItem.ModifyTime = time.Now().Local()
  389. updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
  390. if e = indexItem.Update(updateCols); e != nil {
  391. err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
  392. return
  393. }
  394. }
  395. // 同步刷新指标库
  396. go func() {
  397. _ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
  398. }()
  399. return
  400. }
  401. // RefreshThsHfBaseIndexMgo 源指标刷新-Mongo
  402. func RefreshThsHfBaseIndexMgo(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
  403. defer func() {
  404. if err != nil {
  405. tips := fmt.Sprintf("RefreshThsHfBaseIndexMgo-更新失败, %v", err)
  406. utils.FileLog.Info(tips)
  407. go alarm_msg.SendAlarmMsg(tips, 3)
  408. }
  409. }()
  410. if indexItem == nil {
  411. err = fmt.Errorf("指标不存在")
  412. return
  413. }
  414. if len(codeWithData.IndexData) == 0 {
  415. return
  416. }
  417. mogDataObj := new(mgo.BaseFromThsHfData)
  418. // 获取已存在的所有数据
  419. existCond := bson.M{
  420. "index_code": indexItem.IndexCode,
  421. }
  422. if startTime != "" {
  423. st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local)
  424. if e != nil {
  425. err = fmt.Errorf("start time parse err: %v", e)
  426. return
  427. }
  428. existCond["data_time"] = bson.M{
  429. "$gte": st,
  430. }
  431. }
  432. exitDataList, e := mogDataObj.GetAllDataList(existCond, []string{"data_time"})
  433. if e != nil {
  434. err = fmt.Errorf("GetAllDataList err: %v", e)
  435. return
  436. }
  437. // 已经存在的数据集
  438. exitDataMap := make(map[string]*mgo.BaseFromThsHfData)
  439. for _, v := range exitDataList {
  440. exitDataMap[v.UniqueCode] = v
  441. }
  442. // 待添加的数据集
  443. addDataList := make([]interface{}, 0)
  444. updateDataList := make([]mgo.BaseFromThsHfData, 0)
  445. for _, data := range codeWithData.IndexData {
  446. strNewVal := decimal.NewFromFloat(data.Value).Round(4).String()
  447. di, _ := decimal.NewFromString(strNewVal)
  448. newVal, _ := di.Float64()
  449. // unicode检验是否存在
  450. uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, data.DataTime.Format(utils.FormatDateTimeMinute)))
  451. findData, ok := exitDataMap[uni]
  452. if !ok {
  453. addDataList = append(addDataList, mgo.BaseFromThsHfData{
  454. BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId),
  455. IndexCode: indexItem.IndexCode,
  456. DataTime: data.DataTime,
  457. Value: newVal,
  458. UniqueCode: uni,
  459. CreateTime: time.Now(),
  460. ModifyTime: time.Now(),
  461. DataTimestamp: data.DataTime.UnixNano() / 1e6,
  462. })
  463. continue
  464. }
  465. // 值不匹配,修改数据
  466. strExistVal := decimal.NewFromFloat(findData.Value).Round(4).String()
  467. if strNewVal == strExistVal {
  468. continue
  469. }
  470. findData.Value = newVal
  471. updateDataList = append(updateDataList, *findData)
  472. }
  473. // 入库
  474. {
  475. coll := mogDataObj.GetCollection()
  476. if len(addDataList) > 0 {
  477. if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
  478. err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
  479. return
  480. }
  481. }
  482. if len(updateDataList) > 0 {
  483. for _, v := range updateDataList {
  484. if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
  485. err = fmt.Errorf("UpdateDataByColl, err: %v", e)
  486. return
  487. }
  488. }
  489. }
  490. }
  491. // 修改最大最小日期
  492. minMax, err := indexItem.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode)
  493. if err != nil {
  494. return
  495. }
  496. if err == nil && minMax != nil {
  497. minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
  498. if e != nil {
  499. err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
  500. return
  501. }
  502. maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
  503. if e != nil {
  504. err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
  505. return
  506. }
  507. indexItem.StartDate = minDate
  508. indexItem.EndDate = maxDate
  509. indexItem.ModifyTime = time.Now().Local()
  510. updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
  511. if e = indexItem.Update(updateCols); e != nil {
  512. err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
  513. return
  514. }
  515. }
  516. // 同步刷新指标库
  517. go func() {
  518. _ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
  519. }()
  520. return
  521. }
  522. // RefreshEdbFromThsHfBaseIndex 根据源指标刷新指标库
  523. func RefreshEdbFromThsHfBaseIndex(baseCode, startTime string) (err error) {
  524. defer func() {
  525. if err != nil {
  526. tips := fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标库失败, %v", err)
  527. utils.FileLog.Info(tips)
  528. go alarm_msg.SendAlarmMsg(tips, 3)
  529. }
  530. }()
  531. // 获取指标关联信息
  532. mappings := make([]*models.BaseFromEdbMapping, 0)
  533. {
  534. ob := new(models.BaseFromEdbMapping)
  535. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().BaseIndexCode)
  536. pars := make([]interface{}, 0)
  537. pars = append(pars, baseCode)
  538. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  539. if e != nil {
  540. err = fmt.Errorf("获取源指标关联失败, %v", e)
  541. return
  542. }
  543. mappings = list
  544. }
  545. if len(mappings) == 0 {
  546. return
  547. }
  548. codeMapping := make(map[string]*models.BaseFromEdbMapping)
  549. edbInfoIds := make([]int, 0)
  550. for _, v := range mappings {
  551. if codeMapping[v.EdbCode] == nil {
  552. codeMapping[v.EdbCode] = v
  553. }
  554. edbInfoIds = append(edbInfoIds, v.EdbInfoId)
  555. }
  556. // 指标信息
  557. edbInfoList, e := models.GetEdbInfoByIdList(edbInfoIds)
  558. if e != nil {
  559. err = fmt.Errorf("获取指标信息列表失败, %v", e)
  560. return
  561. }
  562. codeEdb := make(map[string]*models.EdbInfo)
  563. for _, v := range edbInfoList {
  564. if codeEdb[v.EdbCode] == nil {
  565. codeEdb[v.EdbCode] = v
  566. }
  567. }
  568. thsOb := new(models.EdbThsHf)
  569. source := thsOb.GetSource()
  570. subSource := thsOb.GetSubSource()
  571. for _, v := range mappings {
  572. cacheKey := fmt.Sprintf("%s_%d_%d_%s", utils.CACHE_EDB_DATA_REFRESH, source, subSource, v.EdbCode)
  573. if utils.Rc.IsExist(cacheKey) {
  574. continue
  575. }
  576. utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
  577. edb := codeEdb[v.EdbCode]
  578. if edb == nil {
  579. utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-指标信息有误, EdbCode: %s", v.EdbCode))
  580. continue
  581. }
  582. // 刷新指标
  583. if e = thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil {
  584. utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标失败, %v", e))
  585. _ = utils.Rc.Delete(cacheKey)
  586. continue
  587. }
  588. // 更新指标最值
  589. if e = thsOb.UnifiedModifyEdbInfoMaxAndMinInfo(edb); e != nil {
  590. utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-更新指标最值失败, %v", e))
  591. _ = utils.Rc.Delete(cacheKey)
  592. return
  593. }
  594. _ = utils.Rc.Delete(cacheKey)
  595. // 更新ES
  596. go logic.UpdateEs(edb.EdbInfoId)
  597. }
  598. return
  599. }