base_from_wind_wsd.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package models
  2. import (
  3. "eta/eta_index_lib/utils"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "strconv"
  7. "strings"
  8. "time"
  9. )
  10. var windWsd = "wsd"
  11. func AddEdbDataFromWindWsd(stockCode string, item map[string]map[string]interface{}) (err error) {
  12. o := orm.NewOrm()
  13. var isAdd bool
  14. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  15. for wk, wv := range item {
  16. for vk, vv := range wv {
  17. var indexCode string
  18. wkInt, err := strconv.ParseInt(wk, 10, 64)
  19. if err != nil {
  20. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  21. return err
  22. }
  23. vk = strings.ToLower(vk)
  24. indexCode = windWsd + stockCode + vk
  25. wkInt = wkInt / 1000
  26. t := time.Unix(wkInt, 0)
  27. if t.After(time.Now()) {
  28. continue
  29. }
  30. dateTime := t.Format(utils.FormatDate)
  31. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  32. if err != nil {
  33. return err
  34. }
  35. timestamp := dataTime.UnixNano() / 1e6
  36. timeStr := fmt.Sprintf("%d", timestamp)
  37. //saveVal := utils.SubFloatToString(val, 20)
  38. if vv == nil {
  39. continue
  40. }
  41. var saveVal string
  42. switch vt := vv.(type) {
  43. case int:
  44. saveVal = strconv.Itoa(vt)
  45. case float64:
  46. saveVal = utils.SubFloatToFloatStr(vt, 20)
  47. case string:
  48. saveVal = vt
  49. }
  50. addSql += GetAddSql("0", indexCode, dateTime, timeStr, saveVal)
  51. isAdd = true
  52. }
  53. }
  54. if isAdd {
  55. addSql = strings.TrimRight(addSql, ",")
  56. _, err = o.Raw(addSql).Exec()
  57. if err != nil {
  58. fmt.Println("sql exec err:" + err.Error())
  59. return
  60. }
  61. }
  62. return
  63. }
  64. // RefreshEdbDataFromWind 刷新wind指标数据
  65. func RefreshEdbDataFromWindWsd(edbInfoId int, edbCode, startDate string, item *EdbDataFromWind) (err error) {
  66. o := orm.NewOrm()
  67. source := utils.DATA_SOURCE_WIND
  68. // 真实数据的最大日期 , 插入规则配置的日期
  69. var realDataMaxDate, edbDataInsertConfigDate time.Time
  70. var edbDataInsertConfig *EdbDataInsertConfig
  71. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  72. {
  73. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  74. if err != nil && err.Error() != utils.ErrNoRow() {
  75. return
  76. }
  77. if edbDataInsertConfig != nil {
  78. edbDataInsertConfigDate = edbDataInsertConfig.Date
  79. }
  80. }
  81. var condition string
  82. var pars []interface{}
  83. condition += " AND edb_info_id=? "
  84. pars = append(pars, edbInfoId)
  85. var startDateTime time.Time
  86. if startDate != "" {
  87. condition += " AND data_time>=? "
  88. pars = append(pars, startDate)
  89. startDateTime, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  90. }
  91. existList, err := GetEdbDataByCondition(source, condition, pars)
  92. existMap := make(map[string]*EdbInfoSearchData)
  93. for _, v := range existList {
  94. existMap[v.DataTime] = v
  95. }
  96. addSql := ` INSERT INTO edb_data_wind(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  97. var isAdd bool
  98. addMap := make(map[string]string)
  99. edbInfoIdStr := strconv.Itoa(edbInfoId)
  100. for k, v := range item.Dt {
  101. timeStr := fmt.Sprintf("%d", v)
  102. v = v / 1000
  103. t := time.Unix(v, 0)
  104. dateTime := t.Format(utils.FormatDate)
  105. t, _ = time.ParseInLocation(utils.FormatDate, dateTime, time.Local) // 这里的目的是为了处理成北京时间,所以多转一遍
  106. val := item.Close[k]
  107. saveVal := utils.SubFloatToString(val, 30)
  108. //如果传入的开始时间是空的,且当前数据日期早于传入的开始日期,那么需要判断下当前日期的数据是否存在
  109. if !startDateTime.IsZero() && t.Before(startDateTime) {
  110. tmpItem, tmpErr := GetEdbDataByDate(source, edbCode, dateTime)
  111. if tmpErr == nil && tmpItem != nil {
  112. existMap[tmpItem.DataTime] = tmpItem
  113. }
  114. }
  115. if findItem, ok := existMap[dateTime]; !ok {
  116. if _, existOk := addMap[dateTime]; !existOk {
  117. isAdd = true
  118. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  119. addMap[dateTime] = saveVal
  120. }
  121. } else {
  122. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != saveVal {
  123. err = ModifyEdbDataById(source, findItem.EdbDataId, saveVal)
  124. if err != nil {
  125. return err
  126. }
  127. }
  128. }
  129. // 下面代码主要目的是处理掉手动插入的数据判断
  130. {
  131. if realDataMaxDate.IsZero() || t.After(realDataMaxDate) {
  132. realDataMaxDate = t
  133. }
  134. if edbDataInsertConfigDate.IsZero() || t.Equal(edbDataInsertConfigDate) {
  135. isFindConfigDateRealData = true
  136. }
  137. }
  138. }
  139. // 处理手工数据补充的配置
  140. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, existMap, isFindConfigDateRealData)
  141. if isAdd {
  142. addSql = strings.TrimRight(addSql, ",")
  143. _, err = o.Raw(addSql).Exec()
  144. if err != nil {
  145. fmt.Println("RefreshEdbDataFromWind add Err", err.Error())
  146. return
  147. }
  148. }
  149. return
  150. }