base_from_wind_wsd.go 5.7 KB


  1. package models
  2. import (
  3. "eta/eta_index_lib/utils"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "strconv"
  7. "strings"
  8. "time"
  9. )
  10. var windWsd = "wsd"
  11. func AddEdbDataFromWindWsd(stockCode string, item map[string]map[string]interface{}) (err error) {
  12. o := orm.NewOrm()
  13. var isAdd bool
  14. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  15. indexCodeMap := make(map[string]string)
  16. for wk, wv := range item {
  17. for vk, vv := range wv {
  18. if vv == nil {
  19. continue
  20. }
  21. var indexCode string
  22. wkInt, err := strconv.ParseInt(wk, 10, 64)
  23. if err != nil {
  24. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  25. return err
  26. }
  27. vk = strings.ToLower(vk)
  28. indexCode = windWsd + stockCode + vk
  29. indexCodeMap[indexCode] = indexCode
  30. wkInt = wkInt / 1000
  31. t := time.Unix(wkInt, 0)
  32. if t.After(time.Now()) {
  33. continue
  34. }
  35. dateTime := t.Format(utils.FormatDate)
  36. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  37. if err != nil {
  38. return err
  39. }
  40. timestamp := dataTime.UnixNano() / 1e6
  41. timeStr := fmt.Sprintf("%d", timestamp)
  42. //saveVal := utils.SubFloatToString(val, 20)
  43. var saveVal string
  44. switch vt := vv.(type) {
  45. case int:
  46. saveVal = strconv.Itoa(vt)
  47. case float64:
  48. saveVal = utils.SubFloatToFloatStr(vt, 20)
  49. case string:
  50. saveVal = vt
  51. }
  52. addSql += GetAddSql("0", indexCode, dateTime, timeStr, saveVal)
  53. isAdd = true
  54. }
  55. }
  56. if isAdd {
  57. for _, v := range indexCodeMap {
  58. var count int
  59. sql := ` SELECT COUNT(1) FROM edb_data_wind_wsd WHERE edb_code=? `
  60. err = o.Raw(sql, v).QueryRow(&count)
  61. if err != nil {
  62. return err
  63. }
  64. if count > 0 {
  65. sql = ` DELETE FROM edb_data_wind_wsd WHERE edb_code=? `
  66. _, err = o.Raw(sql, v).Exec()
  67. if err != nil {
  68. return err
  69. }
  70. }
  71. }
  72. addSql = strings.TrimRight(addSql, ",")
  73. _, err = o.Raw(addSql).Exec()
  74. if err != nil {
  75. fmt.Println("sql exec err:" + err.Error())
  76. return
  77. }
  78. }
  79. return
  80. }
  81. // RefreshEdbDataFromWind 刷新wind指标数据
  82. func RefreshEdbDataFromWindWsd(edbInfoId int, edbCode, startDate string, item map[string]map[string]interface{}) (err error) {
  83. o := orm.NewOrm()
  84. source := utils.DATA_SOURCE_WIND
  85. subSource := utils.DATA_SUB_SOURCE_DATE
  86. // 真实数据的最大日期 , 插入规则配置的日期
  87. var realDataMaxDate, edbDataInsertConfigDate time.Time
  88. var edbDataInsertConfig *EdbDataInsertConfig
  89. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  90. {
  91. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  92. if err != nil && err.Error() != utils.ErrNoRow() {
  93. return
  94. }
  95. if edbDataInsertConfig != nil {
  96. edbDataInsertConfigDate = edbDataInsertConfig.Date
  97. }
  98. }
  99. var condition string
  100. var pars []interface{}
  101. condition += " AND edb_info_id=? "
  102. pars = append(pars, edbInfoId)
  103. var startDateTime time.Time
  104. if startDate != "" {
  105. condition += " AND data_time>=? "
  106. pars = append(pars, startDate)
  107. startDateTime, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  108. }
  109. existList, err := GetEdbDataByCondition(source, subSource, condition, pars)
  110. existMap := make(map[string]*EdbInfoSearchData)
  111. for _, v := range existList {
  112. existMap[v.DataTime] = v
  113. }
  114. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  115. var isAdd bool
  116. addMap := make(map[string]string)
  117. edbInfoIdStr := strconv.Itoa(edbInfoId)
  118. for wk, wv := range item {
  119. for vk, vv := range wv {
  120. if vv == nil {
  121. continue
  122. }
  123. wkInt, err := strconv.ParseInt(wk, 10, 64)
  124. if err != nil {
  125. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  126. return err
  127. }
  128. vk = strings.ToLower(vk)
  129. wkInt = wkInt / 1000
  130. t := time.Unix(wkInt, 0)
  131. if t.After(time.Now()) {
  132. continue
  133. }
  134. dateTime := t.Format(utils.FormatDate)
  135. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  136. if err != nil {
  137. return err
  138. }
  139. timestamp := dataTime.UnixNano() / 1e6
  140. timeStr := fmt.Sprintf("%d", timestamp)
  141. var saveVal string
  142. switch vt := vv.(type) {
  143. case int:
  144. saveVal = strconv.Itoa(vt)
  145. case float64:
  146. saveVal = utils.SubFloatToFloatStr(vt, 20)
  147. case string:
  148. saveVal = vt
  149. }
  150. //如果传入的开始时间是空的,且当前数据日期早于传入的开始日期,那么需要判断下当前日期的数据是否存在
  151. if !startDateTime.IsZero() && t.Before(startDateTime) {
  152. tmpItem, tmpErr := GetEdbDataByDate(source, subSource, edbCode, dateTime)
  153. if tmpErr == nil && tmpItem != nil {
  154. existMap[tmpItem.DataTime] = tmpItem
  155. }
  156. }
  157. if findItem, ok := existMap[dateTime]; !ok {
  158. if _, existOk := addMap[dateTime]; !existOk {
  159. isAdd = true
  160. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  161. addMap[dateTime] = saveVal
  162. }
  163. } else {
  164. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != saveVal {
  165. err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, saveVal)
  166. if err != nil {
  167. return err
  168. }
  169. }
  170. }
  171. // 下面代码主要目的是处理掉手动插入的数据判断
  172. {
  173. if realDataMaxDate.IsZero() || t.After(realDataMaxDate) {
  174. realDataMaxDate = t
  175. }
  176. if edbDataInsertConfigDate.IsZero() || t.Equal(edbDataInsertConfigDate) {
  177. isFindConfigDateRealData = true
  178. }
  179. }
  180. }
  181. }
  182. // 处理手工数据补充的配置
  183. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
  184. if isAdd {
  185. addSql = strings.TrimRight(addSql, ",")
  186. _, err = o.Raw(addSql).Exec()
  187. if err != nil {
  188. fmt.Println("RefreshEdbDataFromWind add Err", err.Error())
  189. return
  190. }
  191. }
  192. return
  193. }