base_from_usda_fas.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package services
  2. import (
  3. "eta/eta_index_lib/logic"
  4. "eta/eta_index_lib/models"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. // HandleUsdaFasIndex 处理美国农业部的excel数据
  13. func HandleUsdaFasIndex(req *models.HandleUsdaFasExcelDataReq) (err error) {
  14. errMsgList := make([]string, 0)
  15. defer func() {
  16. if len(errMsgList) > 0 {
  17. msg := fmt.Sprint("数据源-美国农业部数据处理失败,err:", strings.Join(errMsgList, "\n"))
  18. utils.FileLog.Info(msg)
  19. go alarm_msg.SendAlarmMsg(msg, 3)
  20. }
  21. }()
  22. // 查询所有的一级分类
  23. classifyObj := new(models.BaseFromUsdaFasClassify)
  24. classifyList, err := classifyObj.GetParentClassify()
  25. if err != nil {
  26. err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
  27. return
  28. }
  29. classifyMap := make(map[string]int, 0)
  30. for _, v := range classifyList {
  31. classifyMap[v.ClassifyName] = int(v.ClassifyId)
  32. }
  33. for _, v := range req.List {
  34. if v.IndexName == "" || v.IndexCode == "" {
  35. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
  36. continue
  37. }
  38. err = handleUsdaFasIndex(v, req.TerminalCode, classifyMap)
  39. if err != nil {
  40. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
  41. return
  42. }
  43. }
  44. return
  45. }
  46. func handleUsdaFasIndex(req *models.HandleUsdaFasExcelData, terminalCode string, classifyMap map[string]int) (err error) {
  47. indexName := req.IndexName
  48. indexCode := req.IndexCode
  49. excelDataMap := req.ExcelDataMap
  50. errMsgList := make([]string, 0)
  51. defer func() {
  52. if len(errMsgList) > 0 {
  53. msg := fmt.Sprint("数据源-美国农业部数据处理失败,err:", strings.Join(errMsgList, "\n"))
  54. utils.FileLog.Info(msg)
  55. go alarm_msg.SendAlarmMsg(msg, 3)
  56. }
  57. }()
  58. indexObj := new(models.BaseFromUsdaFasIndex)
  59. dataObj := new(models.BaseFromUsdaFasData)
  60. classifyObj := new(models.BaseFromUsdaFasClassify)
  61. var indexId int64
  62. addDataList := make([]*models.BaseFromUsdaFasData, 0)
  63. exitDataMap := make(map[string]*models.BaseFromUsdaFasData)
  64. // 修改指标信息
  65. if indexName == "" {
  66. utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
  67. return
  68. }
  69. // 判断目录是否存在
  70. var classifyId int64
  71. now := time.Now()
  72. if req.ClassifyName != "" {
  73. classifyParentId := 0
  74. level := 1
  75. classifyParentId, _ = classifyMap[req.ParentClassifyName]
  76. if classifyParentId > 0 {
  77. level = 2
  78. }
  79. classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
  80. if err != nil {
  81. if err.Error() == utils.ErrNoRow() {
  82. //新增分类
  83. classifyObj = &models.BaseFromUsdaFasClassify{
  84. ClassifyName: req.ClassifyName,
  85. ClassifyNameEn: req.ClassifyName,
  86. ParentId: classifyParentId,
  87. SysUserId: 0,
  88. SysUserRealName: "",
  89. Level: level,
  90. Sort: req.ClassifySort,
  91. ModifyTime: now,
  92. CreateTime: now,
  93. }
  94. classifyId, err = classifyObj.Add()
  95. if err != nil {
  96. err = fmt.Errorf("新增分类失败 Err:%s", err)
  97. return
  98. }
  99. classifyObj.ClassifyId = classifyId
  100. } else {
  101. return
  102. }
  103. } else {
  104. classifyId = classifyObj.ClassifyId
  105. classifyObj.ModifyTime = now
  106. //classifyObj.Sort = req.ClassifySort
  107. classifyObj.ParentId = classifyParentId
  108. //e := classifyObj.Update([]string{"ParentId", "Sort", "ModifyTime"})
  109. e := classifyObj.Update([]string{"ParentId", "ModifyTime"})
  110. if e != nil {
  111. fmt.Println("classifyObj Update Err:" + e.Error())
  112. return
  113. }
  114. }
  115. }
  116. //判断指标是否存在
  117. var isAdd int
  118. item, err := indexObj.GetByIndexCode(indexCode)
  119. if err != nil {
  120. if err.Error() == utils.ErrNoRow() {
  121. isAdd = 1
  122. err = nil
  123. } else {
  124. isAdd = -1
  125. err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
  126. return
  127. }
  128. }
  129. oldIndexObj := item
  130. if item != nil && item.BaseFromUsdaFasIndexId > 0 {
  131. fmt.Println("item:", item)
  132. isAdd = 2
  133. } else {
  134. isAdd = 1
  135. }
  136. if isAdd == 1 {
  137. indexObj.IndexCode = indexCode
  138. indexObj.IndexName = indexName
  139. indexObj.Frequency = req.Frequency
  140. indexObj.ClassifyId = classifyId
  141. indexObj.Country = req.Country
  142. indexObj.Commodity = req.Commodity
  143. indexObj.Unit = req.Unit
  144. indexObj.Sort = req.Sort
  145. indexObj.ModifyTime = time.Now()
  146. indexObj.CreateTime = time.Now()
  147. indexObj.TerminalCode = terminalCode
  148. indexId, err = indexObj.Add()
  149. if err != nil {
  150. err = fmt.Errorf("数据源新增美国农业部指标失败 Err:%s", err)
  151. return
  152. }
  153. indexObj.BaseFromUsdaFasIndexId = indexId
  154. } else if isAdd == 2 {
  155. indexId = item.BaseFromUsdaFasIndexId
  156. if item.TerminalCode == `` && terminalCode != `` {
  157. item.TerminalCode = terminalCode
  158. err = item.Update([]string{"TerminalCode"})
  159. if err != nil {
  160. err = fmt.Errorf("数据源更新美国农业部指标失败 Err:%s", err)
  161. return
  162. }
  163. }
  164. indexObj.BaseFromUsdaFasIndexId = item.BaseFromUsdaFasIndexId
  165. indexObj.IndexName = indexName
  166. indexObj.Frequency = req.Frequency
  167. indexObj.ClassifyId = classifyId
  168. indexObj.Country = req.Country
  169. indexObj.Commodity = req.Commodity
  170. indexObj.Unit = req.Unit
  171. indexObj.Sort = req.Sort
  172. indexObj.ModifyTime = time.Now()
  173. //修改数据
  174. updateColsArr := make([]string, 0)
  175. updateColsArr = append(updateColsArr, "index_name")
  176. updateColsArr = append(updateColsArr, "classify_id")
  177. updateColsArr = append(updateColsArr, "country")
  178. updateColsArr = append(updateColsArr, "commodity")
  179. updateColsArr = append(updateColsArr, "frequency")
  180. updateColsArr = append(updateColsArr, "sort")
  181. updateColsArr = append(updateColsArr, "modify_time")
  182. // 指标数据有变化才更新
  183. if oldIndexObj.IndexName != indexName || oldIndexObj.ClassifyId != classifyId || oldIndexObj.Country != req.Country || oldIndexObj.Commodity != req.Commodity || oldIndexObj.Frequency != req.Frequency || oldIndexObj.Sort != req.Sort {
  184. e := indexObj.Update(updateColsArr)
  185. if e != nil {
  186. fmt.Println("Index Update Err:" + e.Error())
  187. return
  188. }
  189. }
  190. }
  191. //获取已存在的所有数据
  192. var exitDataList []*models.BaseFromUsdaFasData
  193. exitDataList, err = dataObj.GetByIndexCode(indexCode)
  194. if err != nil {
  195. err = fmt.Errorf("数据源查询美国农业部指标数据失败 Err:%s", err)
  196. return
  197. }
  198. fmt.Println("exitDataListLen:", len(exitDataList))
  199. for _, v := range exitDataList {
  200. dateStr := v.DataTime
  201. exitDataMap[dateStr] = v
  202. }
  203. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  204. for date, value := range excelDataMap {
  205. if findData, ok := exitDataMap[date]; !ok {
  206. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  207. if err != nil {
  208. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  209. return
  210. }
  211. //if !strings.Contains(value, "#N/A") {
  212. var saveDataTime time.Time
  213. if strings.Contains(date, "00:00:00") {
  214. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  215. } else {
  216. saveDataTime, err = time.Parse(utils.FormatDate, date)
  217. }
  218. if err != nil {
  219. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  220. continue
  221. }
  222. timestamp := saveDataTime.UnixNano() / 1e6
  223. dataItem := new(models.BaseFromUsdaFasData)
  224. dataItem.BaseFromUsdaFasIndexId = int(indexId)
  225. dataItem.IndexCode = indexCode
  226. dataItem.DataTime = date
  227. dataItem.Value = value
  228. dataItem.CreateTime = time.Now()
  229. dataItem.ModifyTime = time.Now()
  230. dataItem.DataTimestamp = timestamp
  231. addDataList = append(addDataList, dataItem)
  232. if len(addDataList) > 500 {
  233. err = dataObj.AddMulti(addDataList)
  234. if err != nil {
  235. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  236. return
  237. }
  238. addDataList = make([]*models.BaseFromUsdaFasData, 0)
  239. }
  240. //}
  241. } else {
  242. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  243. // 过滤0.50和0.5的比较
  244. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  245. newV, _ := strconv.ParseFloat(value, 64)
  246. if oldV == newV {
  247. continue
  248. }
  249. dataObj.BaseFromUsdaFasIndexId = findData.BaseFromUsdaFasIndexId
  250. dataObj.Value = value
  251. dataObj.ModifyTime = time.Now()
  252. updateDataColsArr := make([]string, 0)
  253. updateDataColsArr = append(updateDataColsArr, "value")
  254. updateDataColsArr = append(updateDataColsArr, "modify_time")
  255. dataObj.Update(updateDataColsArr)
  256. }
  257. }
  258. }
  259. if len(addDataList) > 0 {
  260. err = dataObj.AddMulti(addDataList)
  261. if err != nil {
  262. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  263. return
  264. }
  265. }
  266. var dateItem *models.EdbInfoMaxAndMinInfo
  267. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
  268. if err != nil {
  269. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  270. return
  271. }
  272. go func() {
  273. if isAdd == 2 && oldIndexObj != nil && oldIndexObj.BaseFromUsdaFasIndexId > 0 {
  274. // 判断指标数据是否有变更
  275. if oldIndexObj.StartDate != dateItem.MinDate || oldIndexObj.EndDate != dateItem.MaxDate || oldIndexObj.EndValue != dateItem.LatestValue {
  276. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  277. }
  278. } else {
  279. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  280. }
  281. }()
  282. // 同步刷新ETA指标库的指标
  283. {
  284. // 获取指标详情
  285. baseObj := new(models.BaseFromUsdaFas)
  286. var edbInfo *models.EdbInfo
  287. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
  288. if err != nil {
  289. if !utils.IsErrNoRow(err) {
  290. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  291. return
  292. } else {
  293. err = nil
  294. }
  295. }
  296. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  297. if edbInfo != nil {
  298. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  299. }
  300. }
  301. return
  302. }