predict_edb_data_calculate_zjpj.go 15 KB


  1. package models
  2. import (
  3. "errors"
  4. "eta_gn/eta_index_lib/global"
  5. "eta_gn/eta_index_lib/utils"
  6. "fmt"
  7. "gorm.io/gorm"
  8. "strings"
  9. "time"
  10. )
  11. // EdbDataPredictCalculateZjpj 直接拼接数据结构体
  12. type EdbDataPredictCalculateZjpj struct {
  13. EdbDataId int `gorm:"primaryKey;autoIncrement;column:edb_data_id"`
  14. EdbInfoId int
  15. EdbCode string
  16. DataTime string
  17. Value float64
  18. Status int
  19. CreateTime time.Time
  20. ModifyTime time.Time
  21. DataTimestamp int64
  22. }
  23. // SavePredictCalculateZjpj 新增直接拼接数据
  24. func SavePredictCalculateZjpj(req *EdbInfoCalculateBatchSaveReq, firstEdbInfo, secondEdbInfo *EdbInfo, edbCode, uniqueCode string, sysUserId int, sysUserRealName string) (edbInfo *EdbInfo, latestDateStr string, latestValue float64, err error) {
  25. to := global.DEFAULT_DmSQL.Begin()
  26. defer func() {
  27. if err != nil {
  28. to.Rollback()
  29. } else {
  30. to.Commit()
  31. }
  32. }()
  33. var existItemA, existItemB *EdbInfoCalculateMapping
  34. if req.EdbInfoId <= 0 {
  35. edbInfo = &EdbInfo{
  36. EdbInfoType: 1,
  37. SourceName: "预测直接拼接",
  38. Source: utils.DATA_SOURCE_PREDICT_CALCULATE_ZJPJ,
  39. EdbCode: edbCode,
  40. EdbName: req.EdbName,
  41. EdbNameSource: req.EdbName,
  42. Frequency: req.Frequency,
  43. Unit: req.Unit,
  44. StartDate: firstEdbInfo.StartDate,
  45. EndDate: firstEdbInfo.EndDate,
  46. ClassifyId: req.ClassifyId,
  47. SysUserId: sysUserId,
  48. SysUserRealName: sysUserRealName,
  49. UniqueCode: uniqueCode,
  50. CreateTime: time.Now(),
  51. ModifyTime: time.Now(),
  52. CalculateFormula: req.Formula,
  53. EdbType: 2,
  54. }
  55. tmpErr := to.Create(edbInfo).Error
  56. if tmpErr != nil {
  57. err = tmpErr
  58. return
  59. }
  60. //关联关系
  61. //第一个指标
  62. {
  63. existItemA = &EdbInfoCalculateMapping{
  64. EdbInfoCalculateMappingId: 0,
  65. EdbInfoId: edbInfo.EdbInfoId,
  66. Source: edbInfo.Source,
  67. SourceName: edbInfo.SourceName,
  68. EdbCode: edbInfo.EdbCode,
  69. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  70. FromEdbCode: firstEdbInfo.EdbCode,
  71. FromEdbName: firstEdbInfo.EdbName,
  72. FromSource: firstEdbInfo.Source,
  73. FromSourceName: firstEdbInfo.SourceName,
  74. FromTag: "A",
  75. Sort: 1,
  76. CreateTime: time.Now(),
  77. ModifyTime: time.Now(),
  78. }
  79. tmpErr := to.Create(existItemA).Error
  80. if tmpErr != nil {
  81. err = tmpErr
  82. return
  83. }
  84. }
  85. //第二个指标
  86. {
  87. existItemB = &EdbInfoCalculateMapping{
  88. EdbInfoCalculateMappingId: 0,
  89. EdbInfoId: edbInfo.EdbInfoId,
  90. Source: edbInfo.Source,
  91. SourceName: edbInfo.SourceName,
  92. EdbCode: edbInfo.EdbCode,
  93. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  94. FromEdbCode: secondEdbInfo.EdbCode,
  95. FromEdbName: secondEdbInfo.EdbName,
  96. FromSource: secondEdbInfo.Source,
  97. FromSourceName: secondEdbInfo.SourceName,
  98. FromTag: "B",
  99. Sort: 1,
  100. CreateTime: time.Now(),
  101. ModifyTime: time.Now(),
  102. }
  103. tmpErr := to.Create(existItemB).Error
  104. if tmpErr != nil {
  105. err = tmpErr
  106. return
  107. }
  108. }
  109. } else {
  110. edbInfo, err = GetEdbInfoById(req.EdbInfoId)
  111. if err != nil {
  112. return
  113. }
  114. latestDateStr = edbInfo.LatestDate
  115. latestValue = edbInfo.LatestValue
  116. nowEdbInfo := *edbInfo // 现在的指标信息
  117. //修改指标信息
  118. edbInfo.EdbName = req.EdbName
  119. edbInfo.EdbNameSource = req.EdbName
  120. edbInfo.Frequency = req.Frequency
  121. edbInfo.Unit = req.Unit
  122. edbInfo.ClassifyId = req.ClassifyId
  123. edbInfo.CalculateFormula = req.Formula
  124. edbInfo.ModifyTime = time.Now()
  125. err = to.Model(edbInfo).Select([]string{"EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime"}).Updates(edbInfo).Error
  126. if err != nil {
  127. return
  128. }
  129. var existCondition string
  130. var existPars []interface{}
  131. existCondition += " AND edb_info_id=? "
  132. existPars = append(existPars, edbInfo.EdbInfoId)
  133. //查询出所有的关联指标
  134. var existList []*EdbInfoCalculateMapping
  135. existList, err = GetEdbInfoCalculateListByCondition(existCondition, existPars)
  136. if err != nil {
  137. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  138. return
  139. }
  140. for _, existItem := range existList {
  141. if existItem.FromTag == "A" {
  142. existItemA = existItem
  143. } else if existItem.FromTag == "B" {
  144. existItemB = existItem
  145. }
  146. }
  147. if existItemA == nil {
  148. err = errors.New("原拼接日期之前的指标不存在")
  149. return
  150. }
  151. if existItemB == nil {
  152. err = errors.New("原拼接日期之后的指标不存在")
  153. return
  154. }
  155. // 是否需要删除数据重新计算
  156. isNeedCalculateData := false
  157. // 如果截止日期变更,那么需要重新计算
  158. if req.Formula != nowEdbInfo.CalculateFormula {
  159. isNeedCalculateData = true
  160. }
  161. //第一个指标数据
  162. {
  163. // 如果指标变了,那么需要删除关系,并重新计算
  164. if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId {
  165. //删除之前的A指标关联关系
  166. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  167. err = to.Exec(sql, edbInfo.EdbInfoId, existItemA.FromEdbInfoId).Error
  168. if err != nil {
  169. err = fmt.Errorf("删除拼接日期之前的指标关联关系失败,Err:" + err.Error())
  170. return
  171. }
  172. }
  173. }
  174. //第二个指标数据
  175. {
  176. // 如果指标变了,那么需要删除关系,并重新计算
  177. if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId {
  178. //删除之前的B指标关联关系
  179. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  180. err = to.Exec(sql, edbInfo.EdbInfoId, existItemB.FromEdbInfoId).Error
  181. if err != nil {
  182. err = fmt.Errorf("删除拼接日期之后的指标关联关系失败,Err:" + err.Error())
  183. return
  184. }
  185. }
  186. }
  187. if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId {
  188. //添加新的指标关系
  189. {
  190. existItemA = &EdbInfoCalculateMapping{
  191. EdbInfoCalculateMappingId: 0,
  192. EdbInfoId: edbInfo.EdbInfoId,
  193. Source: edbInfo.Source,
  194. SourceName: edbInfo.SourceName,
  195. EdbCode: edbInfo.EdbCode,
  196. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  197. FromEdbCode: firstEdbInfo.EdbCode,
  198. FromEdbName: firstEdbInfo.EdbName,
  199. FromSource: firstEdbInfo.Source,
  200. FromSourceName: firstEdbInfo.SourceName,
  201. FromTag: "A",
  202. Sort: 1,
  203. CreateTime: time.Now(),
  204. ModifyTime: time.Now(),
  205. }
  206. tmpErr := to.Create(existItemA).Error
  207. if tmpErr != nil {
  208. err = tmpErr
  209. return
  210. }
  211. isNeedCalculateData = true
  212. }
  213. }
  214. if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId {
  215. // 添加新的指标关联关系
  216. existItemB = &EdbInfoCalculateMapping{
  217. EdbInfoCalculateMappingId: 0,
  218. EdbInfoId: edbInfo.EdbInfoId,
  219. Source: edbInfo.Source,
  220. SourceName: edbInfo.SourceName,
  221. EdbCode: edbInfo.EdbCode,
  222. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  223. FromEdbCode: secondEdbInfo.EdbCode,
  224. FromEdbName: secondEdbInfo.EdbName,
  225. FromSource: secondEdbInfo.Source,
  226. FromSourceName: secondEdbInfo.SourceName,
  227. FromTag: "B",
  228. Sort: 2,
  229. CreateTime: time.Now(),
  230. ModifyTime: time.Now(),
  231. }
  232. tmpErr := to.Create(existItemB).Error
  233. if tmpErr != nil {
  234. err = tmpErr
  235. return
  236. }
  237. isNeedCalculateData = true
  238. }
  239. // 如果需要重新计算,那么先删除所有的指标数据,然后再重新计算
  240. if isNeedCalculateData {
  241. // 删除之前所有的指标数据
  242. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  243. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? `, tableName)
  244. err = to.Exec(sql, edbInfo.EdbInfoId).Error
  245. if err != nil {
  246. err = fmt.Errorf("删除历史数据失败,Err:" + err.Error())
  247. return
  248. }
  249. } else {
  250. return
  251. }
  252. }
  253. //拼接数据
  254. latestDateStr, latestValue, err = refreshAllPredictCalculateZjpj(to, edbInfo, firstEdbInfo, secondEdbInfo)
  255. return
  256. }
  257. // RefreshAllPredictCalculateZjpj 刷新所有 直接拼接 数据
  258. func RefreshAllPredictCalculateZjpj(edbInfo *EdbInfo) (latestDateStr string, latestValue float64, err error) {
  259. to := global.DEFAULT_DmSQL.Begin()
  260. defer func() {
  261. if err != nil {
  262. to.Rollback()
  263. } else {
  264. to.Commit()
  265. }
  266. }()
  267. //查询关联指标信息
  268. var existCondition string
  269. var existPars []interface{}
  270. existCondition += " AND edb_info_id=? "
  271. existPars = append(existPars, edbInfo.EdbInfoId)
  272. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  273. if err != nil {
  274. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  275. return
  276. }
  277. var existItemA, existItemB *EdbInfoCalculateMapping
  278. for _, existItem := range existList {
  279. if existItem.FromTag == "A" {
  280. existItemA = existItem
  281. } else if existItem.FromTag == "B" {
  282. existItemB = existItem
  283. }
  284. }
  285. if existItemA == nil {
  286. err = errors.New("原拼接日期之前的指标不存在")
  287. return
  288. }
  289. if existItemB == nil {
  290. err = errors.New("原拼接日期之后的指标不存在")
  291. return
  292. }
  293. fromEdbInfo, err := GetEdbInfoById(existItemA.FromEdbInfoId)
  294. if err != nil {
  295. err = fmt.Errorf("GetEdbInfoById Err:" + err.Error())
  296. return
  297. }
  298. secondEdbInfo, err := GetEdbInfoById(existItemB.FromEdbInfoId)
  299. if err != nil {
  300. err = fmt.Errorf("GetEdbInfoById Err:" + err.Error())
  301. return
  302. }
  303. // 刷新数据
  304. latestDateStr, latestValue, err = refreshAllPredictCalculateZjpj(to, edbInfo, fromEdbInfo, secondEdbInfo)
  305. return
  306. }
  307. // refreshAllPredictCalculateZjpj 刷新所有 直接拼接 数据
  308. func refreshAllPredictCalculateZjpj(to *gorm.DB, edbInfo, firstEdbInfo, secondEdbInfo *EdbInfo) (latestDateStr string, latestValue float64, err error) {
  309. //查询当前指标现有的数据
  310. var dataList []*EdbDataPredictCalculateZjpj
  311. sql := ` SELECT * FROM edb_data_predict_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  312. err = to.Raw(sql, edbInfo.EdbInfoId).Scan(&dataList).Error
  313. if err != nil {
  314. return
  315. }
  316. if edbInfo.CalculateFormula <= secondEdbInfo.LatestDate {
  317. latestDateStr = secondEdbInfo.LatestDate
  318. } else {
  319. if edbInfo.CalculateFormula >= firstEdbInfo.LatestDate {
  320. latestDateStr = firstEdbInfo.LatestDate
  321. } else {
  322. latestDateStr = edbInfo.CalculateFormula
  323. }
  324. }
  325. var dateArr []string
  326. dataMap := make(map[string]*EdbDataPredictCalculateZjpj)
  327. removeDataTimeMap := make(map[string]int) //需要移除的日期数据
  328. for _, v := range dataList {
  329. dateArr = append(dateArr, v.DataTime)
  330. dataMap[v.DataTime] = v
  331. removeDataTimeMap[v.DataTime] = 1
  332. }
  333. addDataList := make([]*EdbDataPredictCalculateZjpj, 0)
  334. //第一个指标
  335. {
  336. var firstDataList []*EdbInfoSearchData
  337. firstDataList, err = GetPredictEdbDataListAllByStartDate(firstEdbInfo, 0, "")
  338. if err != nil {
  339. return
  340. }
  341. for _, v := range firstDataList {
  342. if v.DataTime >= edbInfo.CalculateFormula {
  343. continue
  344. }
  345. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  346. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  347. delete(removeDataTimeMap, v.DataTime)
  348. }
  349. if latestDateStr == v.DataTime {
  350. latestValue = v.Value
  351. }
  352. //时间戳
  353. if edbData, ok := dataMap[v.DataTime]; ok {
  354. if edbData.Value != v.Value {
  355. //更新指标数据
  356. edbData.Value = v.Value
  357. _ = to.Model(edbData).Select([]string{"Value"}).Updates(edbData).Error
  358. }
  359. } else {
  360. //时间戳
  361. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  362. timestamp := currentDate.UnixNano() / 1e6
  363. edbDataZjpj := &EdbDataPredictCalculateZjpj{
  364. EdbInfoId: edbInfo.EdbInfoId,
  365. EdbCode: edbInfo.EdbCode,
  366. DataTime: v.DataTime,
  367. Value: v.Value,
  368. Status: 1,
  369. CreateTime: time.Now(),
  370. ModifyTime: time.Now(),
  371. DataTimestamp: timestamp,
  372. }
  373. addDataList = append(addDataList, edbDataZjpj)
  374. }
  375. }
  376. }
  377. //第二个指标
  378. {
  379. /*condition := ``
  380. pars := make([]interface{}, 0)
  381. condition += " AND data_time >= ? AND edb_info_id = ? "
  382. pars = append(pars, edbInfo.CalculateFormula, existItemB.FromEdbInfoId)
  383. //第二个指标的数据列表
  384. secondDataList, tmpErr := GetEdbDataListAllByTo(to, condition, pars, existItemB.FromSource, 0)
  385. if tmpErr != nil {
  386. return tmpErr
  387. }*/
  388. var secondDataList []*EdbInfoSearchData
  389. secondDataList, err = GetPredictEdbDataListAllByStartDate(secondEdbInfo, 0, edbInfo.CalculateFormula)
  390. if err != nil {
  391. return
  392. }
  393. for _, v := range secondDataList {
  394. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  395. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  396. delete(removeDataTimeMap, v.DataTime)
  397. }
  398. if latestDateStr == v.DataTime {
  399. latestValue = v.Value
  400. }
  401. if edbData, ok := dataMap[v.DataTime]; ok {
  402. if edbData.Value != v.Value {
  403. //更新指标数据
  404. edbData.Value = v.Value
  405. edbData.ModifyTime = time.Now()
  406. tmpErr := to.Model(edbData).Select([]string{"Value", "ModifyTime"}).Updates(edbData).Error
  407. if tmpErr != nil {
  408. fmt.Println("tmpErr:", tmpErr)
  409. err = tmpErr
  410. return
  411. }
  412. }
  413. } else {
  414. //时间戳
  415. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  416. timestamp := currentDate.UnixNano() / 1e6
  417. edbDataZjpj := &EdbDataPredictCalculateZjpj{
  418. EdbInfoId: edbInfo.EdbInfoId,
  419. EdbCode: edbInfo.EdbCode,
  420. DataTime: v.DataTime,
  421. Value: v.Value,
  422. Status: 1,
  423. CreateTime: time.Now(),
  424. ModifyTime: time.Now(),
  425. DataTimestamp: timestamp,
  426. }
  427. addDataList = append(addDataList, edbDataZjpj)
  428. }
  429. }
  430. }
  431. //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了)
  432. {
  433. removeDateList := make([]string, 0)
  434. for dateTime := range removeDataTimeMap {
  435. removeDateList = append(removeDateList, dateTime)
  436. }
  437. if len(removeDateList) > 0 {
  438. removeDateStr := strings.Join(removeDateList, `","`)
  439. removeDateStr = `"` + removeDateStr + `"`
  440. //如果拼接指标变更了,那么需要删除所有的指标数据
  441. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  442. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
  443. err = to.Exec(sql, edbInfo.EdbInfoId).Error
  444. if err != nil {
  445. err = fmt.Errorf("删除不存在的直接拼接指标数据失败,Err:" + err.Error())
  446. return
  447. }
  448. }
  449. }
  450. //数据入库
  451. if len(addDataList) > 0 {
  452. tmpErr := to.CreateInBatches(addDataList, 500).Error
  453. if tmpErr != nil {
  454. err = tmpErr
  455. return
  456. }
  457. }
  458. return
  459. }