edb_ths_hf.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. package models
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/models/mgo"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "github.com/beego/beego/v2/client/orm"
  8. "github.com/shopspring/decimal"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "reflect"
  11. "sort"
  12. "time"
  13. )
  14. // EdbThsHf 自有数据
  15. type EdbThsHf struct{}
  16. // GetSource 获取来源编码id
  17. func (obj EdbThsHf) GetSource() int {
  18. return utils.DATA_SOURCE_THS
  19. }
  20. // GetSubSource 获取子来源编码id
  21. func (obj EdbThsHf) GetSubSource() int {
  22. return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
  23. }
  24. // GetSourceName 获取来源名称
  25. func (obj EdbThsHf) GetSourceName() string {
  26. return utils.DATA_SOURCE_NAME_THS
  27. }
  28. // GetEdbType 获取指标类型
  29. func (obj EdbThsHf) GetEdbType() int {
  30. return utils.DEFAULT_EDB_TYPE
  31. }
  32. // ThsHfAddBaseParams
  33. // @Description: 基础指标的添加参数
  34. type ThsHfAddBaseParams struct {
  35. EdbCode string `description:"指标编码"`
  36. EdbName string `description:"指标名称"`
  37. Unit string `description:"单位"`
  38. Frequency string `description:"频度"`
  39. Sort int `description:"排序"`
  40. ClassifyId int `description:"所属分类"`
  41. SysUserId int `description:"用户id"`
  42. SysUserRealName string `description:"用户真实名称"`
  43. UniqueCode string `description:"唯一编码"`
  44. ConvertRule string `description:"转换规则"`
  45. }
  46. // ThsHfEditBaseParams
  47. // @Description: 基础指标的修改参数
  48. type ThsHfEditBaseParams struct {
  49. EdbCode string `description:"指标编码"`
  50. EdbName string `description:"指标名称"`
  51. EdbNameEn string `description:"指标名称(英文)"`
  52. Unit string `description:"单位"`
  53. UnitEn string `description:"单位(英文)"`
  54. ClassifyId int `description:"所属分类"`
  55. SysUserId int `description:"用户id"`
  56. SysUserRealName string `description:"用户真实名称"`
  57. UniqueCode string `description:"编码"`
  58. Lang string `description:"语言版本"`
  59. EdbInfo *EdbInfo `description:"指标信息"`
  60. }
  61. type ThsHfRefreshBaseParams struct {
  62. EdbInfo *EdbInfo
  63. StartDate string
  64. EndDate string
  65. }
  66. // Add
  67. // @Description: 添加指标
  68. func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
  69. o := orm.NewOrm()
  70. tx, e := o.Begin()
  71. if e != nil {
  72. err = fmt.Errorf("orm begin err: %v", e)
  73. return
  74. }
  75. defer func() {
  76. if err != nil {
  77. _ = tx.Rollback()
  78. utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err))
  79. return
  80. }
  81. _ = tx.Commit()
  82. }()
  83. // 新增指标
  84. edbInfo = new(EdbInfo)
  85. edbInfo.Source = obj.GetSource()
  86. edbInfo.SubSource = obj.GetSubSource()
  87. edbInfo.SourceName = obj.GetSourceName()
  88. edbInfo.EdbType = obj.GetEdbType()
  89. edbInfo.EdbCode = params.EdbCode
  90. edbInfo.EdbName = params.EdbName
  91. edbInfo.EdbNameEn = params.EdbName
  92. edbInfo.EdbNameSource = params.EdbName
  93. edbInfo.Frequency = params.Frequency
  94. edbInfo.Unit = params.Unit
  95. edbInfo.UnitEn = params.Unit
  96. edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
  97. edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
  98. edbInfo.ClassifyId = params.ClassifyId
  99. edbInfo.SysUserId = params.SysUserId
  100. edbInfo.SysUserRealName = params.SysUserRealName
  101. edbInfo.Sort = params.Sort
  102. edbInfo.TerminalCode = baseIndex.TerminalCode
  103. edbInfo.UniqueCode = params.UniqueCode
  104. edbInfo.CreateTime = time.Now()
  105. edbInfo.ModifyTime = time.Now()
  106. edbInfoId, e := tx.Insert(edbInfo)
  107. if e != nil {
  108. err = fmt.Errorf("insert edb err: %v", e)
  109. return
  110. }
  111. edbInfo.EdbInfoId = int(edbInfoId)
  112. // 新增指标关联
  113. edbMapping := new(BaseFromEdbMapping)
  114. edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
  115. edbMapping.BaseIndexCode = baseIndex.IndexCode
  116. edbMapping.EdbInfoId = edbInfo.EdbInfoId
  117. edbMapping.EdbCode = edbInfo.EdbCode
  118. edbMapping.Source = obj.GetSource()
  119. edbMapping.SubSource = obj.GetSubSource()
  120. edbMapping.ConvertRule = params.ConvertRule
  121. edbMapping.CreateTime = time.Now().Local()
  122. edbMapping.ModifyTime = time.Now().Local()
  123. edbMappingId, e := tx.Insert(edbMapping)
  124. if e != nil {
  125. err = fmt.Errorf("insert base edb mapping err: %v", e)
  126. return
  127. }
  128. edbMapping.Id = int(edbMappingId)
  129. // 刷新数据
  130. err = obj.Refresh(edbInfo, edbMapping, "")
  131. return
  132. }
  133. func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  134. if utils.UseMongo {
  135. return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
  136. }
  137. return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
  138. }
  139. func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  140. if edbInfo == nil || edbBaseMapping == nil {
  141. err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
  142. return
  143. }
  144. // 真实数据的最大日期, 插入规则配置的日期
  145. var realDataMaxDate, edbDataInsertConfigDate time.Time
  146. var edbDataInsertConfig *EdbDataInsertConfig
  147. var isFindConfigDateRealData bool
  148. {
  149. conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  150. if e != nil && e.Error() != utils.ErrNoRow() {
  151. err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
  152. return
  153. }
  154. edbDataInsertConfig = conf
  155. if edbDataInsertConfig != nil {
  156. edbDataInsertConfigDate = edbDataInsertConfig.Date
  157. }
  158. }
  159. // 查询时间为开始时间-3d
  160. var queryDate string
  161. if startDate != "" {
  162. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  163. if e != nil {
  164. err = fmt.Errorf("刷新开始时间有误, %v", e)
  165. return
  166. }
  167. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  168. }
  169. // 源指标数据
  170. baseDataList := make([]*BaseFromThsHfData, 0)
  171. {
  172. ob := new(BaseFromThsHfData)
  173. cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
  174. pars := make([]interface{}, 0)
  175. pars = append(pars, edbBaseMapping.BaseIndexCode)
  176. if queryDate != "" {
  177. cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
  178. pars = append(pars, queryDate)
  179. }
  180. list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
  181. if e != nil {
  182. err = fmt.Errorf("获取数据源数据失败, %v", e)
  183. return
  184. }
  185. baseDataList = list
  186. }
  187. // 转换数据
  188. convertRule := new(ThsHfIndexConvert2EdbRule)
  189. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  190. err = fmt.Errorf("转换规则有误, %v", e)
  191. return
  192. }
  193. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  194. for _, v := range baseDataList {
  195. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  196. DataTime: v.DataTime,
  197. Value: v.Value,
  198. })
  199. }
  200. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  201. if e != nil {
  202. err = fmt.Errorf("转换数据失败, %v", e)
  203. return
  204. }
  205. if len(convertData) == 0 {
  206. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  207. return
  208. }
  209. // 获取已有数据
  210. dataOb := new(EdbDataThsHf)
  211. dataExists := make(map[string]*EdbDataThsHf)
  212. searchExistMap := make(map[string]*EdbInfoSearchData)
  213. {
  214. cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
  215. pars := make([]interface{}, 0)
  216. pars = append(pars, edbInfo.EdbInfoId)
  217. if queryDate != "" {
  218. cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
  219. pars = append(pars, queryDate)
  220. }
  221. list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
  222. if e != nil {
  223. err = fmt.Errorf("获取指标数据失败, %v", e)
  224. return
  225. }
  226. for _, v := range list {
  227. dataExists[v.DataTime.Format(utils.FormatDate)] = v
  228. searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
  229. EdbDataId: v.EdbDataId,
  230. EdbInfoId: v.EdbInfoId,
  231. DataTime: v.DataTime.Format(utils.FormatDate),
  232. Value: v.Value,
  233. EdbCode: v.EdbCode,
  234. DataTimestamp: v.DataTimestamp,
  235. }
  236. }
  237. }
  238. // 比对数据
  239. insertExist := make(map[string]bool)
  240. insertData := make([]*EdbDataThsHf, 0)
  241. updateData := make([]*EdbDataThsHf, 0)
  242. for k, v := range convertData {
  243. strDate := k.Format(utils.FormatDate)
  244. // 手动插入数据的判断
  245. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  246. realDataMaxDate = k
  247. }
  248. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  249. isFindConfigDateRealData = true
  250. }
  251. // 入库值
  252. saveVal := decimal.NewFromFloat(v).Round(4).String()
  253. d, e := decimal.NewFromString(saveVal)
  254. if e != nil {
  255. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  256. continue
  257. }
  258. saveFloat, _ := d.Float64()
  259. // 更新
  260. exists := dataExists[strDate]
  261. if exists != nil {
  262. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  263. if saveVal != existVal {
  264. exists.Value = saveFloat
  265. updateData = append(updateData, exists)
  266. }
  267. continue
  268. }
  269. // 新增
  270. if insertExist[strDate] {
  271. continue
  272. }
  273. insertExist[strDate] = true
  274. timestamp := k.UnixNano() / 1e6
  275. insertData = append(insertData, &EdbDataThsHf{
  276. EdbInfoId: edbInfo.EdbInfoId,
  277. EdbCode: edbInfo.EdbCode,
  278. DataTime: k,
  279. Value: saveFloat,
  280. CreateTime: time.Now(),
  281. ModifyTime: time.Now(),
  282. DataTimestamp: timestamp,
  283. })
  284. }
  285. // 批量新增/更新
  286. if len(insertData) > 0 {
  287. if e = dataOb.CreateMulti(insertData); e != nil {
  288. err = fmt.Errorf("批量新增指标数据失败, %v", e)
  289. return
  290. }
  291. }
  292. if len(updateData) > 0 {
  293. if e = dataOb.MultiUpdateValue(updateData); e != nil {
  294. err = fmt.Errorf("批量更新指标数据失败, %v", e)
  295. return
  296. }
  297. }
  298. // 处理手工数据补充的配置
  299. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
  300. return
  301. }
  302. func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
  303. var realDataMaxDate, edbDataInsertConfigDate time.Time
  304. var edbDataInsertConfig *EdbDataInsertConfig
  305. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  306. {
  307. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
  308. if err != nil && err.Error() != utils.ErrNoRow() {
  309. return
  310. }
  311. if edbDataInsertConfig != nil {
  312. edbDataInsertConfigDate = edbDataInsertConfig.Date
  313. }
  314. }
  315. // 查询时间为开始时间-3d
  316. var queryDate string
  317. if startDate != "" {
  318. st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  319. if e != nil {
  320. err = fmt.Errorf("刷新开始时间有误, %v", e)
  321. return
  322. }
  323. queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
  324. }
  325. // 获取源指标数据
  326. baseDataList, e := obj.getBaseIndexDataByMongo(edbInfo, queryDate)
  327. if e != nil {
  328. err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
  329. return
  330. }
  331. // 转换数据
  332. convertRule := new(ThsHfIndexConvert2EdbRule)
  333. if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
  334. err = fmt.Errorf("转换规则有误, %v", e)
  335. return
  336. }
  337. convertOriginData := make([]*ThsHfConvertOriginData, 0)
  338. for _, v := range baseDataList {
  339. convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
  340. DataTime: v.DataTime,
  341. Value: v.Value,
  342. })
  343. }
  344. convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
  345. if e != nil {
  346. err = fmt.Errorf("转换数据失败, %v", e)
  347. return
  348. }
  349. if len(convertData) == 0 {
  350. utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
  351. return
  352. }
  353. //获取指标所有数据
  354. existDataList := make([]*mgo.EdbDataThsHf, 0)
  355. mogDataObj := new(mgo.EdbDataThsHf)
  356. {
  357. // 构建查询条件
  358. queryConditions := bson.M{
  359. "edb_code": edbInfo.EdbCode,
  360. }
  361. if queryDate != `` {
  362. //获取已存在的所有数据
  363. startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
  364. if tmpErr != nil {
  365. err = tmpErr
  366. return
  367. }
  368. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  369. }
  370. existDataList, err = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  371. if err != nil {
  372. fmt.Println(obj.GetSourceName() + ",refresh err;getEdbDataThsHfList Err:" + err.Error())
  373. return
  374. }
  375. }
  376. existDataMap := make(map[string]*mgo.EdbDataThsHf)
  377. removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
  378. for _, v := range existDataList {
  379. tmpDate := v.DataTime.Format(utils.FormatDate)
  380. existDataMap[tmpDate] = v
  381. removeDataTimeMap[tmpDate] = true
  382. }
  383. // 待添加的数据集
  384. addDataList := make([]interface{}, 0)
  385. updateDataList := make([]mgo.EdbDataThsHf, 0)
  386. insertExist := make(map[string]bool)
  387. for k, v := range convertData {
  388. strDate := k.Format(utils.FormatDate)
  389. // 手动插入数据的判断
  390. if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
  391. realDataMaxDate = k
  392. }
  393. if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
  394. isFindConfigDateRealData = true
  395. }
  396. // 入库值
  397. saveVal := decimal.NewFromFloat(v).Round(4).String()
  398. d, e := decimal.NewFromString(saveVal)
  399. if e != nil {
  400. utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
  401. continue
  402. }
  403. saveFloat, _ := d.Float64()
  404. // 更新
  405. exists := existDataMap[strDate]
  406. if exists != nil {
  407. existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
  408. if saveVal != existVal {
  409. exists.Value = saveFloat
  410. updateDataList = append(updateDataList, *exists)
  411. }
  412. continue
  413. }
  414. // 新增
  415. if insertExist[strDate] {
  416. continue
  417. }
  418. insertExist[strDate] = true
  419. timestamp := k.UnixNano() / 1e6
  420. addDataList = append(addDataList, &EdbDataThsHf{
  421. EdbInfoId: edbInfo.EdbInfoId,
  422. EdbCode: edbInfo.EdbCode,
  423. DataTime: k,
  424. Value: saveFloat,
  425. CreateTime: time.Now(),
  426. ModifyTime: time.Now(),
  427. DataTimestamp: timestamp,
  428. })
  429. }
  430. // 入库
  431. {
  432. coll := mogDataObj.GetCollection()
  433. //删除已经不存在的指标数据(由于该指标当日的数据删除了)
  434. {
  435. removeDateList := make([]time.Time, 0)
  436. for dateTime := range removeDataTimeMap {
  437. //获取已存在的所有数据
  438. tmpDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  439. if tmpErr != nil {
  440. err = tmpErr
  441. return
  442. }
  443. removeDateList = append(removeDateList, tmpDateTime)
  444. }
  445. removeNum := len(removeDateList)
  446. if removeNum > 0 {
  447. err = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}})
  448. if err != nil {
  449. fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error())
  450. return
  451. }
  452. }
  453. }
  454. // 插入新数据
  455. if len(addDataList) > 0 {
  456. err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
  457. if err != nil {
  458. fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error())
  459. return
  460. }
  461. }
  462. // 修改历史数据
  463. if len(updateDataList) > 0 {
  464. for _, v := range updateDataList {
  465. err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  466. if err != nil {
  467. fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error())
  468. return
  469. }
  470. }
  471. }
  472. }
  473. // 处理手工数据补充的配置
  474. obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
  475. return
  476. }
  477. type ThsHfConvertOriginData struct {
  478. DataTime time.Time `description:"数据日期(至时分秒)"`
  479. Value float64 `description:"数据值"`
  480. }
  481. // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
  482. func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
  483. // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
  484. timeData = make(map[time.Time]float64)
  485. if len(originData) == 0 || convertRule == nil {
  486. return
  487. }
  488. if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
  489. err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
  490. return
  491. }
  492. // 升序排序
  493. sort.Slice(originData, func(i, j int) bool {
  494. return originData[i].DataTime.Before(originData[j].DataTime)
  495. })
  496. // 将数据根据日期进行分组
  497. var sortDates []string
  498. groupDateData := make(map[string][]*ThsHfConvertOriginData)
  499. for _, v := range originData {
  500. d := v.DataTime.Format(utils.FormatDate)
  501. if !utils.InArrayByStr(sortDates, d) {
  502. sortDates = append(sortDates, d)
  503. }
  504. if groupDateData[d] == nil {
  505. groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
  506. }
  507. groupDateData[d] = append(groupDateData[d], v)
  508. }
  509. // 取值方式-指定时间的值
  510. if convertRule.ConvertType == 1 {
  511. for k, v := range sortDates {
  512. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  513. if e != nil {
  514. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  515. continue
  516. }
  517. var timeTarget time.Time
  518. dateData := make([]*ThsHfConvertOriginData, 0)
  519. // 当日
  520. if convertRule.ConvertFixed.FixedDay == 1 {
  521. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
  522. if e != nil {
  523. utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
  524. continue
  525. }
  526. timeTarget = tg
  527. dt := groupDateData[v]
  528. if dt == nil {
  529. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
  530. continue
  531. }
  532. if len(dt) == 0 {
  533. continue
  534. }
  535. dateData = dt
  536. }
  537. // 前一日
  538. if convertRule.ConvertFixed.FixedDay == 2 {
  539. if k < 1 {
  540. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  541. continue
  542. }
  543. preDate := sortDates[k-1]
  544. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
  545. if e != nil {
  546. utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
  547. continue
  548. }
  549. timeTarget = tg
  550. dt := groupDateData[preDate]
  551. if dt == nil {
  552. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  553. continue
  554. }
  555. if len(dt) == 0 {
  556. continue
  557. }
  558. dateData = dt
  559. }
  560. if len(dateData) == 0 {
  561. utils.FileLog.Info("日期%s无数据序列", v)
  562. continue
  563. }
  564. // 重新获取数据序列中, 时间在目标时间点之后的
  565. newDateData := make([]*ThsHfConvertOriginData, 0)
  566. for kv, dv := range dateData {
  567. if dv.DataTime.Before(timeTarget) {
  568. continue
  569. }
  570. // 由于升序排列, 直接取之后所有的数据
  571. newDateData = append(newDateData, dateData[kv:]...)
  572. break
  573. }
  574. // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
  575. if len(newDateData) == 0 {
  576. utils.FileLog.Info("日期%s无有效数据", v)
  577. continue
  578. }
  579. timeData[todayTime] = newDateData[0].Value
  580. }
  581. return
  582. }
  583. // 取值方式-区间计算值
  584. for k, v := range sortDates {
  585. todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
  586. if e != nil {
  587. utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
  588. continue
  589. }
  590. var thisDate, preDate string
  591. thisDate = v
  592. if k > 1 {
  593. preDate = sortDates[k-1]
  594. }
  595. var startTimeTarget, endTimeTarget time.Time
  596. // 起始时间-当日/前一日
  597. if convertRule.ConvertArea.StartDay == 1 {
  598. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
  599. if e != nil {
  600. utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
  601. continue
  602. }
  603. startTimeTarget = tg
  604. }
  605. if convertRule.ConvertArea.StartDay == 2 {
  606. if preDate == "" {
  607. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  608. continue
  609. }
  610. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
  611. if e != nil {
  612. utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
  613. continue
  614. }
  615. startTimeTarget = tg
  616. }
  617. // 截止时间-当日/前一日
  618. if convertRule.ConvertArea.EndDay == 1 {
  619. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
  620. if e != nil {
  621. utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
  622. continue
  623. }
  624. endTimeTarget = tg
  625. }
  626. if convertRule.ConvertArea.EndDay == 2 {
  627. if preDate == "" {
  628. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
  629. continue
  630. }
  631. tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
  632. if e != nil {
  633. utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
  634. continue
  635. }
  636. endTimeTarget = tg
  637. }
  638. if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
  639. utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
  640. continue
  641. }
  642. // 合并前日当日数据
  643. dateData := make([]*ThsHfConvertOriginData, 0)
  644. if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
  645. // 起始截止均为当日
  646. dateData = groupDateData[thisDate]
  647. if dateData == nil {
  648. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  649. continue
  650. }
  651. if len(dateData) == 0 {
  652. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  653. continue
  654. }
  655. } else {
  656. if preDate == "" {
  657. continue
  658. }
  659. // 起始截止时间含前日
  660. preData := groupDateData[preDate]
  661. if preData == nil {
  662. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  663. continue
  664. }
  665. if len(preData) == 0 {
  666. utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
  667. continue
  668. }
  669. thisData := groupDateData[thisDate]
  670. if thisData == nil {
  671. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  672. continue
  673. }
  674. if len(thisData) == 0 {
  675. utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
  676. continue
  677. }
  678. dateData = append(dateData, preData...)
  679. dateData = append(dateData, thisData...)
  680. }
  681. if len(dateData) == 0 {
  682. utils.FileLog.Info("日期%s无数据序列", v)
  683. continue
  684. }
  685. // 重组时间区间内的数据
  686. newDateData := make([]*ThsHfConvertOriginData, 0)
  687. for _, dv := range dateData {
  688. if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
  689. continue
  690. }
  691. newDateData = append(newDateData, dv)
  692. }
  693. if len(newDateData) == 0 {
  694. utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
  695. continue
  696. }
  697. // 取出区间内的均值/最值
  698. var avgVal, minVal, maxVal, sumVal float64
  699. minVal, maxVal = newDateData[0].Value, newDateData[0].Value
  700. for _, nv := range newDateData {
  701. sumVal += nv.Value
  702. if nv.Value > maxVal {
  703. maxVal = nv.Value
  704. }
  705. if nv.Value < minVal {
  706. minVal = nv.Value
  707. }
  708. }
  709. avgVal = sumVal / float64(len(newDateData))
  710. switch convertRule.ConvertArea.CalculateType {
  711. case 1:
  712. timeData[todayTime] = avgVal
  713. case 2:
  714. timeData[todayTime] = maxVal
  715. case 3:
  716. timeData[todayTime] = minVal
  717. default:
  718. utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
  719. }
  720. }
  721. return
  722. }
  723. func (obj EdbThsHf) getBaseIndexDataByMongo(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
  724. newDataList = make([]EdbInfoMgoData, 0)
  725. // 获取数据源的指标数据
  726. mogDataObj := new(mgo.BaseFromThsHfData)
  727. // 构建查询条件
  728. queryConditions := bson.M{
  729. "index_code": edbInfo.EdbCode,
  730. }
  731. if startDate != `` {
  732. //获取已存在的所有数据
  733. startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  734. if tmpErr != nil {
  735. err = tmpErr
  736. return
  737. }
  738. queryConditions["data_time"] = bson.M{"$gte": startDateTime}
  739. }
  740. baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
  741. if err != nil {
  742. fmt.Println("GetAllDataList Err:" + err.Error())
  743. return
  744. }
  745. for _, v := range baseDataList {
  746. newDataList = append(newDataList, EdbInfoMgoData{
  747. //EdbDataId: v.ID,
  748. DataTime: v.DataTime,
  749. Value: v.Value,
  750. EdbCode: v.IndexCode,
  751. })
  752. }
  753. return
  754. }
  755. func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
  756. if edbDataInsertConfig == nil {
  757. return
  758. }
  759. var err error
  760. defer func() {
  761. if err != nil {
  762. utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
  763. }
  764. }()
  765. edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
  766. // 如果存在真实数据的最大日期 && 存在配置插入数据的最大日期 && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
  767. if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
  768. go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
  769. mogDataObj := mgo.EdbDataThsHf{}
  770. coll := mogDataObj.GetCollection()
  771. edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
  772. // 如果没有找到找到配置日期的实际数据,那么就直接删除
  773. if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
  774. mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
  775. }
  776. } else {
  777. o := orm.NewOrm()
  778. edbDataInsertConfig.RealDate = realDataMaxDate
  779. _, err = o.Update(edbDataInsertConfig, "RealDate")
  780. }
  781. return
  782. }