edb_ths_hf.go 27 KB


  1. package models
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/models/mgo"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "github.com/beego/beego/v2/client/orm"
  8. "github.com/shopspring/decimal"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "reflect"
  11. "sort"
  12. "time"
  13. )
  14. // EdbThsHf 自有数据
  15. type EdbThsHf struct{}
  16. // GetSource 获取来源编码id
  17. func (obj EdbThsHf) GetSource() int {
  18. return utils.DATA_SOURCE_THS
  19. }
  20. // GetSubSource 获取子来源编码id
  21. func (obj EdbThsHf) GetSubSource() int {
  22. return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
  23. }
  24. // GetSourceName 获取来源名称
  25. func (obj EdbThsHf) GetSourceName() string {
  26. return utils.DATA_SOURCE_NAME_THS
  27. }
  28. // GetEdbType 获取指标类型
  29. func (obj EdbThsHf) GetEdbType() int {
  30. return utils.DEFAULT_EDB_TYPE
  31. }
  32. // ThsHfAddBaseParams
  33. // @Description: 基础指标的添加参数
  34. type ThsHfAddBaseParams struct {
  35. EdbCode string `description:"指标编码"`
  36. EdbName string `description:"指标名称"`
  37. Unit string `description:"单位"`
  38. Frequency string `description:"频度"`
  39. Sort int `description:"排序"`
  40. ClassifyId int `description:"所属分类"`
  41. SysUserId int `description:"用户id"`
  42. SysUserRealName string `description:"用户真实名称"`
  43. UniqueCode string `description:"唯一编码"`
  44. ConvertRule string `description:"转换规则"`
  45. }
  46. // ThsHfEditBaseParams
  47. // @Description: 基础指标的修改参数
  48. type ThsHfEditBaseParams struct {
  49. EdbCode string `description:"指标编码"`
  50. EdbName string `description:"指标名称"`
  51. EdbNameEn string `description:"指标名称(英文)"`
  52. Unit string `description:"单位"`
  53. UnitEn string `description:"单位(英文)"`
  54. ClassifyId int `description:"所属分类"`
  55. SysUserId int `description:"用户id"`
  56. SysUserRealName string `description:"用户真实名称"`
  57. UniqueCode string `description:"编码"`
  58. Lang string `description:"语言版本"`
  59. EdbInfo *EdbInfo `description:"指标信息"`
  60. }
  61. type ThsHfRefreshBaseParams struct {
  62. EdbInfo *EdbInfo
  63. StartDate string
  64. EndDate string
  65. }
  66. // Add
  67. // @Description: 添加指标
  68. func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
  69. o := orm.NewOrm()
  70. tx, e := o.Begin()
  71. if e != nil {
  72. err = fmt.Errorf("orm begin err: %v", e)
  73. return
  74. }
  75. defer func() {
  76. if err != nil {
  77. _ = tx.Rollback()
  78. utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err))
  79. return
  80. }
  81. _ = tx.Commit()
  82. }()
  83. // 新增指标
  84. edbInfo = new(EdbInfo)
  85. edbInfo.Source = obj.GetSource()
  86. edbInfo.SubSource = obj.GetSubSource()
  87. edbInfo.SourceName = obj.GetSourceName()
  88. edbInfo.EdbType = obj.GetEdbType()
  89. edbInfo.EdbCode = params.EdbCode
  90. edbInfo.EdbName = params.EdbName
  91. edbInfo.EdbNameEn = params.EdbName
  92. edbInfo.EdbNameSource = params.EdbName
  93. edbInfo.Frequency = params.Frequency
  94. edbInfo.Unit = params.Unit
  95. edbInfo.UnitEn = params.Unit
  96. edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
  97. edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
  98. edbInfo.ClassifyId = params.ClassifyId
  99. edbInfo.SysUserId = params.SysUserId
  100. edbInfo.SysUserRealName = params.SysUserRealName
  101. edbInfo.Sort = params.Sort
  102. edbInfo.TerminalCode = baseIndex.TerminalCode
  103. edbInfo.UniqueCode = params.UniqueCode
  104. edbInfo.CreateTime = time.Now()
  105. edbInfo.ModifyTime = time.Now()
  106. edbInfoId, e := tx.Insert(edbInfo)
  107. if e != nil {
  108. err = fmt.Errorf("insert edb err: %v", e)
  109. return
  110. }
  111. edbInfo.EdbInfoId = int(edbInfoId)
  112. // 新增指标关联
  113. edbMapping := new(BaseFromEdbMapping)
  114. edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
  115. edbMapping.BaseIndexCode = baseIndex.IndexCode
  116. edbMapping.EdbInfoId = edbInfo.EdbInfoId
  117. edbMapping.EdbCode = edbInfo.EdbCode
  118. edbMapping.Source = obj.GetSource()
  119. edbMapping.SubSource = obj.GetSubSource()
  120. edbMapping.ConvertRule = params.ConvertRule
  121. edbMapping.CreateTime = time.Now().Local()
  122. edbMapping.ModifyTime = time.Now().Local()
  123. edbMappingId, e := tx.Insert(edbMapping)
  124. if e != nil {
  125. err = fmt.Errorf("insert base edb mapping err: %v", e)
  126. return
  127. }
  128. edbMapping.Id = int(edbMappingId)
  129. // 刷新数据
  130. err = obj.Refresh(edbInfo, edbMapping, "")
  131. return
  132. }
  133. func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  134. if utils.UseMongo {
  135. return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
  136. }
  137. return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
  138. }
  139. func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  140. if edbInfo == nil || edbBaseMapping == nil {
  141. err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
  142. return
  143. }
  144. // 真实数据的最大日期, 插入规则配置的日期
  145. var realDataMaxDate, edbDataInsertConfigDate time.Time
  146. var edbDataInsertConfig *EdbDataInsertConfig
  147. var isFindConfigDateRealData bool
  148. {
  149. conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  150. if e != nil && e.Error() != utils.ErrNoRow() {
  151. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
  152. return
  153. }
  154. edbDataInsertConfig = conf
  155. if edbDataInsertConfig != nil {
  156. edbDataInsertConfigDate = edbDataInsertConfig.Date
  157. }
  158. }
  159. // 查询时间为开始时间-3d
  160. var queryDate string
  161. if startDate != "" {
  162. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  163. if e != nil {
  164. err = fmt.Errorf("刷新开始时间有误, %v", e)
  165. return
  166. }
  167. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  168. }
  169. // 源指标数据
  170. baseDataList := make([]*BaseFromThsHfData, 0)
  171. {
  172. ob := new(BaseFromThsHfData)
  173. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
  174. pars := make([]interface{}, 0)
  175. pars = append(pars, edbBaseMapping.BaseIndexCode)
  176. if queryDate != "" {
  177. cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
  178. pars = append(pars, queryDate)
  179. }
  180. list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
  181. if e != nil {
  182. err = fmt.Errorf("获取数据源数据失败, %v", e)
  183. return
  184. }
  185. baseDataList = list
  186. }
  187. // 转换数据
  188. convertRule := new(ThsHfIndexConvert2EdbRule)
  189. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  190. err = fmt.Errorf("转换规则有误, %v", e)
  191. return
  192. }
  193. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  194. for _, v := range baseDataList {
  195. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  196. DataTime: v.DataTime,
  197. Value: v.Value,
  198. })
  199. }
  200. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  201. if e != nil {
  202. err = fmt.Errorf("转换数据失败, %v", e)
  203. return
  204. }
  205. if len(convertData) == 0 {
  206. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  207. return
  208. }
  209. // 获取已有数据
  210. dataOb := new(EdbDataThsHf)
  211. dataExists := make(map[string]*EdbDataThsHf)
  212. searchExistMap := make(map[string]*EdbInfoSearchData)
  213. {
  214. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
  215. pars := make([]interface{}, 0)
  216. pars = append(pars, edbInfo.EdbInfoId)
  217. if queryDate != "" {
  218. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  219. pars = append(pars, queryDate)
  220. }
  221. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  222. if e != nil {
  223. err = fmt.Errorf("获取指标数据失败, %v", e)
  224. return
  225. }
  226. for _, v := range list {
  227. dataExists[v.DataTime.Format(utils.FormatDate)] = v
  228. searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
  229. EdbDataId: v.EdbDataId,
  230. EdbInfoId: v.EdbInfoId,
  231. DataTime: v.DataTime.Format(utils.FormatDate),
  232. Value: v.Value,
  233. EdbCode: v.EdbCode,
  234. DataTimestamp: v.DataTimestamp,
  235. }
  236. }
  237. }
  238. // 比对数据
  239. insertExist := make(map[string]bool)
  240. insertData := make([]*EdbDataThsHf, 0)
  241. updateData := make([]*EdbDataThsHf, 0)
  242. for k, v := range convertData {
  243. strDate := k.Format(utils.FormatDate)
  244. // 手动插入数据的判断
  245. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  246. realDataMaxDate = k
  247. }
  248. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  249. isFindConfigDateRealData = true
  250. }
  251. // 入库值
  252. saveVal := decimal.NewFromFloat(v).Round(4).String()
  253. d, e := decimal.NewFromString(saveVal)
  254. if e != nil {
  255. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  256. continue
  257. }
  258. saveFloat, _ := d.Float64()
  259. // 更新
  260. exists := dataExists[strDate]
  261. if exists != nil {
  262. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  263. if saveVal != existVal {
  264. exists.Value = saveFloat
  265. updateData = append(updateData, exists)
  266. }
  267. continue
  268. }
  269. // 新增
  270. if insertExist[strDate] {
  271. continue
  272. }
  273. insertExist[strDate] = true
  274. timestamp := k.UnixNano() / 1e6
  275. insertData = append(insertData, &EdbDataThsHf{
  276. EdbInfoId: edbInfo.EdbInfoId,
  277. EdbCode: edbInfo.EdbCode,
  278. DataTime: k,
  279. Value: saveFloat,
  280. CreateTime: time.Now(),
  281. ModifyTime: time.Now(),
  282. DataTimestamp: timestamp,
  283. })
  284. }
  285. // 批量新增/更新
  286. if len(insertData) > 0 {
  287. if e = dataOb.CreateMulti(insertData); e != nil {
  288. err = fmt.Errorf("批量新增指标数据失败, %v", e)
  289. return
  290. }
  291. }
  292. if len(updateData) > 0 {
  293. if e = dataOb.MultiUpdateValue(updateData); e != nil {
  294. err = fmt.Errorf("批量更新指标数据失败, %v", e)
  295. return
  296. }
  297. }
  298. // 处理手工数据补充的配置
  299. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
  300. return
  301. }
  302. func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  303. defer func() {
  304. if err != nil {
  305. utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err))
  306. }
  307. }()
  308. var realDataMaxDate, edbDataInsertConfigDate time.Time
  309. var edbDataInsertConfig *EdbDataInsertConfig
  310. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  311. {
  312. insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  313. if e != nil && e.Error() != utils.ErrNoRow() {
  314. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e)
  315. return
  316. }
  317. edbDataInsertConfig = insertConfig
  318. if edbDataInsertConfig != nil {
  319. edbDataInsertConfigDate = edbDataInsertConfig.Date
  320. }
  321. }
  322. // 查询时间为开始时间-3d
  323. var queryDate string
  324. if startDate != "" {
  325. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  326. if e != nil {
  327. err = fmt.Errorf("刷新开始时间有误, %v", e)
  328. return
  329. }
  330. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  331. }
  332. // 获取源指标数据
  333. baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate)
  334. if e != nil {
  335. err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
  336. return
  337. }
  338. // 转换数据
  339. convertRule := new(ThsHfIndexConvert2EdbRule)
  340. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  341. err = fmt.Errorf("转换规则有误, %v", e)
  342. return
  343. }
  344. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  345. for _, v := range baseDataList {
  346. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  347. DataTime: v.DataTime,
  348. Value: v.Value,
  349. })
  350. }
  351. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  352. if e != nil {
  353. err = fmt.Errorf("转换数据失败, %v", e)
  354. return
  355. }
  356. if len(convertData) == 0 {
  357. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  358. return
  359. }
  360. //获取指标所有数据
  361. existDataList := make([]*mgo.EdbDataThsHf, 0)
  362. mogDataObj := new(mgo.EdbDataThsHf)
  363. {
  364. // 构建查询条件
  365. queryConditions := bson.M{
  366. "edb_code": edbInfo.EdbCode,
  367. }
  368. if queryDate != `` {
  369. //获取已存在的所有数据
  370. startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
  371. if e != nil {
  372. err = fmt.Errorf("startDateTime parse err: %v", e)
  373. return
  374. }
  375. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  376. }
  377. existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  378. if e != nil {
  379. err = fmt.Errorf("GetAllDataList, err: %v", e)
  380. return
  381. }
  382. }
  383. existDataMap := make(map[string]*mgo.EdbDataThsHf)
  384. removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
  385. for _, v := range existDataList {
  386. tmpDate := v.DataTime.Format(utils.FormatDate)
  387. existDataMap[tmpDate] = v
  388. removeDataTimeMap[tmpDate] = true
  389. }
  390. // 待添加的数据集
  391. addDataList := make([]interface{}, 0)
  392. updateDataList := make([]mgo.EdbDataThsHf, 0)
  393. insertExist := make(map[string]bool)
  394. for k, v := range convertData {
  395. strDate := k.Format(utils.FormatDate)
  396. // 手动插入数据的判断
  397. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  398. realDataMaxDate = k
  399. }
  400. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  401. isFindConfigDateRealData = true
  402. }
  403. // 入库值
  404. saveVal := decimal.NewFromFloat(v).Round(4).String()
  405. d, e := decimal.NewFromString(saveVal)
  406. if e != nil {
  407. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  408. continue
  409. }
  410. saveFloat, _ := d.Float64()
  411. // 更新
  412. exists := existDataMap[strDate]
  413. if exists != nil {
  414. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  415. if saveVal != existVal {
  416. exists.Value = saveFloat
  417. updateDataList = append(updateDataList, *exists)
  418. }
  419. continue
  420. }
  421. // 新增
  422. if insertExist[strDate] {
  423. continue
  424. }
  425. insertExist[strDate] = true
  426. timestamp := k.UnixNano() / 1e6
  427. addDataList = append(addDataList, &mgo.EdbDataThsHf{
  428. EdbInfoId: edbInfo.EdbInfoId,
  429. EdbCode: edbInfo.EdbCode,
  430. DataTime: k,
  431. Value: saveFloat,
  432. CreateTime: time.Now(),
  433. ModifyTime: time.Now(),
  434. DataTimestamp: timestamp,
  435. })
  436. }
  437. // 入库
  438. {
  439. coll := mogDataObj.GetCollection()
  440. //删除已经不存在的指标数据(由于该指标当日的数据删除了)
  441. {
  442. removeDateList := make([]time.Time, 0)
  443. for dateTime := range removeDataTimeMap {
  444. //获取已存在的所有数据
  445. tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  446. if e != nil {
  447. err = fmt.Errorf("tmpDateTime parse err: %v", e)
  448. return
  449. }
  450. removeDateList = append(removeDateList, tmpDateTime)
  451. }
  452. removeNum := len(removeDateList)
  453. if removeNum > 0 {
  454. if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil {
  455. err = fmt.Errorf("RemoveManyByColl, err: %v", e)
  456. return
  457. }
  458. }
  459. }
  460. // 插入新数据
  461. if len(addDataList) > 0 {
  462. if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
  463. err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
  464. return
  465. }
  466. }
  467. // 修改历史数据
  468. if len(updateDataList) > 0 {
  469. for _, v := range updateDataList {
  470. if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
  471. err = fmt.Errorf("UpdateDataByColl, err: %v", e)
  472. return
  473. }
  474. }
  475. }
  476. }
  477. // 处理手工数据补充的配置
  478. obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
  479. return
  480. }
  481. type ThsHfConvertOriginData struct {
  482. DataTime time.Time `description:"数据日期(至时分秒)"`
  483. Value float64 `description:"数据值"`
  484. }
  485. // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
  486. func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
  487. // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
  488. timeData = make(map[time.Time]float64)
  489. if len(originData) == 0 || convertRule == nil {
  490. return
  491. }
  492. if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
  493. err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
  494. return
  495. }
  496. // 升序排序
  497. sort.Slice(originData, func(i, j int) bool {
  498. return originData[i].DataTime.Before(originData[j].DataTime)
  499. })
  500. // 将数据根据日期进行分组
  501. var sortDates []string
  502. groupDateData := make(map[string][]*ThsHfConvertOriginData)
  503. for _, v := range originData {
  504. d := v.DataTime.Format(utils.FormatDate)
  505. if !utils.InArrayByStr(sortDates, d) {
  506. sortDates = append(sortDates, d)
  507. }
  508. if groupDateData[d] == nil {
  509. groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
  510. }
  511. groupDateData[d] = append(groupDateData[d], v)
  512. }
  513. // 取值方式-指定时间的值
  514. if convertRule.ConvertType == 1 {
  515. for k, v := range sortDates {
  516. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  517. if e != nil {
  518. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  519. continue
  520. }
  521. var timeTarget time.Time
  522. dateData := make([]*ThsHfConvertOriginData, 0)
  523. // 当日
  524. if convertRule.ConvertFixed.FixedDay == 1 {
  525. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
  526. if e != nil {
  527. utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
  528. continue
  529. }
  530. timeTarget = tg
  531. dt := groupDateData[v]
  532. if dt == nil {
  533. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
  534. continue
  535. }
  536. if len(dt) == 0 {
  537. continue
  538. }
  539. dateData = dt
  540. }
  541. // 前一日
  542. if convertRule.ConvertFixed.FixedDay == 2 {
  543. if k < 1 {
  544. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  545. continue
  546. }
  547. preDate := sortDates[k-1]
  548. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  549. if e != nil {
  550. utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
  551. continue
  552. }
  553. timeTarget = tg
  554. dt := groupDateData[preDate]
  555. if dt == nil {
  556. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  557. continue
  558. }
  559. if len(dt) == 0 {
  560. continue
  561. }
  562. dateData = dt
  563. }
  564. if len(dateData) == 0 {
  565. utils.FileLog.Info("日期%s无数据序列", v)
  566. continue
  567. }
  568. // 重新获取数据序列中, 时间在目标时间点之后的
  569. newDateData := make([]*ThsHfConvertOriginData, 0)
  570. for kv, dv := range dateData {
  571. if dv.DataTime.Before(timeTarget) {
  572. continue
  573. }
  574. // 由于升序排列, 直接取之后所有的数据
  575. newDateData = append(newDateData, dateData[kv:]...)
  576. break
  577. }
  578. // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
  579. if len(newDateData) == 0 {
  580. utils.FileLog.Info("日期%s无有效数据", v)
  581. continue
  582. }
  583. timeData[todayTime] = newDateData[0].Value
  584. }
  585. return
  586. }
  587. // 取值方式-区间计算值
  588. for k, v := range sortDates {
  589. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  590. if e != nil {
  591. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  592. continue
  593. }
  594. var thisDate, preDate string
  595. thisDate = v
  596. if k > 1 {
  597. preDate = sortDates[k-1]
  598. }
  599. var startTimeTarget, endTimeTarget time.Time
  600. // 起始时间-当日/前一日
  601. if convertRule.ConvertArea.StartDay == 1 {
  602. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
  603. if e != nil {
  604. utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
  605. continue
  606. }
  607. startTimeTarget = tg
  608. }
  609. if convertRule.ConvertArea.StartDay == 2 {
  610. if preDate == "" {
  611. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  612. continue
  613. }
  614. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
  615. if e != nil {
  616. utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
  617. continue
  618. }
  619. startTimeTarget = tg
  620. }
  621. // 截止时间-当日/前一日
  622. if convertRule.ConvertArea.EndDay == 1 {
  623. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
  624. if e != nil {
  625. utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
  626. continue
  627. }
  628. endTimeTarget = tg
  629. }
  630. if convertRule.ConvertArea.EndDay == 2 {
  631. if preDate == "" {
  632. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  633. continue
  634. }
  635. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
  636. if e != nil {
  637. utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
  638. continue
  639. }
  640. endTimeTarget = tg
  641. }
  642. if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
  643. utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
  644. continue
  645. }
  646. // 合并前日当日数据
  647. dateData := make([]*ThsHfConvertOriginData, 0)
  648. if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
  649. // 起始截止均为当日
  650. dateData = groupDateData[thisDate]
  651. if dateData == nil {
  652. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  653. continue
  654. }
  655. if len(dateData) == 0 {
  656. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  657. continue
  658. }
  659. } else {
  660. if preDate == "" {
  661. continue
  662. }
  663. // 起始截止时间含前日
  664. preData := groupDateData[preDate]
  665. if preData == nil {
  666. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  667. continue
  668. }
  669. if len(preData) == 0 {
  670. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  671. continue
  672. }
  673. thisData := groupDateData[thisDate]
  674. if thisData == nil {
  675. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  676. continue
  677. }
  678. if len(thisData) == 0 {
  679. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  680. continue
  681. }
  682. dateData = append(dateData, preData...)
  683. dateData = append(dateData, thisData...)
  684. }
  685. if len(dateData) == 0 {
  686. utils.FileLog.Info("日期%s无数据序列", v)
  687. continue
  688. }
  689. // 重组时间区间内的数据
  690. newDateData := make([]*ThsHfConvertOriginData, 0)
  691. for _, dv := range dateData {
  692. if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
  693. continue
  694. }
  695. newDateData = append(newDateData, dv)
  696. }
  697. if len(newDateData) == 0 {
  698. utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
  699. continue
  700. }
  701. // 取出区间内的均值/最值
  702. var avgVal, minVal, maxVal, sumVal float64
  703. minVal, maxVal = newDateData[0].Value, newDateData[0].Value
  704. for _, nv := range newDateData {
  705. sumVal += nv.Value
  706. if nv.Value > maxVal {
  707. maxVal = nv.Value
  708. }
  709. if nv.Value < minVal {
  710. minVal = nv.Value
  711. }
  712. }
  713. avgVal = sumVal / float64(len(newDateData))
  714. switch convertRule.ConvertArea.CalculateType {
  715. case 1:
  716. timeData[todayTime] = avgVal
  717. case 2:
  718. timeData[todayTime] = maxVal
  719. case 3:
  720. timeData[todayTime] = minVal
  721. default:
  722. utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
  723. }
  724. }
  725. return
  726. }
  727. func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) {
  728. newDataList = make([]EdbInfoMgoData, 0)
  729. // 获取数据源的指标数据
  730. mogDataObj := new(mgo.BaseFromThsHfData)
  731. // 构建查询条件
  732. queryConditions := bson.M{
  733. "index_code": baseIndexCode,
  734. }
  735. if startDate != `` {
  736. //获取已存在的所有数据
  737. startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  738. if tmpErr != nil {
  739. err = tmpErr
  740. return
  741. }
  742. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  743. }
  744. baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  745. if err != nil {
  746. fmt.Println("GetAllDataList Err:" + err.Error())
  747. return
  748. }
  749. for _, v := range baseDataList {
  750. newDataList = append(newDataList, EdbInfoMgoData{
  751. //EdbDataId: v.ID,
  752. DataTime: v.DataTime,
  753. Value: v.Value,
  754. EdbCode: v.IndexCode,
  755. })
  756. }
  757. return
  758. }
  759. func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
  760. if edbDataInsertConfig == nil {
  761. return
  762. }
  763. var err error
  764. defer func() {
  765. if err != nil {
  766. utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
  767. }
  768. }()
  769. edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
  770. // 如果存在真实数据的最大日期 && 存在配置插入数据的最大日期 && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
  771. if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
  772. go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
  773. mogDataObj := mgo.EdbDataThsHf{}
  774. coll := mogDataObj.GetCollection()
  775. edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
  776. // 如果没有找到找到配置日期的实际数据,那么就直接删除
  777. if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
  778. mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
  779. }
  780. } else {
  781. o := orm.NewOrm()
  782. edbDataInsertConfig.RealDate = realDataMaxDate
  783. _, err = o.Update(edbDataInsertConfig, "RealDate")
  784. }
  785. return
  786. }
  787. func (obj EdbThsHf) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
  788. if utils.UseMongo {
  789. edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode)
  790. // 如果正常获取到了,那就去修改指标的最大最小值
  791. if tmpErr == nil && edbInfoMaxAndMinInfo != nil {
  792. err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
  793. } else {
  794. // 清空的目的是为了避免异常返回
  795. err = nil
  796. }
  797. } else {
  798. err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
  799. }
  800. return
  801. }
  802. func (obj EdbThsHf) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
  803. mogDataObj := new(mgo.EdbDataThsHf)
  804. pipeline := []bson.M{
  805. {"$match": bson.M{"edb_code": edbCode}},
  806. {"$group": bson.M{
  807. "_id": nil,
  808. "min_date": bson.M{"$min": "$data_time"},
  809. "max_date": bson.M{"$max": "$data_time"},
  810. "min_value": bson.M{"$min": "$value"},
  811. "max_value": bson.M{"$max": "$value"},
  812. }},
  813. {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
  814. }
  815. result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
  816. if err != nil {
  817. fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
  818. return
  819. }
  820. if !result.MaxDate.IsZero() {
  821. whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
  822. selectParam := bson.D{{"value", 1}, {"_id", 0}}
  823. latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
  824. if tmpErr != nil {
  825. err = tmpErr
  826. return
  827. }
  828. result.LatestValue = latestValue.Value
  829. result.EndValue = latestValue.Value
  830. }
  831. item = &EdbInfoMaxAndMinInfo{
  832. MinDate: result.MinDate.Format(utils.FormatDate),
  833. MaxDate: result.MaxDate.Format(utils.FormatDate),
  834. MinValue: result.MinValue,
  835. MaxValue: result.MaxValue,
  836. LatestValue: result.LatestValue,
  837. LatestDate: result.LatestDate.Format(utils.FormatDate),
  838. EndValue: result.EndValue,
  839. }
  840. return
  841. }