edb_data_pb.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package data_manage
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "github.com/rdlucklib/rdluck_tools/http"
  7. "hongze/hz_crm_api/utils"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. type EdbDataPb struct {
  14. EdbDataId int `orm:"column(edb_data_id);pk"`
  15. EdbInfoId int
  16. EdbCode string
  17. DataTime string
  18. Value float64
  19. Status int
  20. CreateTime time.Time
  21. ModifyTime time.Time
  22. Ticker string
  23. Field string
  24. DataTimestamp int64
  25. }
  26. func AddEdbDataPb(items []*EdbDataPb) (err error) {
  27. o := orm.NewOrmUsingDB("data")
  28. _, err = o.InsertMulti(1, items)
  29. return
  30. }
  31. func AddEdbDataPbBySql(sqlStr string) (err error) {
  32. o := orm.NewOrmUsingDB("data")
  33. _, err = o.Raw(sqlStr).Exec()
  34. return
  35. }
  36. func ModifyEdbDataPb(edbInfoId int, dataTime string, value float64) (err error) {
  37. o := orm.NewOrmUsingDB("data")
  38. sql := ` UPDATE edb_data_pb SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  39. _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
  40. return
  41. }
  42. func GetEdbDataPbMaxOrMinDate(edbCode string) (min_date, max_date string, err error) {
  43. o := orm.NewOrmUsingDB("data")
  44. sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_pb WHERE edb_code=? `
  45. err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date)
  46. return
  47. }
  48. func GetEdbDataPbByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) {
  49. o := orm.NewOrmUsingDB("data")
  50. sql := ` SELECT * FROM edb_data_pb WHERE edb_code=? ORDER BY data_time DESC LIMIT ? `
  51. _, err = o.Raw(sql, edbCode, size).QueryRows(&items)
  52. return
  53. }
  54. func GetEdbDataPbByCodeAndDate(edbCode string, startDate string) (count int, err error) {
  55. o := orm.NewOrmUsingDB("data")
  56. sql := ` SELECT COUNT(1) AS count FROM edb_data_pb WHERE edb_code=? AND data_time>=? `
  57. err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
  58. return
  59. }
  60. type EdbDataFromPb struct {
  61. Date map[string]int64 `json:"date"`
  62. Ticker map[string]string `json:"ticker"`
  63. Field map[string]string `json:"field"`
  64. Value map[string]float64 `json:"value"`
  65. }
  66. // 全部刷新
  67. func RefreshAllEdbDataByPb(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
  68. o := orm.NewOrmUsingDB("data")
  69. to, err := o.Begin()
  70. if err != nil {
  71. return
  72. }
  73. defer func() {
  74. if err != nil {
  75. _ = to.Rollback()
  76. } else {
  77. _ = to.Commit()
  78. }
  79. }()
  80. refreshEdbCode := edbCode
  81. edbCode = url.QueryEscape(edbCode)
  82. bpUrl := utils.Hz_Data_PB_Url + `edbInfo/pb?EdbCode=%s&StartDate=%s&EndDate=%s`
  83. bpUrl = fmt.Sprintf(bpUrl, edbCode, startDate, endDate)
  84. utils.FileLog.Info("thsUrl:%s", bpUrl)
  85. body, err := http.Get(bpUrl)
  86. fmt.Println("RefreshAllEdbDataByPb body:")
  87. fmt.Println(string(body))
  88. if err != nil {
  89. return
  90. }
  91. item := new(EdbDataFromPb)
  92. err = json.Unmarshal(body, &item)
  93. if err != nil {
  94. return
  95. }
  96. //获取指标所有数据
  97. dataList := make([]*EdbDataBase, 0)
  98. dataTableName := GetEdbDataTableName(source)
  99. sql := `SELECT * FROM %s WHERE edb_info_id=? `
  100. sql = fmt.Sprintf(sql, dataTableName)
  101. _, err = to.Raw(sql, edbInfoId).QueryRows(&dataList)
  102. if err != nil {
  103. return err
  104. }
  105. dataMap := make(map[string]string)
  106. for _, v := range dataList {
  107. dataMap[v.DataTime] = v.Value
  108. }
  109. edbInfoIdStr := strconv.Itoa(edbInfoId)
  110. if len(item.Date) > 0 {
  111. dateMap := item.Date
  112. var isAdd bool
  113. addSql := ` INSERT INTO edb_data_pb(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,ticker,field,data_timestamp) values `
  114. nowStr := time.Now().Format(utils.FormatDateTime)
  115. for k, v := range dateMap {
  116. timeStr := fmt.Sprintf("%d", v)
  117. v = v / 1000
  118. t := time.Unix(v, 0)
  119. dateTime := t.Format(utils.FormatDate)
  120. val := item.Value[k]
  121. field := item.Field[k]
  122. ticker := item.Ticker[k]
  123. saveValue := utils.SubFloatToString(val, 30)
  124. if field == "PX_LAST" {
  125. if existVal, ok := dataMap[dateTime]; !ok {
  126. addSql += "("
  127. addSql += edbInfoIdStr + "," + "'" + refreshEdbCode + "'" + "," + "'" + dateTime + "'" + "," + saveValue + "," + "'" + nowStr + "'" +
  128. "," + "'" + nowStr + "'" + "," + "1" + "," + "'" + ticker + "'" + "," + "'" + field + "'" + "," + "'" + timeStr + "'"
  129. addSql += "),"
  130. isAdd = true
  131. } else {
  132. if existVal != saveValue {
  133. sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  134. sql = fmt.Sprintf(sql, dataTableName)
  135. _, err = to.Raw(sql, saveValue, edbInfoId, dateTime).Exec()
  136. if err != nil {
  137. return err
  138. }
  139. }
  140. }
  141. }
  142. }
  143. if isAdd {
  144. addSql = strings.TrimRight(addSql, ",")
  145. _, err = to.Raw(addSql).Exec()
  146. if err != nil {
  147. fmt.Println("RefreshAllEdbDataByPb add Err", err.Error())
  148. return
  149. }
  150. }
  151. }
  152. return
  153. }