edb_data_pb.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package data_manage
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "hongze/hongze_chart_lib/utils"
  6. "net/url"
  7. "github.com/rdlucklib/rdluck_tools/http"
  8. "github.com/rdlucklib/rdluck_tools/orm"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. type EdbDataPb struct {
  14. EdbDataId int `orm:"column(edb_data_id);pk"`
  15. EdbInfoId int
  16. EdbCode string
  17. DataTime string
  18. Value float64
  19. Status int
  20. CreateTime time.Time
  21. ModifyTime time.Time
  22. Ticker string
  23. Field string
  24. DataTimestamp int64
  25. }
  26. func AddEdbDataPb(items []*EdbDataPb) (err error) {
  27. o := orm.NewOrm()
  28. o.Using("data")
  29. _, err = o.InsertMulti(1, items)
  30. return
  31. }
  32. func AddEdbDataPbBySql(sqlStr string) (err error) {
  33. o := orm.NewOrm()
  34. o.Using("data")
  35. _, err = o.Raw(sqlStr).Exec()
  36. return
  37. }
  38. func ModifyEdbDataPb(edbInfoId int, dataTime string, value float64) (err error) {
  39. o := orm.NewOrm()
  40. o.Using("data")
  41. sql := ` UPDATE edb_data_pb SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  42. _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
  43. return
  44. }
  45. func GetEdbDataPbMaxOrMinDate(edbCode string) (min_date, max_date string, err error) {
  46. o := orm.NewOrm()
  47. o.Using("data")
  48. sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_pb WHERE edb_code=? `
  49. err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date)
  50. return
  51. }
  52. func GetEdbDataPbByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) {
  53. o := orm.NewOrm()
  54. o.Using("data")
  55. sql := ` SELECT * FROM edb_data_pb WHERE edb_code=? ORDER BY data_time DESC LIMIT ? `
  56. _, err = o.Raw(sql, edbCode, size).QueryRows(&items)
  57. return
  58. }
  59. func GetEdbDataPbByCodeAndDate(edbCode string, startDate string) (count int, err error) {
  60. o := orm.NewOrm()
  61. o.Using("data")
  62. sql := ` SELECT COUNT(1) AS count FROM edb_data_pb WHERE edb_code=? AND data_time>=? `
  63. err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
  64. return
  65. }
  66. type EdbDataFromPb struct {
  67. Date map[string]int64 `json:"date"`
  68. Ticker map[string]string `json:"ticker"`
  69. Field map[string]string `json:"field"`
  70. Value map[string]float64 `json:"value"`
  71. }
  72. //全部刷新
  73. func RefreshAllEdbDataByPb(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
  74. o := orm.NewOrm()
  75. o.Using("data")
  76. o.Begin()
  77. defer func() {
  78. if err != nil {
  79. o.Rollback()
  80. } else {
  81. o.Commit()
  82. }
  83. }()
  84. refreshEdbCode := edbCode
  85. edbCode = url.QueryEscape(edbCode)
  86. bpUrl := utils.Hz_Data_PB_Url + `edbInfo/pb?EdbCode=%s&StartDate=%s&EndDate=%s`
  87. bpUrl = fmt.Sprintf(bpUrl, edbCode, startDate, endDate)
  88. utils.FileLog.Info("thsUrl:%s", bpUrl)
  89. body, err := http.Get(bpUrl)
  90. fmt.Println("RefreshAllEdbDataByPb body:")
  91. fmt.Println(string(body))
  92. if err != nil {
  93. return
  94. }
  95. item := new(EdbDataFromPb)
  96. err = json.Unmarshal(body, &item)
  97. if err != nil {
  98. return
  99. }
  100. //获取指标所有数据
  101. dataList := make([]*EdbDataBase, 0)
  102. dataTableName := GetEdbDataTableName(source)
  103. sql := `SELECT * FROM %s WHERE edb_info_id=? `
  104. sql = fmt.Sprintf(sql, dataTableName)
  105. _, err = o.Raw(sql, edbInfoId).QueryRows(&dataList)
  106. if err != nil {
  107. return err
  108. }
  109. dataMap := make(map[string]string)
  110. for _, v := range dataList {
  111. dataMap[v.DataTime] = v.Value
  112. }
  113. edbInfoIdStr := strconv.Itoa(edbInfoId)
  114. if len(item.Date) > 0 {
  115. dateMap := item.Date
  116. var isAdd bool
  117. addSql := ` INSERT INTO edb_data_pb(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,ticker,field,data_timestamp) values `
  118. nowStr := time.Now().Format(utils.FormatDateTime)
  119. for k, v := range dateMap {
  120. timeStr := fmt.Sprintf("%d", v)
  121. v = v / 1000
  122. t := time.Unix(v, 0)
  123. dateTime := t.Format(utils.FormatDate)
  124. val := item.Value[k]
  125. field := item.Field[k]
  126. ticker := item.Ticker[k]
  127. saveValue := utils.SubFloatToString(val, 30)
  128. if field == "PX_LAST" {
  129. if existVal, ok := dataMap[dateTime]; !ok {
  130. addSql += "("
  131. addSql += edbInfoIdStr + "," + "'" + refreshEdbCode + "'" + "," + "'" + dateTime + "'" + "," + saveValue + "," + "'" + nowStr + "'" +
  132. "," + "'" + nowStr + "'" + "," + "1" + "," + "'" + ticker + "'" + "," + "'" + field + "'" + "," + "'" + timeStr + "'"
  133. addSql += "),"
  134. isAdd = true
  135. } else {
  136. if existVal != saveValue {
  137. sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  138. sql = fmt.Sprintf(sql, dataTableName)
  139. _, err = o.Raw(sql, saveValue, edbInfoId, dateTime).Exec()
  140. if err != nil {
  141. return err
  142. }
  143. }
  144. }
  145. }
  146. }
  147. if isAdd {
  148. addSql = strings.TrimRight(addSql, ",")
  149. _, err = o.Raw(addSql).Exec()
  150. if err != nil {
  151. fmt.Println("RefreshAllEdbDataByPb add Err", err.Error())
  152. return
  153. }
  154. }
  155. }
  156. return
  157. }