base_from_gpr_risk.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package services
  2. import (
  3. "eta/eta_index_lib/logic"
  4. "eta/eta_index_lib/models"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. // HandleGprRiskIndex 处理GPR地缘风险指数的excel数据
  13. func HandleGprRiskIndex(req *models.HandleGprRiskExcelDataReq) (err error) {
  14. errMsgList := make([]string, 0)
  15. defer func() {
  16. if len(errMsgList) > 0 {
  17. msg := fmt.Sprint("数据源-GPR地缘风险指数数据处理失败,err:", strings.Join(errMsgList, "\n"))
  18. utils.FileLog.Info(msg)
  19. go alarm_msg.SendAlarmMsg(msg, 3)
  20. }
  21. }()
  22. // 查询所有的一级分类
  23. classifyObj := new(models.BaseFromGprRiskClassify)
  24. classifyList, err := classifyObj.GetParentClassify()
  25. if err != nil {
  26. err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
  27. return
  28. }
  29. classifyMap := make(map[string]int, 0)
  30. for _, v := range classifyList {
  31. classifyMap[v.ClassifyName] = int(v.ClassifyId)
  32. }
  33. for _, v := range req.List {
  34. if v.IndexName == "" || v.IndexCode == "" {
  35. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
  36. continue
  37. }
  38. err = handleGprRiskIndex(v, req.TerminalCode, classifyMap)
  39. if err != nil {
  40. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func handleGprRiskIndex(req *models.HandleGprRiskExcelData, terminalCode string, classifyMap map[string]int) (err error) {
  47. indexName := req.IndexName
  48. indexCode := req.IndexCode
  49. excelDataMap := req.ExcelDataMap
  50. errMsgList := make([]string, 0)
  51. defer func() {
  52. if len(errMsgList) > 0 {
  53. msg := fmt.Sprint("数据源-GPR地缘风险指数数据处理失败,err:", strings.Join(errMsgList, "\n"))
  54. utils.FileLog.Info(msg)
  55. go alarm_msg.SendAlarmMsg(msg, 3)
  56. }
  57. }()
  58. indexObj := new(models.BaseFromGprRiskIndex)
  59. dataObj := new(models.BaseFromGprRiskData)
  60. classifyObj := new(models.BaseFromGprRiskClassify)
  61. var indexId int64
  62. addDataList := make([]*models.BaseFromGprRiskData, 0)
  63. exitDataMap := make(map[string]*models.BaseFromGprRiskData)
  64. // 修改指标信息
  65. if indexName == "" {
  66. utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
  67. return
  68. }
  69. // 判断目录是否存在
  70. var classifyId int64
  71. now := time.Now()
  72. if req.ClassifyName != "" {
  73. classifyParentId := 0
  74. level := 1
  75. classifyParentId, _ = classifyMap[req.ParentClassifyName]
  76. if classifyParentId > 0 {
  77. level = 2
  78. }
  79. classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
  80. if err != nil {
  81. if err.Error() == utils.ErrNoRow() {
  82. //新增分类
  83. classifyObj = &models.BaseFromGprRiskClassify{
  84. ClassifyName: req.ClassifyName,
  85. ClassifyNameEn: req.ClassifyName,
  86. ParentId: classifyParentId,
  87. SysUserId: 0,
  88. SysUserRealName: "",
  89. Level: level,
  90. Sort: req.ClassifySort,
  91. ModifyTime: now,
  92. CreateTime: now,
  93. }
  94. classifyId, err = classifyObj.Add()
  95. if err != nil {
  96. err = fmt.Errorf("新增分类失败 Err:%s", err)
  97. return
  98. }
  99. classifyObj.ClassifyId = classifyId
  100. } else {
  101. return
  102. }
  103. } else {
  104. classifyId = classifyObj.ClassifyId
  105. classifyObj.ModifyTime = now
  106. //classifyObj.Sort = req.ClassifySort
  107. classifyObj.ParentId = classifyParentId
  108. //e := classifyObj.Update([]string{"ParentId", "Sort", "ModifyTime"})
  109. e := classifyObj.Update([]string{"ParentId", "ModifyTime"})
  110. if e != nil {
  111. fmt.Println("classifyObj Update Err:" + e.Error())
  112. return
  113. }
  114. }
  115. }
  116. //判断指标是否存在
  117. var isAdd int
  118. item, err := indexObj.GetByIndexCode(indexCode)
  119. if err != nil {
  120. if err.Error() == utils.ErrNoRow() {
  121. isAdd = 1
  122. err = nil
  123. } else {
  124. isAdd = -1
  125. err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
  126. return
  127. }
  128. }
  129. if item != nil && item.BaseFromGprRiskIndexId > 0 {
  130. fmt.Println("item:", item)
  131. isAdd = 2
  132. } else {
  133. isAdd = 1
  134. }
  135. if isAdd == 1 {
  136. indexObj.IndexCode = indexCode
  137. indexObj.IndexName = indexName
  138. indexObj.Frequency = req.Frequency
  139. indexObj.ClassifyId = classifyId
  140. indexObj.Unit = req.Unit
  141. indexObj.Sort = req.Sort
  142. indexObj.ModifyTime = time.Now()
  143. indexObj.CreateTime = time.Now()
  144. indexObj.TerminalCode = terminalCode
  145. indexId, err = indexObj.Add()
  146. if err != nil {
  147. err = fmt.Errorf("数据源新增GPR地缘风险指数指标失败 Err:%s", err)
  148. return
  149. }
  150. indexObj.BaseFromGprRiskIndexId = indexId
  151. } else if isAdd == 2 {
  152. indexId = item.BaseFromGprRiskIndexId
  153. if item.TerminalCode == `` && terminalCode != `` {
  154. item.TerminalCode = terminalCode
  155. err = item.Update([]string{"TerminalCode"})
  156. if err != nil {
  157. err = fmt.Errorf("数据源更新GPR地缘风险指数指标失败 Err:%s", err)
  158. return
  159. }
  160. }
  161. indexObj.BaseFromGprRiskIndexId = item.BaseFromGprRiskIndexId
  162. indexObj.IndexName = indexName
  163. indexObj.Frequency = req.Frequency
  164. indexObj.ClassifyId = classifyId
  165. indexObj.Unit = req.Unit
  166. indexObj.Sort = req.Sort
  167. indexObj.ModifyTime = time.Now()
  168. //修改数据
  169. updateColsArr := make([]string, 0)
  170. updateColsArr = append(updateColsArr, "index_name")
  171. updateColsArr = append(updateColsArr, "classify_id")
  172. updateColsArr = append(updateColsArr, "frequency")
  173. updateColsArr = append(updateColsArr, "sort")
  174. updateColsArr = append(updateColsArr, "unit")
  175. updateColsArr = append(updateColsArr, "modify_time")
  176. e := indexObj.Update(updateColsArr)
  177. if e != nil {
  178. fmt.Println("Index Update Err:" + e.Error())
  179. return
  180. }
  181. }
  182. //获取已存在的所有数据
  183. var exitDataList []*models.BaseFromGprRiskData
  184. exitDataList, err = dataObj.GetByIndexCode(indexCode)
  185. if err != nil {
  186. err = fmt.Errorf("数据源查询GPR地缘风险指数指标数据失败 Err:%s", err)
  187. return
  188. }
  189. fmt.Println("exitDataListLen:", len(exitDataList))
  190. for _, v := range exitDataList {
  191. dateStr := v.DataTime
  192. exitDataMap[dateStr] = v
  193. }
  194. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  195. for date, value := range excelDataMap {
  196. if findData, ok := exitDataMap[date]; !ok {
  197. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  198. if err != nil {
  199. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  200. return
  201. }
  202. //if !strings.Contains(value, "#N/A") {
  203. var saveDataTime time.Time
  204. if strings.Contains(date, "00:00:00") {
  205. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  206. } else {
  207. saveDataTime, err = time.Parse(utils.FormatDate, date)
  208. }
  209. if err != nil {
  210. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  211. continue
  212. }
  213. timestamp := saveDataTime.UnixNano() / 1e6
  214. dataItem := new(models.BaseFromGprRiskData)
  215. dataItem.BaseFromGprRiskIndexId = int(indexId)
  216. dataItem.IndexCode = indexCode
  217. dataItem.DataTime = date
  218. dataItem.Value = value
  219. dataItem.CreateTime = time.Now()
  220. dataItem.ModifyTime = time.Now()
  221. dataItem.DataTimestamp = timestamp
  222. addDataList = append(addDataList, dataItem)
  223. if len(addDataList) > 500 {
  224. err = dataObj.AddMulti(addDataList)
  225. if err != nil {
  226. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  227. return
  228. }
  229. addDataList = make([]*models.BaseFromGprRiskData, 0)
  230. }
  231. //}
  232. } else {
  233. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  234. // 过滤0.50和0.5的比较
  235. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  236. newV, _ := strconv.ParseFloat(value, 64)
  237. if oldV == newV {
  238. continue
  239. }
  240. dataObj.BaseFromGprRiskIndexId = findData.BaseFromGprRiskIndexId
  241. dataObj.Value = value
  242. dataObj.ModifyTime = time.Now()
  243. updateDataColsArr := make([]string, 0)
  244. updateDataColsArr = append(updateDataColsArr, "value")
  245. updateDataColsArr = append(updateDataColsArr, "modify_time")
  246. dataObj.Update(updateDataColsArr)
  247. }
  248. }
  249. }
  250. if len(addDataList) > 0 {
  251. err = dataObj.AddMulti(addDataList)
  252. if err != nil {
  253. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  254. return
  255. }
  256. }
  257. var dateItem *models.EdbInfoMaxAndMinInfo
  258. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
  259. if err != nil {
  260. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  261. return
  262. }
  263. go func() {
  264. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  265. }()
  266. // 同步刷新ETA指标库的指标
  267. {
  268. // 获取指标详情
  269. baseObj := new(models.BaseFromGprRisk)
  270. var edbInfo *models.EdbInfo
  271. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
  272. if err != nil {
  273. if err.Error() != utils.ErrNoRow() {
  274. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  275. return
  276. } else {
  277. err = nil
  278. }
  279. }
  280. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  281. if edbInfo != nil {
  282. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  283. }
  284. }
  285. return
  286. }