edb_ths_hf.go 17 KB


  1. package models
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/utils"
  5. "fmt"
  6. "github.com/beego/beego/v2/client/orm"
  7. "github.com/shopspring/decimal"
  8. "reflect"
  9. "sort"
  10. "time"
  11. )
  12. // EdbThsHf 自有数据
  13. type EdbThsHf struct{}
  14. // GetSource 获取来源编码id
  15. func (obj EdbThsHf) GetSource() int {
  16. return utils.DATA_SOURCE_THS
  17. }
  18. // GetSubSource 获取子来源编码id
  19. func (obj EdbThsHf) GetSubSource() int {
  20. return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
  21. }
  22. // GetSourceName 获取来源名称
  23. func (obj EdbThsHf) GetSourceName() string {
  24. return utils.DATA_SOURCE_NAME_THS
  25. }
  26. // GetEdbType 获取指标类型
  27. func (obj EdbThsHf) GetEdbType() int {
  28. return utils.DEFAULT_EDB_TYPE
  29. }
  30. // ThsHfAddBaseParams
  31. // @Description: 基础指标的添加参数
  32. type ThsHfAddBaseParams struct {
  33. EdbCode string `description:"指标编码"`
  34. EdbName string `description:"指标名称"`
  35. Unit string `description:"单位"`
  36. Frequency string `description:"频度"`
  37. Sort int `description:"排序"`
  38. ClassifyId int `description:"所属分类"`
  39. SysUserId int `description:"用户id"`
  40. SysUserRealName string `description:"用户真实名称"`
  41. UniqueCode string `description:"唯一编码"`
  42. ConvertRule string `description:"转换规则"`
  43. }
  44. // ThsHfEditBaseParams
  45. // @Description: 基础指标的修改参数
  46. type ThsHfEditBaseParams struct {
  47. EdbCode string `description:"指标编码"`
  48. EdbName string `description:"指标名称"`
  49. EdbNameEn string `description:"指标名称(英文)"`
  50. Unit string `description:"单位"`
  51. UnitEn string `description:"单位(英文)"`
  52. ClassifyId int `description:"所属分类"`
  53. SysUserId int `description:"用户id"`
  54. SysUserRealName string `description:"用户真实名称"`
  55. UniqueCode string `description:"编码"`
  56. Lang string `description:"语言版本"`
  57. EdbInfo *EdbInfo `description:"指标信息"`
  58. }
  59. type ThsHfRefreshBaseParams struct {
  60. EdbInfo *EdbInfo
  61. StartDate string
  62. EndDate string
  63. }
  64. // Add
  65. // @Description: 添加指标
  66. // @author: Roc
  67. // @receiver obj
  68. // @datetime 2024-04-30 17:35:14
  69. // @param params ThsHfAddBaseParams
  70. // @param businessIndexItem *BaseFromBusinessIndex
  71. // @return edbInfo *EdbInfo
  72. // @return err error
  73. // @return errMsg string
  74. func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
  75. o := orm.NewOrm()
  76. tx, e := o.Begin()
  77. if e != nil {
  78. err = fmt.Errorf("orm begin err: %v", e)
  79. return
  80. }
  81. defer func() {
  82. if err != nil {
  83. _ = tx.Rollback()
  84. utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err))
  85. return
  86. }
  87. _ = tx.Commit()
  88. }()
  89. // 新增指标
  90. edbInfo = new(EdbInfo)
  91. edbInfo.Source = obj.GetSource()
  92. edbInfo.SubSource = obj.GetSubSource()
  93. edbInfo.SourceName = obj.GetSourceName()
  94. edbInfo.EdbType = obj.GetEdbType()
  95. edbInfo.EdbCode = params.EdbCode
  96. edbInfo.EdbName = params.EdbName
  97. edbInfo.EdbNameEn = params.EdbName
  98. edbInfo.EdbNameSource = params.EdbName
  99. edbInfo.Frequency = params.Frequency
  100. edbInfo.Unit = params.Unit
  101. edbInfo.UnitEn = params.Unit
  102. edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
  103. edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
  104. edbInfo.ClassifyId = params.ClassifyId
  105. edbInfo.SysUserId = params.SysUserId
  106. edbInfo.SysUserRealName = params.SysUserRealName
  107. edbInfo.Sort = params.Sort
  108. edbInfo.TerminalCode = baseIndex.TerminalCode
  109. edbInfo.UniqueCode = params.UniqueCode
  110. edbInfo.CreateTime = time.Now()
  111. edbInfo.ModifyTime = time.Now()
  112. edbInfoId, e := tx.Insert(edbInfo)
  113. if e != nil {
  114. err = fmt.Errorf("insert edb err: %v", e)
  115. return
  116. }
  117. edbInfo.EdbInfoId = int(edbInfoId)
  118. // 新增指标关联
  119. edbMapping := new(BaseFromEdbMapping)
  120. edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
  121. edbMapping.BaseIndexCode = baseIndex.IndexCode
  122. edbMapping.EdbInfoId = edbInfo.EdbInfoId
  123. edbMapping.EdbCode = edbInfo.EdbCode
  124. edbMapping.Source = obj.GetSource()
  125. edbMapping.SubSource = obj.GetSubSource()
  126. edbMapping.ConvertRule = params.ConvertRule
  127. edbMapping.CreateTime = time.Now().Local()
  128. edbMapping.ModifyTime = time.Now().Local()
  129. edbMappingId, e := tx.Insert(edbMapping)
  130. if e != nil {
  131. err = fmt.Errorf("insert base edb mapping err: %v", e)
  132. return
  133. }
  134. edbMapping.Id = int(edbMappingId)
  135. // 刷新数据
  136. err = obj.Refresh(edbInfo, edbMapping, "")
  137. return
  138. }
  139. func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  140. if edbInfo == nil || edbBaseMapping == nil {
  141. err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
  142. return
  143. }
  144. // 真实数据的最大日期, 插入规则配置的日期
  145. var realDataMaxDate, edbDataInsertConfigDate time.Time
  146. var edbDataInsertConfig *EdbDataInsertConfig
  147. var isFindConfigDateRealData bool
  148. {
  149. conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  150. if e != nil && e.Error() != utils.ErrNoRow() {
  151. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
  152. return
  153. }
  154. edbDataInsertConfig = conf
  155. if edbDataInsertConfig != nil {
  156. edbDataInsertConfigDate = edbDataInsertConfig.Date
  157. }
  158. }
  159. // 源指标数据
  160. baseDataList := make([]*BaseFromThsHfData, 0)
  161. {
  162. ob := new(BaseFromThsHfData)
  163. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
  164. pars := make([]interface{}, 0)
  165. pars = append(pars, edbBaseMapping.BaseIndexCode)
  166. // 开始时间-取-3d
  167. if startDate != "" {
  168. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  169. if e != nil {
  170. err = fmt.Errorf("刷新开始时间有误, %v", e)
  171. return
  172. }
  173. cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
  174. pars = append(pars, st.AddDate(0, 0, -3).Format(utils.FormatDate))
  175. }
  176. list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
  177. if e != nil {
  178. err = fmt.Errorf("获取数据源数据失败, %v", e)
  179. return
  180. }
  181. baseDataList = list
  182. }
  183. convertRule := new(ThsHfIndexConvert2EdbRule)
  184. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  185. err = fmt.Errorf("转换规则有误, %v", e)
  186. return
  187. }
  188. // 转换数据
  189. convertData, e := ThsHfConvertData2DayByRule(baseDataList, convertRule)
  190. if e != nil {
  191. err = fmt.Errorf("转换数据失败, %v", e)
  192. return
  193. }
  194. if len(convertData) == 0 {
  195. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  196. return
  197. }
  198. // 获取已有数据
  199. dataOb := new(EdbDataThsHf)
  200. dataExists := make(map[string]*EdbDataThsHf)
  201. searchExistMap := make(map[string]*EdbInfoSearchData)
  202. {
  203. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
  204. pars := make([]interface{}, 0)
  205. pars = append(pars, edbInfo.EdbInfoId)
  206. if startDate != "" {
  207. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  208. pars = append(pars, startDate)
  209. }
  210. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  211. if e != nil {
  212. err = fmt.Errorf("获取指标数据失败, %v", e)
  213. return
  214. }
  215. for _, v := range list {
  216. dataExists[v.DataTime.Format(utils.FormatDate)] = v
  217. searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
  218. EdbDataId: v.EdbDataId,
  219. EdbInfoId: v.EdbInfoId,
  220. DataTime: v.DataTime.Format(utils.FormatDate),
  221. Value: v.Value,
  222. EdbCode: v.EdbCode,
  223. DataTimestamp: v.DataTimestamp,
  224. }
  225. }
  226. }
  227. // 比对数据
  228. insertExist := make(map[string]bool)
  229. insertData := make([]*EdbDataThsHf, 0)
  230. updateData := make([]*EdbDataThsHf, 0)
  231. for k, v := range convertData {
  232. strDate := k.Format(utils.FormatDate)
  233. // 手动插入数据的判断
  234. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  235. realDataMaxDate = k
  236. }
  237. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  238. isFindConfigDateRealData = true
  239. }
  240. // 入库值
  241. saveVal := decimal.NewFromFloat(v).Round(4).String()
  242. d, e := decimal.NewFromString(saveVal)
  243. if e != nil {
  244. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  245. continue
  246. }
  247. saveFloat, _ := d.Float64()
  248. // 更新
  249. exists := dataExists[strDate]
  250. if exists != nil {
  251. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  252. if saveVal != existVal {
  253. exists.Value = saveFloat
  254. updateData = append(updateData, exists)
  255. }
  256. continue
  257. }
  258. // 新增
  259. if insertExist[strDate] {
  260. continue
  261. }
  262. insertExist[strDate] = true
  263. timestamp := k.UnixNano() / 1e6
  264. insertData = append(insertData, &EdbDataThsHf{
  265. EdbInfoId: edbInfo.EdbInfoId,
  266. EdbCode: edbInfo.EdbCode,
  267. DataTime: k,
  268. Value: saveFloat,
  269. CreateTime: time.Now(),
  270. ModifyTime: time.Now(),
  271. DataTimestamp: timestamp,
  272. })
  273. }
  274. // 批量新增/更新
  275. if len(insertData) > 0 {
  276. if e = dataOb.CreateMulti(insertData); e != nil {
  277. err = fmt.Errorf("批量新增指标数据失败, %v", e)
  278. return
  279. }
  280. }
  281. if len(updateData) > 0 {
  282. if e = dataOb.MultiUpdateValue(updateData); e != nil {
  283. err = fmt.Errorf("批量更新指标数据失败, %v", e)
  284. return
  285. }
  286. }
  287. // 处理手工数据补充的配置
  288. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
  289. return
  290. }
  291. // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
  292. func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
  293. // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
  294. timeData = make(map[time.Time]float64)
  295. if len(originData) == 0 || convertRule == nil {
  296. return
  297. }
  298. if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
  299. err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
  300. return
  301. }
  302. // 升序排序
  303. sort.Slice(originData, func(i, j int) bool {
  304. return originData[i].DataTimestamp < originData[j].DataTimestamp
  305. })
  306. // 将数据根据日期进行分组
  307. var sortDates []string
  308. groupDateData := make(map[string][]*BaseFromThsHfData)
  309. for _, v := range originData {
  310. d := v.DataTime.Format(utils.FormatDate)
  311. if !utils.InArrayByStr(sortDates, d) {
  312. sortDates = append(sortDates, d)
  313. }
  314. if groupDateData[d] == nil {
  315. groupDateData[d] = make([]*BaseFromThsHfData, 0)
  316. }
  317. groupDateData[d] = append(groupDateData[d], v)
  318. }
  319. // 取值方式-指定时间的值
  320. if convertRule.ConvertType == 1 {
  321. for k, v := range sortDates {
  322. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  323. if e != nil {
  324. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  325. continue
  326. }
  327. var timeTarget time.Time
  328. dateData := make([]*BaseFromThsHfData, 0)
  329. // 当日
  330. if convertRule.ConvertFixed.FixedDay == 1 {
  331. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
  332. if e != nil {
  333. utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
  334. continue
  335. }
  336. timeTarget = tg
  337. dt := groupDateData[v]
  338. if dt == nil {
  339. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
  340. continue
  341. }
  342. if len(dt) == 0 {
  343. continue
  344. }
  345. dateData = dt
  346. }
  347. // 前一日
  348. if convertRule.ConvertFixed.FixedDay == 2 {
  349. if k < 0 {
  350. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  351. continue
  352. }
  353. preDate := sortDates[k-1]
  354. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  355. if e != nil {
  356. utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
  357. continue
  358. }
  359. timeTarget = tg
  360. dt := groupDateData[preDate]
  361. if dt == nil {
  362. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  363. continue
  364. }
  365. if len(dt) == 0 {
  366. continue
  367. }
  368. dateData = dt
  369. }
  370. // 重新获取数据序列中, 时间在目标时间点之后的
  371. newDateData := make([]*BaseFromThsHfData, 0)
  372. for kv, dv := range dateData {
  373. if dv.DataTime.Before(timeTarget) {
  374. continue
  375. }
  376. // 由于升序排列, 直接取之后所有的数据
  377. newDateData = append(newDateData, dateData[kv:]...)
  378. break
  379. }
  380. // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
  381. if len(newDateData) == 0 {
  382. utils.FileLog.Info("%s当日无有效数据", v)
  383. continue
  384. }
  385. timeData[todayTime] = newDateData[0].Value
  386. }
  387. return
  388. }
  389. // 取值方式-区间计算值
  390. for k, v := range sortDates {
  391. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  392. if e != nil {
  393. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  394. continue
  395. }
  396. var thisDate, preDate string
  397. thisDate = v
  398. if k > 0 {
  399. preDate = sortDates[k-1]
  400. }
  401. var startTimeTarget, endTimeTarget time.Time
  402. // 起始时间-当日/前一日
  403. if convertRule.ConvertArea.StartDay == 1 {
  404. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertFixed.FixedTime), time.Local)
  405. if e != nil {
  406. utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
  407. continue
  408. }
  409. startTimeTarget = tg
  410. }
  411. if convertRule.ConvertArea.StartDay == 2 {
  412. if preDate == "" {
  413. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  414. continue
  415. }
  416. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  417. if e != nil {
  418. utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
  419. continue
  420. }
  421. startTimeTarget = tg
  422. }
  423. // 截止时间-当日/前一日
  424. if convertRule.ConvertArea.EndDay == 1 {
  425. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertFixed.FixedTime), time.Local)
  426. if e != nil {
  427. utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
  428. continue
  429. }
  430. endTimeTarget = tg
  431. }
  432. if convertRule.ConvertArea.EndDay == 2 {
  433. if preDate == "" {
  434. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  435. continue
  436. }
  437. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  438. if e != nil {
  439. utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
  440. continue
  441. }
  442. endTimeTarget = tg
  443. }
  444. if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
  445. utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
  446. continue
  447. }
  448. // 合并前日当日数据
  449. dateData := make([]*BaseFromThsHfData, 0)
  450. if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
  451. // 起始截止均为当日
  452. dateData = groupDateData[thisDate]
  453. if dateData == nil {
  454. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  455. continue
  456. }
  457. if len(dateData) == 0 {
  458. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  459. continue
  460. }
  461. } else {
  462. if preDate == "" {
  463. continue
  464. }
  465. // 起始截止时间含前日
  466. preData := groupDateData[preDate]
  467. if preData == nil {
  468. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  469. continue
  470. }
  471. if len(preData) == 0 {
  472. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  473. continue
  474. }
  475. thisData := groupDateData[thisDate]
  476. if thisData == nil {
  477. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  478. continue
  479. }
  480. if len(thisData) == 0 {
  481. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  482. continue
  483. }
  484. dateData = append(dateData, preData...)
  485. dateData = append(dateData, thisData...)
  486. }
  487. // 重组时间区间内的数据
  488. newDateData := make([]*BaseFromThsHfData, 0)
  489. for _, dv := range dateData {
  490. if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
  491. continue
  492. }
  493. newDateData = append(newDateData, dv)
  494. }
  495. if len(newDateData) == 0 {
  496. utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
  497. continue
  498. }
  499. // 取出区间内的均值/最值
  500. var avgVal, minVal, maxVal, sumVal float64
  501. for _, nv := range newDateData {
  502. sumVal += nv.Value
  503. if nv.Value > maxVal {
  504. maxVal = nv.Value
  505. }
  506. if nv.Value < minVal {
  507. minVal = nv.Value
  508. }
  509. }
  510. avgVal = sumVal / float64(len(newDateData))
  511. switch convertRule.ConvertArea.CalculateType {
  512. case 1:
  513. timeData[todayTime] = avgVal
  514. case 2:
  515. timeData[todayTime] = maxVal
  516. case 3:
  517. timeData[todayTime] = minVal
  518. default:
  519. utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
  520. }
  521. }
  522. return
  523. }