edb_ths_hf.go 27 KB


  1. package models
  2. import (
  3. "encoding/json"
  4. "eta_gn/eta_index_lib/global"
  5. "eta_gn/eta_index_lib/models/mgo"
  6. "eta_gn/eta_index_lib/utils"
  7. "fmt"
  8. "github.com/shopspring/decimal"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "sort"
  11. "time"
  12. )
  13. // EdbThsHf 自有数据
  14. type EdbThsHf struct{}
  15. // GetSource 获取来源编码id
  16. func (obj EdbThsHf) GetSource() int {
  17. return utils.DATA_SOURCE_THS
  18. }
  19. // GetSubSource 获取子来源编码id
  20. func (obj EdbThsHf) GetSubSource() int {
  21. return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
  22. }
  23. // GetSourceName 获取来源名称
  24. func (obj EdbThsHf) GetSourceName() string {
  25. return utils.DATA_SOURCE_NAME_THS
  26. }
  27. // GetEdbType 获取指标类型
  28. func (obj EdbThsHf) GetEdbType() int {
  29. return utils.DEFAULT_EDB_TYPE
  30. }
  31. // ThsHfAddBaseParams
  32. // @Description: 基础指标的添加参数
  33. type ThsHfAddBaseParams struct {
  34. EdbCode string `description:"指标编码"`
  35. EdbName string `description:"指标名称"`
  36. Unit string `description:"单位"`
  37. Frequency string `description:"频度"`
  38. Sort int `description:"排序"`
  39. ClassifyId int `description:"所属分类"`
  40. SysUserId int `description:"用户id"`
  41. SysUserRealName string `description:"用户真实名称"`
  42. UniqueCode string `description:"唯一编码"`
  43. ConvertRule string `description:"转换规则"`
  44. }
  45. // ThsHfEditBaseParams
  46. // @Description: 基础指标的修改参数
  47. type ThsHfEditBaseParams struct {
  48. EdbCode string `description:"指标编码"`
  49. EdbName string `description:"指标名称"`
  50. EdbNameEn string `description:"指标名称(英文)"`
  51. Unit string `description:"单位"`
  52. UnitEn string `description:"单位(英文)"`
  53. ClassifyId int `description:"所属分类"`
  54. SysUserId int `description:"用户id"`
  55. SysUserRealName string `description:"用户真实名称"`
  56. UniqueCode string `description:"编码"`
  57. Lang string `description:"语言版本"`
  58. EdbInfo *EdbInfo `description:"指标信息"`
  59. }
  60. type ThsHfRefreshBaseParams struct {
  61. EdbInfo *EdbInfo
  62. StartDate string
  63. EndDate string
  64. }
  65. // Add
  66. // @Description: 添加指标
  67. func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
  68. to := global.DEFAULT_DmSQL.Begin()
  69. defer func() {
  70. if err != nil {
  71. to.Rollback()
  72. } else {
  73. to.Commit()
  74. }
  75. }()
  76. // 新增指标
  77. edbInfo = new(EdbInfo)
  78. edbInfo.Source = obj.GetSource()
  79. edbInfo.SubSource = obj.GetSubSource()
  80. edbInfo.SourceName = obj.GetSourceName()
  81. edbInfo.EdbType = obj.GetEdbType()
  82. edbInfo.EdbCode = params.EdbCode
  83. edbInfo.EdbName = params.EdbName
  84. edbInfo.EdbNameEn = params.EdbName
  85. edbInfo.EdbNameSource = params.EdbName
  86. edbInfo.Frequency = params.Frequency
  87. edbInfo.Unit = params.Unit
  88. edbInfo.UnitEn = params.Unit
  89. edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
  90. edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
  91. edbInfo.ClassifyId = params.ClassifyId
  92. edbInfo.SysUserId = params.SysUserId
  93. edbInfo.SysUserRealName = params.SysUserRealName
  94. edbInfo.Sort = params.Sort
  95. edbInfo.TerminalCode = baseIndex.TerminalCode
  96. edbInfo.UniqueCode = params.UniqueCode
  97. edbInfo.CreateTime = time.Now()
  98. edbInfo.ModifyTime = time.Now()
  99. e := to.Create(edbInfo).Error
  100. if e != nil {
  101. err = fmt.Errorf("insert edb err: %v", e)
  102. return
  103. }
  104. // 新增指标关联
  105. edbMapping := new(BaseFromEdbMapping)
  106. edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
  107. edbMapping.BaseIndexCode = baseIndex.IndexCode
  108. edbMapping.EdbInfoId = edbInfo.EdbInfoId
  109. edbMapping.EdbCode = edbInfo.EdbCode
  110. edbMapping.Source = obj.GetSource()
  111. edbMapping.SubSource = obj.GetSubSource()
  112. edbMapping.ConvertRule = params.ConvertRule
  113. edbMapping.CreateTime = time.Now().Local()
  114. edbMapping.ModifyTime = time.Now().Local()
  115. e = to.Create(edbMapping).Error
  116. if e != nil {
  117. err = fmt.Errorf("insert base edb mapping err: %v", e)
  118. return
  119. }
  120. // 刷新数据
  121. err = obj.Refresh(edbInfo, edbMapping, "")
  122. return
  123. }
  124. func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  125. if utils.UseMongo {
  126. return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
  127. }
  128. return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
  129. }
  130. func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  131. if edbInfo == nil || edbBaseMapping == nil {
  132. err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
  133. return
  134. }
  135. // 真实数据的最大日期, 插入规则配置的日期
  136. var realDataMaxDate, edbDataInsertConfigDate time.Time
  137. var edbDataInsertConfig *EdbDataInsertConfig
  138. var isFindConfigDateRealData bool
  139. {
  140. conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  141. if e != nil && e.Error() != utils.ErrNoRow() {
  142. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
  143. return
  144. }
  145. edbDataInsertConfig = conf
  146. if edbDataInsertConfig != nil {
  147. edbDataInsertConfigDate = edbDataInsertConfig.Date
  148. }
  149. }
  150. // 查询时间为开始时间-3d
  151. var queryDate string
  152. if startDate != "" {
  153. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  154. if e != nil {
  155. err = fmt.Errorf("刷新开始时间有误, %v", e)
  156. return
  157. }
  158. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  159. }
  160. // 源指标数据
  161. baseDataList := make([]*BaseFromThsHfData, 0)
  162. {
  163. ob := new(BaseFromThsHfData)
  164. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
  165. pars := make([]interface{}, 0)
  166. pars = append(pars, edbBaseMapping.BaseIndexCode)
  167. if queryDate != "" {
  168. cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
  169. pars = append(pars, queryDate)
  170. }
  171. list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
  172. if e != nil {
  173. err = fmt.Errorf("获取数据源数据失败, %v", e)
  174. return
  175. }
  176. baseDataList = list
  177. }
  178. // 转换数据
  179. convertRule := new(ThsHfIndexConvert2EdbRule)
  180. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  181. err = fmt.Errorf("转换规则有误, %v", e)
  182. return
  183. }
  184. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  185. for _, v := range baseDataList {
  186. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  187. DataTime: v.DataTime,
  188. Value: v.Value,
  189. })
  190. }
  191. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  192. if e != nil {
  193. err = fmt.Errorf("转换数据失败, %v", e)
  194. return
  195. }
  196. if len(convertData) == 0 {
  197. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  198. return
  199. }
  200. // 获取已有数据
  201. dataOb := new(EdbDataThsHf)
  202. dataExists := make(map[string]*EdbDataThsHf)
  203. searchExistMap := make(map[string]*EdbInfoSearchData)
  204. {
  205. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
  206. pars := make([]interface{}, 0)
  207. pars = append(pars, edbInfo.EdbInfoId)
  208. if queryDate != "" {
  209. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  210. pars = append(pars, queryDate)
  211. }
  212. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  213. if e != nil {
  214. err = fmt.Errorf("获取指标数据失败, %v", e)
  215. return
  216. }
  217. for _, v := range list {
  218. dataExists[v.DataTime.Format(utils.FormatDate)] = v
  219. searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
  220. EdbDataId: v.EdbDataId,
  221. EdbInfoId: v.EdbInfoId,
  222. DataTime: v.DataTime.Format(utils.FormatDate),
  223. Value: v.Value,
  224. EdbCode: v.EdbCode,
  225. DataTimestamp: v.DataTimestamp,
  226. }
  227. }
  228. }
  229. // 比对数据
  230. insertExist := make(map[string]bool)
  231. insertData := make([]*EdbDataThsHf, 0)
  232. updateData := make([]*EdbDataThsHf, 0)
  233. for k, v := range convertData {
  234. strDate := k.Format(utils.FormatDate)
  235. // 手动插入数据的判断
  236. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  237. realDataMaxDate = k
  238. }
  239. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  240. isFindConfigDateRealData = true
  241. }
  242. // 入库值
  243. saveVal := decimal.NewFromFloat(v).Round(4).String()
  244. d, e := decimal.NewFromString(saveVal)
  245. if e != nil {
  246. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  247. continue
  248. }
  249. saveFloat, _ := d.Float64()
  250. // 更新
  251. exists := dataExists[strDate]
  252. if exists != nil {
  253. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  254. if saveVal != existVal {
  255. exists.Value = saveFloat
  256. updateData = append(updateData, exists)
  257. }
  258. continue
  259. }
  260. // 新增
  261. if insertExist[strDate] {
  262. continue
  263. }
  264. insertExist[strDate] = true
  265. timestamp := k.UnixNano() / 1e6
  266. insertData = append(insertData, &EdbDataThsHf{
  267. EdbInfoId: edbInfo.EdbInfoId,
  268. EdbCode: edbInfo.EdbCode,
  269. DataTime: k,
  270. Value: saveFloat,
  271. CreateTime: time.Now(),
  272. ModifyTime: time.Now(),
  273. DataTimestamp: timestamp,
  274. })
  275. }
  276. // 批量新增/更新
  277. if len(insertData) > 0 {
  278. if e = dataOb.CreateMulti(insertData); e != nil {
  279. err = fmt.Errorf("批量新增指标数据失败, %v", e)
  280. return
  281. }
  282. }
  283. if len(updateData) > 0 {
  284. if e = dataOb.MultiUpdateValue(updateData); e != nil {
  285. err = fmt.Errorf("批量更新指标数据失败, %v", e)
  286. return
  287. }
  288. }
  289. // 处理手工数据补充的配置
  290. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
  291. return
  292. }
  293. func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  294. defer func() {
  295. if err != nil {
  296. utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err))
  297. }
  298. }()
  299. var realDataMaxDate, edbDataInsertConfigDate time.Time
  300. var edbDataInsertConfig *EdbDataInsertConfig
  301. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  302. {
  303. insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  304. if e != nil && e.Error() != utils.ErrNoRow() {
  305. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e)
  306. return
  307. }
  308. edbDataInsertConfig = insertConfig
  309. if edbDataInsertConfig != nil {
  310. edbDataInsertConfigDate = edbDataInsertConfig.Date
  311. }
  312. }
  313. // 查询时间为开始时间-3d
  314. var queryDate string
  315. if startDate != "" {
  316. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  317. if e != nil {
  318. err = fmt.Errorf("刷新开始时间有误, %v", e)
  319. return
  320. }
  321. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  322. }
  323. // 获取源指标数据
  324. baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate)
  325. if e != nil {
  326. err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
  327. return
  328. }
  329. // 转换数据
  330. convertRule := new(ThsHfIndexConvert2EdbRule)
  331. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  332. err = fmt.Errorf("转换规则有误, %v", e)
  333. return
  334. }
  335. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  336. for _, v := range baseDataList {
  337. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  338. DataTime: v.DataTime,
  339. Value: v.Value,
  340. })
  341. }
  342. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  343. if e != nil {
  344. err = fmt.Errorf("转换数据失败, %v", e)
  345. return
  346. }
  347. if len(convertData) == 0 {
  348. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  349. return
  350. }
  351. //获取指标所有数据
  352. existDataList := make([]*mgo.EdbDataThsHf, 0)
  353. mogDataObj := new(mgo.EdbDataThsHf)
  354. {
  355. // 构建查询条件
  356. queryConditions := bson.M{
  357. "edb_code": edbInfo.EdbCode,
  358. }
  359. if queryDate != `` {
  360. //获取已存在的所有数据
  361. startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
  362. if e != nil {
  363. err = fmt.Errorf("startDateTime parse err: %v", e)
  364. return
  365. }
  366. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  367. }
  368. existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  369. if e != nil {
  370. err = fmt.Errorf("GetAllDataList, err: %v", e)
  371. return
  372. }
  373. }
  374. existDataMap := make(map[string]*mgo.EdbDataThsHf)
  375. removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
  376. for _, v := range existDataList {
  377. tmpDate := v.DataTime.Format(utils.FormatDate)
  378. existDataMap[tmpDate] = v
  379. removeDataTimeMap[tmpDate] = true
  380. }
  381. // 待添加的数据集
  382. addDataList := make([]interface{}, 0)
  383. updateDataList := make([]mgo.EdbDataThsHf, 0)
  384. insertExist := make(map[string]bool)
  385. for k, v := range convertData {
  386. strDate := k.Format(utils.FormatDate)
  387. // 手动插入数据的判断
  388. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  389. realDataMaxDate = k
  390. }
  391. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  392. isFindConfigDateRealData = true
  393. }
  394. // 入库值
  395. saveVal := decimal.NewFromFloat(v).Round(4).String()
  396. d, e := decimal.NewFromString(saveVal)
  397. if e != nil {
  398. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  399. continue
  400. }
  401. saveFloat, _ := d.Float64()
  402. // 更新
  403. exists := existDataMap[strDate]
  404. if exists != nil {
  405. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  406. if saveVal != existVal {
  407. exists.Value = saveFloat
  408. updateDataList = append(updateDataList, *exists)
  409. }
  410. continue
  411. }
  412. // 新增
  413. if insertExist[strDate] {
  414. continue
  415. }
  416. insertExist[strDate] = true
  417. timestamp := k.UnixNano() / 1e6
  418. addDataList = append(addDataList, &mgo.EdbDataThsHf{
  419. EdbInfoId: edbInfo.EdbInfoId,
  420. EdbCode: edbInfo.EdbCode,
  421. DataTime: k,
  422. Value: saveFloat,
  423. CreateTime: time.Now(),
  424. ModifyTime: time.Now(),
  425. DataTimestamp: timestamp,
  426. })
  427. }
  428. // 入库
  429. {
  430. coll := mogDataObj.GetCollection()
  431. //删除已经不存在的指标数据(由于该指标当日的数据删除了)
  432. {
  433. removeDateList := make([]time.Time, 0)
  434. for dateTime := range removeDataTimeMap {
  435. //获取已存在的所有数据
  436. tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  437. if e != nil {
  438. err = fmt.Errorf("tmpDateTime parse err: %v", e)
  439. return
  440. }
  441. removeDateList = append(removeDateList, tmpDateTime)
  442. }
  443. removeNum := len(removeDateList)
  444. if removeNum > 0 {
  445. if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil {
  446. err = fmt.Errorf("RemoveManyByColl, err: %v", e)
  447. return
  448. }
  449. }
  450. }
  451. // 插入新数据
  452. if len(addDataList) > 0 {
  453. if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
  454. err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
  455. return
  456. }
  457. }
  458. // 修改历史数据
  459. if len(updateDataList) > 0 {
  460. for _, v := range updateDataList {
  461. if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
  462. err = fmt.Errorf("UpdateDataByColl, err: %v", e)
  463. return
  464. }
  465. }
  466. }
  467. }
  468. // 处理手工数据补充的配置
  469. obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
  470. return
  471. }
  472. type ThsHfConvertOriginData struct {
  473. DataTime time.Time `description:"数据日期(至时分秒)"`
  474. Value float64 `description:"数据值"`
  475. }
  476. // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
  477. func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
  478. // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
  479. timeData = make(map[time.Time]float64)
  480. if len(originData) == 0 || convertRule == nil {
  481. return
  482. }
  483. if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
  484. err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
  485. return
  486. }
  487. // 升序排序
  488. sort.Slice(originData, func(i, j int) bool {
  489. return originData[i].DataTime.Before(originData[j].DataTime)
  490. })
  491. // 将数据根据日期进行分组
  492. var sortDates []string
  493. groupDateData := make(map[string][]*ThsHfConvertOriginData)
  494. for _, v := range originData {
  495. d := v.DataTime.Format(utils.FormatDate)
  496. if !utils.InArrayByStr(sortDates, d) {
  497. sortDates = append(sortDates, d)
  498. }
  499. if groupDateData[d] == nil {
  500. groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
  501. }
  502. groupDateData[d] = append(groupDateData[d], v)
  503. }
  504. // 取值方式-指定时间的值
  505. if convertRule.ConvertType == 1 {
  506. for k, v := range sortDates {
  507. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  508. if e != nil {
  509. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  510. continue
  511. }
  512. var timeTarget time.Time
  513. dateData := make([]*ThsHfConvertOriginData, 0)
  514. // 当日
  515. if convertRule.ConvertFixed.FixedDay == 1 {
  516. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
  517. if e != nil {
  518. utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
  519. continue
  520. }
  521. timeTarget = tg
  522. dt := groupDateData[v]
  523. if dt == nil {
  524. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
  525. continue
  526. }
  527. if len(dt) == 0 {
  528. continue
  529. }
  530. dateData = dt
  531. }
  532. // 前一日
  533. if convertRule.ConvertFixed.FixedDay == 2 {
  534. if k < 1 {
  535. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  536. continue
  537. }
  538. preDate := sortDates[k-1]
  539. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  540. if e != nil {
  541. utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
  542. continue
  543. }
  544. timeTarget = tg
  545. dt := groupDateData[preDate]
  546. if dt == nil {
  547. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  548. continue
  549. }
  550. if len(dt) == 0 {
  551. continue
  552. }
  553. dateData = dt
  554. }
  555. if len(dateData) == 0 {
  556. utils.FileLog.Info("日期%s无数据序列", v)
  557. continue
  558. }
  559. // 重新获取数据序列中, 时间在目标时间点之后的
  560. newDateData := make([]*ThsHfConvertOriginData, 0)
  561. for kv, dv := range dateData {
  562. if dv.DataTime.Before(timeTarget) {
  563. continue
  564. }
  565. // 由于升序排列, 直接取之后所有的数据
  566. newDateData = append(newDateData, dateData[kv:]...)
  567. break
  568. }
  569. // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
  570. if len(newDateData) == 0 {
  571. utils.FileLog.Info("日期%s无有效数据", v)
  572. continue
  573. }
  574. timeData[todayTime] = newDateData[0].Value
  575. }
  576. return
  577. }
  578. // 取值方式-区间计算值
  579. for k, v := range sortDates {
  580. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  581. if e != nil {
  582. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  583. continue
  584. }
  585. var thisDate, preDate string
  586. thisDate = v
  587. if k > 1 {
  588. preDate = sortDates[k-1]
  589. }
  590. var startTimeTarget, endTimeTarget time.Time
  591. // 起始时间-当日/前一日
  592. if convertRule.ConvertArea.StartDay == 1 {
  593. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
  594. if e != nil {
  595. utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
  596. continue
  597. }
  598. startTimeTarget = tg
  599. }
  600. if convertRule.ConvertArea.StartDay == 2 {
  601. if preDate == "" {
  602. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  603. continue
  604. }
  605. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
  606. if e != nil {
  607. utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
  608. continue
  609. }
  610. startTimeTarget = tg
  611. }
  612. // 截止时间-当日/前一日
  613. if convertRule.ConvertArea.EndDay == 1 {
  614. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
  615. if e != nil {
  616. utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
  617. continue
  618. }
  619. endTimeTarget = tg
  620. }
  621. if convertRule.ConvertArea.EndDay == 2 {
  622. if preDate == "" {
  623. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  624. continue
  625. }
  626. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
  627. if e != nil {
  628. utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
  629. continue
  630. }
  631. endTimeTarget = tg
  632. }
  633. if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
  634. utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
  635. continue
  636. }
  637. // 合并前日当日数据
  638. dateData := make([]*ThsHfConvertOriginData, 0)
  639. if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
  640. // 起始截止均为当日
  641. dateData = groupDateData[thisDate]
  642. if dateData == nil {
  643. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  644. continue
  645. }
  646. if len(dateData) == 0 {
  647. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  648. continue
  649. }
  650. } else {
  651. if preDate == "" {
  652. continue
  653. }
  654. // 起始截止时间含前日
  655. preData := groupDateData[preDate]
  656. if preData == nil {
  657. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  658. continue
  659. }
  660. if len(preData) == 0 {
  661. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  662. continue
  663. }
  664. thisData := groupDateData[thisDate]
  665. if thisData == nil {
  666. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  667. continue
  668. }
  669. if len(thisData) == 0 {
  670. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  671. continue
  672. }
  673. dateData = append(dateData, preData...)
  674. dateData = append(dateData, thisData...)
  675. }
  676. if len(dateData) == 0 {
  677. utils.FileLog.Info("日期%s无数据序列", v)
  678. continue
  679. }
  680. // 重组时间区间内的数据
  681. newDateData := make([]*ThsHfConvertOriginData, 0)
  682. for _, dv := range dateData {
  683. if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
  684. continue
  685. }
  686. newDateData = append(newDateData, dv)
  687. }
  688. if len(newDateData) == 0 {
  689. utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
  690. continue
  691. }
  692. // 取出区间内的均值/最值
  693. var avgVal, minVal, maxVal, sumVal float64
  694. minVal, maxVal = newDateData[0].Value, newDateData[0].Value
  695. for _, nv := range newDateData {
  696. sumVal += nv.Value
  697. if nv.Value > maxVal {
  698. maxVal = nv.Value
  699. }
  700. if nv.Value < minVal {
  701. minVal = nv.Value
  702. }
  703. }
  704. avgVal = sumVal / float64(len(newDateData))
  705. switch convertRule.ConvertArea.CalculateType {
  706. case 1:
  707. timeData[todayTime] = avgVal
  708. case 2:
  709. timeData[todayTime] = maxVal
  710. case 3:
  711. timeData[todayTime] = minVal
  712. default:
  713. utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
  714. }
  715. }
  716. return
  717. }
  718. func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) {
  719. newDataList = make([]EdbInfoMgoData, 0)
  720. // 获取数据源的指标数据
  721. mogDataObj := new(mgo.BaseFromThsHfData)
  722. // 构建查询条件
  723. queryConditions := bson.M{
  724. "index_code": baseIndexCode,
  725. }
  726. if startDate != `` {
  727. //获取已存在的所有数据
  728. startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  729. if tmpErr != nil {
  730. err = tmpErr
  731. return
  732. }
  733. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  734. }
  735. baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  736. if err != nil {
  737. fmt.Println("GetAllDataList Err:" + err.Error())
  738. return
  739. }
  740. for _, v := range baseDataList {
  741. newDataList = append(newDataList, EdbInfoMgoData{
  742. //EdbDataId: v.ID,
  743. DataTime: v.DataTime,
  744. Value: v.Value,
  745. EdbCode: v.IndexCode,
  746. })
  747. }
  748. return
  749. }
  750. func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
  751. if edbDataInsertConfig == nil {
  752. return
  753. }
  754. var err error
  755. defer func() {
  756. if err != nil {
  757. utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
  758. }
  759. }()
  760. edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
  761. // 如果存在真实数据的最大日期 && 存在配置插入数据的最大日期 && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
  762. if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
  763. go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
  764. mogDataObj := mgo.EdbDataThsHf{}
  765. coll := mogDataObj.GetCollection()
  766. edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
  767. // 如果没有找到找到配置日期的实际数据,那么就直接删除
  768. if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
  769. mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
  770. }
  771. } else {
  772. edbDataInsertConfig.RealDate = realDataMaxDate
  773. //_, err = o.Update(edbDataInsertConfig, "RealDate")
  774. err = global.DEFAULT_DmSQL.Model(edbDataInsertConfig).Select([]string{"RealDate"}).Updates(edbDataInsertConfig).Error
  775. }
  776. return
  777. }
  778. func (obj EdbThsHf) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
  779. if utils.UseMongo {
  780. edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode)
  781. // 如果正常获取到了,那就去修改指标的最大最小值
  782. if tmpErr == nil && edbInfoMaxAndMinInfo != nil {
  783. err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
  784. } else {
  785. // 清空的目的是为了避免异常返回
  786. err = nil
  787. }
  788. } else {
  789. err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
  790. }
  791. return
  792. }
  793. func (obj EdbThsHf) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
  794. mogDataObj := new(mgo.EdbDataThsHf)
  795. pipeline := []bson.M{
  796. {"$match": bson.M{"edb_code": edbCode}},
  797. {"$group": bson.M{
  798. "_id": nil,
  799. "min_date": bson.M{"$min": "$data_time"},
  800. "max_date": bson.M{"$max": "$data_time"},
  801. "min_value": bson.M{"$min": "$value"},
  802. "max_value": bson.M{"$max": "$value"},
  803. }},
  804. {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
  805. }
  806. result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
  807. if err != nil {
  808. fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
  809. return
  810. }
  811. if !result.MaxDate.IsZero() {
  812. whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
  813. selectParam := bson.D{{"value", 1}, {"_id", 0}}
  814. latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
  815. if tmpErr != nil {
  816. err = tmpErr
  817. return
  818. }
  819. result.LatestValue = latestValue.Value
  820. result.EndValue = latestValue.Value
  821. }
  822. item = &EdbInfoMaxAndMinInfo{
  823. MinDate: result.MinDate.Format(utils.FormatDate),
  824. MaxDate: result.MaxDate.Format(utils.FormatDate),
  825. MinValue: result.MinValue,
  826. MaxValue: result.MaxValue,
  827. LatestValue: result.LatestValue,
  828. LatestDate: result.LatestDate.Format(utils.FormatDate),
  829. EndValue: result.EndValue,
  830. }
  831. return
  832. }