edb_data_calculate_zjpj.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. package models
  2. import (
  3. "eta/eta_index_lib/utils"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "strings"
  7. "time"
  8. )
  9. // EdbDataCalculateZjpj 直接拼接数据结构体
  10. type EdbDataCalculateZjpj struct {
  11. EdbDataId int `orm:"column(edb_data_id);pk"`
  12. EdbInfoId int
  13. EdbCode string
  14. DataTime string
  15. Value float64
  16. Status int
  17. CreateTime time.Time
  18. ModifyTime time.Time
  19. DataTimestamp int64
  20. }
  21. // AddCalculateZjpj 新增直接拼接数据
  22. func AddCalculateZjpj(req *EdbInfoCalculateBatchSaveReq, firstEdbInfo, secondEdbInfo *EdbInfo, edbCode, uniqueCode string, sysUserId int, sysUserRealName string) (edbInfo *EdbInfo, err error) {
  23. o := orm.NewOrm()
  24. to, err := o.Begin()
  25. if err != nil {
  26. return
  27. }
  28. defer func() {
  29. if err != nil {
  30. fmt.Println("AddCalculateZjpj,Err:" + err.Error())
  31. _ = to.Rollback()
  32. } else {
  33. _ = to.Commit()
  34. }
  35. }()
  36. if req.EdbInfoId <= 0 {
  37. edbInfo = &EdbInfo{
  38. SourceName: "直接拼接",
  39. Source: utils.DATA_SOURCE_CALCULATE_ZJPJ,
  40. EdbCode: edbCode,
  41. EdbName: req.EdbName,
  42. EdbNameSource: req.EdbName,
  43. Frequency: req.Frequency,
  44. Unit: req.Unit,
  45. StartDate: firstEdbInfo.StartDate,
  46. EndDate: firstEdbInfo.EndDate,
  47. ClassifyId: req.ClassifyId,
  48. SysUserId: sysUserId,
  49. SysUserRealName: sysUserRealName,
  50. UniqueCode: uniqueCode,
  51. CreateTime: time.Now(),
  52. ModifyTime: time.Now(),
  53. CalculateFormula: req.Formula,
  54. EdbNameEn: req.EdbName,
  55. UnitEn: req.Unit,
  56. EdbType: 2,
  57. }
  58. newEdbInfoId, tmpErr := to.Insert(edbInfo)
  59. if tmpErr != nil {
  60. err = tmpErr
  61. return
  62. }
  63. edbInfo.EdbInfoId = int(newEdbInfoId)
  64. } else {
  65. edbInfo, err = GetEdbInfoById(req.EdbInfoId)
  66. if err != nil {
  67. return
  68. }
  69. //查询
  70. tmpEdbInfo, tmpErr := GetEdbInfoById(edbInfo.EdbInfoId)
  71. if tmpErr != nil {
  72. err = tmpErr
  73. return
  74. }
  75. tmpEdbInfo.EdbName = req.EdbName
  76. tmpEdbInfo.ClassifyId = req.ClassifyId
  77. tmpEdbInfo.Frequency = req.Frequency
  78. tmpEdbInfo.Unit = req.Unit
  79. tmpEdbInfo.CalculateFormula = req.Formula
  80. edbInfo = tmpEdbInfo
  81. //删除指标数据
  82. dataTableName := GetEdbDataTableName(utils.DATA_SOURCE_CALCULATE_ZJPJ, utils.DATA_SUB_SOURCE_EDB)
  83. fmt.Println("dataTableName:" + dataTableName)
  84. deleteSql := ` DELETE FROM %s WHERE edb_info_id=? `
  85. deleteSql = fmt.Sprintf(deleteSql, dataTableName)
  86. _, err = to.Raw(deleteSql, req.EdbInfoId).Exec()
  87. if err != nil {
  88. return
  89. }
  90. //删除指标关系
  91. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id=? `
  92. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  93. }
  94. //关联关系
  95. var existItemA, existItemB *EdbInfoCalculateMapping
  96. //第一个指标
  97. {
  98. existItemA = &EdbInfoCalculateMapping{
  99. EdbInfoCalculateMappingId: 0,
  100. EdbInfoId: edbInfo.EdbInfoId,
  101. Source: edbInfo.Source,
  102. SourceName: edbInfo.SourceName,
  103. EdbCode: edbInfo.EdbCode,
  104. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  105. FromEdbCode: firstEdbInfo.EdbCode,
  106. FromEdbName: firstEdbInfo.EdbName,
  107. FromSource: firstEdbInfo.Source,
  108. FromSourceName: firstEdbInfo.SourceName,
  109. FromTag: "A",
  110. Sort: 1,
  111. CreateTime: time.Now(),
  112. ModifyTime: time.Now(),
  113. FromSubSource: firstEdbInfo.SubSource,
  114. }
  115. insertId, tmpErr := to.Insert(existItemA)
  116. if tmpErr != nil {
  117. err = tmpErr
  118. return
  119. }
  120. existItemA.EdbInfoCalculateMappingId = int(insertId)
  121. }
  122. //第二个指标
  123. {
  124. existItemB = &EdbInfoCalculateMapping{
  125. EdbInfoCalculateMappingId: 0,
  126. EdbInfoId: edbInfo.EdbInfoId,
  127. Source: edbInfo.Source,
  128. SourceName: edbInfo.SourceName,
  129. EdbCode: edbInfo.EdbCode,
  130. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  131. FromEdbCode: secondEdbInfo.EdbCode,
  132. FromEdbName: secondEdbInfo.EdbName,
  133. FromSource: secondEdbInfo.Source,
  134. FromSourceName: secondEdbInfo.SourceName,
  135. FromTag: "B",
  136. Sort: 1,
  137. CreateTime: time.Now(),
  138. ModifyTime: time.Now(),
  139. FromSubSource: secondEdbInfo.SubSource,
  140. }
  141. insertId, tmpErr := to.Insert(existItemB)
  142. if tmpErr != nil {
  143. err = tmpErr
  144. return
  145. }
  146. existItemB.EdbInfoCalculateMappingId = int(insertId)
  147. }
  148. //拼接数据
  149. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  150. return
  151. }
  152. // EditCalculateZjpj 编辑直接拼接数据
  153. func EditCalculateZjpj(req *EdbInfoCalculateBatchEditReq, edbInfo, firstEdbInfo, secondEdbInfo *EdbInfo) (err error) {
  154. o := orm.NewOrm()
  155. to, err := o.Begin()
  156. if err != nil {
  157. return
  158. }
  159. defer func() {
  160. if err != nil {
  161. fmt.Println("EditCalculateZjpj,Err:" + err.Error())
  162. _ = to.Rollback()
  163. } else {
  164. _ = to.Commit()
  165. }
  166. }()
  167. nowEdbInfo := *edbInfo // 现在的指标信息
  168. //修改指标信息
  169. edbInfo.EdbName = req.EdbName
  170. edbInfo.EdbNameSource = req.EdbName
  171. edbInfo.Frequency = req.Frequency
  172. edbInfo.Unit = req.Unit
  173. edbInfo.ClassifyId = req.ClassifyId
  174. edbInfo.CalculateFormula = req.Formula
  175. edbInfo.EdbNameEn = req.EdbNameEn
  176. edbInfo.UnitEn = req.UnitEn
  177. edbInfo.ModifyTime = time.Now()
  178. _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime", "EdbNameEn", "UnitEn")
  179. if err != nil {
  180. return
  181. }
  182. var existCondition string
  183. var existPars []interface{}
  184. existCondition += " AND edb_info_id=? "
  185. existPars = append(existPars, edbInfo.EdbInfoId)
  186. //查询出所有的关联指标
  187. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  188. if err != nil {
  189. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  190. return
  191. }
  192. var existItemA, existItemB *EdbInfoCalculateMapping
  193. for _, existItem := range existList {
  194. if existItem.FromTag == "A" {
  195. existItemA = existItem
  196. } else if existItem.FromTag == "B" {
  197. existItemB = existItem
  198. }
  199. }
  200. // 是否需要删除数据重新计算
  201. isNeedCalculateData := false
  202. // 如果截止日期变更,那么需要重新计算
  203. if req.Formula != nowEdbInfo.CalculateFormula {
  204. isNeedCalculateData = true
  205. }
  206. //第一个指标数据
  207. {
  208. // 如果指标变了,那么需要删除关系,并重新计算
  209. if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId {
  210. //删除之前的A指标关联关系
  211. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  212. _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemA.FromEdbInfoId).Exec()
  213. if err != nil {
  214. err = fmt.Errorf("删除拼接日期之前的指标关联关系失败,Err:" + err.Error())
  215. return
  216. }
  217. //添加新的指标关系
  218. {
  219. existItemA = &EdbInfoCalculateMapping{
  220. EdbInfoCalculateMappingId: 0,
  221. EdbInfoId: edbInfo.EdbInfoId,
  222. Source: edbInfo.Source,
  223. SourceName: edbInfo.SourceName,
  224. EdbCode: edbInfo.EdbCode,
  225. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  226. FromEdbCode: firstEdbInfo.EdbCode,
  227. FromEdbName: firstEdbInfo.EdbName,
  228. FromSource: firstEdbInfo.Source,
  229. FromSourceName: firstEdbInfo.SourceName,
  230. FromTag: "A",
  231. Sort: 1,
  232. CreateTime: time.Now(),
  233. ModifyTime: time.Now(),
  234. FromSubSource: firstEdbInfo.SubSource,
  235. }
  236. insertId, tmpErr := to.Insert(existItemA)
  237. if tmpErr != nil {
  238. err = tmpErr
  239. return
  240. }
  241. existItemA.EdbInfoCalculateMappingId = int(insertId)
  242. isNeedCalculateData = true
  243. }
  244. }
  245. }
  246. //第二个指标数据
  247. {
  248. // 如果指标变了,那么需要删除关系,并重新计算
  249. if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId {
  250. //删除之前的B指标关联关系
  251. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  252. _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemB.FromEdbInfoId).Exec()
  253. if err != nil {
  254. err = fmt.Errorf("删除拼接日期之后的指标关联关系失败,Err:" + err.Error())
  255. return
  256. }
  257. // 添加新的指标关联关系
  258. existItemB = &EdbInfoCalculateMapping{
  259. EdbInfoCalculateMappingId: 0,
  260. EdbInfoId: edbInfo.EdbInfoId,
  261. Source: edbInfo.Source,
  262. SourceName: edbInfo.SourceName,
  263. EdbCode: edbInfo.EdbCode,
  264. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  265. FromEdbCode: secondEdbInfo.EdbCode,
  266. FromEdbName: secondEdbInfo.EdbName,
  267. FromSource: secondEdbInfo.Source,
  268. FromSourceName: secondEdbInfo.SourceName,
  269. FromTag: "B",
  270. Sort: 2,
  271. CreateTime: time.Now(),
  272. ModifyTime: time.Now(),
  273. FromSubSource: secondEdbInfo.SubSource,
  274. }
  275. insertId, tmpErr := to.Insert(existItemB)
  276. if tmpErr != nil {
  277. err = tmpErr
  278. return
  279. }
  280. existItemB.EdbInfoCalculateMappingId = int(insertId)
  281. isNeedCalculateData = true
  282. }
  283. }
  284. // 如果需要重新计算,那么先删除所有的指标数据,然后再重新计算
  285. if isNeedCalculateData {
  286. // 删除之前所有的指标数据
  287. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  288. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? `, tableName)
  289. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  290. if err != nil {
  291. err = fmt.Errorf("删除历史数据失败,Err:" + err.Error())
  292. return
  293. }
  294. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  295. }
  296. return
  297. }
  298. // GetAllEdbDataCalculateZjpjByEdbInfoId 根据指标id获取全部的数据
  299. func GetAllEdbDataCalculateZjpjByEdbInfoId(edbInfoId int) (items []*EdbDataCalculateZjpj, err error) {
  300. o := orm.NewOrm()
  301. sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  302. _, err = o.Raw(sql, edbInfoId).QueryRows(&items)
  303. return
  304. }
  305. // RefreshAllCalculateZjpj 刷新所有 直接拼接 数据
  306. func RefreshAllCalculateZjpj(edbInfo *EdbInfo) (err error) {
  307. o := orm.NewOrm()
  308. to, err := o.Begin()
  309. defer func() {
  310. if err != nil {
  311. fmt.Println("RefreshAllCalculateZjpj,Err:" + err.Error())
  312. _ = to.Rollback()
  313. } else {
  314. _ = to.Commit()
  315. }
  316. }()
  317. //查询关联指标信息
  318. var existCondition string
  319. var existPars []interface{}
  320. existCondition += " AND edb_info_id=? "
  321. existPars = append(existPars, edbInfo.EdbInfoId)
  322. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  323. if err != nil {
  324. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  325. return
  326. }
  327. var existItemA, existItemB *EdbInfoCalculateMapping
  328. for _, existItem := range existList {
  329. if existItem.FromTag == "A" {
  330. existItemA = existItem
  331. } else if existItem.FromTag == "B" {
  332. existItemB = existItem
  333. }
  334. }
  335. // 刷新数据
  336. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  337. return
  338. }
  339. // refreshAllCalculateZjpj 刷新所有 直接拼接 数据
  340. func refreshAllCalculateZjpj(to orm.TxOrmer, edbInfo *EdbInfo, existItemA, existItemB *EdbInfoCalculateMapping) (err error) {
  341. //查询当前指标现有的数据
  342. var condition string
  343. var pars []interface{}
  344. condition += " AND edb_info_id=? "
  345. pars = append(pars, edbInfo.EdbInfoId)
  346. var dataList []*EdbDataCalculateZjpj
  347. sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  348. _, err = to.Raw(sql, edbInfo.EdbInfoId).QueryRows(&dataList)
  349. if err != nil {
  350. return err
  351. }
  352. var dateArr []string
  353. dataMap := make(map[string]*EdbDataCalculateZjpj)
  354. removeDataTimeMap := make(map[string]int) //需要移除的日期数据
  355. for _, v := range dataList {
  356. dateArr = append(dateArr, v.DataTime)
  357. dataMap[v.DataTime] = v
  358. removeDataTimeMap[v.DataTime] = 1
  359. }
  360. addDataList := make([]*EdbDataCalculateZjpj, 0)
  361. //第一个指标
  362. {
  363. //第一个指标的数据列表
  364. firstDataList, tmpErr := GetEdbDataListAllByTo(to, existItemA.FromSource, existItemA.FromSubSource, FindEdbDataListAllCond{
  365. EdbInfoId: existItemA.FromEdbInfoId,
  366. EndDataTime: edbInfo.CalculateFormula,
  367. EndDataTimeCond: "<",
  368. }, 0)
  369. if tmpErr != nil {
  370. return tmpErr
  371. }
  372. for _, v := range firstDataList {
  373. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  374. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  375. delete(removeDataTimeMap, v.DataTime)
  376. }
  377. //时间戳
  378. if edbData, ok := dataMap[v.DataTime]; ok {
  379. if edbData.Value != v.Value {
  380. //更新指标数据
  381. edbData.Value = v.Value
  382. _, _ = to.Update(edbData, "Value")
  383. }
  384. } else {
  385. //时间戳
  386. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  387. timestamp := currentDate.UnixNano() / 1e6
  388. edbDataZjpj := &EdbDataCalculateZjpj{
  389. EdbInfoId: edbInfo.EdbInfoId,
  390. EdbCode: edbInfo.EdbCode,
  391. DataTime: v.DataTime,
  392. Value: v.Value,
  393. Status: 1,
  394. CreateTime: time.Now(),
  395. ModifyTime: time.Now(),
  396. DataTimestamp: timestamp,
  397. }
  398. addDataList = append(addDataList, edbDataZjpj)
  399. }
  400. }
  401. }
  402. //第二个指标
  403. {
  404. //第二个指标的数据列表
  405. secondDataList, tmpErr := GetEdbDataListAllByTo(to, existItemB.FromSource, existItemB.FromSubSource, FindEdbDataListAllCond{
  406. EdbInfoId: existItemB.FromEdbInfoId,
  407. StartDataTime: edbInfo.CalculateFormula,
  408. StartDataTimeCond: ">=",
  409. }, 0)
  410. if tmpErr != nil {
  411. return tmpErr
  412. }
  413. for _, v := range secondDataList {
  414. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  415. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  416. delete(removeDataTimeMap, v.DataTime)
  417. }
  418. if edbData, ok := dataMap[v.DataTime]; ok {
  419. if edbData.Value != v.Value {
  420. //更新指标数据
  421. edbData.Value = v.Value
  422. edbData.ModifyTime = time.Now()
  423. _, tmpErr := to.Update(edbData, "Value", "ModifyTime")
  424. if tmpErr != nil {
  425. fmt.Println("tmpErr:", tmpErr)
  426. }
  427. }
  428. } else {
  429. //时间戳
  430. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  431. timestamp := currentDate.UnixNano() / 1e6
  432. edbDataZjpj := &EdbDataCalculateZjpj{
  433. EdbInfoId: edbInfo.EdbInfoId,
  434. EdbCode: edbInfo.EdbCode,
  435. DataTime: v.DataTime,
  436. Value: v.Value,
  437. Status: 1,
  438. CreateTime: time.Now(),
  439. ModifyTime: time.Now(),
  440. DataTimestamp: timestamp,
  441. }
  442. addDataList = append(addDataList, edbDataZjpj)
  443. }
  444. }
  445. }
  446. //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了)
  447. {
  448. removeDateList := make([]string, 0)
  449. for dateTime := range removeDataTimeMap {
  450. removeDateList = append(removeDateList, dateTime)
  451. }
  452. if len(removeDateList) > 0 {
  453. removeDateStr := strings.Join(removeDateList, `","`)
  454. removeDateStr = `"` + removeDateStr + `"`
  455. //如果拼接指标变更了,那么需要删除所有的指标数据
  456. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  457. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
  458. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  459. if err != nil {
  460. err = fmt.Errorf("删除不存在的直接拼接指标数据失败,Err:" + err.Error())
  461. return
  462. }
  463. }
  464. }
  465. //数据入库
  466. if len(addDataList) > 0 {
  467. tmpAddDataList := make([]*EdbDataCalculateZjpj, 0)
  468. i := 0
  469. for _, v := range addDataList {
  470. tmpAddDataList = append(tmpAddDataList, v)
  471. i++
  472. if i >= 500 {
  473. _, err = to.InsertMulti(len(tmpAddDataList), tmpAddDataList)
  474. if err != nil {
  475. return
  476. }
  477. i = 0
  478. tmpAddDataList = make([]*EdbDataCalculateZjpj, 0)
  479. }
  480. }
  481. if len(tmpAddDataList) > 0 {
  482. _, err = to.InsertMulti(len(tmpAddDataList), tmpAddDataList)
  483. if err != nil {
  484. return
  485. }
  486. }
  487. }
  488. return
  489. }