base_from_sci.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package services
  2. import (
  3. "eta/eta_index_lib/logic"
  4. "eta/eta_index_lib/models"
  5. "eta/eta_index_lib/services/alarm_msg"
  6. "eta/eta_index_lib/utils"
  7. "fmt"
  8. "github.com/shopspring/decimal"
  9. "strings"
  10. "time"
  11. )
  12. // HandleSciIndex 处理卓创(红桃3)的Excel数据
  13. func HandleSciIndex(indexNameList, thirdIndexIdList, frequencyList, unitList []string, dataMap map[string]map[string]string, filePath, terminalCode string) {
  14. // 卓创(红桃3)指标id列表
  15. sciIndexModel := new(models.BaseFromSciIndex)
  16. list, err := sciIndexModel.GetAllIndex()
  17. if err != nil {
  18. fmt.Println("获取指标失败:", err)
  19. return
  20. }
  21. allIndexMap := make(map[string]*models.BaseFromSciIndex)
  22. for _, v := range list {
  23. allIndexMap[v.IndexCode] = v
  24. }
  25. // 需要入库的指标下标
  26. needAddIndexKeyList := make([]int, 0)
  27. needAddIndexMap := make(map[string]int, 0)
  28. for key, v := range thirdIndexIdList {
  29. if sciIndexInfo, ok := allIndexMap[v]; !ok {
  30. if _, ok2 := needAddIndexMap[v]; !ok2 {
  31. needAddIndexKeyList = append(needAddIndexKeyList, key)
  32. needAddIndexMap[v] = key
  33. }
  34. } else {
  35. updateColList := make([]string, 0)
  36. if sciIndexInfo.FilePath != filePath {
  37. sciIndexInfo.FilePath = filePath
  38. updateColList = append(updateColList, "FilePath")
  39. }
  40. if sciIndexInfo.TerminalCode != terminalCode {
  41. sciIndexInfo.TerminalCode = terminalCode
  42. updateColList = append(updateColList, "TerminalCode")
  43. }
  44. // 更新指标源信息
  45. if len(updateColList) > 0 {
  46. sciIndexInfo.Update(updateColList)
  47. }
  48. }
  49. }
  50. //fmt.Println(needAddIndexKeyList)
  51. //return
  52. addSciIndexList := make([]*models.BaseFromSciIndex, 0)
  53. // 新的指标入库
  54. for _, key := range needAddIndexKeyList {
  55. tmpSciIndex := &models.BaseFromSciIndex{
  56. //BaseFromSciIndexId: 0,
  57. //ClassifyId: 0,
  58. IndexCode: thirdIndexIdList[key],
  59. IndexName: indexNameList[key],
  60. Frequency: frequencyList[key],
  61. Unit: unitList[key],
  62. //StartDate: time.Time{},
  63. //EndDate: time.Time{},
  64. FilePath: filePath,
  65. TerminalCode: terminalCode,
  66. CreateTime: time.Now(),
  67. ModifyTime: time.Now(),
  68. }
  69. addSciIndexList = append(addSciIndexList, tmpSciIndex)
  70. }
  71. //fmt.Println(addSciIndexList)
  72. if len(addSciIndexList) > 0 {
  73. err = sciIndexModel.BatchAdd(addSciIndexList)
  74. if err != nil {
  75. fmt.Println("批量添加指标失败:", err)
  76. return
  77. }
  78. fmt.Println("添加成功")
  79. }
  80. // 红桃3实际数据处理
  81. HandleSciData(dataMap)
  82. }
  83. // HandleData 红桃3实际数据处理
  84. func HandleSciData(dataMap map[string]map[string]string) {
  85. errMsgList := make([]string, 0)
  86. defer func() {
  87. if len(errMsgList) > 0 {
  88. go alarm_msg.SendAlarmMsg(fmt.Sprint("红桃3实际数据处理失败,err:", strings.Join(errMsgList, "\n")), 3)
  89. }
  90. }()
  91. // 获取所有的指标
  92. sciIndexModel := new(models.BaseFromSciIndex)
  93. list, err := sciIndexModel.GetAllIndex()
  94. if err != nil {
  95. fmt.Println("获取指标失败:", err)
  96. return
  97. }
  98. allIndexMap := make(map[string]*models.BaseFromSciIndex)
  99. for _, v := range list {
  100. allIndexMap[v.IndexCode] = v
  101. }
  102. sciIndexDataModel := new(models.BaseFromSciData)
  103. for indexCode, data := range dataMap {
  104. indexInfo, ok := allIndexMap[indexCode]
  105. if !ok {
  106. fmt.Println("找不到该指标:", indexCode)
  107. continue
  108. }
  109. indexDataList, err := sciIndexDataModel.GetIndexDataList(indexCode)
  110. if err != nil {
  111. errMsgList = append(errMsgList, fmt.Sprint("查找卓创基础指标失败,指标编码:", indexCode, ";错误原因:", err.Error()))
  112. continue
  113. }
  114. indexDataExistMap := make(map[string]*models.BaseFromSciData)
  115. for _, indexData := range indexDataList {
  116. indexDataExistMap[indexData.DataTime] = indexData
  117. }
  118. addSciDataList := make([]*models.BaseFromSciData, 0)
  119. var tmpStartDate, tmpEndDate time.Time
  120. var latestValue string
  121. for currDate, currVal := range data {
  122. currDataTime, tmpErr := time.ParseInLocation(utils.FormatDate, currDate, time.Local)
  123. if tmpErr != nil {
  124. errMsgList = append(errMsgList, fmt.Sprint("时间格式化失败,指标编码:", currDate, ";错误原因:", tmpErr.Error()))
  125. continue
  126. }
  127. // 如果开始日期为空、或者当前日期早于开始日期,那么给开始日期赋值为当前日期
  128. if tmpStartDate.IsZero() || currDataTime.Before(tmpStartDate) {
  129. tmpStartDate = currDataTime
  130. }
  131. // 如果结束日期为空、或者当前日期晚于结束日期,那么给结束日期赋值为当前日期,且赋值最新值
  132. if tmpEndDate.IsZero() || currDataTime.After(tmpEndDate) {
  133. tmpEndDate = currDataTime
  134. latestValue = currVal
  135. }
  136. timestamp := currDataTime.UnixNano() / 1e6
  137. sciData, ok := indexDataExistMap[currDate]
  138. //判断是否存在数据,如果不存在,那么插入数据,存在的话那么修改数据
  139. if !ok {
  140. tmpBaseFromSciData := &models.BaseFromSciData{
  141. //SciDataId: 0,
  142. BaseFromSciIndexId: indexInfo.BaseFromSciIndexId,
  143. IndexCode: indexInfo.IndexCode,
  144. DataTime: currDate,
  145. Value: currVal,
  146. DataTimestamp: timestamp,
  147. CreateTime: time.Now(),
  148. ModifyTime: time.Now(),
  149. }
  150. indexDataExistMap[currDate] = tmpBaseFromSciData
  151. addSciDataList = append(addSciDataList, tmpBaseFromSciData)
  152. } else {
  153. // 更新数据
  154. existValue := sciData.Value
  155. if existValue != currVal {
  156. //fmt.Println("existValue:", existValue, ";====;currVal:", currVal)
  157. sciData.Value = currVal
  158. sciData.ModifyTime = time.Now()
  159. // 如果是已经入库了数据,那么就更新,否则只是单纯更改内存数据,而不去更新数据库
  160. if sciData.SciDataId > 0 {
  161. tmpErr = sciData.Update([]string{"Value", "ModifyTime"})
  162. if tmpErr != nil {
  163. errMsgList = append(errMsgList, fmt.Sprint("指标数据更新失败,指标编码:", indexCode, ",时间:", currDate, ";错误原因:", tmpErr.Error()))
  164. continue
  165. }
  166. }
  167. }
  168. }
  169. }
  170. if len(addSciDataList) > 0 {
  171. err = sciIndexDataModel.BatchAdd(addSciDataList)
  172. if err != nil {
  173. errMsgList = append(errMsgList, fmt.Sprint("批量添加数据失败,指标编码:", indexCode, err.Error()))
  174. continue
  175. }
  176. }
  177. updateCols := []string{"ModifyTime"}
  178. indexInfo.ModifyTime = time.Now()
  179. if !tmpStartDate.IsZero() {
  180. indexInfo.StartDate = tmpStartDate
  181. updateCols = append(updateCols, "StartDate")
  182. }
  183. if !tmpEndDate.IsZero() {
  184. indexInfo.EndDate = tmpEndDate
  185. updateCols = append(updateCols, "EndDate")
  186. }
  187. latestValueDeci, tmpErr := decimal.NewFromString(latestValue)
  188. if tmpErr == nil {
  189. indexInfo.LatestValue, _ = latestValueDeci.Float64()
  190. updateCols = append(updateCols, "LatestValue")
  191. }
  192. indexInfo.Update(updateCols)
  193. // 同步刷新ETA图库红桃3的指标
  194. {
  195. // 获取指标详情
  196. edbInfo, err := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_SCI, indexInfo.IndexCode)
  197. if err != nil && err.Error() != utils.ErrNoRow() {
  198. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  199. continue
  200. }
  201. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  202. if edbInfo != nil {
  203. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  204. }
  205. }
  206. }
  207. }