edb_data_pb.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package data_manage
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "github.com/rdlucklib/rdluck_tools/http"
  7. "hongze/hongze_chart_lib/utils"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. type EdbDataPb struct {
  14. EdbDataId int `orm:"column(edb_data_id);pk"`
  15. EdbInfoId int
  16. EdbCode string
  17. DataTime string
  18. Value float64
  19. Status int
  20. CreateTime time.Time
  21. ModifyTime time.Time
  22. Ticker string
  23. Field string
  24. DataTimestamp int64
  25. }
  26. func AddEdbDataPb(items []*EdbDataPb) (err error) {
  27. o := orm.NewOrmUsingDB("data")
  28. _, err = o.InsertMulti(1, items)
  29. return
  30. }
  31. func AddEdbDataPbBySql(sqlStr string) (err error) {
  32. o := orm.NewOrmUsingDB("data")
  33. _, err = o.Raw(sqlStr).Exec()
  34. return
  35. }
  36. func ModifyEdbDataPb(edbInfoId int, dataTime string, value float64) (err error) {
  37. o := orm.NewOrmUsingDB("data")
  38. sql := ` UPDATE edb_data_pb SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  39. _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
  40. return
  41. }
  42. func GetEdbDataPbMaxOrMinDate(edbCode string) (min_date, max_date string, err error) {
  43. o := orm.NewOrmUsingDB("data")
  44. sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_pb WHERE edb_code=? `
  45. err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date)
  46. return
  47. }
  48. func GetEdbDataPbByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) {
  49. o := orm.NewOrmUsingDB("data")
  50. sql := ` SELECT * FROM edb_data_pb WHERE edb_code=? ORDER BY data_time DESC LIMIT ? `
  51. _, err = o.Raw(sql, edbCode, size).QueryRows(&items)
  52. return
  53. }
  54. func GetEdbDataPbByCodeAndDate(edbCode string, startDate string) (count int, err error) {
  55. o := orm.NewOrmUsingDB("data")
  56. sql := ` SELECT COUNT(1) AS count FROM edb_data_pb WHERE edb_code=? AND data_time>=? `
  57. err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
  58. return
  59. }
  60. type EdbDataFromPb struct {
  61. Date map[string]int64 `json:"date"`
  62. Ticker map[string]string `json:"ticker"`
  63. Field map[string]string `json:"field"`
  64. Value map[string]float64 `json:"value"`
  65. }
  66. //全部刷新
  67. func RefreshAllEdbDataByPb(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
  68. o := orm.NewOrmUsingDB("data")
  69. o.Begin()
  70. defer func() {
  71. if err != nil {
  72. o.Rollback()
  73. } else {
  74. o.Commit()
  75. }
  76. }()
  77. refreshEdbCode := edbCode
  78. edbCode = url.QueryEscape(edbCode)
  79. bpUrl := utils.Hz_Data_PB_Url + `edbInfo/pb?EdbCode=%s&StartDate=%s&EndDate=%s`
  80. bpUrl = fmt.Sprintf(bpUrl, edbCode, startDate, endDate)
  81. utils.FileLog.Info("thsUrl:%s", bpUrl)
  82. body, err := http.Get(bpUrl)
  83. fmt.Println("RefreshAllEdbDataByPb body:")
  84. fmt.Println(string(body))
  85. if err != nil {
  86. return
  87. }
  88. item := new(EdbDataFromPb)
  89. err = json.Unmarshal(body, &item)
  90. if err != nil {
  91. return
  92. }
  93. //获取指标所有数据
  94. dataList := make([]*EdbDataBase, 0)
  95. dataTableName := GetEdbDataTableName(source)
  96. sql := `SELECT * FROM %s WHERE edb_info_id=? `
  97. sql = fmt.Sprintf(sql, dataTableName)
  98. _, err = o.Raw(sql, edbInfoId).QueryRows(&dataList)
  99. if err != nil {
  100. return err
  101. }
  102. dataMap := make(map[string]string)
  103. for _, v := range dataList {
  104. dataMap[v.DataTime] = v.Value
  105. }
  106. edbInfoIdStr := strconv.Itoa(edbInfoId)
  107. if len(item.Date) > 0 {
  108. dateMap := item.Date
  109. var isAdd bool
  110. addSql := ` INSERT INTO edb_data_pb(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,ticker,field,data_timestamp) values `
  111. nowStr := time.Now().Format(utils.FormatDateTime)
  112. for k, v := range dateMap {
  113. timeStr := fmt.Sprintf("%d", v)
  114. v = v / 1000
  115. t := time.Unix(v, 0)
  116. dateTime := t.Format(utils.FormatDate)
  117. val := item.Value[k]
  118. field := item.Field[k]
  119. ticker := item.Ticker[k]
  120. saveValue := utils.SubFloatToString(val, 30)
  121. if field == "PX_LAST" {
  122. if existVal, ok := dataMap[dateTime]; !ok {
  123. addSql += "("
  124. addSql += edbInfoIdStr + "," + "'" + refreshEdbCode + "'" + "," + "'" + dateTime + "'" + "," + saveValue + "," + "'" + nowStr + "'" +
  125. "," + "'" + nowStr + "'" + "," + "1" + "," + "'" + ticker + "'" + "," + "'" + field + "'" + "," + "'" + timeStr + "'"
  126. addSql += "),"
  127. isAdd = true
  128. } else {
  129. if existVal != saveValue {
  130. sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  131. sql = fmt.Sprintf(sql, dataTableName)
  132. _, err = o.Raw(sql, saveValue, edbInfoId, dateTime).Exec()
  133. if err != nil {
  134. return err
  135. }
  136. }
  137. }
  138. }
  139. }
  140. if isAdd {
  141. addSql = strings.TrimRight(addSql, ",")
  142. _, err = o.Raw(addSql).Exec()
  143. if err != nil {
  144. fmt.Println("RefreshAllEdbDataByPb add Err", err.Error())
  145. return
  146. }
  147. }
  148. }
  149. return
  150. }