base_from_usda_fas.go 9.0 KB


  1. package services
  2. import (
  3. "eta/eta_index_lib/logic"
  4. "eta/eta_index_lib/models"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. // HandleUsdaFasIndex 处理美国农业部的excel数据
  13. func HandleUsdaFasIndex(req *models.HandleUsdaFasExcelDataReq) (err error) {
  14. errMsgList := make([]string, 0)
  15. defer func() {
  16. if len(errMsgList) > 0 {
  17. msg := fmt.Sprint("数据源-美国农业部数据处理失败,err:", strings.Join(errMsgList, "\n"))
  18. utils.FileLog.Info(msg)
  19. go alarm_msg.SendAlarmMsg(msg, 3)
  20. }
  21. }()
  22. // 查询所有的一级分类
  23. classifyObj := new(models.BaseFromUsdaFasClassify)
  24. classifyList, err := classifyObj.GetParentClassify()
  25. if err != nil {
  26. err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
  27. return
  28. }
  29. classifyMap := make(map[string]int, 0)
  30. for _, v := range classifyList {
  31. classifyMap[v.ClassifyName] = int(v.ClassifyId)
  32. }
  33. for _, v := range req.List {
  34. if v.IndexName == "" || v.IndexCode == "" {
  35. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
  36. continue
  37. }
  38. err = handleUsdaFasIndex(v, req.TerminalCode, classifyMap)
  39. if err != nil {
  40. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func handleUsdaFasIndex(req *models.HandleUsdaFasExcelData, terminalCode string, classifyMap map[string]int) (err error) {
  47. indexName := req.IndexName
  48. indexCode := req.IndexCode
  49. excelDataMap := req.ExcelDataMap
  50. errMsgList := make([]string, 0)
  51. defer func() {
  52. if len(errMsgList) > 0 {
  53. msg := fmt.Sprint("数据源-美国农业部数据处理失败,err:", strings.Join(errMsgList, "\n"))
  54. utils.FileLog.Info(msg)
  55. go alarm_msg.SendAlarmMsg(msg, 3)
  56. }
  57. }()
  58. indexObj := new(models.BaseFromUsdaFasIndex)
  59. dataObj := new(models.BaseFromUsdaFasData)
  60. classifyObj := new(models.BaseFromUsdaFasClassify)
  61. var indexId int64
  62. addDataList := make([]*models.BaseFromUsdaFasData, 0)
  63. exitDataMap := make(map[string]*models.BaseFromUsdaFasData)
  64. // 修改指标信息
  65. if indexName == "" {
  66. utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
  67. return
  68. }
  69. // 判断目录是否存在
  70. var classifyId int64
  71. now := time.Now()
  72. if req.ClassifyName != "" {
  73. classifyParentId := 0
  74. level := 1
  75. classifyParentId, _ = classifyMap[req.ParentClassifyName]
  76. if classifyParentId > 0 {
  77. level = 2
  78. }
  79. classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
  80. if err != nil {
  81. if err.Error() == utils.ErrNoRow() {
  82. //新增分类
  83. classifyObj = &models.BaseFromUsdaFasClassify{
  84. ClassifyName: req.ClassifyName,
  85. ClassifyNameEn: req.ClassifyName,
  86. ParentId: classifyParentId,
  87. SysUserId: 0,
  88. SysUserRealName: "",
  89. Level: level,
  90. Sort: req.ClassifySort,
  91. ModifyTime: now,
  92. CreateTime: now,
  93. }
  94. classifyId, err = classifyObj.Add()
  95. if err != nil {
  96. err = fmt.Errorf("新增分类失败 Err:%s", err)
  97. return
  98. }
  99. classifyObj.ClassifyId = classifyId
  100. } else {
  101. return
  102. }
  103. } else {
  104. classifyId = classifyObj.ClassifyId
  105. classifyObj.ModifyTime = now
  106. //classifyObj.Sort = req.ClassifySort
  107. classifyObj.ParentId = classifyParentId
  108. //e := classifyObj.Update([]string{"ParentId", "Sort", "ModifyTime"})
  109. e := classifyObj.Update([]string{"ParentId", "ModifyTime"})
  110. if e != nil {
  111. fmt.Println("classifyObj Update Err:" + e.Error())
  112. return
  113. }
  114. }
  115. }
  116. //判断指标是否存在
  117. var isAdd int
  118. item, err := indexObj.GetByIndexCode(indexCode)
  119. if err != nil {
  120. if err.Error() == utils.ErrNoRow() {
  121. isAdd = 1
  122. err = nil
  123. } else {
  124. isAdd = -1
  125. err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
  126. return
  127. }
  128. }
  129. if item != nil && item.BaseFromUsdaFasIndexId > 0 {
  130. fmt.Println("item:", item)
  131. isAdd = 2
  132. } else {
  133. isAdd = 1
  134. }
  135. if isAdd == 1 {
  136. indexObj.IndexCode = indexCode
  137. indexObj.IndexName = indexName
  138. indexObj.Frequency = req.Frequency
  139. indexObj.ClassifyId = classifyId
  140. indexObj.Country = req.Country
  141. indexObj.Commodity = req.Commodity
  142. indexObj.Unit = req.Unit
  143. indexObj.Sort = req.Sort
  144. indexObj.ModifyTime = time.Now()
  145. indexObj.CreateTime = time.Now()
  146. indexObj.TerminalCode = terminalCode
  147. indexId, err = indexObj.Add()
  148. if err != nil {
  149. err = fmt.Errorf("数据源新增美国农业部指标失败 Err:%s", err)
  150. return
  151. }
  152. indexObj.BaseFromUsdaFasIndexId = indexId
  153. } else if isAdd == 2 {
  154. indexId = item.BaseFromUsdaFasIndexId
  155. if item.TerminalCode == `` && terminalCode != `` {
  156. item.TerminalCode = terminalCode
  157. err = item.Update([]string{"TerminalCode"})
  158. if err != nil {
  159. err = fmt.Errorf("数据源更新美国农业部指标失败 Err:%s", err)
  160. return
  161. }
  162. }
  163. indexObj.BaseFromUsdaFasIndexId = item.BaseFromUsdaFasIndexId
  164. indexObj.IndexName = indexName
  165. indexObj.Frequency = req.Frequency
  166. indexObj.ClassifyId = classifyId
  167. indexObj.Country = req.Country
  168. indexObj.Commodity = req.Commodity
  169. indexObj.Unit = req.Unit
  170. indexObj.Sort = req.Sort
  171. indexObj.ModifyTime = time.Now()
  172. //修改数据
  173. updateColsArr := make([]string, 0)
  174. updateColsArr = append(updateColsArr, "index_name")
  175. updateColsArr = append(updateColsArr, "classify_id")
  176. updateColsArr = append(updateColsArr, "country")
  177. updateColsArr = append(updateColsArr, "commodity")
  178. updateColsArr = append(updateColsArr, "frequency")
  179. updateColsArr = append(updateColsArr, "sort")
  180. updateColsArr = append(updateColsArr, "modify_time")
  181. e := indexObj.Update(updateColsArr)
  182. if e != nil {
  183. fmt.Println("Index Update Err:" + e.Error())
  184. return
  185. }
  186. }
  187. //获取已存在的所有数据
  188. var exitDataList []*models.BaseFromUsdaFasData
  189. exitDataList, err = dataObj.GetByIndexCode(indexCode)
  190. if err != nil {
  191. err = fmt.Errorf("数据源查询美国农业部指标数据失败 Err:%s", err)
  192. return
  193. }
  194. fmt.Println("exitDataListLen:", len(exitDataList))
  195. for _, v := range exitDataList {
  196. dateStr := v.DataTime
  197. exitDataMap[dateStr] = v
  198. }
  199. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  200. for date, value := range excelDataMap {
  201. if findData, ok := exitDataMap[date]; !ok {
  202. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  203. if err != nil {
  204. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  205. return
  206. }
  207. //if !strings.Contains(value, "#N/A") {
  208. var saveDataTime time.Time
  209. if strings.Contains(date, "00:00:00") {
  210. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  211. } else {
  212. saveDataTime, err = time.Parse(utils.FormatDate, date)
  213. }
  214. if err != nil {
  215. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  216. continue
  217. }
  218. timestamp := saveDataTime.UnixNano() / 1e6
  219. dataItem := new(models.BaseFromUsdaFasData)
  220. dataItem.BaseFromUsdaFasIndexId = int(indexId)
  221. dataItem.IndexCode = indexCode
  222. dataItem.DataTime = date
  223. dataItem.Value = value
  224. dataItem.CreateTime = time.Now()
  225. dataItem.ModifyTime = time.Now()
  226. dataItem.DataTimestamp = timestamp
  227. addDataList = append(addDataList, dataItem)
  228. if len(addDataList) > 500 {
  229. err = dataObj.AddMulti(addDataList)
  230. if err != nil {
  231. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  232. return
  233. }
  234. addDataList = make([]*models.BaseFromUsdaFasData, 0)
  235. }
  236. //}
  237. } else {
  238. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  239. // 过滤0.50和0.5的比较
  240. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  241. newV, _ := strconv.ParseFloat(value, 64)
  242. if oldV == newV {
  243. continue
  244. }
  245. dataObj.BaseFromUsdaFasIndexId = findData.BaseFromUsdaFasIndexId
  246. dataObj.Value = value
  247. dataObj.ModifyTime = time.Now()
  248. updateDataColsArr := make([]string, 0)
  249. updateDataColsArr = append(updateDataColsArr, "value")
  250. updateDataColsArr = append(updateDataColsArr, "modify_time")
  251. dataObj.Update(updateDataColsArr)
  252. }
  253. }
  254. }
  255. if len(addDataList) > 0 {
  256. err = dataObj.AddMulti(addDataList)
  257. if err != nil {
  258. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  259. return
  260. }
  261. }
  262. var dateItem *models.EdbInfoMaxAndMinInfo
  263. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
  264. if err != nil {
  265. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  266. return
  267. }
  268. go func() {
  269. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  270. }()
  271. // 同步刷新ETA指标库的指标
  272. {
  273. // 获取指标详情
  274. baseObj := new(models.BaseFromUsdaFas)
  275. var edbInfo *models.EdbInfo
  276. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
  277. if err != nil {
  278. if err.Error() != utils.ErrNoRow() {
  279. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  280. return
  281. } else {
  282. err = nil
  283. }
  284. }
  285. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  286. if edbInfo != nil {
  287. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  288. }
  289. }
  290. return
  291. }