base_from_business.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package services
  2. import (
  3. "errors"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/models/mgo"
  7. "eta/eta_index_lib/services/alarm_msg"
  8. "eta/eta_index_lib/utils"
  9. "eta/eta_index_lib/utils/mgodb"
  10. "fmt"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "strings"
  13. "time"
  14. )
  15. // HandleBusinessIndex
  16. // @Description: 处理外部指标
  17. // @author: Roc
  18. // @datetime 2024-04-26 14:23:42
  19. // @param indexReq *models.AddBusinessIndexReq
  20. // @return err error
  21. func HandleBusinessIndex(indexReq *models.AddBusinessIndexReq) (resp models.BaseFromBusinessIndexResp, err error) {
  22. defer func() {
  23. if err != nil {
  24. // 添加刷新失败日志
  25. dataUpdateResult := 2
  26. dataUpdateFailedReason := "服务异常"
  27. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexReq.IndexCode)
  28. if e == nil {
  29. //查询指标存在,才添加刷新日志
  30. _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
  31. }
  32. }
  33. }()
  34. // 没有数据就返回
  35. if indexReq.DataList == nil || len(indexReq.DataList) <= 0 {
  36. return
  37. }
  38. // 兼容频度缺少度的字段
  39. if !strings.Contains(indexReq.Frequency, "度") {
  40. indexReq.Frequency = indexReq.Frequency + "度"
  41. }
  42. if !utils.VerifyFrequency(indexReq.Frequency) {
  43. err = errors.New("指标频度不合法:" + indexReq.Frequency)
  44. return
  45. }
  46. // 判断来源,如果来源不存在的话,则创建
  47. sourceObj := new(models.EdbBusinessSource)
  48. sourceItem, err := sourceObj.GetEdbBusinessSourceItem(indexReq.SourceName)
  49. if err != nil {
  50. if err.Error() == utils.ErrNoRow() {
  51. sourceItem = &models.EdbBusinessSource{
  52. EdbBusinessSourceId: 0,
  53. SourceName: indexReq.SourceName,
  54. CreateTime: time.Now(),
  55. }
  56. err = sourceItem.Add()
  57. } else {
  58. return
  59. }
  60. }
  61. // 指标
  62. indexObj := new(models.BaseFromBusinessIndex)
  63. if indexReq.IndexCode == `` {
  64. // 如果指标编码为空,那么自动生成
  65. currId, tmpErr := indexObj.GetMaxId()
  66. if tmpErr != nil {
  67. err = tmpErr
  68. return
  69. }
  70. indexReq.IndexCode = fmt.Sprintf("SELF%07d", currId+1)
  71. }
  72. //判断指标是否存在
  73. item, err := indexObj.GetIndexItem(indexReq.IndexCode)
  74. if err != nil {
  75. if err.Error() != utils.ErrNoRow() {
  76. return
  77. }
  78. // 添加指标
  79. item = &models.BaseFromBusinessIndex{
  80. BaseFromBusinessIndexId: 0,
  81. IndexCode: indexReq.IndexCode,
  82. IndexName: indexReq.IndexName,
  83. Unit: indexReq.Unit,
  84. Frequency: indexReq.Frequency,
  85. Source: int(sourceItem.EdbBusinessSourceId),
  86. SourceName: sourceItem.SourceName,
  87. //StartDate: time.Time{},
  88. //EndDate: time.Time{},
  89. Remark: indexReq.Remark,
  90. BaseModifyTime: time.Now(),
  91. DataUpdateTime: time.Now(),
  92. CreateTime: time.Now(),
  93. ModifyTime: time.Now(),
  94. }
  95. err = item.Add()
  96. if err != nil {
  97. fmt.Println("add err:" + err.Error())
  98. return
  99. }
  100. } else {
  101. updateCols := make([]string, 0)
  102. if item.IndexName != indexReq.IndexName {
  103. item.IndexName = indexReq.IndexName
  104. updateCols = append(updateCols, "IndexName")
  105. }
  106. if item.Unit != indexReq.Unit {
  107. item.Unit = indexReq.Unit
  108. updateCols = append(updateCols, "Unit")
  109. }
  110. if item.Frequency != indexReq.Frequency {
  111. item.Frequency = indexReq.Frequency
  112. updateCols = append(updateCols, "Frequency")
  113. }
  114. if item.Source != int(sourceItem.EdbBusinessSourceId) {
  115. item.Source = int(sourceItem.EdbBusinessSourceId)
  116. item.SourceName = sourceItem.SourceName
  117. updateCols = append(updateCols, "Source", "SourceName")
  118. }
  119. if len(updateCols) > 0 {
  120. item.BaseModifyTime = time.Now()
  121. item.ModifyTime = time.Now()
  122. updateCols = append(updateCols, "BaseModifyTime", "ModifyTime")
  123. err = item.Update(updateCols)
  124. if err != nil {
  125. fmt.Println("update index err:" + err.Error())
  126. return
  127. }
  128. }
  129. }
  130. // 数据处理
  131. mogDataObj := new(mgo.BaseFromBusinessData)
  132. //获取已存在的所有数据
  133. exitDataList, err := mogDataObj.GetAllDataList(bson.M{"index_code": indexReq.IndexCode}, []string{"data_time"})
  134. if err != nil {
  135. fmt.Println("GetIndexDataList Err:" + err.Error())
  136. return
  137. }
  138. // 已经存在的数据集
  139. exitDataMap := make(map[string]*mgo.BaseFromBusinessData)
  140. for _, v := range exitDataList {
  141. exitDataMap[v.DataTime.Format(utils.FormatDate)] = v
  142. }
  143. // 当前传入的最小日期
  144. var reqMinDate time.Time
  145. // 待添加的数据集
  146. addDataList := make([]interface{}, 0)
  147. updateDataList := make([]mgo.BaseFromBusinessData, 0)
  148. //var hasUpdate bool
  149. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  150. for _, data := range indexReq.DataList {
  151. dateTime, tmpErr := utils.DealExcelDate(data.Date)
  152. if tmpErr != nil {
  153. fmt.Println("time.ParseInLocation Err:" + tmpErr.Error())
  154. err = tmpErr
  155. return
  156. }
  157. // 调整最小日期
  158. if reqMinDate.IsZero() || reqMinDate.After(dateTime) {
  159. reqMinDate = dateTime
  160. }
  161. date := dateTime.Format(utils.FormatDate)
  162. findData, ok := exitDataMap[date]
  163. if !ok {
  164. addDataList = append(addDataList, mgo.BaseFromBusinessData{
  165. BaseFromBusinessIndexId: item.BaseFromBusinessIndexId,
  166. IndexCode: item.IndexCode,
  167. DataTime: dateTime,
  168. Value: data.Value,
  169. CreateTime: time.Now(),
  170. ModifyTime: time.Now(),
  171. //DataTimestamp: 0,
  172. })
  173. continue
  174. }
  175. // 值不匹配,修改数据
  176. if findData.Value != data.Value {
  177. findData.Value = data.Value
  178. updateDataList = append(updateDataList, *findData)
  179. }
  180. }
  181. // 指标数据是否新增或修改
  182. var isIndexUpdateOrAdd bool
  183. // 入库
  184. {
  185. coll := mogDataObj.GetCollection()
  186. if len(addDataList) > 0 {
  187. isIndexUpdateOrAdd = true
  188. err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
  189. if err != nil {
  190. fmt.Println("mogDataObj.HandleData() Err:" + err.Error())
  191. return
  192. }
  193. }
  194. if len(updateDataList) > 0 {
  195. isIndexUpdateOrAdd = true
  196. for _, v := range updateDataList {
  197. err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  198. if err != nil {
  199. fmt.Println("UpdateDataByColl:Err:" + err.Error())
  200. return
  201. }
  202. }
  203. }
  204. }
  205. // 支持事务的话,下面操作
  206. //result, err := mogDataObj.HandleData(addDataList, updateDataList)
  207. //if err != nil {
  208. // fmt.Println("mogDataObj.HandleData() Err:" + err.Error())
  209. // return
  210. //}
  211. //fmt.Println("result", result)
  212. //修改最大最小日期
  213. indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(indexReq.IndexCode)
  214. if err != nil {
  215. return
  216. }
  217. if err == nil && indexMaxAndMinInfo != nil {
  218. e := item.ModifyIndexMaxAndMinInfo(indexReq.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd)
  219. if e != nil {
  220. fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error())
  221. }
  222. }
  223. // 同步刷新指标库的指标
  224. go refreshEdbBusiness(item.IndexCode, reqMinDate)
  225. resp = models.BaseFromBusinessIndexResp{
  226. IndexCode: item.IndexCode,
  227. IndexName: item.IndexName,
  228. Unit: item.Unit,
  229. Frequency: item.Frequency,
  230. SourceName: item.SourceName,
  231. }
  232. return
  233. }
  234. // DelBusinessIndexResp
  235. // @Description: 删除外部指标的返回
  236. type DelBusinessIndexResp struct {
  237. IsDeleteEdbCodeList []string `description:"已经删除了的指标编码"`
  238. NoDeleteEdbCodeList []string `description:"未删除的指标编码"`
  239. }
  240. // DelBusinessIndex
  241. // @Description: 删除外部指标
  242. // @author: Roc
  243. // @datetime 2024-05-31 16:27:37
  244. // @param indexCodeList []string
  245. // @return err error
  246. // @return errMsg string
  247. func DelBusinessIndex(indexCodeList []string) (joinEdbCodeList, needDelEdbCodeList []string, err error, errMsg string) {
  248. defer func() {
  249. if err != nil {
  250. fmt.Println("DelBusinessIndex Err:" + err.Error())
  251. }
  252. }()
  253. errMsg = "删除失败"
  254. if len(indexCodeList) < 0 {
  255. errMsg = "指标编码不允许为空"
  256. err = errors.New(errMsg)
  257. return
  258. }
  259. // 指标
  260. indexObj := new(models.BaseFromBusinessIndex)
  261. //判断指标是否存在
  262. baseIndexList, err := indexObj.GetIndexItemList(indexCodeList)
  263. if err != nil {
  264. return
  265. }
  266. // 已加入到eta指标库的编码map
  267. joinEdbCodeList = make([]string, 0)
  268. joinEdbMap := make(map[string]string)
  269. // 未加入到eta指标库的编码
  270. needDelEdbCodeList = make([]string, 0)
  271. // 判断指标是否加入到指标库
  272. tmpEdbInfoList, err := models.GetEdbInfoByEdbCodeList(utils.DATA_SOURCE_BUSINESS, indexCodeList)
  273. if err != nil {
  274. return
  275. }
  276. for _, v := range tmpEdbInfoList {
  277. joinEdbCodeList = append(joinEdbCodeList, v.EdbCode)
  278. joinEdbMap[v.EdbCode] = v.EdbCode
  279. }
  280. for _, v := range baseIndexList {
  281. _, ok := joinEdbMap[v.IndexCode]
  282. if !ok {
  283. needDelEdbCodeList = append(needDelEdbCodeList, v.IndexCode)
  284. }
  285. }
  286. // 如果需要删除的指标,则直接返回
  287. if len(needDelEdbCodeList) <= 0 {
  288. return
  289. }
  290. // 删除指标
  291. err = indexObj.DelIndexItemList(needDelEdbCodeList)
  292. if err != nil {
  293. fmt.Println("删除自有指标失败, Err:" + err.Error())
  294. return
  295. }
  296. // 删除指标明细数据
  297. mogDataObj := new(mgo.BaseFromBusinessData)
  298. err = mogDataObj.RemoveMany(bson.M{"index_code": bson.M{"$in": needDelEdbCodeList}})
  299. if err != nil {
  300. fmt.Println("删除自有指标明细数据 Err:" + err.Error())
  301. return
  302. }
  303. return
  304. }
  305. // DelBusinessIndexData
  306. // @Description: 删除指标数据
  307. // @author: Roc
  308. // @datetime 2024-05-31 15:43:32
  309. // @param indexCode string
  310. // @param dateList []string
  311. // @return err error
  312. // @return errMsg string
  313. func DelBusinessIndexData(indexCode string, startDate, endDate string) (err error, errMsg string) {
  314. defer func() {
  315. if err != nil {
  316. fmt.Println("DelBusinessIndex Err:" + err.Error())
  317. }
  318. }()
  319. errMsg = "删除失败"
  320. if indexCode == `` {
  321. errMsg = "指标编码不允许为空"
  322. err = errors.New(errMsg)
  323. return
  324. }
  325. if startDate == `` && endDate == `` {
  326. errMsg = "开始日期和结束日期不允许同时为空"
  327. err = errors.New(errMsg)
  328. return
  329. }
  330. // 指标
  331. indexObj := new(models.BaseFromBusinessIndex)
  332. //判断指标是否存在
  333. item, err := indexObj.GetIndexItem(indexCode)
  334. if err != nil {
  335. if err.Error() != utils.ErrNoRow() {
  336. return
  337. }
  338. err = nil
  339. return
  340. }
  341. // 构建查询条件
  342. queryConditions := bson.M{
  343. "index_code": item.IndexCode,
  344. }
  345. // 当前传入的最小日期
  346. var reqMinDate time.Time
  347. var startDateTime, endDateTime time.Time
  348. if startDate != `` {
  349. //获取已存在的所有数据
  350. startDateTime, err = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  351. if err != nil {
  352. return
  353. }
  354. // 调整最小日期
  355. if reqMinDate.IsZero() || reqMinDate.After(startDateTime) {
  356. reqMinDate = startDateTime
  357. }
  358. }
  359. if endDate != `` {
  360. //获取已存在的所有数据
  361. endDateTime, err = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  362. if err != nil {
  363. return
  364. }
  365. // 调整最小日期
  366. if reqMinDate.IsZero() || reqMinDate.After(endDateTime) {
  367. reqMinDate = endDateTime
  368. }
  369. }
  370. dateCondition, err := mgodb.BuildDateTimeCondition(startDateTime, endDateTime)
  371. if err != nil {
  372. return
  373. }
  374. if len(dateCondition) > 0 {
  375. queryConditions["data_time"] = dateCondition
  376. }
  377. // 删除数据源中的指标明细数据
  378. mogDataObj := new(mgo.BaseFromBusinessData)
  379. err = mogDataObj.RemoveMany(queryConditions)
  380. if err != nil {
  381. fmt.Println("删除自有指标明细数据 Err:" + err.Error())
  382. return
  383. }
  384. // 同步刷新指标库的指标
  385. go refreshEdbBusiness(item.IndexCode, reqMinDate)
  386. return
  387. }
  388. func refreshEdbBusiness(indexCode string, reqMinDate time.Time) {
  389. var indexErr error
  390. var errMsg string
  391. defer func() {
  392. if indexErr != nil {
  393. tips := fmt.Sprintf("自有数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s, 错误信息:%s", indexCode, indexErr.Error(), errMsg)
  394. alarm_msg.SendAlarmMsg(tips, 3)
  395. }
  396. }()
  397. edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BUSINESS, indexCode)
  398. if e != nil && e.Error() != utils.ErrNoRow() {
  399. indexErr = e
  400. return
  401. }
  402. if edbInfo != nil {
  403. startDate := ``
  404. if reqMinDate.IsZero() {
  405. startDate = edbInfo.EndDate
  406. } else {
  407. startDate = reqMinDate.Format(utils.FormatDate)
  408. }
  409. params := models.RefreshBaseParams{
  410. EdbInfo: edbInfo,
  411. StartDate: startDate,
  412. }
  413. obj := models.Business{}
  414. indexErr, errMsg = obj.Refresh(params)
  415. if indexErr != nil {
  416. return
  417. }
  418. // 更新指标最大最小值
  419. indexErr = obj.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
  420. if indexErr != nil {
  421. return
  422. }
  423. // 更新ES
  424. go logic.UpdateEs(edbInfo.EdbInfoId)
  425. }
  426. }