base_from_wind_wsd.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package models
  2. import (
  3. "errors"
  4. "eta/eta_index_lib/global"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. )
  11. var windWsd = "wsd"
  12. func AddEdbDataFromWindWsd(stockCode string, item map[string]map[string]interface{}) (err error) {
  13. //o := orm.NewOrm()
  14. var isAdd bool
  15. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  16. addSql = utils.ReplaceDriverKeywords("", addSql)
  17. indexCodeMap := make(map[string]string)
  18. for wk, wv := range item {
  19. for vk, vv := range wv {
  20. if vv == nil {
  21. continue
  22. }
  23. var indexCode string
  24. wkInt, err := strconv.ParseInt(wk, 10, 64)
  25. if err != nil {
  26. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  27. return err
  28. }
  29. if vk == "OUTMESSAGE" {
  30. utils.FileLog.Info("OUTMESSAGE:" + vv.(string))
  31. err = errors.New("OUTMESSAGE:" + vv.(string))
  32. return err
  33. }
  34. vk = strings.ToLower(vk)
  35. indexCode = windWsd + stockCode + vk
  36. indexCodeMap[indexCode] = indexCode
  37. wkInt = wkInt / 1000
  38. t := time.Unix(wkInt, 0)
  39. if t.After(time.Now()) {
  40. continue
  41. }
  42. dateTime := t.Format(utils.FormatDate)
  43. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  44. if err != nil {
  45. return err
  46. }
  47. timestamp := dataTime.UnixNano() / 1e6
  48. timeStr := fmt.Sprintf("%d", timestamp)
  49. //saveVal := utils.SubFloatToString(val, 20)
  50. var saveVal string
  51. switch vt := vv.(type) {
  52. case int:
  53. saveVal = strconv.Itoa(vt)
  54. case float64:
  55. saveVal = utils.SubFloatToFloatStr(vt, 20)
  56. case string:
  57. saveVal = vt
  58. }
  59. addSql += GetAddSql("0", indexCode, dateTime, timeStr, saveVal)
  60. isAdd = true
  61. }
  62. }
  63. if isAdd {
  64. for _, v := range indexCodeMap {
  65. var count int
  66. sql := ` SELECT COUNT(1) FROM edb_data_wind_wsd WHERE edb_code=? `
  67. //err = o.Raw(sql, v).QueryRow(&count)
  68. err = global.DEFAULT_DB.Raw(sql, v).Scan(&count).Error
  69. if err != nil {
  70. return err
  71. }
  72. if count > 0 {
  73. sql = ` DELETE FROM edb_data_wind_wsd WHERE edb_code=? `
  74. //_, err = o.Raw(sql, v).Exec()
  75. err = global.DEFAULT_DB.Exec(sql, v).Error
  76. if err != nil {
  77. return err
  78. }
  79. }
  80. }
  81. addSql = strings.TrimRight(addSql, ",")
  82. //_, err = o.Raw(addSql).Exec()
  83. err = global.DEFAULT_DB.Exec(addSql).Error
  84. if err != nil {
  85. fmt.Println("sql exec err:" + err.Error())
  86. return
  87. }
  88. }
  89. return
  90. }
  91. // RefreshEdbDataFromWind 刷新wind指标数据
  92. func RefreshEdbDataFromWindWsd(edbInfoId int, edbCode, startDate string, item map[string]map[string]interface{}) (err error) {
  93. //o := orm.NewOrm()
  94. source := utils.DATA_SOURCE_WIND
  95. subSource := utils.DATA_SUB_SOURCE_DATE
  96. // 真实数据的最大日期 , 插入规则配置的日期
  97. var realDataMaxDate, edbDataInsertConfigDate time.Time
  98. var edbDataInsertConfig *EdbDataInsertConfig
  99. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  100. {
  101. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  102. if err != nil && !utils.IsErrNoRow(err) {
  103. return
  104. }
  105. if edbDataInsertConfig != nil {
  106. edbDataInsertConfigDate = edbDataInsertConfig.Date
  107. }
  108. }
  109. var condition string
  110. var pars []interface{}
  111. condition += " AND edb_info_id=? "
  112. pars = append(pars, edbInfoId)
  113. var startDateTime time.Time
  114. if startDate != "" {
  115. condition += " AND data_time>=? "
  116. pars = append(pars, startDate)
  117. startDateTime, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  118. }
  119. existList, err := GetEdbDataByCondition(source, subSource, condition, pars)
  120. existMap := make(map[string]*EdbInfoSearchData)
  121. for _, v := range existList {
  122. existMap[v.DataTime] = v
  123. }
  124. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  125. addSql = utils.ReplaceDriverKeywords("", addSql)
  126. var isAdd bool
  127. addMap := make(map[string]string)
  128. edbInfoIdStr := strconv.Itoa(edbInfoId)
  129. for wk, wv := range item {
  130. for vk, vv := range wv {
  131. if vv == nil {
  132. continue
  133. }
  134. wkInt, err := strconv.ParseInt(wk, 10, 64)
  135. if err != nil {
  136. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  137. return err
  138. }
  139. vk = strings.ToLower(vk)
  140. wkInt = wkInt / 1000
  141. t := time.Unix(wkInt, 0)
  142. if t.After(time.Now()) {
  143. continue
  144. }
  145. dateTime := t.Format(utils.FormatDate)
  146. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  147. if err != nil {
  148. return err
  149. }
  150. timestamp := dataTime.UnixNano() / 1e6
  151. timeStr := fmt.Sprintf("%d", timestamp)
  152. var saveVal string
  153. switch vt := vv.(type) {
  154. case int:
  155. saveVal = strconv.Itoa(vt)
  156. case float64:
  157. saveVal = utils.SubFloatToFloatStr(vt, 20)
  158. case string:
  159. saveVal = vt
  160. }
  161. //如果传入的开始时间是空的,且当前数据日期早于传入的开始日期,那么需要判断下当前日期的数据是否存在
  162. if !startDateTime.IsZero() && t.Before(startDateTime) {
  163. tmpItem, tmpErr := GetEdbDataByDate(source, subSource, edbCode, dateTime)
  164. if tmpErr == nil && tmpItem != nil {
  165. existMap[tmpItem.DataTime] = tmpItem
  166. }
  167. }
  168. if findItem, ok := existMap[dateTime]; !ok {
  169. if _, existOk := addMap[dateTime]; !existOk {
  170. isAdd = true
  171. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  172. addMap[dateTime] = saveVal
  173. }
  174. } else {
  175. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != saveVal {
  176. err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, saveVal)
  177. if err != nil {
  178. return err
  179. }
  180. }
  181. }
  182. // 下面代码主要目的是处理掉手动插入的数据判断
  183. {
  184. if realDataMaxDate.IsZero() || t.After(realDataMaxDate) {
  185. realDataMaxDate = t
  186. }
  187. if edbDataInsertConfigDate.IsZero() || t.Equal(edbDataInsertConfigDate) {
  188. isFindConfigDateRealData = true
  189. }
  190. }
  191. }
  192. }
  193. // 处理手工数据补充的配置
  194. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
  195. if isAdd {
  196. addSql = strings.TrimRight(addSql, ",")
  197. //_, err = o.Raw(addSql).Exec()
  198. err = global.DEFAULT_DB.Exec(addSql).Error
  199. if err != nil {
  200. fmt.Println("RefreshEdbDataFromWind add Err", err.Error())
  201. return
  202. }
  203. }
  204. return
  205. }