edb_data_calculate_zjpj.go 16 KB


  1. package models
  2. import (
  3. "eta/eta_index_lib/utils"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "strings"
  7. "time"
  8. )
  9. // EdbDataCalculateZjpj 直接拼接数据结构体
  10. type EdbDataCalculateZjpj struct {
  11. EdbDataId int `orm:"column(edb_data_id);pk"`
  12. EdbInfoId int
  13. EdbCode string
  14. DataTime string
  15. Value float64
  16. Status int
  17. CreateTime time.Time
  18. ModifyTime time.Time
  19. DataTimestamp int64
  20. }
  21. // AddCalculateZjpj 新增直接拼接数据
  22. func AddCalculateZjpj(req *EdbInfoCalculateBatchSaveReq, firstEdbInfo, secondEdbInfo *EdbInfo, edbCode, uniqueCode string, sysUserId int, sysUserRealName string) (edbInfo *EdbInfo, err error) {
  23. o := orm.NewOrm()
  24. to, err := o.Begin()
  25. if err != nil {
  26. return
  27. }
  28. defer func() {
  29. if err != nil {
  30. fmt.Println("AddCalculateZjpj,Err:" + err.Error())
  31. _ = to.Rollback()
  32. } else {
  33. _ = to.Commit()
  34. }
  35. }()
  36. if req.EdbInfoId <= 0 {
  37. edbInfo = &EdbInfo{
  38. SourceName: "直接拼接",
  39. Source: utils.DATA_SOURCE_CALCULATE_ZJPJ,
  40. EdbCode: edbCode,
  41. EdbName: req.EdbName,
  42. EdbNameSource: req.EdbName,
  43. Frequency: req.Frequency,
  44. Unit: req.Unit,
  45. StartDate: firstEdbInfo.StartDate,
  46. EndDate: firstEdbInfo.EndDate,
  47. ClassifyId: req.ClassifyId,
  48. SysUserId: sysUserId,
  49. SysUserRealName: sysUserRealName,
  50. UniqueCode: uniqueCode,
  51. CreateTime: time.Now(),
  52. ModifyTime: time.Now(),
  53. CalculateFormula: req.Formula,
  54. EdbNameEn: req.EdbName,
  55. UnitEn: req.Unit,
  56. EdbType: 2,
  57. }
  58. newEdbInfoId, tmpErr := to.Insert(edbInfo)
  59. if tmpErr != nil {
  60. err = tmpErr
  61. return
  62. }
  63. edbInfo.EdbInfoId = int(newEdbInfoId)
  64. } else {
  65. edbInfo, err = GetEdbInfoById(req.EdbInfoId)
  66. if err != nil {
  67. return
  68. }
  69. //查询
  70. tmpEdbInfo, tmpErr := GetEdbInfoById(edbInfo.EdbInfoId)
  71. if tmpErr != nil {
  72. err = tmpErr
  73. return
  74. }
  75. tmpEdbInfo.EdbName = req.EdbName
  76. tmpEdbInfo.ClassifyId = req.ClassifyId
  77. tmpEdbInfo.Frequency = req.Frequency
  78. tmpEdbInfo.Unit = req.Unit
  79. tmpEdbInfo.CalculateFormula = req.Formula
  80. edbInfo = tmpEdbInfo
  81. //删除指标数据
  82. dataTableName := GetEdbDataTableName(utils.DATA_SOURCE_CALCULATE_ZJPJ, utils.DATA_SUB_SOURCE_EDB)
  83. fmt.Println("dataTableName:" + dataTableName)
  84. deleteSql := ` DELETE FROM %s WHERE edb_info_id=? `
  85. deleteSql = fmt.Sprintf(deleteSql, dataTableName)
  86. _, err = to.Raw(deleteSql, req.EdbInfoId).Exec()
  87. if err != nil {
  88. return
  89. }
  90. //删除指标关系
  91. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id=? `
  92. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  93. }
  94. //关联关系
  95. var existItemA, existItemB *EdbInfoCalculateMapping
  96. //第一个指标
  97. {
  98. existItemA = &EdbInfoCalculateMapping{
  99. EdbInfoCalculateMappingId: 0,
  100. EdbInfoId: edbInfo.EdbInfoId,
  101. Source: edbInfo.Source,
  102. SourceName: edbInfo.SourceName,
  103. EdbCode: edbInfo.EdbCode,
  104. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  105. FromEdbCode: firstEdbInfo.EdbCode,
  106. FromEdbName: firstEdbInfo.EdbName,
  107. FromSource: firstEdbInfo.Source,
  108. FromSourceName: firstEdbInfo.SourceName,
  109. FromTag: "A",
  110. Sort: 1,
  111. CreateTime: time.Now(),
  112. ModifyTime: time.Now(),
  113. FromSubSource: firstEdbInfo.SubSource,
  114. }
  115. insertId, tmpErr := to.Insert(existItemA)
  116. if tmpErr != nil {
  117. err = tmpErr
  118. return
  119. }
  120. existItemA.EdbInfoCalculateMappingId = int(insertId)
  121. }
  122. //第二个指标
  123. {
  124. existItemB = &EdbInfoCalculateMapping{
  125. EdbInfoCalculateMappingId: 0,
  126. EdbInfoId: edbInfo.EdbInfoId,
  127. Source: edbInfo.Source,
  128. SourceName: edbInfo.SourceName,
  129. EdbCode: edbInfo.EdbCode,
  130. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  131. FromEdbCode: secondEdbInfo.EdbCode,
  132. FromEdbName: secondEdbInfo.EdbName,
  133. FromSource: secondEdbInfo.Source,
  134. FromSourceName: secondEdbInfo.SourceName,
  135. FromTag: "B",
  136. Sort: 1,
  137. CreateTime: time.Now(),
  138. ModifyTime: time.Now(),
  139. FromSubSource: secondEdbInfo.SubSource,
  140. }
  141. insertId, tmpErr := to.Insert(existItemB)
  142. if tmpErr != nil {
  143. err = tmpErr
  144. return
  145. }
  146. existItemB.EdbInfoCalculateMappingId = int(insertId)
  147. }
  148. //拼接数据
  149. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  150. return
  151. }
  152. // EditCalculateZjpj 编辑直接拼接数据
  153. func EditCalculateZjpj(req *EdbInfoCalculateBatchEditReq, edbInfo, firstEdbInfo, secondEdbInfo *EdbInfo) (err error) {
  154. o := orm.NewOrm()
  155. to, err := o.Begin()
  156. if err != nil {
  157. return
  158. }
  159. defer func() {
  160. if err != nil {
  161. fmt.Println("EditCalculateZjpj,Err:" + err.Error())
  162. _ = to.Rollback()
  163. } else {
  164. _ = to.Commit()
  165. }
  166. }()
  167. nowEdbInfo := *edbInfo // 现在的指标信息
  168. //修改指标信息
  169. edbInfo.EdbName = req.EdbName
  170. edbInfo.EdbNameSource = req.EdbName
  171. edbInfo.Frequency = req.Frequency
  172. edbInfo.Unit = req.Unit
  173. edbInfo.ClassifyId = req.ClassifyId
  174. edbInfo.CalculateFormula = req.Formula
  175. edbInfo.ModifyTime = time.Now()
  176. _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime")
  177. if err != nil {
  178. return
  179. }
  180. var existCondition string
  181. var existPars []interface{}
  182. existCondition += " AND edb_info_id=? "
  183. existPars = append(existPars, edbInfo.EdbInfoId)
  184. //查询出所有的关联指标
  185. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  186. if err != nil {
  187. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  188. return
  189. }
  190. var existItemA, existItemB *EdbInfoCalculateMapping
  191. for _, existItem := range existList {
  192. if existItem.FromTag == "A" {
  193. existItemA = existItem
  194. } else if existItem.FromTag == "B" {
  195. existItemB = existItem
  196. }
  197. }
  198. // 是否需要删除数据重新计算
  199. isNeedCalculateData := false
  200. // 如果截止日期变更,那么需要重新计算
  201. if req.Formula != nowEdbInfo.CalculateFormula {
  202. isNeedCalculateData = true
  203. }
  204. //第一个指标数据
  205. {
  206. // 如果指标变了,那么需要删除关系,并重新计算
  207. if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId {
  208. //删除之前的A指标关联关系
  209. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  210. _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemA.FromEdbInfoId).Exec()
  211. if err != nil {
  212. err = fmt.Errorf("删除拼接日期之前的指标关联关系失败,Err:" + err.Error())
  213. return
  214. }
  215. //添加新的指标关系
  216. {
  217. existItemA = &EdbInfoCalculateMapping{
  218. EdbInfoCalculateMappingId: 0,
  219. EdbInfoId: edbInfo.EdbInfoId,
  220. Source: edbInfo.Source,
  221. SourceName: edbInfo.SourceName,
  222. EdbCode: edbInfo.EdbCode,
  223. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  224. FromEdbCode: firstEdbInfo.EdbCode,
  225. FromEdbName: firstEdbInfo.EdbName,
  226. FromSource: firstEdbInfo.Source,
  227. FromSourceName: firstEdbInfo.SourceName,
  228. FromTag: "A",
  229. Sort: 1,
  230. CreateTime: time.Now(),
  231. ModifyTime: time.Now(),
  232. FromSubSource: firstEdbInfo.SubSource,
  233. }
  234. insertId, tmpErr := to.Insert(existItemA)
  235. if tmpErr != nil {
  236. err = tmpErr
  237. return
  238. }
  239. existItemA.EdbInfoCalculateMappingId = int(insertId)
  240. isNeedCalculateData = true
  241. }
  242. }
  243. }
  244. //第二个指标数据
  245. {
  246. // 如果指标变了,那么需要删除关系,并重新计算
  247. if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId {
  248. //删除之前的B指标关联关系
  249. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  250. _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemB.FromEdbInfoId).Exec()
  251. if err != nil {
  252. err = fmt.Errorf("删除拼接日期之后的指标关联关系失败,Err:" + err.Error())
  253. return
  254. }
  255. // 添加新的指标关联关系
  256. existItemB = &EdbInfoCalculateMapping{
  257. EdbInfoCalculateMappingId: 0,
  258. EdbInfoId: edbInfo.EdbInfoId,
  259. Source: edbInfo.Source,
  260. SourceName: edbInfo.SourceName,
  261. EdbCode: edbInfo.EdbCode,
  262. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  263. FromEdbCode: secondEdbInfo.EdbCode,
  264. FromEdbName: secondEdbInfo.EdbName,
  265. FromSource: secondEdbInfo.Source,
  266. FromSourceName: secondEdbInfo.SourceName,
  267. FromTag: "B",
  268. Sort: 2,
  269. CreateTime: time.Now(),
  270. ModifyTime: time.Now(),
  271. FromSubSource: secondEdbInfo.SubSource,
  272. }
  273. insertId, tmpErr := to.Insert(existItemB)
  274. if tmpErr != nil {
  275. err = tmpErr
  276. return
  277. }
  278. existItemB.EdbInfoCalculateMappingId = int(insertId)
  279. isNeedCalculateData = true
  280. }
  281. }
  282. // 如果需要重新计算,那么先删除所有的指标数据,然后再重新计算
  283. if isNeedCalculateData {
  284. // 删除之前所有的指标数据
  285. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  286. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? `, tableName)
  287. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  288. if err != nil {
  289. err = fmt.Errorf("删除历史数据失败,Err:" + err.Error())
  290. return
  291. }
  292. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  293. }
  294. return
  295. }
  296. // GetAllEdbDataCalculateZjpjByEdbInfoId 根据指标id获取全部的数据
  297. func GetAllEdbDataCalculateZjpjByEdbInfoId(edbInfoId int) (items []*EdbDataCalculateZjpj, err error) {
  298. o := orm.NewOrm()
  299. sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  300. _, err = o.Raw(sql, edbInfoId).QueryRows(&items)
  301. return
  302. }
  303. // RefreshAllCalculateZjpj 刷新所有 直接拼接 数据
  304. func RefreshAllCalculateZjpj(edbInfo *EdbInfo) (err error) {
  305. o := orm.NewOrm()
  306. to, err := o.Begin()
  307. defer func() {
  308. if err != nil {
  309. fmt.Println("RefreshAllCalculateZjpj,Err:" + err.Error())
  310. _ = to.Rollback()
  311. } else {
  312. _ = to.Commit()
  313. }
  314. }()
  315. //查询关联指标信息
  316. var existCondition string
  317. var existPars []interface{}
  318. existCondition += " AND edb_info_id=? "
  319. existPars = append(existPars, edbInfo.EdbInfoId)
  320. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  321. if err != nil {
  322. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  323. return
  324. }
  325. var existItemA, existItemB *EdbInfoCalculateMapping
  326. for _, existItem := range existList {
  327. if existItem.FromTag == "A" {
  328. existItemA = existItem
  329. } else if existItem.FromTag == "B" {
  330. existItemB = existItem
  331. }
  332. }
  333. // 刷新数据
  334. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  335. return
  336. }
  337. // refreshAllCalculateZjpj 刷新所有 直接拼接 数据
  338. func refreshAllCalculateZjpj(to orm.TxOrmer, edbInfo *EdbInfo, existItemA, existItemB *EdbInfoCalculateMapping) (err error) {
  339. //查询当前指标现有的数据
  340. var condition string
  341. var pars []interface{}
  342. condition += " AND edb_info_id=? "
  343. pars = append(pars, edbInfo.EdbInfoId)
  344. var dataList []*EdbDataCalculateZjpj
  345. sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  346. _, err = to.Raw(sql, edbInfo.EdbInfoId).QueryRows(&dataList)
  347. if err != nil {
  348. return err
  349. }
  350. var dateArr []string
  351. dataMap := make(map[string]*EdbDataCalculateZjpj)
  352. removeDataTimeMap := make(map[string]int) //需要移除的日期数据
  353. for _, v := range dataList {
  354. dateArr = append(dateArr, v.DataTime)
  355. dataMap[v.DataTime] = v
  356. removeDataTimeMap[v.DataTime] = 1
  357. }
  358. addDataList := make([]*EdbDataCalculateZjpj, 0)
  359. //第一个指标
  360. {
  361. var condition string
  362. var pars []interface{}
  363. condition += " AND data_time < ? AND edb_info_id=? "
  364. pars = append(pars, edbInfo.CalculateFormula, existItemA.FromEdbInfoId)
  365. //第一个指标的数据列表
  366. firstDataList, tmpErr := GetEdbDataListAllByTo(to, condition, pars, existItemA.FromSource, existItemA.FromSubSource, 0)
  367. if tmpErr != nil {
  368. return tmpErr
  369. }
  370. for _, v := range firstDataList {
  371. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  372. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  373. delete(removeDataTimeMap, v.DataTime)
  374. }
  375. //时间戳
  376. if edbData, ok := dataMap[v.DataTime]; ok {
  377. if edbData.Value != v.Value {
  378. //更新指标数据
  379. edbData.Value = v.Value
  380. _, _ = to.Update(edbData, "Value")
  381. }
  382. } else {
  383. //时间戳
  384. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  385. timestamp := currentDate.UnixNano() / 1e6
  386. edbDataZjpj := &EdbDataCalculateZjpj{
  387. EdbInfoId: edbInfo.EdbInfoId,
  388. EdbCode: edbInfo.EdbCode,
  389. DataTime: v.DataTime,
  390. Value: v.Value,
  391. Status: 1,
  392. CreateTime: time.Now(),
  393. ModifyTime: time.Now(),
  394. DataTimestamp: timestamp,
  395. }
  396. addDataList = append(addDataList, edbDataZjpj)
  397. }
  398. }
  399. }
  400. //第二个指标
  401. {
  402. condition = ``
  403. pars = make([]interface{}, 0)
  404. condition += " AND data_time >= ? AND edb_info_id = ? "
  405. pars = append(pars, edbInfo.CalculateFormula, existItemB.FromEdbInfoId)
  406. //第二个指标的数据列表
  407. secondDataList, tmpErr := GetEdbDataListAllByTo(to, condition, pars, existItemB.FromSource, existItemB.FromSubSource, 0)
  408. if tmpErr != nil {
  409. return tmpErr
  410. }
  411. for _, v := range secondDataList {
  412. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  413. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  414. delete(removeDataTimeMap, v.DataTime)
  415. }
  416. if edbData, ok := dataMap[v.DataTime]; ok {
  417. if edbData.Value != v.Value {
  418. //更新指标数据
  419. edbData.Value = v.Value
  420. edbData.ModifyTime = time.Now()
  421. _, tmpErr := to.Update(edbData, "Value", "ModifyTime")
  422. if tmpErr != nil {
  423. fmt.Println("tmpErr:", tmpErr)
  424. }
  425. }
  426. } else {
  427. //时间戳
  428. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  429. timestamp := currentDate.UnixNano() / 1e6
  430. edbDataZjpj := &EdbDataCalculateZjpj{
  431. EdbInfoId: edbInfo.EdbInfoId,
  432. EdbCode: edbInfo.EdbCode,
  433. DataTime: v.DataTime,
  434. Value: v.Value,
  435. Status: 1,
  436. CreateTime: time.Now(),
  437. ModifyTime: time.Now(),
  438. DataTimestamp: timestamp,
  439. }
  440. addDataList = append(addDataList, edbDataZjpj)
  441. }
  442. }
  443. }
  444. //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了)
  445. {
  446. removeDateList := make([]string, 0)
  447. for dateTime := range removeDataTimeMap {
  448. removeDateList = append(removeDateList, dateTime)
  449. }
  450. if len(removeDateList) > 0 {
  451. removeDateStr := strings.Join(removeDateList, `","`)
  452. removeDateStr = `"` + removeDateStr + `"`
  453. //如果拼接指标变更了,那么需要删除所有的指标数据
  454. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  455. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
  456. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  457. if err != nil {
  458. err = fmt.Errorf("删除不存在的直接拼接指标数据失败,Err:" + err.Error())
  459. return
  460. }
  461. }
  462. }
  463. //数据入库
  464. if len(addDataList) > 0 {
  465. tmpAddDataList := make([]*EdbDataCalculateZjpj, 0)
  466. i := 0
  467. for _, v := range addDataList {
  468. tmpAddDataList = append(tmpAddDataList, v)
  469. i++
  470. if i >= 500 {
  471. _, err = to.InsertMulti(len(tmpAddDataList), tmpAddDataList)
  472. if err != nil {
  473. return
  474. }
  475. i = 0
  476. tmpAddDataList = make([]*EdbDataCalculateZjpj, 0)
  477. }
  478. }
  479. if len(tmpAddDataList) > 0 {
  480. _, err = to.InsertMulti(len(tmpAddDataList), tmpAddDataList)
  481. if err != nil {
  482. return
  483. }
  484. }
  485. }
  486. return
  487. }