base_from_wind_wsd.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package models
  2. import (
  3. "errors"
  4. "eta/eta_index_lib/global"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. )
  11. var windWsd = "wsd"
  12. func AddEdbDataFromWindWsd(stockCode string, item map[string]map[string]interface{}, days, period, priceAdj string) (err error) {
  13. //o := orm.NewOrm()
  14. var isAdd bool
  15. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  16. addSql = utils.ReplaceDriverKeywords("", addSql)
  17. // 指标编码后缀,默认值则无需拼接
  18. suffix := utils.GetWindWsdIndexCodeSuffix(period, days, priceAdj)
  19. indexCodeMap := make(map[string]string)
  20. for wk, wv := range item {
  21. for vk, vv := range wv {
  22. if vv == nil {
  23. continue
  24. }
  25. var indexCode string
  26. wkInt, err := strconv.ParseInt(wk, 10, 64)
  27. if err != nil {
  28. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  29. return err
  30. }
  31. if vk == "OUTMESSAGE" {
  32. utils.FileLog.Info("OUTMESSAGE:" + vv.(string))
  33. err = errors.New("OUTMESSAGE:" + vv.(string))
  34. return err
  35. }
  36. vk = strings.ToLower(vk)
  37. indexCode = windWsd + stockCode + vk + suffix
  38. indexCodeMap[indexCode] = indexCode
  39. wkInt = wkInt / 1000
  40. t := time.Unix(wkInt, 0)
  41. if t.After(time.Now()) {
  42. continue
  43. }
  44. dateTime := t.Format(utils.FormatDate)
  45. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  46. if err != nil {
  47. return err
  48. }
  49. timestamp := dataTime.UnixNano() / 1e6
  50. timeStr := fmt.Sprintf("%d", timestamp)
  51. //saveVal := utils.SubFloatToString(val, 20)
  52. var saveVal string
  53. switch vt := vv.(type) {
  54. case int:
  55. saveVal = strconv.Itoa(vt)
  56. case float64:
  57. saveVal = utils.SubFloatToFloatStr(vt, 20)
  58. case string:
  59. saveVal = vt
  60. }
  61. addSql += GetAddSql("0", indexCode, dateTime, timeStr, saveVal)
  62. isAdd = true
  63. }
  64. }
  65. if isAdd {
  66. for _, v := range indexCodeMap {
  67. var count int
  68. sql := ` SELECT COUNT(1) FROM edb_data_wind_wsd WHERE edb_code=? `
  69. //err = o.Raw(sql, v).QueryRow(&count)
  70. err = global.DEFAULT_DB.Raw(sql, v).Scan(&count).Error
  71. if err != nil {
  72. return err
  73. }
  74. if count > 0 {
  75. sql = ` DELETE FROM edb_data_wind_wsd WHERE edb_code=? `
  76. //_, err = o.Raw(sql, v).Exec()
  77. err = global.DEFAULT_DB.Exec(sql, v).Error
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. }
  83. addSql = strings.TrimRight(addSql, ",")
  84. //_, err = o.Raw(addSql).Exec()
  85. err = global.DEFAULT_DB.Exec(addSql).Error
  86. if err != nil {
  87. fmt.Println("sql exec err:" + err.Error())
  88. return
  89. }
  90. }
  91. return
  92. }
  93. // RefreshEdbDataFromWind 刷新wind指标数据
  94. func RefreshEdbDataFromWindWsd(edbInfoId int, edbCode, startDate string, item map[string]map[string]interface{}) (err error) {
  95. //o := orm.NewOrm()
  96. source := utils.DATA_SOURCE_WIND
  97. subSource := utils.DATA_SUB_SOURCE_DATE
  98. // 真实数据的最大日期 , 插入规则配置的日期
  99. var realDataMaxDate, edbDataInsertConfigDate time.Time
  100. var edbDataInsertConfig *EdbDataInsertConfig
  101. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  102. {
  103. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  104. if err != nil && !utils.IsErrNoRow(err) {
  105. return
  106. }
  107. if edbDataInsertConfig != nil {
  108. edbDataInsertConfigDate = edbDataInsertConfig.Date
  109. }
  110. }
  111. var condition string
  112. var pars []interface{}
  113. condition += " AND edb_info_id=? "
  114. pars = append(pars, edbInfoId)
  115. var startDateTime time.Time
  116. if startDate != "" {
  117. condition += " AND data_time>=? "
  118. pars = append(pars, startDate)
  119. startDateTime, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  120. }
  121. existList, err := GetEdbDataByCondition(source, subSource, condition, pars)
  122. existMap := make(map[string]*EdbInfoSearchData)
  123. for _, v := range existList {
  124. existMap[v.DataTime] = v
  125. }
  126. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  127. addSql = utils.ReplaceDriverKeywords("", addSql)
  128. var isAdd bool
  129. addMap := make(map[string]string)
  130. edbInfoIdStr := strconv.Itoa(edbInfoId)
  131. for wk, wv := range item {
  132. for vk, vv := range wv {
  133. if vv == nil {
  134. continue
  135. }
  136. wkInt, err := strconv.ParseInt(wk, 10, 64)
  137. if err != nil {
  138. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  139. return err
  140. }
  141. vk = strings.ToLower(vk)
  142. wkInt = wkInt / 1000
  143. t := time.Unix(wkInt, 0)
  144. if t.After(time.Now()) {
  145. continue
  146. }
  147. dateTime := t.Format(utils.FormatDate)
  148. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  149. if err != nil {
  150. return err
  151. }
  152. timestamp := dataTime.UnixNano() / 1e6
  153. timeStr := fmt.Sprintf("%d", timestamp)
  154. var saveVal string
  155. switch vt := vv.(type) {
  156. case int:
  157. saveVal = strconv.Itoa(vt)
  158. case float64:
  159. saveVal = utils.SubFloatToFloatStr(vt, 20)
  160. case string:
  161. saveVal = vt
  162. }
  163. //如果传入的开始时间是空的,且当前数据日期早于传入的开始日期,那么需要判断下当前日期的数据是否存在
  164. if !startDateTime.IsZero() && t.Before(startDateTime) {
  165. tmpItem, tmpErr := GetEdbDataByDate(source, subSource, edbCode, dateTime)
  166. if tmpErr == nil && tmpItem != nil {
  167. existMap[tmpItem.DataTime] = tmpItem
  168. }
  169. }
  170. if findItem, ok := existMap[dateTime]; !ok {
  171. if _, existOk := addMap[dateTime]; !existOk {
  172. isAdd = true
  173. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  174. addMap[dateTime] = saveVal
  175. }
  176. } else {
  177. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != saveVal {
  178. err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, saveVal)
  179. if err != nil {
  180. return err
  181. }
  182. }
  183. }
  184. // 下面代码主要目的是处理掉手动插入的数据判断
  185. {
  186. if realDataMaxDate.IsZero() || t.After(realDataMaxDate) {
  187. realDataMaxDate = t
  188. }
  189. if edbDataInsertConfigDate.IsZero() || t.Equal(edbDataInsertConfigDate) {
  190. isFindConfigDateRealData = true
  191. }
  192. }
  193. }
  194. }
  195. // 处理手工数据补充的配置
  196. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
  197. if isAdd {
  198. addSql = strings.TrimRight(addSql, ",")
  199. //_, err = o.Raw(addSql).Exec()
  200. err = global.DEFAULT_DB.Exec(addSql).Error
  201. if err != nil {
  202. fmt.Println("RefreshEdbDataFromWind add Err", err.Error())
  203. return
  204. }
  205. }
  206. return
  207. }