base_from_usda_fas.go 8.7 KB


  1. package services
  2. import (
  3. "eta/eta_index_lib/logic"
  4. "eta/eta_index_lib/models"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. // HandleUsdaFasIndex 处理美国农业部的excel数据
  13. func HandleUsdaFasIndex(req *models.HandleUsdaFasExcelDataReq) (err error) {
  14. errMsgList := make([]string, 0)
  15. defer func() {
  16. if len(errMsgList) > 0 {
  17. msg := fmt.Sprint("数据源-美国农业部数据处理失败,err:", strings.Join(errMsgList, "\n"))
  18. utils.FileLog.Info(msg)
  19. go alarm_msg.SendAlarmMsg(msg, 3)
  20. }
  21. }()
  22. // 查询所有的一级分类
  23. classifyObj := new(models.BaseFromUsdaFasClassify)
  24. classifyList, err := classifyObj.GetParentClassify()
  25. if err != nil {
  26. err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
  27. return
  28. }
  29. classifyMap := make(map[string]int, 0)
  30. for _, v := range classifyList {
  31. classifyMap[v.ClassifyName] = int(v.ClassifyId)
  32. }
  33. for _, v := range req.List {
  34. if v.IndexName == "" || v.IndexCode == "" {
  35. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
  36. continue
  37. }
  38. err = handleUsdaFasIndex(v, req.TerminalCode, classifyMap)
  39. if err != nil {
  40. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func handleUsdaFasIndex(req *models.HandleUsdaFasExcelData, terminalCode string, classifyMap map[string]int) (err error) {
  47. indexName := req.IndexName
  48. indexCode := req.IndexCode
  49. excelDataMap := req.ExcelDataMap
  50. errMsgList := make([]string, 0)
  51. defer func() {
  52. if len(errMsgList) > 0 {
  53. msg := fmt.Sprint("数据源-美国农业部数据处理失败,err:", strings.Join(errMsgList, "\n"))
  54. utils.FileLog.Info(msg)
  55. go alarm_msg.SendAlarmMsg(msg, 3)
  56. }
  57. }()
  58. indexObj := new(models.BaseFromUsdaFasIndex)
  59. dataObj := new(models.BaseFromUsdaFasData)
  60. classifyObj := new(models.BaseFromUsdaFasClassify)
  61. var indexId int64
  62. addDataList := make([]*models.BaseFromUsdaFasData, 0)
  63. exitDataMap := make(map[string]*models.BaseFromUsdaFasData)
  64. // 修改指标信息
  65. if indexName == "" {
  66. utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
  67. return
  68. }
  69. // 判断目录是否存在
  70. var classifyId int64
  71. now := time.Now()
  72. if req.ClassifyName != "" {
  73. classifyParentId := 0
  74. level := 1
  75. classifyParentId, _ = classifyMap[req.ParentClassifyName]
  76. if classifyParentId > 0 {
  77. level = 2
  78. }
  79. classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
  80. if err != nil {
  81. if err.Error() == utils.ErrNoRow() {
  82. //新增分类
  83. classifyObj = &models.BaseFromUsdaFasClassify{
  84. ClassifyName: req.ClassifyName,
  85. ClassifyNameEn: req.ClassifyName,
  86. ParentId: classifyParentId,
  87. SysUserId: 0,
  88. SysUserRealName: "",
  89. Level: level,
  90. Sort: req.ClassifySort,
  91. ModifyTime: now,
  92. CreateTime: now,
  93. }
  94. classifyId, err = classifyObj.Add()
  95. if err != nil {
  96. err = fmt.Errorf("新增分类失败 Err:%s", err)
  97. return
  98. }
  99. classifyObj.ClassifyId = classifyId
  100. } else {
  101. return
  102. }
  103. } else {
  104. classifyId = classifyObj.ClassifyId
  105. classifyObj.ModifyTime = now
  106. classifyObj.Sort = req.ClassifySort
  107. classifyObj.ParentId = classifyParentId
  108. e := classifyObj.Update([]string{"ParentId", "Sort", "ModifyTime"})
  109. if e != nil {
  110. fmt.Println("classifyObj Update Err:" + e.Error())
  111. return
  112. }
  113. }
  114. }
  115. //判断指标是否存在
  116. var isAdd int
  117. item, err := indexObj.GetByIndexCode(indexCode)
  118. if err != nil {
  119. if err.Error() == utils.ErrNoRow() {
  120. isAdd = 1
  121. err = nil
  122. } else {
  123. isAdd = -1
  124. err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
  125. return
  126. }
  127. }
  128. if item != nil && item.BaseFromUsdaFasIndexId > 0 {
  129. fmt.Println("item:", item)
  130. isAdd = 2
  131. } else {
  132. isAdd = 1
  133. }
  134. if isAdd == 1 {
  135. indexObj.IndexCode = indexCode
  136. indexObj.IndexName = indexName
  137. indexObj.Frequency = req.Frequency
  138. indexObj.ClassifyId = classifyId
  139. indexObj.Unit = req.Unit
  140. indexObj.Sort = req.Sort
  141. indexObj.ModifyTime = time.Now()
  142. indexObj.CreateTime = time.Now()
  143. indexObj.TerminalCode = terminalCode
  144. indexId, err = indexObj.Add()
  145. if err != nil {
  146. err = fmt.Errorf("数据源新增美国农业部指标失败 Err:%s", err)
  147. return
  148. }
  149. indexObj.BaseFromUsdaFasIndexId = indexId
  150. } else if isAdd == 2 {
  151. indexId = item.BaseFromUsdaFasIndexId
  152. if item.TerminalCode == `` && terminalCode != `` {
  153. item.TerminalCode = terminalCode
  154. err = item.Update([]string{"TerminalCode"})
  155. if err != nil {
  156. err = fmt.Errorf("数据源更新美国农业部指标失败 Err:%s", err)
  157. return
  158. }
  159. }
  160. indexObj.BaseFromUsdaFasIndexId = item.BaseFromUsdaFasIndexId
  161. indexObj.IndexName = indexName
  162. indexObj.Frequency = req.Frequency
  163. indexObj.ClassifyId = classifyId
  164. indexObj.Unit = req.Unit
  165. indexObj.Sort = req.Sort
  166. indexObj.ModifyTime = time.Now()
  167. //修改数据
  168. updateColsArr := make([]string, 0)
  169. updateColsArr = append(updateColsArr, "index_name")
  170. updateColsArr = append(updateColsArr, "classify_id")
  171. updateColsArr = append(updateColsArr, "unit")
  172. updateColsArr = append(updateColsArr, "frequency")
  173. updateColsArr = append(updateColsArr, "sort")
  174. updateColsArr = append(updateColsArr, "modify_time")
  175. e := indexObj.Update(updateColsArr)
  176. if e != nil {
  177. fmt.Println("Index Update Err:" + e.Error())
  178. return
  179. }
  180. }
  181. //获取已存在的所有数据
  182. var exitDataList []*models.BaseFromUsdaFasData
  183. exitDataList, err = dataObj.GetByIndexCode(indexCode)
  184. if err != nil {
  185. err = fmt.Errorf("数据源查询美国农业部指标数据失败 Err:%s", err)
  186. return
  187. }
  188. fmt.Println("exitDataListLen:", len(exitDataList))
  189. for _, v := range exitDataList {
  190. dateStr := v.DataTime
  191. exitDataMap[dateStr] = v
  192. }
  193. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  194. for date, value := range excelDataMap {
  195. if findData, ok := exitDataMap[date]; !ok {
  196. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  197. if err != nil {
  198. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  199. return
  200. }
  201. //if !strings.Contains(value, "#N/A") {
  202. var saveDataTime time.Time
  203. if strings.Contains(date, "00:00:00") {
  204. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  205. } else {
  206. saveDataTime, err = time.Parse(utils.FormatDate, date)
  207. }
  208. if err != nil {
  209. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  210. continue
  211. }
  212. timestamp := saveDataTime.UnixNano() / 1e6
  213. dataItem := new(models.BaseFromUsdaFasData)
  214. dataItem.BaseFromUsdaFasIndexId = int(indexId)
  215. dataItem.IndexCode = indexCode
  216. dataItem.DataTime = date
  217. dataItem.Value = value
  218. dataItem.CreateTime = time.Now()
  219. dataItem.ModifyTime = time.Now()
  220. dataItem.DataTimestamp = timestamp
  221. addDataList = append(addDataList, dataItem)
  222. if len(addDataList) > 500 {
  223. err = dataObj.AddMulti(addDataList)
  224. if err != nil {
  225. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  226. return
  227. }
  228. addDataList = make([]*models.BaseFromUsdaFasData, 0)
  229. }
  230. //}
  231. } else {
  232. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  233. // 过滤0.50和0.5的比较
  234. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  235. newV, _ := strconv.ParseFloat(value, 64)
  236. if oldV == newV {
  237. continue
  238. }
  239. dataObj.BaseFromUsdaFasIndexId = findData.BaseFromUsdaFasIndexId
  240. dataObj.Value = value
  241. dataObj.ModifyTime = time.Now()
  242. updateDataColsArr := make([]string, 0)
  243. updateDataColsArr = append(updateDataColsArr, "value")
  244. updateDataColsArr = append(updateDataColsArr, "modify_time")
  245. dataObj.Update(updateDataColsArr)
  246. }
  247. }
  248. }
  249. if len(addDataList) > 0 {
  250. err = dataObj.AddMulti(addDataList)
  251. if err != nil {
  252. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  253. return
  254. }
  255. }
  256. var dateItem *models.EdbInfoMaxAndMinInfo
  257. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
  258. if err != nil {
  259. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  260. return
  261. }
  262. go func() {
  263. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  264. }()
  265. // 同步刷新ETA指标库的指标
  266. {
  267. // 获取指标详情
  268. baseObj := new(models.BaseFromUsdaFas)
  269. var edbInfo *models.EdbInfo
  270. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
  271. if err != nil {
  272. if err.Error() != utils.ErrNoRow() {
  273. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  274. return
  275. } else {
  276. err = nil
  277. }
  278. }
  279. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  280. if edbInfo != nil {
  281. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  282. }
  283. }
  284. return
  285. }