edb_data_wind.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package data_manage
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "hongze/hongze_chart_lib/utils"
  6. "github.com/rdlucklib/rdluck_tools/http"
  7. "github.com/rdlucklib/rdluck_tools/orm"
  8. "strconv"
  9. "strings"
  10. "time"
  11. )
  12. type EdbDataWind struct {
  13. EdbDataId int `orm:"column(edb_data_id);pk"`
  14. EdbInfoId int
  15. EdbCode string
  16. DataTime string
  17. Value float64
  18. Status int
  19. CreateTime time.Time
  20. ModifyTime time.Time
  21. DataTimestamp int64
  22. }
  23. func AddEdbDataWind(items []*EdbDataWind) (err error) {
  24. o := orm.NewOrm()
  25. o.Using("data")
  26. _, err = o.InsertMulti(1, items)
  27. return
  28. }
  29. func AddEdbDataWindBySql(sqlStr string) (err error) {
  30. o := orm.NewOrm()
  31. o.Using("data")
  32. _, err = o.Raw(sqlStr).Exec()
  33. return
  34. }
  35. func DeleteEdbDataWind(edbCode string) (err error) {
  36. o := orm.NewOrm()
  37. o.Using("data")
  38. sql := `DELETE FROM edb_data_wind WHERE edb_code=? `
  39. _, err = o.Raw(sql, edbCode).Exec()
  40. return
  41. }
  42. func ModifyEdbDataWind(edbInfoId int64, dataTime string, value float64) (err error) {
  43. o := orm.NewOrm()
  44. o.Using("data")
  45. sql := ` UPDATE edb_data_wind SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  46. _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
  47. return
  48. }
  49. func GetEdbDataWindByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) {
  50. o := orm.NewOrm()
  51. o.Using("data")
  52. sql := ` SELECT * FROM edb_data_wind WHERE edb_code=? ORDER BY data_time DESC LIMIT ? `
  53. _, err = o.Raw(sql, edbCode, size).QueryRows(&items)
  54. return
  55. }
  56. func GetEdbDataWindMaxOrMinDate(edbCode string) (min_date, max_date string, err error) {
  57. o := orm.NewOrm()
  58. o.Using("data")
  59. sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_wind WHERE edb_code=? `
  60. err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date)
  61. return
  62. }
  63. func GetEdbDataWindByCodeAndDate(edbCode string, startDate string) (count int, err error) {
  64. o := orm.NewOrm()
  65. o.Using("data")
  66. sql := ` SELECT COUNT(1) AS count FROM edb_data_wind WHERE edb_code=? AND data_time=? `
  67. err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
  68. return
  69. }
  70. type EdbDataFromWind struct {
  71. Close map[string]float64 `json:"CLOSE"`
  72. Dt map[string]int64 `json:"DT"`
  73. ErrMsg string
  74. }
  75. func RefreshAllEdbDataByWind(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
  76. o := orm.NewOrm()
  77. o.Using("data")
  78. o.Begin()
  79. defer func() {
  80. if err != nil {
  81. o.Rollback()
  82. } else {
  83. o.Commit()
  84. }
  85. }()
  86. fmt.Println("wind start:", time.Now())
  87. thsUrl := utils.Hz_Data_Url + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s`
  88. thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate)
  89. utils.FileLog.Info("thsUrl:%s", thsUrl)
  90. body, err := http.Get(thsUrl)
  91. utils.FileLog.Info("wind %s", string(body))
  92. fmt.Println("wind end:", time.Now())
  93. if err != nil {
  94. return
  95. }
  96. item := new(EdbDataFromWind)
  97. err = json.Unmarshal(body, &item)
  98. if err != nil {
  99. return
  100. }
  101. //获取指标所有数据
  102. dataList := make([]*EdbDataBase, 0)
  103. dataTableName := GetEdbDataTableName(source)
  104. sql := `SELECT * FROM %s WHERE edb_info_id=? `
  105. sql = fmt.Sprintf(sql, dataTableName)
  106. _, err = o.Raw(sql, edbInfoId).QueryRows(&dataList)
  107. if err != nil {
  108. return err
  109. }
  110. dataMap := make(map[string]string)
  111. for _, v := range dataList {
  112. dataMap[v.DataTime] = v.Value
  113. }
  114. addSql := ` INSERT INTO edb_data_wind(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values `
  115. var isAdd bool
  116. edbInfoIdStr := strconv.Itoa(edbInfoId)
  117. existDataMap := make(map[string]string)
  118. fmt.Println("start for:", time.Now())
  119. for k, v := range item.Dt {
  120. timeStr := fmt.Sprintf("%d", v)
  121. v = v / 1000
  122. t := time.Unix(v, 0)
  123. dateTime := t.Format(utils.FormatDate)
  124. val := item.Close[k]
  125. saveVal := utils.SubFloatToString(val, 30)
  126. if existVal, ok := dataMap[dateTime]; !ok {
  127. if _, existOk := existDataMap[dateTime]; !existOk {
  128. isAdd = true
  129. addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal)
  130. }
  131. existDataMap[dateTime] = dateTime
  132. } else {
  133. if existVal != saveVal {
  134. sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  135. sql = fmt.Sprintf(sql, dataTableName)
  136. _, err = o.Raw(sql, saveVal, edbInfoId, dateTime).Exec()
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. }
  142. }
  143. fmt.Println("end for:", time.Now())
  144. if isAdd {
  145. addSql = strings.TrimRight(addSql, ",")
  146. _, err = o.Raw(addSql).Exec()
  147. if err != nil {
  148. fmt.Println("RefreshAllEdbDataByWind add Err", err.Error())
  149. return
  150. }
  151. }
  152. return
  153. }