base_from_ths_hf.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/models/mgo"
  7. "eta/eta_index_lib/services/alarm_msg"
  8. "eta/eta_index_lib/utils"
  9. "fmt"
  10. "github.com/rdlucklib/rdluck_tools/http"
  11. "github.com/shopspring/decimal"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "net/url"
  14. "strings"
  15. "time"
  16. )
  17. const (
  18. ThsHfApiUrl = "https://quantapi.51ifind.com/api/v1/high_frequency"
  19. )
  20. // GetEdbDataFromThsHf 获取高频数据
  21. func GetEdbDataFromThsHf(thsParams models.ThsHfSearchEdbReq, terminalCode string) (indexes []*models.ThsHfIndexWithData, err error) {
  22. terminal, e := GetTerminal(utils.DATA_SOURCE_THS, terminalCode)
  23. if e != nil {
  24. err = fmt.Errorf("获取同花顺终端配置失败, %v", e)
  25. return
  26. }
  27. if thsParams.EndTime == "" {
  28. thsParams.EndTime = time.Now().Local().Format(utils.FormatDateTime)
  29. }
  30. // 走API
  31. if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" {
  32. var token string
  33. token, e = GetAccessToken(false, terminal.Value)
  34. if e != nil {
  35. err = fmt.Errorf("获取同花顺API-AccessToken失败, %v", e)
  36. return
  37. }
  38. // TEST
  39. //token = "9eba1634116ea2aed9a5b12b6e12b0b5fcbe0847.signs_NTc2NjQ4MTA5"
  40. return getEdbDataFromThsHfHttp(thsParams, terminal.Value, token)
  41. }
  42. // 走公用机
  43. if terminal.ServerUrl == "" {
  44. err = fmt.Errorf("同花顺终端地址未配置")
  45. return
  46. }
  47. return getEdbDataFromThsHfApp(thsParams, 0, terminal.ServerUrl)
  48. }
  49. // getEdbDataFromThsHfHttp API-获取高频指标数据
  50. func getEdbDataFromThsHfHttp(thsParams models.ThsHfSearchEdbReq, refreshToken, accessToken string) (indexes []*models.ThsHfIndexWithData, err error) {
  51. defer func() {
  52. if err != nil {
  53. tips := fmt.Sprintf("同花顺高频指标API-getEdbDataFromThsHfHttp err: %v", err)
  54. utils.FileLog.Info(tips)
  55. }
  56. }()
  57. // 请求参数参考
  58. //dataMap := map[string]interface{}{
  59. // "codes": "CU2407.SHF,CU2408.SHF",
  60. // "indicators": "pct_chg",
  61. // "starttime": "2024-06-06 09:15:00",
  62. // "endtime": "2024-06-11 15:15:00",
  63. // "functionpara": map[string]interface {
  64. // }{
  65. // "Limitstart": "10:00:00",
  66. // "Limitend": "14:15:00",
  67. // "Interval": 60,
  68. // "Fill": "Previous",
  69. // "CPS": "forward4",
  70. // "Timeformat": "LocalTime",
  71. // "BaseDate": "2024-01-01",
  72. // },
  73. //}
  74. // 额外参数
  75. funcParams := map[string]interface{}{}
  76. funcParams["Interval"] = thsParams.Interval
  77. if thsParams.Fill != "" {
  78. funcParams["Fill"] = thsParams.Fill
  79. }
  80. if thsParams.CPS != "" {
  81. funcParams["CPS"] = thsParams.CPS
  82. }
  83. if thsParams.BaseDate != "" {
  84. funcParams["BaseDate"] = thsParams.BaseDate
  85. }
  86. // TEST
  87. //funcParams["Limitstart"] = "10:00:00"
  88. //funcParams["Limitend"] = "14:15:00"
  89. //funcParams["Fill"] = "Previous"
  90. //funcParams["CPS"] = "forward4"
  91. //funcParams["Timeformat"] = "LocalTime"
  92. //funcParams["BaseDate"] = "2024-01-01"
  93. dataMap := map[string]interface{}{
  94. "codes": thsParams.StockCode,
  95. "indicators": thsParams.EdbCode,
  96. "starttime": thsParams.StartTime,
  97. "endtime": thsParams.EndTime,
  98. "functionpara": funcParams,
  99. }
  100. // 请求接口
  101. body, e, _ := postCurl(ThsHfApiUrl, dataMap, 0, refreshToken, accessToken)
  102. if e != nil {
  103. utils.FileLog.Info(string(body))
  104. err = fmt.Errorf("同花顺API-请求失败, %v", e)
  105. return
  106. }
  107. apiResp := new(models.ThsHfApiResp)
  108. if e = json.Unmarshal(body, &apiResp); e != nil {
  109. err = fmt.Errorf("同花顺API-解析响应失败, %v", e)
  110. return
  111. }
  112. if apiResp.ErrorCode != 0 {
  113. err = fmt.Errorf("同花顺高频API-状态码: %d, 提示信息: %s", apiResp.ErrorCode, apiResp.ErrMsg)
  114. return
  115. }
  116. indexes = make([]*models.ThsHfIndexWithData, 0)
  117. if len(apiResp.Tables) == 0 {
  118. utils.FileLog.Info("同花顺高频API-无数据")
  119. return
  120. }
  121. // 结果示例
  122. // {
  123. // "errorcode": 0,
  124. // "errmsg": "Success!",
  125. // "tables": [{
  126. // "thscode": "CU2407.SHF",
  127. // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"],
  128. // "table": {
  129. // "open": [77930.000000, 77980.000000, 77910.000000, 77850.000000],
  130. // "close": [77980.000000, 77920.000000, 77850.000000, 77780.000000]
  131. // }
  132. // }, {
  133. // "thscode": "CU2408.SHF",
  134. // "time": ["2024-07-01 10:00", "2024-07-01 11:15", "2024-07-01 14:15", "2024-07-01 15:00"],
  135. // "table": {
  136. // "open": [78180.000000, 78280.000000, 78220.000000, 78110.000000],
  137. // "close": [78280.000000, 78220.000000, 78110.000000, 78060.000000]
  138. // }
  139. // }]
  140. // }
  141. // Tables中的每一个对应一个证券代码
  142. for _, v := range apiResp.Tables {
  143. if len(v.Time) == 0 || len(v.Table) == 0 {
  144. continue
  145. }
  146. // Table中的K-V对应指标代码-数据值序列
  147. for tk, tv := range v.Table {
  148. index := new(models.ThsHfIndexWithData)
  149. index.StockCode = v.ThsCode
  150. index.EdbCode = tk
  151. td := make([]*models.ThsHfIndexData, 0)
  152. tvl := len(tv)
  153. for k, t := range v.Time {
  154. if k >= tvl {
  155. continue
  156. }
  157. dt, e := time.ParseInLocation("2006-01-02 15:04", t, time.Local)
  158. if e != nil {
  159. utils.FileLog.Info(fmt.Sprintf("同花顺API-time parse t: %s, err: %v", t, e))
  160. continue
  161. }
  162. td = append(td, &models.ThsHfIndexData{
  163. DataTime: dt,
  164. Value: tv[k],
  165. })
  166. }
  167. index.IndexData = td
  168. indexes = append(indexes, index)
  169. }
  170. }
  171. return
  172. }
  173. // getEdbDataFromThsHfApp 公用机-获取高频指标数据
  174. func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverUrl string) (indexes []*models.ThsHfIndexWithData, err error) {
  175. var requestUrl string
  176. defer func() {
  177. if err != nil {
  178. utils.FileLog.Info(fmt.Sprintf("requestUrl: %s", requestUrl))
  179. utils.FileLog.Info(fmt.Sprintf("getEdbDataFromThsHfApp: %v", err))
  180. }
  181. }()
  182. //serverUrl = "http://wxmsgsen1.hzinsights.com:8040/"
  183. baseUrl := fmt.Sprintf("%s%s", serverUrl, "edbInfo/ths/hf?")
  184. // 额外参数
  185. var funcParam string
  186. if thsParams.Interval > 0 {
  187. funcParam += fmt.Sprintf("Interval:%d,", thsParams.Interval)
  188. }
  189. if thsParams.Fill != "" {
  190. funcParam += fmt.Sprintf("Fill:%s,", thsParams.Fill)
  191. }
  192. if thsParams.CPS != "" {
  193. funcParam += fmt.Sprintf("CPS:%s,", thsParams.CPS)
  194. }
  195. if thsParams.BaseDate != "" {
  196. funcParam += fmt.Sprintf("BaseDate:%s,", thsParams.BaseDate)
  197. }
  198. funcParam = strings.TrimRight(funcParam, ",")
  199. params := url.Values{}
  200. params.Add("codes", thsParams.StockCode)
  201. params.Add("indicators", thsParams.EdbCode)
  202. params.Add("function_para", funcParam)
  203. params.Add("start_time", thsParams.StartTime)
  204. params.Add("end_time", thsParams.EndTime)
  205. // 请求终端
  206. requestUrl = baseUrl + params.Encode()
  207. body, e := http.Get(requestUrl)
  208. if e != nil {
  209. err = fmt.Errorf("")
  210. return
  211. }
  212. dataBody := strings.TrimLeft(string(body), `"`)
  213. dataBody = strings.TrimRight(dataBody, `"`)
  214. dataBody = strings.ReplaceAll(dataBody, `\`, ``)
  215. //utils.FileLog.Info(dataBody)
  216. appResp := new(TerminalResponse)
  217. if e = json.Unmarshal([]byte(dataBody), &appResp); e != nil {
  218. err = fmt.Errorf("同花顺APP-解析响应失败, %v", e)
  219. return
  220. }
  221. if appResp.ErrorCode != 0 {
  222. //如果是同花顺登录session失效了,那么就重新请求获取数据
  223. if appResp.ErrorCode == -1020 && num == 0 {
  224. return getEdbDataFromThsHfApp(thsParams, 1, serverUrl)
  225. }
  226. err = fmt.Errorf("同花顺APP-状态码: %d, 提示信息: %s", appResp.ErrorCode, appResp.ErrMsg)
  227. return
  228. }
  229. // 响应结果示例
  230. // {
  231. // "errorcode": 0,
  232. // "errmsg": "Success!",
  233. // "data": [{
  234. // "time": "2024-06-04 09:30",
  235. // "thscode": "CU2406.SHF",
  236. // "open": 81900.0,
  237. // "close": 81820.0
  238. // }, {
  239. // "time": "2024-06-04 10:00",
  240. // "thscode": "CU2406.SHF",
  241. // "open": 81820.0,
  242. // "close": 81790.0
  243. // }, {
  244. // "time": "2024-06-04 10:45",
  245. // "thscode": "CU2406.SHF",
  246. // "open": 81820.0,
  247. // "close": 81950.0
  248. // }]
  249. // }
  250. indexes = make([]*models.ThsHfIndexWithData, 0)
  251. indexMap := make(map[string]*models.ThsHfIndexWithData)
  252. for _, stockData := range appResp.Data {
  253. strTime := stockData["time"].(string)
  254. dataTime, e := time.ParseInLocation("2006-01-02 15:04", strTime, time.Local)
  255. if e != nil {
  256. utils.FileLog.Info("数据日期格式有误, time: %s, %v", strTime, e)
  257. continue
  258. }
  259. stockCode := stockData["thscode"].(string)
  260. // 指标代码+数据
  261. for k, v := range stockData {
  262. if k == "time" || k == "thscode" {
  263. continue
  264. }
  265. if v == nil {
  266. continue
  267. }
  268. val, ok := v.(float64)
  269. if !ok {
  270. continue
  271. }
  272. mk := fmt.Sprintf("%s-%s", stockCode, k)
  273. if indexMap[mk] == nil {
  274. indexMap[mk] = new(models.ThsHfIndexWithData)
  275. indexMap[mk].StockCode = stockCode
  276. indexMap[mk].EdbCode = k
  277. indexMap[mk].IndexData = make([]*models.ThsHfIndexData, 0)
  278. }
  279. indexMap[mk].IndexData = append(indexMap[mk].IndexData, &models.ThsHfIndexData{
  280. DataTime: dataTime,
  281. Value: val,
  282. })
  283. }
  284. }
  285. for _, v := range indexMap {
  286. indexes = append(indexes, v)
  287. }
  288. return
  289. }
  290. // RefreshThsHfBaseIndex 源指标刷新
  291. func RefreshThsHfBaseIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
  292. defer func() {
  293. if err != nil {
  294. tips := fmt.Sprintf("RefreshThsHfBaseIndex-更新失败, %v", err)
  295. utils.FileLog.Info(tips)
  296. go alarm_msg.SendAlarmMsg(tips, 3)
  297. }
  298. }()
  299. if indexItem == nil {
  300. err = fmt.Errorf("指标不存在")
  301. return
  302. }
  303. if len(codeWithData.IndexData) == 0 {
  304. return
  305. }
  306. // 获取源指标数据
  307. dataOb := new(models.BaseFromThsHfData)
  308. originData := make([]*models.BaseFromThsHfData, 0)
  309. {
  310. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().IndexCode)
  311. pars := make([]interface{}, 0)
  312. pars = append(pars, indexItem.IndexCode)
  313. if startTime != "" {
  314. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  315. pars = append(pars, startTime)
  316. }
  317. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  318. if e != nil {
  319. err = fmt.Errorf("获取源指标数据失败, %v", e)
  320. return
  321. }
  322. originData = list
  323. }
  324. // 更新指标数据
  325. dateExist := make(map[string]*models.BaseFromThsHfData)
  326. newValExist := make(map[string]bool)
  327. if len(originData) > 0 {
  328. // unicode去重
  329. for _, d := range originData {
  330. dateExist[d.UniqueCode] = d
  331. }
  332. }
  333. // 筛选新增/更新数据
  334. updateData := make([]*models.BaseFromThsHfData, 0)
  335. insertData := make([]*models.BaseFromThsHfData, 0)
  336. for _, d := range codeWithData.IndexData {
  337. uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format(utils.FormatDateTimeMinute)))
  338. origin := dateExist[uni]
  339. // unicode检验是否存在
  340. strNewVal := decimal.NewFromFloat(d.Value).Round(4).String()
  341. di, _ := decimal.NewFromString(strNewVal)
  342. newVal, _ := di.Float64()
  343. if origin != nil {
  344. strExistVal := decimal.NewFromFloat(origin.Value).Round(4).String()
  345. if strNewVal == strExistVal {
  346. continue
  347. }
  348. origin.Value = newVal
  349. origin.ModifyTime = time.Now().Local()
  350. updateData = append(updateData, origin)
  351. }
  352. // 新增的数据去重
  353. if newValExist[uni] {
  354. continue
  355. }
  356. newValExist[uni] = true
  357. newData := new(models.BaseFromThsHfData)
  358. newData.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId
  359. newData.IndexCode = indexItem.IndexCode
  360. newData.DataTime = d.DataTime
  361. newData.Value = newVal
  362. newData.CreateTime = time.Now()
  363. newData.ModifyTime = time.Now()
  364. newData.UniqueCode = uni
  365. newData.DataTimestamp = d.DataTime.UnixNano() / 1e6
  366. insertData = append(insertData, newData)
  367. }
  368. if e := dataOb.MultiInsertOrUpdate(insertData, updateData); e != nil {
  369. err = fmt.Errorf("新增/更新源指标数据失败, %v", e)
  370. return
  371. }
  372. // 更新指标开始结束时间
  373. minMax, e := dataOb.GetIndexMinMax(indexItem.IndexCode)
  374. if e == nil && minMax != nil {
  375. minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
  376. if e != nil {
  377. err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
  378. return
  379. }
  380. maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
  381. if e != nil {
  382. err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
  383. return
  384. }
  385. indexItem.StartDate = minDate
  386. indexItem.EndDate = maxDate
  387. indexItem.ModifyTime = time.Now().Local()
  388. updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
  389. if e = indexItem.Update(updateCols); e != nil {
  390. err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
  391. return
  392. }
  393. }
  394. // 同步刷新指标库
  395. go func() {
  396. _ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
  397. }()
  398. return
  399. }
  400. // RefreshThsHfBaseIndexMgo 源指标刷新-Mongo
  401. func RefreshThsHfBaseIndexMgo(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
  402. defer func() {
  403. if err != nil {
  404. tips := fmt.Sprintf("RefreshThsHfBaseIndexMgo-更新失败, %v", err)
  405. utils.FileLog.Info(tips)
  406. go alarm_msg.SendAlarmMsg(tips, 3)
  407. }
  408. }()
  409. if indexItem == nil {
  410. err = fmt.Errorf("指标不存在")
  411. return
  412. }
  413. if len(codeWithData.IndexData) == 0 {
  414. return
  415. }
  416. mogDataObj := new(mgo.BaseFromThsHfData)
  417. // 获取已存在的所有数据
  418. existCond := bson.M{
  419. "index_code": indexItem.IndexCode,
  420. }
  421. if startTime != "" {
  422. st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local)
  423. if e != nil {
  424. err = fmt.Errorf("start time parse err: %v", e)
  425. return
  426. }
  427. existCond["data_time"] = bson.M{
  428. "$gte": st,
  429. }
  430. }
  431. exitDataList, e := mogDataObj.GetAllDataList(existCond, []string{"data_time"})
  432. if e != nil {
  433. err = fmt.Errorf("GetAllDataList err: %v", e)
  434. return
  435. }
  436. // 已经存在的数据集
  437. exitDataMap := make(map[string]*mgo.BaseFromThsHfData)
  438. for _, v := range exitDataList {
  439. exitDataMap[v.UniqueCode] = v
  440. }
  441. // 待添加的数据集
  442. addDataList := make([]interface{}, 0)
  443. updateDataList := make([]mgo.BaseFromThsHfData, 0)
  444. for _, data := range codeWithData.IndexData {
  445. strNewVal := decimal.NewFromFloat(data.Value).Round(4).String()
  446. di, _ := decimal.NewFromString(strNewVal)
  447. newVal, _ := di.Float64()
  448. // unicode检验是否存在
  449. uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, data.DataTime.Format(utils.FormatDateTimeMinute)))
  450. findData, ok := exitDataMap[uni]
  451. if !ok {
  452. addDataList = append(addDataList, mgo.BaseFromThsHfData{
  453. BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId),
  454. IndexCode: indexItem.IndexCode,
  455. DataTime: data.DataTime,
  456. Value: newVal,
  457. UniqueCode: uni,
  458. CreateTime: time.Now(),
  459. ModifyTime: time.Now(),
  460. DataTimestamp: data.DataTime.UnixNano() / 1e6,
  461. })
  462. continue
  463. }
  464. // 值不匹配,修改数据
  465. strExistVal := decimal.NewFromFloat(findData.Value).Round(4).String()
  466. if strNewVal == strExistVal {
  467. continue
  468. }
  469. findData.Value = newVal
  470. updateDataList = append(updateDataList, *findData)
  471. }
  472. // 入库
  473. {
  474. coll := mogDataObj.GetCollection()
  475. if len(addDataList) > 0 {
  476. if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
  477. err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
  478. return
  479. }
  480. }
  481. if len(updateDataList) > 0 {
  482. for _, v := range updateDataList {
  483. if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
  484. err = fmt.Errorf("UpdateDataByColl, err: %v", e)
  485. return
  486. }
  487. }
  488. }
  489. }
  490. // 修改最大最小日期
  491. minMax, err := indexItem.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode)
  492. if err != nil {
  493. return
  494. }
  495. if err == nil && minMax != nil {
  496. minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
  497. if e != nil {
  498. err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
  499. return
  500. }
  501. maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
  502. if e != nil {
  503. err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
  504. return
  505. }
  506. indexItem.StartDate = minDate
  507. indexItem.EndDate = maxDate
  508. indexItem.ModifyTime = time.Now().Local()
  509. updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
  510. if e = indexItem.Update(updateCols); e != nil {
  511. err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
  512. return
  513. }
  514. }
  515. // 同步刷新指标库
  516. go func() {
  517. _ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
  518. }()
  519. return
  520. }
  521. // RefreshEdbFromThsHfBaseIndex 根据源指标刷新指标库
  522. func RefreshEdbFromThsHfBaseIndex(baseCode, startTime string) (err error) {
  523. defer func() {
  524. if err != nil {
  525. tips := fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标库失败, %v", err)
  526. utils.FileLog.Info(tips)
  527. go alarm_msg.SendAlarmMsg(tips, 3)
  528. }
  529. }()
  530. // 获取指标关联信息
  531. mappings := make([]*models.BaseFromEdbMapping, 0)
  532. {
  533. ob := new(models.BaseFromEdbMapping)
  534. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().BaseIndexCode)
  535. pars := make([]interface{}, 0)
  536. pars = append(pars, baseCode)
  537. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  538. if e != nil {
  539. err = fmt.Errorf("获取源指标关联失败, %v", e)
  540. return
  541. }
  542. mappings = list
  543. }
  544. if len(mappings) == 0 {
  545. return
  546. }
  547. codeMapping := make(map[string]*models.BaseFromEdbMapping)
  548. edbInfoIds := make([]int, 0)
  549. for _, v := range mappings {
  550. if codeMapping[v.EdbCode] == nil {
  551. codeMapping[v.EdbCode] = v
  552. }
  553. edbInfoIds = append(edbInfoIds, v.EdbInfoId)
  554. }
  555. // 指标信息
  556. edbInfoList, e := models.GetEdbInfoByIdList(edbInfoIds)
  557. if e != nil {
  558. err = fmt.Errorf("获取指标信息列表失败, %v", e)
  559. return
  560. }
  561. codeEdb := make(map[string]*models.EdbInfo)
  562. for _, v := range edbInfoList {
  563. if codeEdb[v.EdbCode] == nil {
  564. codeEdb[v.EdbCode] = v
  565. }
  566. }
  567. thsOb := new(models.EdbThsHf)
  568. source := thsOb.GetSource()
  569. subSource := thsOb.GetSubSource()
  570. for _, v := range mappings {
  571. cacheKey := fmt.Sprintf("%s_%d_%d_%s", utils.CACHE_EDB_DATA_REFRESH, source, subSource, v.EdbCode)
  572. if utils.Rc.IsExist(cacheKey) {
  573. continue
  574. }
  575. utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
  576. edb := codeEdb[v.EdbCode]
  577. if edb == nil {
  578. utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-指标信息有误, EdbCode: %s", v.EdbCode))
  579. continue
  580. }
  581. // 刷新指标
  582. if e = thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil {
  583. utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标失败, %v", e))
  584. _ = utils.Rc.Delete(cacheKey)
  585. continue
  586. }
  587. // 更新指标最值
  588. if e = thsOb.UnifiedModifyEdbInfoMaxAndMinInfo(edb); e != nil {
  589. utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-更新指标最值失败, %v", e))
  590. _ = utils.Rc.Delete(cacheKey)
  591. return
  592. }
  593. _ = utils.Rc.Delete(cacheKey)
  594. // 更新ES
  595. go logic.UpdateEs(edb.EdbInfoId)
  596. }
  597. return
  598. }