base_from_wind_wsd.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package models
  2. import (
  3. "errors"
  4. "eta/eta_index_lib/utils"
  5. "fmt"
  6. "github.com/beego/beego/v2/client/orm"
  7. "strconv"
  8. "strings"
  9. "time"
  10. )
  11. var windWsd = "wsd"
  12. func AddEdbDataFromWindWsd(stockCode string, item map[string]map[string]interface{}) (err error) {
  13. o := orm.NewOrm()
  14. var isAdd bool
  15. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  16. indexCodeMap := make(map[string]string)
  17. for wk, wv := range item {
  18. for vk, vv := range wv {
  19. if vv == nil {
  20. continue
  21. }
  22. var indexCode string
  23. wkInt, err := strconv.ParseInt(wk, 10, 64)
  24. if err != nil {
  25. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  26. return err
  27. }
  28. if vk == "OUTMESSAGE" {
  29. utils.FileLog.Info("OUTMESSAGE:" + vv.(string))
  30. err = errors.New("OUTMESSAGE:" + vv.(string))
  31. return err
  32. }
  33. vk = strings.ToLower(vk)
  34. indexCode = windWsd + stockCode + vk
  35. indexCodeMap[indexCode] = indexCode
  36. wkInt = wkInt / 1000
  37. t := time.Unix(wkInt, 0)
  38. if t.After(time.Now()) {
  39. continue
  40. }
  41. dateTime := t.Format(utils.FormatDate)
  42. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  43. if err != nil {
  44. return err
  45. }
  46. timestamp := dataTime.UnixNano() / 1e6
  47. timeStr := fmt.Sprintf("%d", timestamp)
  48. //saveVal := utils.SubFloatToString(val, 20)
  49. var saveVal string
  50. switch vt := vv.(type) {
  51. case int:
  52. saveVal = strconv.Itoa(vt)
  53. case float64:
  54. saveVal = utils.SubFloatToFloatStr(vt, 20)
  55. case string:
  56. saveVal = vt
  57. }
  58. addSql += GetAddSql("0", indexCode, dateTime, timeStr, saveVal)
  59. isAdd = true
  60. }
  61. }
  62. if isAdd {
  63. for _, v := range indexCodeMap {
  64. var count int
  65. sql := ` SELECT COUNT(1) FROM edb_data_wind_wsd WHERE edb_code=? `
  66. err = o.Raw(sql, v).QueryRow(&count)
  67. if err != nil {
  68. return err
  69. }
  70. if count > 0 {
  71. sql = ` DELETE FROM edb_data_wind_wsd WHERE edb_code=? `
  72. _, err = o.Raw(sql, v).Exec()
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. }
  78. addSql = strings.TrimRight(addSql, ",")
  79. _, err = o.Raw(addSql).Exec()
  80. if err != nil {
  81. fmt.Println("sql exec err:" + err.Error())
  82. return
  83. }
  84. }
  85. return
  86. }
  87. // RefreshEdbDataFromWind 刷新wind指标数据
  88. func RefreshEdbDataFromWindWsd(edbInfoId int, edbCode, startDate string, item map[string]map[string]interface{}) (err error) {
  89. o := orm.NewOrm()
  90. source := utils.DATA_SOURCE_WIND
  91. subSource := utils.DATA_SUB_SOURCE_DATE
  92. // 真实数据的最大日期 , 插入规则配置的日期
  93. var realDataMaxDate, edbDataInsertConfigDate time.Time
  94. var edbDataInsertConfig *EdbDataInsertConfig
  95. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  96. {
  97. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  98. if err != nil && err.Error() != utils.ErrNoRow() {
  99. return
  100. }
  101. if edbDataInsertConfig != nil {
  102. edbDataInsertConfigDate = edbDataInsertConfig.Date
  103. }
  104. }
  105. var condition string
  106. var pars []interface{}
  107. condition += " AND edb_info_id=? "
  108. pars = append(pars, edbInfoId)
  109. var startDateTime time.Time
  110. if startDate != "" {
  111. condition += " AND data_time>=? "
  112. pars = append(pars, startDate)
  113. startDateTime, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  114. }
  115. existList, err := GetEdbDataByCondition(source, subSource, condition, pars)
  116. existMap := make(map[string]*EdbInfoSearchData)
  117. for _, v := range existList {
  118. existMap[v.DataTime] = v
  119. }
  120. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  121. var isAdd bool
  122. addMap := make(map[string]string)
  123. edbInfoIdStr := strconv.Itoa(edbInfoId)
  124. for wk, wv := range item {
  125. for vk, vv := range wv {
  126. if vv == nil {
  127. continue
  128. }
  129. wkInt, err := strconv.ParseInt(wk, 10, 64)
  130. if err != nil {
  131. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  132. return err
  133. }
  134. vk = strings.ToLower(vk)
  135. wkInt = wkInt / 1000
  136. t := time.Unix(wkInt, 0)
  137. if t.After(time.Now()) {
  138. continue
  139. }
  140. dateTime := t.Format(utils.FormatDate)
  141. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  142. if err != nil {
  143. return err
  144. }
  145. timestamp := dataTime.UnixNano() / 1e6
  146. timeStr := fmt.Sprintf("%d", timestamp)
  147. var saveVal string
  148. switch vt := vv.(type) {
  149. case int:
  150. saveVal = strconv.Itoa(vt)
  151. case float64:
  152. saveVal = utils.SubFloatToFloatStr(vt, 20)
  153. case string:
  154. saveVal = vt
  155. }
  156. //如果传入的开始时间是空的,且当前数据日期早于传入的开始日期,那么需要判断下当前日期的数据是否存在
  157. if !startDateTime.IsZero() && t.Before(startDateTime) {
  158. tmpItem, tmpErr := GetEdbDataByDate(source, subSource, edbCode, dateTime)
  159. if tmpErr == nil && tmpItem != nil {
  160. existMap[tmpItem.DataTime] = tmpItem
  161. }
  162. }
  163. if findItem, ok := existMap[dateTime]; !ok {
  164. if _, existOk := addMap[dateTime]; !existOk {
  165. isAdd = true
  166. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  167. addMap[dateTime] = saveVal
  168. }
  169. } else {
  170. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != saveVal {
  171. err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, saveVal)
  172. if err != nil {
  173. return err
  174. }
  175. }
  176. }
  177. // 下面代码主要目的是处理掉手动插入的数据判断
  178. {
  179. if realDataMaxDate.IsZero() || t.After(realDataMaxDate) {
  180. realDataMaxDate = t
  181. }
  182. if edbDataInsertConfigDate.IsZero() || t.Equal(edbDataInsertConfigDate) {
  183. isFindConfigDateRealData = true
  184. }
  185. }
  186. }
  187. }
  188. // 处理手工数据补充的配置
  189. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
  190. if isAdd {
  191. addSql = strings.TrimRight(addSql, ",")
  192. _, err = o.Raw(addSql).Exec()
  193. if err != nil {
  194. fmt.Println("RefreshEdbDataFromWind add Err", err.Error())
  195. return
  196. }
  197. }
  198. return
  199. }