base_from_yongyi.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package services
  2. import (
  3. "eta/eta_index_lib/logic"
  4. "eta/eta_index_lib/models"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. // HandleYongyiIndex 处理涌益咨询的excel数据
  13. func HandleYongyiIndex(req *models.HandleYongyiExcelDataReq) (err error) {
  14. errMsgList := make([]string, 0)
  15. defer func() {
  16. if len(errMsgList) > 0 {
  17. msg := fmt.Sprint("数据源-涌益咨询数据处理失败,err:", strings.Join(errMsgList, "\n"))
  18. utils.FileLog.Info(msg)
  19. go alarm_msg.SendAlarmMsg(msg, 3)
  20. }
  21. }()
  22. // 查询所有的一级分类
  23. classifyObj := new(models.BaseFromYongyiClassify)
  24. classifyList, err := classifyObj.GetParentClassify()
  25. if err != nil {
  26. err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
  27. return
  28. }
  29. classifyMap := make(map[string]int, 0)
  30. for _, v := range classifyList {
  31. classifyMap[v.ClassifyName] = int(v.ClassifyId)
  32. }
  33. for _, v := range req.List {
  34. if v.IndexName == "" || v.IndexCode == "" {
  35. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
  36. continue
  37. }
  38. err = handleYongyiIndex(v, req.TerminalCode, classifyMap)
  39. if err != nil {
  40. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func handleYongyiIndex(req *models.HandleYongyiExcelData, terminalCode string, classifyMap map[string]int) (err error) {
  47. indexName := req.IndexName
  48. indexCode := req.IndexCode
  49. excelDataMap := req.ExcelDataMap
  50. errMsgList := make([]string, 0)
  51. defer func() {
  52. if len(errMsgList) > 0 {
  53. msg := fmt.Sprint("数据源-涌益咨询数据处理失败,err:", strings.Join(errMsgList, "\n"))
  54. utils.FileLog.Info(msg)
  55. go alarm_msg.SendAlarmMsg(msg, 3)
  56. }
  57. }()
  58. indexObj := new(models.BaseFromYongyiIndex)
  59. dataObj := new(models.BaseFromYongyiData)
  60. classifyObj := new(models.BaseFromYongyiClassify)
  61. var indexId int64
  62. addDataList := make([]*models.BaseFromYongyiData, 0)
  63. exitDataMap := make(map[string]*models.BaseFromYongyiData)
  64. // 修改指标信息
  65. if indexName == "" {
  66. utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
  67. return
  68. }
  69. // 判断目录是否存在
  70. var classifyId int64
  71. now := time.Now()
  72. if req.ClassifyName != "" {
  73. classifyParentId := 0
  74. classifyParentId, _ = classifyMap[req.Frequency]
  75. classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
  76. if err != nil {
  77. if err.Error() == utils.ErrNoRow() {
  78. //新增分类
  79. classifyObj = &models.BaseFromYongyiClassify{
  80. ClassifyName: req.ClassifyName,
  81. ParentId: classifyParentId,
  82. SysUserId: 0,
  83. SysUserRealName: "",
  84. Level: 1,
  85. Sort: req.ClassifySort,
  86. ModifyTime: now,
  87. CreateTime: now,
  88. }
  89. if classifyParentId > 0 {
  90. classifyObj.Level = 2
  91. }
  92. classifyId, err = classifyObj.Add()
  93. if err != nil {
  94. err = fmt.Errorf("新增分类失败 Err:%s", err)
  95. return
  96. }
  97. classifyObj.ClassifyId = classifyId
  98. } else {
  99. return
  100. }
  101. } else {
  102. classifyId = classifyObj.ClassifyId
  103. classifyObj.ModifyTime = now
  104. classifyObj.Sort = req.ClassifySort
  105. classifyObj.ParentId = classifyParentId
  106. e := classifyObj.Update([]string{"ParentId", "Sort", "ModifyTime"})
  107. if e != nil {
  108. fmt.Println("classifyObj Update Err:" + e.Error())
  109. return
  110. }
  111. }
  112. }
  113. //判断指标是否存在
  114. var isAdd int
  115. item, err := indexObj.GetByIndexCode(indexCode)
  116. if err != nil {
  117. if err.Error() == utils.ErrNoRow() {
  118. isAdd = 1
  119. err = nil
  120. } else {
  121. isAdd = -1
  122. err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
  123. return
  124. }
  125. }
  126. if item != nil && item.YongyiIndexId > 0 {
  127. fmt.Println("item:", item)
  128. isAdd = 2
  129. } else {
  130. isAdd = 1
  131. }
  132. if isAdd == 1 {
  133. indexObj.IndexCode = indexCode
  134. indexObj.IndexName = indexName
  135. indexObj.Frequency = req.Frequency
  136. indexObj.ClassifyId = classifyId
  137. indexObj.Unit = req.Unit
  138. indexObj.Sort = req.Sort
  139. indexObj.ModifyTime = time.Now()
  140. indexObj.CreateTime = time.Now()
  141. indexObj.TerminalCode = terminalCode
  142. indexId, err = indexObj.Add()
  143. if err != nil {
  144. err = fmt.Errorf("数据源新增涌溢指标失败 Err:%s", err)
  145. return
  146. }
  147. indexObj.YongyiIndexId = indexId
  148. } else if isAdd == 2 {
  149. indexId = item.YongyiIndexId
  150. if item.TerminalCode == `` && terminalCode != `` {
  151. item.TerminalCode = terminalCode
  152. err = item.Update([]string{"TerminalCode"})
  153. if err != nil {
  154. err = fmt.Errorf("数据源更新涌溢指标失败 Err:%s", err)
  155. return
  156. }
  157. }
  158. indexObj.YongyiIndexId = item.YongyiIndexId
  159. indexObj.IndexName = indexName
  160. indexObj.Frequency = req.Frequency
  161. indexObj.ClassifyId = classifyId
  162. indexObj.Unit = req.Unit
  163. indexObj.Sort = req.Sort
  164. indexObj.ModifyTime = time.Now()
  165. //修改数据
  166. updateColsArr := make([]string, 0)
  167. updateColsArr = append(updateColsArr, "index_name")
  168. updateColsArr = append(updateColsArr, "classify_id")
  169. updateColsArr = append(updateColsArr, "unit")
  170. updateColsArr = append(updateColsArr, "frequency")
  171. updateColsArr = append(updateColsArr, "sort")
  172. updateColsArr = append(updateColsArr, "modify_time")
  173. e := indexObj.Update(updateColsArr)
  174. if e != nil {
  175. fmt.Println("Index Update Err:" + e.Error())
  176. return
  177. }
  178. }
  179. //获取已存在的所有数据
  180. var exitDataList []*models.BaseFromYongyiData
  181. exitDataList, err = dataObj.GetByIndexCode(indexCode)
  182. if err != nil {
  183. err = fmt.Errorf("数据源查询涌溢指标数据失败 Err:%s", err)
  184. return
  185. }
  186. fmt.Println("exitDataListLen:", len(exitDataList))
  187. for _, v := range exitDataList {
  188. dateStr := v.DataTime
  189. exitDataMap[dateStr] = v
  190. }
  191. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  192. for date, value := range excelDataMap {
  193. if findData, ok := exitDataMap[date]; !ok {
  194. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  195. if err != nil {
  196. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  197. return
  198. }
  199. if !strings.Contains(value, "#N/A") {
  200. var saveDataTime time.Time
  201. if strings.Contains(date, "00:00:00") {
  202. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  203. } else {
  204. saveDataTime, err = time.Parse(utils.FormatDate, date)
  205. }
  206. if err != nil {
  207. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  208. continue
  209. }
  210. timestamp := saveDataTime.UnixNano() / 1e6
  211. dataItem := new(models.BaseFromYongyiData)
  212. dataItem.YongyiIndexId = int(indexId)
  213. dataItem.IndexCode = indexCode
  214. dataItem.DataTime = date
  215. dataItem.Value = value
  216. dataItem.CreateTime = time.Now()
  217. dataItem.ModifyTime = time.Now()
  218. dataItem.DataTimestamp = timestamp
  219. addDataList = append(addDataList, dataItem)
  220. if len(addDataList) > 500 {
  221. err = dataObj.AddMulti(addDataList)
  222. if err != nil {
  223. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  224. return
  225. }
  226. addDataList = make([]*models.BaseFromYongyiData, 0)
  227. }
  228. }
  229. } else {
  230. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  231. // 过滤0.50和0.5的比较
  232. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  233. newV, _ := strconv.ParseFloat(value, 64)
  234. if oldV == newV {
  235. continue
  236. }
  237. dataObj.YongyiDataId = findData.YongyiDataId
  238. dataObj.Value = value
  239. dataObj.ModifyTime = time.Now()
  240. updateDataColsArr := make([]string, 0)
  241. updateDataColsArr = append(updateDataColsArr, "value")
  242. updateDataColsArr = append(updateDataColsArr, "modify_time")
  243. dataObj.Update(updateDataColsArr)
  244. }
  245. }
  246. }
  247. if len(addDataList) > 0 {
  248. err = dataObj.AddMulti(addDataList)
  249. if err != nil {
  250. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  251. return
  252. }
  253. }
  254. var dateItem *models.EdbInfoMaxAndMinInfo
  255. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
  256. if err != nil {
  257. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  258. return
  259. }
  260. go func() {
  261. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  262. }()
  263. // 同步刷新ETA指标库的指标
  264. {
  265. // 获取指标详情
  266. baseObj := new(models.BaseFromYongyi)
  267. var edbInfo *models.EdbInfo
  268. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
  269. if err != nil {
  270. if err.Error() != utils.ErrNoRow() {
  271. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  272. return
  273. } else {
  274. err = nil
  275. }
  276. }
  277. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  278. if edbInfo != nil {
  279. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  280. }
  281. }
  282. return
  283. }