edb_ths_hf.go 27 KB


  1. package models
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/global"
  5. "eta/eta_index_lib/models/mgo"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "github.com/shopspring/decimal"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "reflect"
  11. "sort"
  12. "time"
  13. )
  14. // EdbThsHf 自有数据
  15. type EdbThsHf struct{}
  16. // GetSource 获取来源编码id
  17. func (obj EdbThsHf) GetSource() int {
  18. return utils.DATA_SOURCE_THS
  19. }
  20. // GetSubSource 获取子来源编码id
  21. func (obj EdbThsHf) GetSubSource() int {
  22. return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
  23. }
  24. // GetSourceName 获取来源名称
  25. func (obj EdbThsHf) GetSourceName() string {
  26. return utils.DATA_SOURCE_NAME_THS
  27. }
  28. // GetEdbType 获取指标类型
  29. func (obj EdbThsHf) GetEdbType() int {
  30. return utils.DEFAULT_EDB_TYPE
  31. }
  32. // ThsHfAddBaseParams
  33. // @Description: 基础指标的添加参数
  34. type ThsHfAddBaseParams struct {
  35. EdbCode string `description:"指标编码"`
  36. EdbName string `description:"指标名称"`
  37. Unit string `description:"单位"`
  38. Frequency string `description:"频度"`
  39. Sort int `description:"排序"`
  40. ClassifyId int `description:"所属分类"`
  41. SysUserId int `description:"用户id"`
  42. SysUserRealName string `description:"用户真实名称"`
  43. UniqueCode string `description:"唯一编码"`
  44. ConvertRule string `description:"转换规则"`
  45. }
  46. // ThsHfEditBaseParams
  47. // @Description: 基础指标的修改参数
  48. type ThsHfEditBaseParams struct {
  49. EdbCode string `description:"指标编码"`
  50. EdbName string `description:"指标名称"`
  51. EdbNameEn string `description:"指标名称(英文)"`
  52. Unit string `description:"单位"`
  53. UnitEn string `description:"单位(英文)"`
  54. ClassifyId int `description:"所属分类"`
  55. SysUserId int `description:"用户id"`
  56. SysUserRealName string `description:"用户真实名称"`
  57. UniqueCode string `description:"编码"`
  58. Lang string `description:"语言版本"`
  59. EdbInfo *EdbInfo `description:"指标信息"`
  60. }
  61. type ThsHfRefreshBaseParams struct {
  62. EdbInfo *EdbInfo
  63. StartDate string
  64. EndDate string
  65. }
  66. // Add
  67. // @Description: 添加指标
  68. func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
  69. tx := global.DEFAULT_DB.Begin()
  70. defer func() {
  71. if err != nil {
  72. _ = tx.Rollback()
  73. utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err))
  74. return
  75. }
  76. _ = tx.Commit()
  77. }()
  78. // 新增指标
  79. edbInfo = new(EdbInfo)
  80. edbInfo.Source = obj.GetSource()
  81. edbInfo.SubSource = obj.GetSubSource()
  82. edbInfo.SourceName = obj.GetSourceName()
  83. edbInfo.EdbType = obj.GetEdbType()
  84. edbInfo.EdbCode = params.EdbCode
  85. edbInfo.EdbName = params.EdbName
  86. edbInfo.EdbNameEn = params.EdbName
  87. edbInfo.EdbNameSource = params.EdbName
  88. edbInfo.Frequency = params.Frequency
  89. edbInfo.Unit = params.Unit
  90. edbInfo.UnitEn = params.Unit
  91. edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
  92. edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
  93. edbInfo.ClassifyId = params.ClassifyId
  94. edbInfo.SysUserId = params.SysUserId
  95. edbInfo.SysUserRealName = params.SysUserRealName
  96. edbInfo.Sort = params.Sort
  97. edbInfo.TerminalCode = baseIndex.TerminalCode
  98. edbInfo.UniqueCode = params.UniqueCode
  99. edbInfo.CreateTime = time.Now()
  100. edbInfo.ModifyTime = time.Now()
  101. e := tx.Create(edbInfo).Error
  102. if e != nil {
  103. err = fmt.Errorf("insert edb err: %v", e)
  104. return
  105. }
  106. // 新增指标关联
  107. edbMapping := new(BaseFromEdbMapping)
  108. edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
  109. edbMapping.BaseIndexCode = baseIndex.IndexCode
  110. edbMapping.EdbInfoId = edbInfo.EdbInfoId
  111. edbMapping.EdbCode = edbInfo.EdbCode
  112. edbMapping.Source = obj.GetSource()
  113. edbMapping.SubSource = obj.GetSubSource()
  114. edbMapping.ConvertRule = params.ConvertRule
  115. edbMapping.CreateTime = time.Now().Local()
  116. edbMapping.ModifyTime = time.Now().Local()
  117. e = tx.Create(edbMapping).Error
  118. if e != nil {
  119. err = fmt.Errorf("insert base edb mapping err: %v", e)
  120. return
  121. }
  122. // 刷新数据
  123. err = obj.Refresh(edbInfo, edbMapping, "")
  124. return
  125. }
  126. func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  127. if utils.UseMongo {
  128. return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
  129. }
  130. return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
  131. }
  132. func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  133. if edbInfo == nil || edbBaseMapping == nil {
  134. err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
  135. return
  136. }
  137. // 真实数据的最大日期, 插入规则配置的日期
  138. var realDataMaxDate, edbDataInsertConfigDate time.Time
  139. var edbDataInsertConfig *EdbDataInsertConfig
  140. var isFindConfigDateRealData bool
  141. {
  142. conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  143. if e != nil && e.Error() != utils.ErrNoRow() {
  144. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
  145. return
  146. }
  147. edbDataInsertConfig = conf
  148. if edbDataInsertConfig != nil {
  149. edbDataInsertConfigDate = edbDataInsertConfig.Date
  150. }
  151. }
  152. // 查询时间为开始时间-3d
  153. var queryDate string
  154. if startDate != "" {
  155. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  156. if e != nil {
  157. err = fmt.Errorf("刷新开始时间有误, %v", e)
  158. return
  159. }
  160. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  161. }
  162. // 源指标数据
  163. baseDataList := make([]*BaseFromThsHfData, 0)
  164. {
  165. ob := new(BaseFromThsHfData)
  166. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
  167. pars := make([]interface{}, 0)
  168. pars = append(pars, edbBaseMapping.BaseIndexCode)
  169. if queryDate != "" {
  170. cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
  171. pars = append(pars, queryDate)
  172. }
  173. list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
  174. if e != nil {
  175. err = fmt.Errorf("获取数据源数据失败, %v", e)
  176. return
  177. }
  178. baseDataList = list
  179. }
  180. // 转换数据
  181. convertRule := new(ThsHfIndexConvert2EdbRule)
  182. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  183. err = fmt.Errorf("转换规则有误, %v", e)
  184. return
  185. }
  186. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  187. for _, v := range baseDataList {
  188. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  189. DataTime: v.DataTime,
  190. Value: v.Value,
  191. })
  192. }
  193. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  194. if e != nil {
  195. err = fmt.Errorf("转换数据失败, %v", e)
  196. return
  197. }
  198. if len(convertData) == 0 {
  199. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  200. return
  201. }
  202. // 获取已有数据
  203. dataOb := new(EdbDataThsHf)
  204. dataExists := make(map[string]*EdbDataThsHf)
  205. searchExistMap := make(map[string]*EdbInfoSearchData)
  206. {
  207. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
  208. pars := make([]interface{}, 0)
  209. pars = append(pars, edbInfo.EdbInfoId)
  210. if queryDate != "" {
  211. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  212. pars = append(pars, queryDate)
  213. }
  214. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  215. if e != nil {
  216. err = fmt.Errorf("获取指标数据失败, %v", e)
  217. return
  218. }
  219. for _, v := range list {
  220. dataExists[v.DataTime.Format(utils.FormatDate)] = v
  221. searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
  222. EdbDataId: v.EdbDataId,
  223. EdbInfoId: v.EdbInfoId,
  224. DataTime: v.DataTime.Format(utils.FormatDate),
  225. Value: v.Value,
  226. EdbCode: v.EdbCode,
  227. DataTimestamp: v.DataTimestamp,
  228. }
  229. }
  230. }
  231. // 比对数据
  232. insertExist := make(map[string]bool)
  233. insertData := make([]*EdbDataThsHf, 0)
  234. updateData := make([]*EdbDataThsHf, 0)
  235. for k, v := range convertData {
  236. strDate := k.Format(utils.FormatDate)
  237. // 手动插入数据的判断
  238. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  239. realDataMaxDate = k
  240. }
  241. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  242. isFindConfigDateRealData = true
  243. }
  244. // 入库值
  245. saveVal := decimal.NewFromFloat(v).Round(4).String()
  246. d, e := decimal.NewFromString(saveVal)
  247. if e != nil {
  248. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  249. continue
  250. }
  251. saveFloat, _ := d.Float64()
  252. // 更新
  253. exists := dataExists[strDate]
  254. if exists != nil {
  255. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  256. if saveVal != existVal {
  257. exists.Value = saveFloat
  258. updateData = append(updateData, exists)
  259. }
  260. continue
  261. }
  262. // 新增
  263. if insertExist[strDate] {
  264. continue
  265. }
  266. insertExist[strDate] = true
  267. timestamp := k.UnixNano() / 1e6
  268. insertData = append(insertData, &EdbDataThsHf{
  269. EdbInfoId: edbInfo.EdbInfoId,
  270. EdbCode: edbInfo.EdbCode,
  271. DataTime: k,
  272. Value: saveFloat,
  273. CreateTime: time.Now(),
  274. ModifyTime: time.Now(),
  275. DataTimestamp: timestamp,
  276. })
  277. }
  278. // 批量新增/更新
  279. if len(insertData) > 0 {
  280. if e = dataOb.CreateMulti(insertData); e != nil {
  281. err = fmt.Errorf("批量新增指标数据失败, %v", e)
  282. return
  283. }
  284. }
  285. if len(updateData) > 0 {
  286. if e = dataOb.MultiUpdateValue(updateData); e != nil {
  287. err = fmt.Errorf("批量更新指标数据失败, %v", e)
  288. return
  289. }
  290. }
  291. // 处理手工数据补充的配置
  292. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
  293. return
  294. }
  295. func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  296. defer func() {
  297. if err != nil {
  298. utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err))
  299. }
  300. }()
  301. var realDataMaxDate, edbDataInsertConfigDate time.Time
  302. var edbDataInsertConfig *EdbDataInsertConfig
  303. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  304. {
  305. insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  306. if e != nil && e.Error() != utils.ErrNoRow() {
  307. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e)
  308. return
  309. }
  310. edbDataInsertConfig = insertConfig
  311. if edbDataInsertConfig != nil {
  312. edbDataInsertConfigDate = edbDataInsertConfig.Date
  313. }
  314. }
  315. // 查询时间为开始时间-3d
  316. var queryDate string
  317. if startDate != "" {
  318. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  319. if e != nil {
  320. err = fmt.Errorf("刷新开始时间有误, %v", e)
  321. return
  322. }
  323. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  324. }
  325. // 获取源指标数据
  326. baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate)
  327. if e != nil {
  328. err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
  329. return
  330. }
  331. // 转换数据
  332. convertRule := new(ThsHfIndexConvert2EdbRule)
  333. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  334. err = fmt.Errorf("转换规则有误, %v", e)
  335. return
  336. }
  337. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  338. for _, v := range baseDataList {
  339. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  340. DataTime: v.DataTime,
  341. Value: v.Value,
  342. })
  343. }
  344. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  345. if e != nil {
  346. err = fmt.Errorf("转换数据失败, %v", e)
  347. return
  348. }
  349. if len(convertData) == 0 {
  350. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  351. return
  352. }
  353. //获取指标所有数据
  354. existDataList := make([]*mgo.EdbDataThsHf, 0)
  355. mogDataObj := new(mgo.EdbDataThsHf)
  356. {
  357. // 构建查询条件
  358. queryConditions := bson.M{
  359. "edb_code": edbInfo.EdbCode,
  360. }
  361. if queryDate != `` {
  362. //获取已存在的所有数据
  363. startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
  364. if e != nil {
  365. err = fmt.Errorf("startDateTime parse err: %v", e)
  366. return
  367. }
  368. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  369. }
  370. existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  371. if e != nil {
  372. err = fmt.Errorf("GetAllDataList, err: %v", e)
  373. return
  374. }
  375. }
  376. existDataMap := make(map[string]*mgo.EdbDataThsHf)
  377. removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
  378. for _, v := range existDataList {
  379. tmpDate := v.DataTime.Format(utils.FormatDate)
  380. existDataMap[tmpDate] = v
  381. removeDataTimeMap[tmpDate] = true
  382. }
  383. // 待添加的数据集
  384. addDataList := make([]interface{}, 0)
  385. updateDataList := make([]mgo.EdbDataThsHf, 0)
  386. insertExist := make(map[string]bool)
  387. for k, v := range convertData {
  388. strDate := k.Format(utils.FormatDate)
  389. // 手动插入数据的判断
  390. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  391. realDataMaxDate = k
  392. }
  393. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  394. isFindConfigDateRealData = true
  395. }
  396. // 入库值
  397. saveVal := decimal.NewFromFloat(v).Round(4).String()
  398. d, e := decimal.NewFromString(saveVal)
  399. if e != nil {
  400. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  401. continue
  402. }
  403. saveFloat, _ := d.Float64()
  404. // 更新
  405. exists := existDataMap[strDate]
  406. if exists != nil {
  407. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  408. if saveVal != existVal {
  409. exists.Value = saveFloat
  410. updateDataList = append(updateDataList, *exists)
  411. }
  412. continue
  413. }
  414. // 新增
  415. if insertExist[strDate] {
  416. continue
  417. }
  418. insertExist[strDate] = true
  419. timestamp := k.UnixNano() / 1e6
  420. addDataList = append(addDataList, &mgo.EdbDataThsHf{
  421. EdbInfoId: edbInfo.EdbInfoId,
  422. EdbCode: edbInfo.EdbCode,
  423. DataTime: k,
  424. Value: saveFloat,
  425. CreateTime: time.Now(),
  426. ModifyTime: time.Now(),
  427. DataTimestamp: timestamp,
  428. })
  429. }
  430. // 入库
  431. {
  432. coll := mogDataObj.GetCollection()
  433. //删除已经不存在的指标数据(由于该指标当日的数据删除了)
  434. {
  435. removeDateList := make([]time.Time, 0)
  436. for dateTime := range removeDataTimeMap {
  437. //获取已存在的所有数据
  438. tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  439. if e != nil {
  440. err = fmt.Errorf("tmpDateTime parse err: %v", e)
  441. return
  442. }
  443. removeDateList = append(removeDateList, tmpDateTime)
  444. }
  445. removeNum := len(removeDateList)
  446. if removeNum > 0 {
  447. if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil {
  448. err = fmt.Errorf("RemoveManyByColl, err: %v", e)
  449. return
  450. }
  451. }
  452. }
  453. // 插入新数据
  454. if len(addDataList) > 0 {
  455. if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
  456. err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
  457. return
  458. }
  459. }
  460. // 修改历史数据
  461. if len(updateDataList) > 0 {
  462. for _, v := range updateDataList {
  463. if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
  464. err = fmt.Errorf("UpdateDataByColl, err: %v", e)
  465. return
  466. }
  467. }
  468. }
  469. }
  470. // 处理手工数据补充的配置
  471. obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
  472. return
  473. }
  474. type ThsHfConvertOriginData struct {
  475. DataTime time.Time `description:"数据日期(至时分秒)"`
  476. Value float64 `description:"数据值"`
  477. }
  478. // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
  479. func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
  480. // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
  481. timeData = make(map[time.Time]float64)
  482. if len(originData) == 0 || convertRule == nil {
  483. return
  484. }
  485. if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
  486. err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
  487. return
  488. }
  489. // 升序排序
  490. sort.Slice(originData, func(i, j int) bool {
  491. return originData[i].DataTime.Before(originData[j].DataTime)
  492. })
  493. // 将数据根据日期进行分组
  494. var sortDates []string
  495. groupDateData := make(map[string][]*ThsHfConvertOriginData)
  496. for _, v := range originData {
  497. d := v.DataTime.Format(utils.FormatDate)
  498. if !utils.InArrayByStr(sortDates, d) {
  499. sortDates = append(sortDates, d)
  500. }
  501. if groupDateData[d] == nil {
  502. groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
  503. }
  504. groupDateData[d] = append(groupDateData[d], v)
  505. }
  506. // 取值方式-指定时间的值
  507. if convertRule.ConvertType == 1 {
  508. for k, v := range sortDates {
  509. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  510. if e != nil {
  511. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  512. continue
  513. }
  514. var timeTarget time.Time
  515. dateData := make([]*ThsHfConvertOriginData, 0)
  516. // 当日
  517. if convertRule.ConvertFixed.FixedDay == 1 {
  518. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
  519. if e != nil {
  520. utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
  521. continue
  522. }
  523. timeTarget = tg
  524. dt := groupDateData[v]
  525. if dt == nil {
  526. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
  527. continue
  528. }
  529. if len(dt) == 0 {
  530. continue
  531. }
  532. dateData = dt
  533. }
  534. // 前一日
  535. if convertRule.ConvertFixed.FixedDay == 2 {
  536. if k < 1 {
  537. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  538. continue
  539. }
  540. preDate := sortDates[k-1]
  541. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  542. if e != nil {
  543. utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
  544. continue
  545. }
  546. timeTarget = tg
  547. dt := groupDateData[preDate]
  548. if dt == nil {
  549. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  550. continue
  551. }
  552. if len(dt) == 0 {
  553. continue
  554. }
  555. dateData = dt
  556. }
  557. if len(dateData) == 0 {
  558. utils.FileLog.Info("日期%s无数据序列", v)
  559. continue
  560. }
  561. // 重新获取数据序列中, 时间在目标时间点之后的
  562. newDateData := make([]*ThsHfConvertOriginData, 0)
  563. for kv, dv := range dateData {
  564. if dv.DataTime.Before(timeTarget) {
  565. continue
  566. }
  567. // 由于升序排列, 直接取之后所有的数据
  568. newDateData = append(newDateData, dateData[kv:]...)
  569. break
  570. }
  571. // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
  572. if len(newDateData) == 0 {
  573. utils.FileLog.Info("日期%s无有效数据", v)
  574. continue
  575. }
  576. timeData[todayTime] = newDateData[0].Value
  577. }
  578. return
  579. }
  580. // 取值方式-区间计算值
  581. for k, v := range sortDates {
  582. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  583. if e != nil {
  584. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  585. continue
  586. }
  587. var thisDate, preDate string
  588. thisDate = v
  589. if k > 1 {
  590. preDate = sortDates[k-1]
  591. }
  592. var startTimeTarget, endTimeTarget time.Time
  593. // 起始时间-当日/前一日
  594. if convertRule.ConvertArea.StartDay == 1 {
  595. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
  596. if e != nil {
  597. utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
  598. continue
  599. }
  600. startTimeTarget = tg
  601. }
  602. if convertRule.ConvertArea.StartDay == 2 {
  603. if preDate == "" {
  604. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  605. continue
  606. }
  607. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
  608. if e != nil {
  609. utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
  610. continue
  611. }
  612. startTimeTarget = tg
  613. }
  614. // 截止时间-当日/前一日
  615. if convertRule.ConvertArea.EndDay == 1 {
  616. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
  617. if e != nil {
  618. utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
  619. continue
  620. }
  621. endTimeTarget = tg
  622. }
  623. if convertRule.ConvertArea.EndDay == 2 {
  624. if preDate == "" {
  625. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  626. continue
  627. }
  628. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
  629. if e != nil {
  630. utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
  631. continue
  632. }
  633. endTimeTarget = tg
  634. }
  635. if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
  636. utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
  637. continue
  638. }
  639. // 合并前日当日数据
  640. dateData := make([]*ThsHfConvertOriginData, 0)
  641. if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
  642. // 起始截止均为当日
  643. dateData = groupDateData[thisDate]
  644. if dateData == nil {
  645. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  646. continue
  647. }
  648. if len(dateData) == 0 {
  649. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  650. continue
  651. }
  652. } else {
  653. if preDate == "" {
  654. continue
  655. }
  656. // 起始截止时间含前日
  657. preData := groupDateData[preDate]
  658. if preData == nil {
  659. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  660. continue
  661. }
  662. if len(preData) == 0 {
  663. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  664. continue
  665. }
  666. thisData := groupDateData[thisDate]
  667. if thisData == nil {
  668. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  669. continue
  670. }
  671. if len(thisData) == 0 {
  672. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  673. continue
  674. }
  675. dateData = append(dateData, preData...)
  676. dateData = append(dateData, thisData...)
  677. }
  678. if len(dateData) == 0 {
  679. utils.FileLog.Info("日期%s无数据序列", v)
  680. continue
  681. }
  682. // 重组时间区间内的数据
  683. newDateData := make([]*ThsHfConvertOriginData, 0)
  684. for _, dv := range dateData {
  685. if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
  686. continue
  687. }
  688. newDateData = append(newDateData, dv)
  689. }
  690. if len(newDateData) == 0 {
  691. utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
  692. continue
  693. }
  694. // 取出区间内的均值/最值
  695. var avgVal, minVal, maxVal, sumVal float64
  696. minVal, maxVal = newDateData[0].Value, newDateData[0].Value
  697. for _, nv := range newDateData {
  698. sumVal += nv.Value
  699. if nv.Value > maxVal {
  700. maxVal = nv.Value
  701. }
  702. if nv.Value < minVal {
  703. minVal = nv.Value
  704. }
  705. }
  706. avgVal = sumVal / float64(len(newDateData))
  707. switch convertRule.ConvertArea.CalculateType {
  708. case 1:
  709. timeData[todayTime] = avgVal
  710. case 2:
  711. timeData[todayTime] = maxVal
  712. case 3:
  713. timeData[todayTime] = minVal
  714. default:
  715. utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
  716. }
  717. }
  718. return
  719. }
  720. func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) {
  721. newDataList = make([]EdbInfoMgoData, 0)
  722. // 获取数据源的指标数据
  723. mogDataObj := new(mgo.BaseFromThsHfData)
  724. // 构建查询条件
  725. queryConditions := bson.M{
  726. "index_code": baseIndexCode,
  727. }
  728. if startDate != `` {
  729. //获取已存在的所有数据
  730. startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  731. if tmpErr != nil {
  732. err = tmpErr
  733. return
  734. }
  735. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  736. }
  737. baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  738. if err != nil {
  739. fmt.Println("GetAllDataList Err:" + err.Error())
  740. return
  741. }
  742. for _, v := range baseDataList {
  743. newDataList = append(newDataList, EdbInfoMgoData{
  744. //EdbDataId: v.ID,
  745. DataTime: v.DataTime,
  746. Value: v.Value,
  747. EdbCode: v.IndexCode,
  748. })
  749. }
  750. return
  751. }
  752. func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
  753. if edbDataInsertConfig == nil {
  754. return
  755. }
  756. var err error
  757. defer func() {
  758. if err != nil {
  759. utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
  760. }
  761. }()
  762. edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
  763. // 如果存在真实数据的最大日期 && 存在配置插入数据的最大日期 && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
  764. if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
  765. go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
  766. mogDataObj := mgo.EdbDataThsHf{}
  767. coll := mogDataObj.GetCollection()
  768. edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
  769. // 如果没有找到找到配置日期的实际数据,那么就直接删除
  770. if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
  771. mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
  772. }
  773. } else {
  774. edbDataInsertConfig.RealDate = realDataMaxDate
  775. err = global.DEFAULT_DB.Model(edbDataInsertConfig).Select([]string{"RealDate"}).Updates(edbDataInsertConfig).Error
  776. }
  777. return
  778. }
  779. func (obj EdbThsHf) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
  780. if utils.UseMongo {
  781. edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode)
  782. // 如果正常获取到了,那就去修改指标的最大最小值
  783. if tmpErr == nil && edbInfoMaxAndMinInfo != nil {
  784. err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
  785. } else {
  786. // 清空的目的是为了避免异常返回
  787. err = nil
  788. }
  789. } else {
  790. err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
  791. }
  792. return
  793. }
  794. func (obj EdbThsHf) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
  795. mogDataObj := new(mgo.EdbDataThsHf)
  796. pipeline := []bson.M{
  797. {"$match": bson.M{"edb_code": edbCode}},
  798. {"$group": bson.M{
  799. "_id": nil,
  800. "min_date": bson.M{"$min": "$data_time"},
  801. "max_date": bson.M{"$max": "$data_time"},
  802. "min_value": bson.M{"$min": "$value"},
  803. "max_value": bson.M{"$max": "$value"},
  804. }},
  805. {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
  806. }
  807. result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
  808. if err != nil {
  809. fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
  810. return
  811. }
  812. if !result.MaxDate.IsZero() {
  813. whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
  814. selectParam := bson.D{{"value", 1}, {"_id", 0}}
  815. latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
  816. if tmpErr != nil {
  817. err = tmpErr
  818. return
  819. }
  820. result.LatestValue = latestValue.Value
  821. result.EndValue = latestValue.Value
  822. }
  823. item = &EdbInfoMaxAndMinInfo{
  824. MinDate: result.MinDate.Format(utils.FormatDate),
  825. MaxDate: result.MaxDate.Format(utils.FormatDate),
  826. MinValue: result.MinValue,
  827. MaxValue: result.MaxValue,
  828. LatestValue: result.LatestValue,
  829. LatestDate: result.LatestDate.Format(utils.FormatDate),
  830. EndValue: result.EndValue,
  831. }
  832. return
  833. }