base_from_wind_wsd.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package models
  2. import (
  3. "errors"
  4. "eta/eta_index_lib/global"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. )
  11. var windWsd = "wsd"
  12. func AddEdbDataFromWindWsd(stockCode string, item map[string]map[string]interface{}) (err error) {
  13. //o := orm.NewOrm()
  14. var isAdd bool
  15. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  16. indexCodeMap := make(map[string]string)
  17. for wk, wv := range item {
  18. for vk, vv := range wv {
  19. if vv == nil {
  20. continue
  21. }
  22. var indexCode string
  23. wkInt, err := strconv.ParseInt(wk, 10, 64)
  24. if err != nil {
  25. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  26. return err
  27. }
  28. if vk == "OUTMESSAGE" {
  29. utils.FileLog.Info("OUTMESSAGE:" + vv.(string))
  30. err = errors.New("OUTMESSAGE:" + vv.(string))
  31. return err
  32. }
  33. vk = strings.ToLower(vk)
  34. indexCode = windWsd + stockCode + vk
  35. indexCodeMap[indexCode] = indexCode
  36. wkInt = wkInt / 1000
  37. t := time.Unix(wkInt, 0)
  38. if t.After(time.Now()) {
  39. continue
  40. }
  41. dateTime := t.Format(utils.FormatDate)
  42. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  43. if err != nil {
  44. return err
  45. }
  46. timestamp := dataTime.UnixNano() / 1e6
  47. timeStr := fmt.Sprintf("%d", timestamp)
  48. //saveVal := utils.SubFloatToString(val, 20)
  49. var saveVal string
  50. switch vt := vv.(type) {
  51. case int:
  52. saveVal = strconv.Itoa(vt)
  53. case float64:
  54. saveVal = utils.SubFloatToFloatStr(vt, 20)
  55. case string:
  56. saveVal = vt
  57. }
  58. addSql += GetAddSql("0", indexCode, dateTime, timeStr, saveVal)
  59. isAdd = true
  60. }
  61. }
  62. if isAdd {
  63. for _, v := range indexCodeMap {
  64. var count int
  65. sql := ` SELECT COUNT(1) FROM edb_data_wind_wsd WHERE edb_code=? `
  66. //err = o.Raw(sql, v).QueryRow(&count)
  67. err = global.DEFAULT_DB.Raw(sql, v).Scan(&count).Error
  68. if err != nil {
  69. return err
  70. }
  71. if count > 0 {
  72. sql = ` DELETE FROM edb_data_wind_wsd WHERE edb_code=? `
  73. //_, err = o.Raw(sql, v).Exec()
  74. err = global.DEFAULT_DB.Exec(sql, v).Error
  75. if err != nil {
  76. return err
  77. }
  78. }
  79. }
  80. addSql = strings.TrimRight(addSql, ",")
  81. //_, err = o.Raw(addSql).Exec()
  82. err = global.DEFAULT_DB.Exec(addSql).Error
  83. if err != nil {
  84. fmt.Println("sql exec err:" + err.Error())
  85. return
  86. }
  87. }
  88. return
  89. }
  90. // RefreshEdbDataFromWind 刷新wind指标数据
  91. func RefreshEdbDataFromWindWsd(edbInfoId int, edbCode, startDate string, item map[string]map[string]interface{}) (err error) {
  92. //o := orm.NewOrm()
  93. source := utils.DATA_SOURCE_WIND
  94. subSource := utils.DATA_SUB_SOURCE_DATE
  95. // 真实数据的最大日期 , 插入规则配置的日期
  96. var realDataMaxDate, edbDataInsertConfigDate time.Time
  97. var edbDataInsertConfig *EdbDataInsertConfig
  98. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  99. {
  100. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  101. if err != nil && !utils.IsErrNoRow(err) {
  102. return
  103. }
  104. if edbDataInsertConfig != nil {
  105. edbDataInsertConfigDate = edbDataInsertConfig.Date
  106. }
  107. }
  108. var condition string
  109. var pars []interface{}
  110. condition += " AND edb_info_id=? "
  111. pars = append(pars, edbInfoId)
  112. var startDateTime time.Time
  113. if startDate != "" {
  114. condition += " AND data_time>=? "
  115. pars = append(pars, startDate)
  116. startDateTime, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  117. }
  118. existList, err := GetEdbDataByCondition(source, subSource, condition, pars)
  119. existMap := make(map[string]*EdbInfoSearchData)
  120. for _, v := range existList {
  121. existMap[v.DataTime] = v
  122. }
  123. addSql := ` INSERT INTO edb_data_wind_wsd(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  124. var isAdd bool
  125. addMap := make(map[string]string)
  126. edbInfoIdStr := strconv.Itoa(edbInfoId)
  127. for wk, wv := range item {
  128. for vk, vv := range wv {
  129. if vv == nil {
  130. continue
  131. }
  132. wkInt, err := strconv.ParseInt(wk, 10, 64)
  133. if err != nil {
  134. fmt.Println("ParseInt Err:" + err.Error() + ";wk:" + wk)
  135. return err
  136. }
  137. vk = strings.ToLower(vk)
  138. wkInt = wkInt / 1000
  139. t := time.Unix(wkInt, 0)
  140. if t.After(time.Now()) {
  141. continue
  142. }
  143. dateTime := t.Format(utils.FormatDate)
  144. dataTime, err := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
  145. if err != nil {
  146. return err
  147. }
  148. timestamp := dataTime.UnixNano() / 1e6
  149. timeStr := fmt.Sprintf("%d", timestamp)
  150. var saveVal string
  151. switch vt := vv.(type) {
  152. case int:
  153. saveVal = strconv.Itoa(vt)
  154. case float64:
  155. saveVal = utils.SubFloatToFloatStr(vt, 20)
  156. case string:
  157. saveVal = vt
  158. }
  159. //如果传入的开始时间是空的,且当前数据日期早于传入的开始日期,那么需要判断下当前日期的数据是否存在
  160. if !startDateTime.IsZero() && t.Before(startDateTime) {
  161. tmpItem, tmpErr := GetEdbDataByDate(source, subSource, edbCode, dateTime)
  162. if tmpErr == nil && tmpItem != nil {
  163. existMap[tmpItem.DataTime] = tmpItem
  164. }
  165. }
  166. if findItem, ok := existMap[dateTime]; !ok {
  167. if _, existOk := addMap[dateTime]; !existOk {
  168. isAdd = true
  169. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  170. addMap[dateTime] = saveVal
  171. }
  172. } else {
  173. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != saveVal {
  174. err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, saveVal)
  175. if err != nil {
  176. return err
  177. }
  178. }
  179. }
  180. // 下面代码主要目的是处理掉手动插入的数据判断
  181. {
  182. if realDataMaxDate.IsZero() || t.After(realDataMaxDate) {
  183. realDataMaxDate = t
  184. }
  185. if edbDataInsertConfigDate.IsZero() || t.Equal(edbDataInsertConfigDate) {
  186. isFindConfigDateRealData = true
  187. }
  188. }
  189. }
  190. }
  191. // 处理手工数据补充的配置
  192. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
  193. if isAdd {
  194. addSql = strings.TrimRight(addSql, ",")
  195. //_, err = o.Raw(addSql).Exec()
  196. err = global.DEFAULT_DB.Exec(addSql).Error
  197. if err != nil {
  198. fmt.Println("RefreshEdbDataFromWind add Err", err.Error())
  199. return
  200. }
  201. }
  202. return
  203. }