base_from_business.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712
  1. package services
  2. import (
  3. "errors"
  4. "eta_gn/eta_index_lib/logic"
  5. "eta_gn/eta_index_lib/models"
  6. "eta_gn/eta_index_lib/models/mgo"
  7. "eta_gn/eta_index_lib/services/alarm_msg"
  8. "eta_gn/eta_index_lib/utils"
  9. "fmt"
  10. "github.com/qiniu/qmgo"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "strings"
  13. "time"
  14. )
  15. // HandleBusinessIndex
  16. // @Description: 处理外部指标
  17. // @author: Roc
  18. // @datetime 2024-04-26 14:23:42
  19. // @param indexReq *models.AddBusinessIndexReq
  20. // @return err error
  21. func HandleBusinessIndex(indexReq *models.AddBusinessIndexReq) (resp models.BaseFromBusinessIndexResp, err error) {
  22. defer func() {
  23. if err != nil {
  24. // 添加刷新失败日志
  25. dataUpdateResult := 2
  26. dataUpdateFailedReason := "服务异常"
  27. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexReq.IndexCode)
  28. if e == nil {
  29. //查询指标存在,才添加刷新日志
  30. _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
  31. }
  32. }
  33. }()
  34. // 没有数据就返回
  35. if indexReq.DataList == nil || len(indexReq.DataList) <= 0 {
  36. return
  37. }
  38. // 兼容频度缺少度的字段
  39. if !strings.Contains(indexReq.Frequency, "度") {
  40. indexReq.Frequency = indexReq.Frequency + "度"
  41. }
  42. if !utils.VerifyFrequency(indexReq.Frequency) {
  43. err = errors.New("指标频度不合法:" + indexReq.Frequency)
  44. return
  45. }
  46. // 判断来源,如果来源不存在的话,则创建
  47. sourceObj := new(models.EdbBusinessSource)
  48. sourceItem, err := sourceObj.GetEdbBusinessSourceItem(indexReq.SourceName)
  49. if err != nil {
  50. if err.Error() == utils.ErrNoRow() {
  51. sourceItem = &models.EdbBusinessSource{
  52. EdbBusinessSourceId: 0,
  53. SourceName: indexReq.SourceName,
  54. CreateTime: time.Now(),
  55. }
  56. err = sourceItem.Add()
  57. } else {
  58. return
  59. }
  60. }
  61. // 指标
  62. indexObj := new(models.BaseFromBusinessIndex)
  63. if indexReq.IndexCode == `` {
  64. // 如果指标编码为空,那么自动生成
  65. currId, tmpErr := indexObj.GetMaxId()
  66. if tmpErr != nil {
  67. err = tmpErr
  68. return
  69. }
  70. indexReq.IndexCode = fmt.Sprintf("SELF%07d", currId+1)
  71. }
  72. //判断指标是否存在
  73. item, err := indexObj.GetIndexItem(indexReq.IndexCode)
  74. if err != nil {
  75. if err.Error() != utils.ErrNoRow() {
  76. return
  77. }
  78. // 添加指标
  79. item = &models.BaseFromBusinessIndex{
  80. BaseFromBusinessIndexId: 0,
  81. IndexCode: indexReq.IndexCode,
  82. IndexName: indexReq.IndexName,
  83. Unit: indexReq.Unit,
  84. Frequency: indexReq.Frequency,
  85. Source: int(sourceItem.EdbBusinessSourceId),
  86. SourceName: sourceItem.SourceName,
  87. //StartDate: time.Time{},
  88. //EndDate: time.Time{},
  89. Remark: indexReq.Remark,
  90. BaseModifyTime: time.Now(),
  91. DataUpdateTime: time.Now(),
  92. CreateTime: time.Now(),
  93. ModifyTime: time.Now(),
  94. }
  95. err = item.Add()
  96. if err != nil {
  97. fmt.Println("add err:" + err.Error())
  98. return
  99. }
  100. } else {
  101. updateCols := make([]string, 0)
  102. if item.IndexName != indexReq.IndexName {
  103. item.IndexName = indexReq.IndexName
  104. updateCols = append(updateCols, "IndexName")
  105. }
  106. if item.Unit != indexReq.Unit {
  107. item.Unit = indexReq.Unit
  108. updateCols = append(updateCols, "Unit")
  109. }
  110. if item.Frequency != indexReq.Frequency {
  111. item.Frequency = indexReq.Frequency
  112. updateCols = append(updateCols, "Frequency")
  113. }
  114. if item.Source != int(sourceItem.EdbBusinessSourceId) {
  115. item.Source = int(sourceItem.EdbBusinessSourceId)
  116. item.SourceName = sourceItem.SourceName
  117. updateCols = append(updateCols, "Source", "SourceName")
  118. }
  119. if len(updateCols) > 0 {
  120. item.BaseModifyTime = time.Now()
  121. item.ModifyTime = time.Now()
  122. updateCols = append(updateCols, "BaseModifyTime", "ModifyTime")
  123. err = item.Update(updateCols)
  124. if err != nil {
  125. fmt.Println("update index err:" + err.Error())
  126. return
  127. }
  128. }
  129. }
  130. // 数据处理
  131. // 当前传入的最小日期
  132. var reqMinDate time.Time
  133. if utils.UseMongo {
  134. reqMinDate, err = handleBusinessDataByMongo(item, indexReq.DataList)
  135. } else {
  136. reqMinDate, err = handleBusinessDataByMysql(item, indexReq.DataList)
  137. }
  138. if err != nil {
  139. return
  140. }
  141. // 同步刷新指标库的指标
  142. go refreshEdbBusiness(item.IndexCode, reqMinDate)
  143. resp = models.BaseFromBusinessIndexResp{
  144. IndexCode: item.IndexCode,
  145. IndexName: item.IndexName,
  146. Unit: item.Unit,
  147. Frequency: item.Frequency,
  148. SourceName: item.SourceName,
  149. }
  150. return
  151. }
  152. // handleBusinessDataByMongo
  153. // @Description: 处理外部指标数据(mongo)
  154. // @author: Roc
  155. // @datetime 2024-07-01 15:30:41
  156. // @param item *models.BaseFromBusinessIndex
  157. // @param reqDataList []*models.AddBusinessDataReq
  158. // @return reqMinDate time.Time 当前传入的最小日期
  159. // @return err error
  160. func handleBusinessDataByMongo(item *models.BaseFromBusinessIndex, reqDataList []models.AddBusinessDataReq) (reqMinDate time.Time, err error) {
  161. mogDataObj := new(mgo.BaseFromBusinessData)
  162. //获取已存在的所有数据
  163. exitDataList, err := mogDataObj.GetAllDataList(bson.M{"index_code": item.IndexCode}, []string{"data_time"})
  164. if err != nil {
  165. fmt.Println("GetIndexDataList Err:" + err.Error())
  166. return
  167. }
  168. // 已经存在的数据集
  169. exitDataMap := make(map[string]*mgo.BaseFromBusinessData)
  170. for _, v := range exitDataList {
  171. exitDataMap[v.DataTime.Format(utils.FormatDate)] = v
  172. }
  173. // 待添加的数据集
  174. addDataList := make([]interface{}, 0)
  175. updateDataList := make([]mgo.BaseFromBusinessData, 0)
  176. //var hasUpdate bool
  177. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  178. for _, data := range reqDataList {
  179. dateTime, tmpErr := utils.DealExcelDate(data.Date)
  180. if tmpErr != nil {
  181. fmt.Println("time.ParseInLocation Err:" + tmpErr.Error())
  182. err = tmpErr
  183. return
  184. }
  185. // 调整最小日期
  186. if reqMinDate.IsZero() || reqMinDate.After(dateTime) {
  187. reqMinDate = dateTime
  188. }
  189. date := dateTime.Format(utils.FormatDate)
  190. findData, ok := exitDataMap[date]
  191. if !ok {
  192. addDataList = append(addDataList, mgo.BaseFromBusinessData{
  193. BaseFromBusinessIndexId: item.BaseFromBusinessIndexId,
  194. IndexCode: item.IndexCode,
  195. DataTime: dateTime,
  196. Value: data.Value,
  197. CreateTime: time.Now(),
  198. ModifyTime: time.Now(),
  199. //DataTimestamp: 0,
  200. })
  201. continue
  202. }
  203. // 值不匹配,修改数据
  204. if findData.Value != data.Value {
  205. findData.Value = data.Value
  206. updateDataList = append(updateDataList, *findData)
  207. }
  208. }
  209. // 指标数据是否新增或修改
  210. var isIndexUpdateOrAdd bool
  211. // 入库
  212. {
  213. coll := mogDataObj.GetCollection()
  214. if len(addDataList) > 0 {
  215. isIndexUpdateOrAdd = true
  216. err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
  217. if err != nil {
  218. fmt.Println("mogDataObj.HandleData() Err:" + err.Error())
  219. return
  220. }
  221. }
  222. if len(updateDataList) > 0 {
  223. isIndexUpdateOrAdd = true
  224. for _, v := range updateDataList {
  225. err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  226. if err != nil {
  227. fmt.Println("UpdateDataByColl:Err:" + err.Error())
  228. return
  229. }
  230. }
  231. }
  232. }
  233. // 支持事务的话,下面操作
  234. //result, err := mogDataObj.HandleData(addDataList, updateDataList)
  235. //if err != nil {
  236. // fmt.Println("mogDataObj.HandleData() Err:" + err.Error())
  237. // return
  238. //}
  239. //fmt.Println("result", result)
  240. //修改最大最小日期
  241. indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(item.IndexCode)
  242. if err != nil {
  243. return
  244. }
  245. if err == nil && indexMaxAndMinInfo != nil {
  246. e := item.ModifyIndexMaxAndMinInfo(item.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd)
  247. if e != nil {
  248. fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error())
  249. }
  250. }
  251. return
  252. }
  253. // handleBusinessDataByMysql
  254. // @Description: 处理外部指标数据(mysql)
  255. // @author: Roc
  256. // @datetime 2024-07-01 15:59:43
  257. // @param item *models.BaseFromBusinessIndex
  258. // @param reqDataList []models.AddBusinessDataReq
  259. // @return reqMinDate time.Time
  260. // @return err error
  261. func handleBusinessDataByMysql(item *models.BaseFromBusinessIndex, reqDataList []models.AddBusinessDataReq) (reqMinDate time.Time, err error) {
  262. businessDataObj := new(models.BaseFromBusinessData)
  263. var condition []string
  264. var pars []interface{}
  265. condition = append(condition, "index_code = ?")
  266. pars = append(pars, item.IndexCode)
  267. //获取已存在的所有数据
  268. exitDataList, err := businessDataObj.GetAllDataList(condition, pars, "data_time ASC")
  269. if err != nil {
  270. fmt.Println("GetIndexDataList Err:" + err.Error())
  271. return
  272. }
  273. // 已经存在的数据集
  274. exitDataMap := make(map[string]*models.BaseFromBusinessData)
  275. for _, v := range exitDataList {
  276. exitDataMap[v.DataTime.Format(utils.FormatDate)] = v
  277. }
  278. // 待添加的数据集
  279. addDataList := make([]*models.BaseFromBusinessData, 0)
  280. updateDataList := make([]*models.BaseFromBusinessData, 0)
  281. //var hasUpdate bool
  282. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  283. for _, data := range reqDataList {
  284. dateTime, tmpErr := utils.DealExcelDate(data.Date)
  285. if tmpErr != nil {
  286. fmt.Println("time.ParseInLocation Err:" + tmpErr.Error())
  287. err = tmpErr
  288. return
  289. }
  290. // 调整最小日期
  291. if reqMinDate.IsZero() || reqMinDate.After(dateTime) {
  292. reqMinDate = dateTime
  293. }
  294. date := dateTime.Format(utils.FormatDate)
  295. findData, ok := exitDataMap[date]
  296. if !ok {
  297. addDataList = append(addDataList, &models.BaseFromBusinessData{
  298. BaseFromBusinessIndexId: int(item.BaseFromBusinessIndexId),
  299. IndexCode: item.IndexCode,
  300. DataTime: dateTime,
  301. Value: data.Value,
  302. CreateTime: time.Now(),
  303. ModifyTime: time.Now(),
  304. //DataTimestamp: 0,
  305. })
  306. continue
  307. }
  308. // 值不匹配,修改数据
  309. if findData.Value != data.Value {
  310. findData.Value = data.Value
  311. findData.ModifyTime = time.Now()
  312. updateDataList = append(updateDataList, findData)
  313. }
  314. }
  315. // 指标数据是否新增或修改
  316. var isIndexUpdateOrAdd bool
  317. // 入库
  318. {
  319. if len(addDataList) > 0 {
  320. isIndexUpdateOrAdd = true
  321. }
  322. if len(updateDataList) > 0 {
  323. isIndexUpdateOrAdd = true
  324. }
  325. err = businessDataObj.HandleData(addDataList, updateDataList)
  326. if err != nil {
  327. fmt.Println("UpdateDataByColl:Err:" + err.Error())
  328. return
  329. }
  330. }
  331. //修改最大最小日期
  332. indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(item.IndexCode)
  333. if err != nil {
  334. return
  335. }
  336. if err == nil && indexMaxAndMinInfo != nil {
  337. e := item.ModifyIndexMaxAndMinInfo(item.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd)
  338. if e != nil {
  339. fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error())
  340. }
  341. }
  342. return
  343. }
  344. // DelBusinessIndexResp
  345. // @Description: 删除外部指标的返回
  346. type DelBusinessIndexResp struct {
  347. IsDeleteEdbCodeList []string `description:"已经删除了的指标编码"`
  348. NoDeleteEdbCodeList []string `description:"未删除的指标编码"`
  349. }
  350. // DelBusinessIndex
  351. // @Description: 删除外部指标
  352. // @author: Roc
  353. // @datetime 2024-05-31 16:27:37
  354. // @param indexCodeList []string
  355. // @return err error
  356. // @return errMsg string
  357. func DelBusinessIndex(indexCodeList []string) (joinEdbCodeList, needDelEdbCodeList []string, err error, errMsg string) {
  358. defer func() {
  359. if err != nil {
  360. fmt.Println("DelBusinessIndex Err:" + err.Error())
  361. }
  362. }()
  363. errMsg = "删除失败"
  364. if len(indexCodeList) < 0 {
  365. errMsg = "指标编码不允许为空"
  366. err = errors.New(errMsg)
  367. return
  368. }
  369. // 指标
  370. indexObj := new(models.BaseFromBusinessIndex)
  371. //判断指标是否存在
  372. baseIndexList, err := indexObj.GetIndexItemList(indexCodeList)
  373. if err != nil {
  374. return
  375. }
  376. // 已加入到eta指标库的编码map
  377. joinEdbCodeList = make([]string, 0)
  378. joinEdbMap := make(map[string]string)
  379. // 未加入到eta指标库的编码
  380. needDelEdbCodeList = make([]string, 0)
  381. // 判断指标是否加入到指标库
  382. tmpEdbInfoList, err := models.GetEdbInfoByEdbCodeList(utils.DATA_SOURCE_BUSINESS, indexCodeList)
  383. if err != nil {
  384. return
  385. }
  386. for _, v := range tmpEdbInfoList {
  387. joinEdbCodeList = append(joinEdbCodeList, v.EdbCode)
  388. joinEdbMap[v.EdbCode] = v.EdbCode
  389. }
  390. for _, v := range baseIndexList {
  391. _, ok := joinEdbMap[v.IndexCode]
  392. if !ok {
  393. needDelEdbCodeList = append(needDelEdbCodeList, v.IndexCode)
  394. }
  395. }
  396. // 如果需要删除的指标,则直接返回
  397. if len(needDelEdbCodeList) <= 0 {
  398. return
  399. }
  400. // 删除指标
  401. err = indexObj.DelIndexItemList(needDelEdbCodeList)
  402. if err != nil {
  403. fmt.Println("删除自有指标失败, Err:" + err.Error())
  404. return
  405. }
  406. if utils.UseMongo {
  407. // 删除指标明细数据
  408. mogDataObj := new(mgo.BaseFromBusinessData)
  409. err = mogDataObj.RemoveMany(bson.M{"index_code": bson.M{"$in": needDelEdbCodeList}})
  410. } else {
  411. var condition []string
  412. var pars []interface{}
  413. delNum := len(needDelEdbCodeList)
  414. if delNum > 0 {
  415. condition = append(condition, "index_code in ("+utils.GetOrmInReplace(delNum)+")")
  416. pars = append(pars, needDelEdbCodeList)
  417. businessDataObj := models.BaseFromBusinessData{}
  418. err = businessDataObj.DelDataByCond(condition, pars)
  419. }
  420. }
  421. if err != nil {
  422. fmt.Println("删除自有指标明细数据 Err:" + err.Error())
  423. return
  424. }
  425. return
  426. }
  427. // DelBusinessIndexData
  428. // @Description: 删除指标数据
  429. // @author: Roc
  430. // @datetime 2024-05-31 15:43:32
  431. // @param indexCode string
  432. // @param dateList []string
  433. // @return err error
  434. // @return errMsg string
  435. func DelBusinessIndexData(indexCode string, startDate, endDate string) (err error, errMsg string) {
  436. defer func() {
  437. if err != nil {
  438. fmt.Println("DelBusinessIndexData Err:" + err.Error())
  439. }
  440. }()
  441. errMsg = "删除失败"
  442. if indexCode == `` {
  443. errMsg = "指标编码不允许为空"
  444. err = errors.New(errMsg)
  445. return
  446. }
  447. if startDate == `` && endDate == `` {
  448. errMsg = "开始日期和结束日期不允许同时为空"
  449. err = errors.New(errMsg)
  450. return
  451. }
  452. // 指标
  453. indexObj := new(models.BaseFromBusinessIndex)
  454. //判断指标是否存在
  455. item, err := indexObj.GetIndexItem(indexCode)
  456. if err != nil {
  457. if err.Error() != utils.ErrNoRow() {
  458. return
  459. }
  460. err = nil
  461. return
  462. }
  463. // 当前传入的最小日期
  464. var reqMinDate time.Time
  465. var startDateTime, endDateTime time.Time
  466. if startDate != `` {
  467. //获取已存在的所有数据
  468. startDateTime, err = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  469. if err != nil {
  470. return
  471. }
  472. // 调整最小日期
  473. if reqMinDate.IsZero() || reqMinDate.After(startDateTime) {
  474. reqMinDate = startDateTime
  475. }
  476. }
  477. if endDate != `` {
  478. //获取已存在的所有数据
  479. endDateTime, err = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  480. if err != nil {
  481. return
  482. }
  483. // 调整最小日期
  484. if reqMinDate.IsZero() || reqMinDate.After(endDateTime) {
  485. reqMinDate = endDateTime
  486. }
  487. }
  488. // 删除具体的数据
  489. var indexMaxAndMinInfo *models.EdbInfoMaxAndMinInfo
  490. if utils.UseMongo {
  491. indexMaxAndMinInfo, err = delBusinessIndexDataByMongo(item, startDateTime, endDateTime)
  492. } else {
  493. indexMaxAndMinInfo, err = delBusinessIndexDataByMysql(item, startDateTime, endDateTime)
  494. }
  495. if err != nil {
  496. return
  497. }
  498. // 修改指标的最早最晚日期
  499. if indexMaxAndMinInfo != nil {
  500. e := item.ModifyIndexMaxAndMinInfo(item.IndexCode, indexMaxAndMinInfo, true)
  501. if e != nil {
  502. fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error())
  503. }
  504. }
  505. // 同步刷新指标库的指标
  506. go refreshEdbBusiness(item.IndexCode, reqMinDate)
  507. return
  508. }
  509. // delBusinessIndexDataByMongo
  510. // @Description: 删除指标数据(从mongo删除)
  511. // @author: Roc
  512. // @datetime 2024-07-01 18:00:07
  513. // @param item *models.BaseFromBusinessIndex
  514. // @param startDateTime time.Time
  515. // @param endDateTime time.Time
  516. // @return err error
  517. func delBusinessIndexDataByMongo(item *models.BaseFromBusinessIndex, startDateTime, endDateTime time.Time) (indexMaxAndMinInfo *models.EdbInfoMaxAndMinInfo, err error) {
  518. defer func() {
  519. if err != nil {
  520. utils.FileLog.Error("delBusinessIndexDataByMongo 删除自有指标明细数据 Err:" + err.Error())
  521. }
  522. }()
  523. // 构建查询条件
  524. queryConditions := bson.M{
  525. "index_code": item.IndexCode,
  526. }
  527. dateCondition, err := mgo.BuildDateTimeCondition(startDateTime, endDateTime)
  528. if err != nil {
  529. return
  530. }
  531. if len(dateCondition) > 0 {
  532. queryConditions["data_time"] = dateCondition
  533. }
  534. // 删除数据源中的指标明细数据
  535. mogDataObj := new(mgo.BaseFromBusinessData)
  536. err = mogDataObj.RemoveMany(queryConditions)
  537. if err != nil {
  538. return
  539. }
  540. //修改最大最小日期
  541. indexMaxAndMinInfo, err = item.GetEdbInfoMaxAndMinInfo(item.IndexCode)
  542. // 如果有错误,且错误信息是取不到文档,那么就不修改了
  543. if err != nil && !errors.Is(err, qmgo.ErrNoSuchDocuments) {
  544. return
  545. }
  546. // 清空的目的是为了避免异常返回
  547. err = nil
  548. return
  549. }
  550. // delBusinessIndexDataByMysql
  551. // @Description: 删除指标数据(从mysql删除)
  552. // @author: Roc
  553. // @datetime 2024-07-02 09:53:13
  554. // @param item *models.BaseFromBusinessIndex
  555. // @param startDateTime time.Time
  556. // @param endDateTime time.Time
  557. // @return err error
  558. func delBusinessIndexDataByMysql(item *models.BaseFromBusinessIndex, startDateTime, endDateTime time.Time) (indexMaxAndMinInfo *models.EdbInfoMaxAndMinInfo, err error) {
  559. defer func() {
  560. if err != nil {
  561. utils.FileLog.Error("delBusinessIndexDataByMysql 删除自有指标明细数据 Err:" + err.Error())
  562. }
  563. }()
  564. // 构建查询条件
  565. var condition []string
  566. var pars []interface{}
  567. condition = append(condition, "index_code = ? ")
  568. pars = append(pars, item.IndexCode)
  569. if !startDateTime.IsZero() {
  570. condition = append(condition, " data_time >= ? ")
  571. pars = append(pars, startDateTime.Format(utils.FormatDate))
  572. }
  573. if !endDateTime.IsZero() {
  574. condition = append(condition, " data_time <= ? ")
  575. pars = append(pars, endDateTime.Format(utils.FormatDate))
  576. }
  577. // 删除数据源中的指标明细数据
  578. businessDataObj := new(models.BaseFromBusinessData)
  579. err = businessDataObj.DelDataByCond(condition, pars)
  580. if err != nil {
  581. return
  582. }
  583. //修改最大最小日期
  584. indexMaxAndMinInfo, err = item.GetEdbInfoMaxAndMinInfo(item.IndexCode)
  585. // 如果有错误,且错误信息是取不到文档,那么就不修改了
  586. if err != nil && err.Error() != utils.ErrNoRow() {
  587. return
  588. }
  589. // 清空的目的是为了避免异常返回
  590. err = nil
  591. return
  592. }
  593. func refreshEdbBusiness(indexCode string, reqMinDate time.Time) {
  594. var indexErr error
  595. var errMsg string
  596. defer func() {
  597. if indexErr != nil {
  598. tips := fmt.Sprintf("自有数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s, 错误信息:%s", indexCode, indexErr.Error(), errMsg)
  599. alarm_msg.SendAlarmMsg(tips, 3)
  600. }
  601. }()
  602. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BUSINESS, indexCode)
  603. if e != nil && e.Error() != utils.ErrNoRow() {
  604. indexErr = e
  605. return
  606. }
  607. if edbInfo != nil {
  608. startDate := ``
  609. if reqMinDate.IsZero() {
  610. startDate = edbInfo.EndDate
  611. } else {
  612. startDate = reqMinDate.Format(utils.FormatDate)
  613. }
  614. params := models.RefreshBaseParams{
  615. EdbInfo: edbInfo,
  616. StartDate: startDate,
  617. }
  618. obj := models.Business{}
  619. indexErr, errMsg = obj.Refresh(params)
  620. if indexErr != nil {
  621. return
  622. }
  623. // 更新指标最大最小值
  624. indexErr = obj.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
  625. if indexErr != nil {
  626. return
  627. }
  628. // 更新ES
  629. go logic.UpdateEs(edbInfo.EdbInfoId)
  630. }
  631. }