edb_data_calculate_zjpj.go 16 KB


  1. package models
  2. import (
  3. "eta/eta_index_lib/utils"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "strings"
  7. "time"
  8. )
  9. // EdbDataCalculateZjpj 直接拼接数据结构体
  10. type EdbDataCalculateZjpj struct {
  11. EdbDataId int `orm:"column(edb_data_id);pk"`
  12. EdbInfoId int
  13. EdbCode string
  14. DataTime string
  15. Value float64
  16. Status int
  17. CreateTime time.Time
  18. ModifyTime time.Time
  19. DataTimestamp int64
  20. }
  21. // AddCalculateZjpj 新增直接拼接数据
  22. func AddCalculateZjpj(req *EdbInfoCalculateBatchSaveReq, firstEdbInfo, secondEdbInfo *EdbInfo, edbCode, uniqueCode string, sysUserId int, sysUserRealName string) (edbInfo *EdbInfo, err error) {
  23. o := orm.NewOrm()
  24. to, err := o.Begin()
  25. if err != nil {
  26. return
  27. }
  28. defer func() {
  29. if err != nil {
  30. fmt.Println("AddCalculateZjpj,Err:" + err.Error())
  31. _ = to.Rollback()
  32. } else {
  33. _ = to.Commit()
  34. }
  35. }()
  36. if req.EdbInfoId <= 0 {
  37. edbInfo = &EdbInfo{
  38. SourceName: "直接拼接",
  39. Source: utils.DATA_SOURCE_CALCULATE_ZJPJ,
  40. EdbCode: edbCode,
  41. EdbName: req.EdbName,
  42. EdbNameSource: req.EdbName,
  43. Frequency: req.Frequency,
  44. Unit: req.Unit,
  45. StartDate: firstEdbInfo.StartDate,
  46. EndDate: firstEdbInfo.EndDate,
  47. ClassifyId: req.ClassifyId,
  48. SysUserId: sysUserId,
  49. SysUserRealName: sysUserRealName,
  50. UniqueCode: uniqueCode,
  51. CreateTime: time.Now(),
  52. ModifyTime: time.Now(),
  53. CalculateFormula: req.Formula,
  54. EdbType: 2,
  55. }
  56. newEdbInfoId, tmpErr := to.Insert(edbInfo)
  57. if tmpErr != nil {
  58. err = tmpErr
  59. return
  60. }
  61. edbInfo.EdbInfoId = int(newEdbInfoId)
  62. } else {
  63. edbInfo, err = GetEdbInfoById(req.EdbInfoId)
  64. if err != nil {
  65. return
  66. }
  67. //查询
  68. tmpEdbInfo, tmpErr := GetEdbInfoById(edbInfo.EdbInfoId)
  69. if tmpErr != nil {
  70. err = tmpErr
  71. return
  72. }
  73. tmpEdbInfo.EdbName = req.EdbName
  74. tmpEdbInfo.ClassifyId = req.ClassifyId
  75. tmpEdbInfo.Frequency = req.Frequency
  76. tmpEdbInfo.Unit = req.Unit
  77. tmpEdbInfo.CalculateFormula = req.Formula
  78. edbInfo = tmpEdbInfo
  79. //删除指标数据
  80. dataTableName := GetEdbDataTableName(utils.DATA_SOURCE_CALCULATE_ZJPJ, utils.DATA_SUB_SOURCE_EDB)
  81. fmt.Println("dataTableName:" + dataTableName)
  82. deleteSql := ` DELETE FROM %s WHERE edb_info_id=? `
  83. deleteSql = fmt.Sprintf(deleteSql, dataTableName)
  84. _, err = to.Raw(deleteSql, req.EdbInfoId).Exec()
  85. if err != nil {
  86. return
  87. }
  88. //删除指标关系
  89. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id=? `
  90. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  91. }
  92. //关联关系
  93. var existItemA, existItemB *EdbInfoCalculateMapping
  94. //第一个指标
  95. {
  96. existItemA = &EdbInfoCalculateMapping{
  97. EdbInfoCalculateMappingId: 0,
  98. EdbInfoId: edbInfo.EdbInfoId,
  99. Source: edbInfo.Source,
  100. SourceName: edbInfo.SourceName,
  101. EdbCode: edbInfo.EdbCode,
  102. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  103. FromEdbCode: firstEdbInfo.EdbCode,
  104. FromEdbName: firstEdbInfo.EdbName,
  105. FromSource: firstEdbInfo.Source,
  106. FromSourceName: firstEdbInfo.SourceName,
  107. FromTag: "A",
  108. Sort: 1,
  109. CreateTime: time.Now(),
  110. ModifyTime: time.Now(),
  111. FromSubSource: firstEdbInfo.SubSource,
  112. }
  113. insertId, tmpErr := to.Insert(existItemA)
  114. if tmpErr != nil {
  115. err = tmpErr
  116. return
  117. }
  118. existItemA.EdbInfoCalculateMappingId = int(insertId)
  119. }
  120. //第二个指标
  121. {
  122. existItemB = &EdbInfoCalculateMapping{
  123. EdbInfoCalculateMappingId: 0,
  124. EdbInfoId: edbInfo.EdbInfoId,
  125. Source: edbInfo.Source,
  126. SourceName: edbInfo.SourceName,
  127. EdbCode: edbInfo.EdbCode,
  128. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  129. FromEdbCode: secondEdbInfo.EdbCode,
  130. FromEdbName: secondEdbInfo.EdbName,
  131. FromSource: secondEdbInfo.Source,
  132. FromSourceName: secondEdbInfo.SourceName,
  133. FromTag: "B",
  134. Sort: 1,
  135. CreateTime: time.Now(),
  136. ModifyTime: time.Now(),
  137. FromSubSource: secondEdbInfo.SubSource,
  138. }
  139. insertId, tmpErr := to.Insert(existItemB)
  140. if tmpErr != nil {
  141. err = tmpErr
  142. return
  143. }
  144. existItemB.EdbInfoCalculateMappingId = int(insertId)
  145. }
  146. //拼接数据
  147. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  148. return
  149. }
  150. // EditCalculateZjpj 编辑直接拼接数据
  151. func EditCalculateZjpj(req *EdbInfoCalculateBatchEditReq, edbInfo, firstEdbInfo, secondEdbInfo *EdbInfo) (err error) {
  152. o := orm.NewOrm()
  153. to, err := o.Begin()
  154. if err != nil {
  155. return
  156. }
  157. defer func() {
  158. if err != nil {
  159. fmt.Println("EditCalculateZjpj,Err:" + err.Error())
  160. _ = to.Rollback()
  161. } else {
  162. _ = to.Commit()
  163. }
  164. }()
  165. nowEdbInfo := *edbInfo // 现在的指标信息
  166. //修改指标信息
  167. edbInfo.EdbName = req.EdbName
  168. edbInfo.EdbNameSource = req.EdbName
  169. edbInfo.Frequency = req.Frequency
  170. edbInfo.Unit = req.Unit
  171. edbInfo.ClassifyId = req.ClassifyId
  172. edbInfo.CalculateFormula = req.Formula
  173. edbInfo.ModifyTime = time.Now()
  174. _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime")
  175. if err != nil {
  176. return
  177. }
  178. var existCondition string
  179. var existPars []interface{}
  180. existCondition += " AND edb_info_id=? "
  181. existPars = append(existPars, edbInfo.EdbInfoId)
  182. //查询出所有的关联指标
  183. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  184. if err != nil {
  185. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  186. return
  187. }
  188. var existItemA, existItemB *EdbInfoCalculateMapping
  189. for _, existItem := range existList {
  190. if existItem.FromTag == "A" {
  191. existItemA = existItem
  192. } else if existItem.FromTag == "B" {
  193. existItemB = existItem
  194. }
  195. }
  196. // 是否需要删除数据重新计算
  197. isNeedCalculateData := false
  198. // 如果截止日期变更,那么需要重新计算
  199. if req.Formula != nowEdbInfo.CalculateFormula {
  200. isNeedCalculateData = true
  201. }
  202. //第一个指标数据
  203. {
  204. // 如果指标变了,那么需要删除关系,并重新计算
  205. if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId {
  206. //删除之前的A指标关联关系
  207. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  208. _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemA.FromEdbInfoId).Exec()
  209. if err != nil {
  210. err = fmt.Errorf("删除拼接日期之前的指标关联关系失败,Err:" + err.Error())
  211. return
  212. }
  213. //添加新的指标关系
  214. {
  215. existItemA = &EdbInfoCalculateMapping{
  216. EdbInfoCalculateMappingId: 0,
  217. EdbInfoId: edbInfo.EdbInfoId,
  218. Source: edbInfo.Source,
  219. SourceName: edbInfo.SourceName,
  220. EdbCode: edbInfo.EdbCode,
  221. FromEdbInfoId: firstEdbInfo.EdbInfoId,
  222. FromEdbCode: firstEdbInfo.EdbCode,
  223. FromEdbName: firstEdbInfo.EdbName,
  224. FromSource: firstEdbInfo.Source,
  225. FromSourceName: firstEdbInfo.SourceName,
  226. FromTag: "A",
  227. Sort: 1,
  228. CreateTime: time.Now(),
  229. ModifyTime: time.Now(),
  230. FromSubSource: firstEdbInfo.SubSource,
  231. }
  232. insertId, tmpErr := to.Insert(existItemA)
  233. if tmpErr != nil {
  234. err = tmpErr
  235. return
  236. }
  237. existItemA.EdbInfoCalculateMappingId = int(insertId)
  238. isNeedCalculateData = true
  239. }
  240. }
  241. }
  242. //第二个指标数据
  243. {
  244. // 如果指标变了,那么需要删除关系,并重新计算
  245. if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId {
  246. //删除之前的B指标关联关系
  247. sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?`
  248. _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemB.FromEdbInfoId).Exec()
  249. if err != nil {
  250. err = fmt.Errorf("删除拼接日期之后的指标关联关系失败,Err:" + err.Error())
  251. return
  252. }
  253. // 添加新的指标关联关系
  254. existItemB = &EdbInfoCalculateMapping{
  255. EdbInfoCalculateMappingId: 0,
  256. EdbInfoId: edbInfo.EdbInfoId,
  257. Source: edbInfo.Source,
  258. SourceName: edbInfo.SourceName,
  259. EdbCode: edbInfo.EdbCode,
  260. FromEdbInfoId: secondEdbInfo.EdbInfoId,
  261. FromEdbCode: secondEdbInfo.EdbCode,
  262. FromEdbName: secondEdbInfo.EdbName,
  263. FromSource: secondEdbInfo.Source,
  264. FromSourceName: secondEdbInfo.SourceName,
  265. FromTag: "B",
  266. Sort: 2,
  267. CreateTime: time.Now(),
  268. ModifyTime: time.Now(),
  269. FromSubSource: secondEdbInfo.SubSource,
  270. }
  271. insertId, tmpErr := to.Insert(existItemB)
  272. if tmpErr != nil {
  273. err = tmpErr
  274. return
  275. }
  276. existItemB.EdbInfoCalculateMappingId = int(insertId)
  277. isNeedCalculateData = true
  278. }
  279. }
  280. // 如果需要重新计算,那么先删除所有的指标数据,然后再重新计算
  281. if isNeedCalculateData {
  282. // 删除之前所有的指标数据
  283. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  284. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? `, tableName)
  285. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  286. if err != nil {
  287. err = fmt.Errorf("删除历史数据失败,Err:" + err.Error())
  288. return
  289. }
  290. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  291. }
  292. return
  293. }
  294. // GetAllEdbDataCalculateZjpjByEdbInfoId 根据指标id获取全部的数据
  295. func GetAllEdbDataCalculateZjpjByEdbInfoId(edbInfoId int) (items []*EdbDataCalculateZjpj, err error) {
  296. o := orm.NewOrm()
  297. sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  298. _, err = o.Raw(sql, edbInfoId).QueryRows(&items)
  299. return
  300. }
  301. // RefreshAllCalculateZjpj 刷新所有 直接拼接 数据
  302. func RefreshAllCalculateZjpj(edbInfo *EdbInfo) (err error) {
  303. o := orm.NewOrm()
  304. to, err := o.Begin()
  305. defer func() {
  306. if err != nil {
  307. fmt.Println("RefreshAllCalculateZjpj,Err:" + err.Error())
  308. _ = to.Rollback()
  309. } else {
  310. _ = to.Commit()
  311. }
  312. }()
  313. //查询关联指标信息
  314. var existCondition string
  315. var existPars []interface{}
  316. existCondition += " AND edb_info_id=? "
  317. existPars = append(existPars, edbInfo.EdbInfoId)
  318. existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars)
  319. if err != nil {
  320. err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error())
  321. return
  322. }
  323. var existItemA, existItemB *EdbInfoCalculateMapping
  324. for _, existItem := range existList {
  325. if existItem.FromTag == "A" {
  326. existItemA = existItem
  327. } else if existItem.FromTag == "B" {
  328. existItemB = existItem
  329. }
  330. }
  331. // 刷新数据
  332. err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB)
  333. return
  334. }
  335. // refreshAllCalculateZjpj 刷新所有 直接拼接 数据
  336. func refreshAllCalculateZjpj(to orm.TxOrmer, edbInfo *EdbInfo, existItemA, existItemB *EdbInfoCalculateMapping) (err error) {
  337. //查询当前指标现有的数据
  338. var condition string
  339. var pars []interface{}
  340. condition += " AND edb_info_id=? "
  341. pars = append(pars, edbInfo.EdbInfoId)
  342. var dataList []*EdbDataCalculateZjpj
  343. sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC `
  344. _, err = to.Raw(sql, edbInfo.EdbInfoId).QueryRows(&dataList)
  345. if err != nil {
  346. return err
  347. }
  348. var dateArr []string
  349. dataMap := make(map[string]*EdbDataCalculateZjpj)
  350. removeDataTimeMap := make(map[string]int) //需要移除的日期数据
  351. for _, v := range dataList {
  352. dateArr = append(dateArr, v.DataTime)
  353. dataMap[v.DataTime] = v
  354. removeDataTimeMap[v.DataTime] = 1
  355. }
  356. addDataList := make([]*EdbDataCalculateZjpj, 0)
  357. //第一个指标
  358. {
  359. var condition string
  360. var pars []interface{}
  361. condition += " AND data_time < ? AND edb_info_id=? "
  362. pars = append(pars, edbInfo.CalculateFormula, existItemA.FromEdbInfoId)
  363. //第一个指标的数据列表
  364. firstDataList, tmpErr := GetEdbDataListAllByTo(to, condition, pars, existItemA.FromSource, existItemA.FromSubSource, 0)
  365. if tmpErr != nil {
  366. return tmpErr
  367. }
  368. for _, v := range firstDataList {
  369. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  370. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  371. delete(removeDataTimeMap, v.DataTime)
  372. }
  373. //时间戳
  374. if edbData, ok := dataMap[v.DataTime]; ok {
  375. if edbData.Value != v.Value {
  376. //更新指标数据
  377. edbData.Value = v.Value
  378. _, _ = to.Update(edbData, "Value")
  379. }
  380. } else {
  381. //时间戳
  382. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  383. timestamp := currentDate.UnixNano() / 1e6
  384. edbDataZjpj := &EdbDataCalculateZjpj{
  385. EdbInfoId: edbInfo.EdbInfoId,
  386. EdbCode: edbInfo.EdbCode,
  387. DataTime: v.DataTime,
  388. Value: v.Value,
  389. Status: 1,
  390. CreateTime: time.Now(),
  391. ModifyTime: time.Now(),
  392. DataTimestamp: timestamp,
  393. }
  394. addDataList = append(addDataList, edbDataZjpj)
  395. }
  396. }
  397. }
  398. //第二个指标
  399. {
  400. condition = ``
  401. pars = make([]interface{}, 0)
  402. condition += " AND data_time >= ? AND edb_info_id = ? "
  403. pars = append(pars, edbInfo.CalculateFormula, existItemB.FromEdbInfoId)
  404. //第二个指标的数据列表
  405. secondDataList, tmpErr := GetEdbDataListAllByTo(to, condition, pars, existItemB.FromSource, existItemB.FromSubSource, 0)
  406. if tmpErr != nil {
  407. return tmpErr
  408. }
  409. for _, v := range secondDataList {
  410. //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素
  411. if _, ok := removeDataTimeMap[v.DataTime]; ok {
  412. delete(removeDataTimeMap, v.DataTime)
  413. }
  414. if edbData, ok := dataMap[v.DataTime]; ok {
  415. if edbData.Value != v.Value {
  416. //更新指标数据
  417. edbData.Value = v.Value
  418. edbData.ModifyTime = time.Now()
  419. _, tmpErr := to.Update(edbData, "Value", "ModifyTime")
  420. if tmpErr != nil {
  421. fmt.Println("tmpErr:", tmpErr)
  422. }
  423. }
  424. } else {
  425. //时间戳
  426. currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
  427. timestamp := currentDate.UnixNano() / 1e6
  428. edbDataZjpj := &EdbDataCalculateZjpj{
  429. EdbInfoId: edbInfo.EdbInfoId,
  430. EdbCode: edbInfo.EdbCode,
  431. DataTime: v.DataTime,
  432. Value: v.Value,
  433. Status: 1,
  434. CreateTime: time.Now(),
  435. ModifyTime: time.Now(),
  436. DataTimestamp: timestamp,
  437. }
  438. addDataList = append(addDataList, edbDataZjpj)
  439. }
  440. }
  441. }
  442. //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了)
  443. {
  444. removeDateList := make([]string, 0)
  445. for dateTime := range removeDataTimeMap {
  446. removeDateList = append(removeDateList, dateTime)
  447. }
  448. if len(removeDateList) > 0 {
  449. removeDateStr := strings.Join(removeDateList, `","`)
  450. removeDateStr = `"` + removeDateStr + `"`
  451. //如果拼接指标变更了,那么需要删除所有的指标数据
  452. tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
  453. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
  454. _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec()
  455. if err != nil {
  456. err = fmt.Errorf("删除不存在的直接拼接指标数据失败,Err:" + err.Error())
  457. return
  458. }
  459. }
  460. }
  461. //数据入库
  462. if len(addDataList) > 0 {
  463. _, tmpErr := to.InsertMulti(len(addDataList), addDataList)
  464. if tmpErr != nil {
  465. err = tmpErr
  466. return
  467. }
  468. }
  469. return
  470. }