edb_data_fubao.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package data_manage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/beego/beego/v2/client/orm"
  7. "github.com/rdlucklib/rdluck_tools/http"
  8. "hongze/hz_crm_api/utils"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. //富宝数据
  14. type EdbDataFubao struct {
  15. EdbDataId int `orm:"column(edb_data_id);pk"`
  16. EdbInfoId int
  17. EdbCode string
  18. DataTime string
  19. Value float64
  20. Status int
  21. CreateTime time.Time
  22. ModifyTime time.Time
  23. DataTimestamp int64
  24. }
  25. func AddEdbDataFubaoBySql(sqlStr string) (err error) {
  26. o := orm.NewOrmUsingDB("data")
  27. _, err = o.Raw(sqlStr).Exec()
  28. return
  29. }
  30. func ModifyEdbDataFubao(edbInfoId int64, dataTime string, value float64) (err error) {
  31. o := orm.NewOrmUsingDB("data")
  32. sql := ` UPDATE edb_data_ths SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  33. _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
  34. return
  35. }
  36. func GetEdbDataFubaoMaxOrMinDate(edbCode string) (min_date, max_date string, err error) {
  37. o := orm.NewOrmUsingDB("data")
  38. sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_ths WHERE edb_code=? `
  39. err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date)
  40. return
  41. }
  42. func GetEdbDataFubaoByCodeAndDate(edbCode string, startDate string) (count int, err error) {
  43. o := orm.NewOrmUsingDB("data")
  44. sql := ` SELECT COUNT(1) AS count FROM edb_data_ths WHERE edb_code=? AND data_time=? `
  45. err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
  46. return
  47. }
  48. func GetEdbDataFubaoByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) {
  49. o := orm.NewOrmUsingDB("data")
  50. sql := ` SELECT * FROM edb_data_ths WHERE edb_code=? ORDER BY data_time DESC LIMIT ? `
  51. _, err = o.Raw(sql, edbCode, size).QueryRows(&items)
  52. return
  53. }
  54. type EdbDataFromFubao struct {
  55. DataVol int64 `json:"dataVol"`
  56. Errmsg string `json:"errmsg"`
  57. Errorcode int64 `json:"errorcode"`
  58. Perf interface{} `json:"perf"`
  59. Tables []struct {
  60. ID []string `json:"id"`
  61. Time []string `json:"time"`
  62. Value []float64 `json:"value"`
  63. } `json:"tables"`
  64. }
  65. // 刷新所有数据
  66. func RefreshAllEdbDataByFubao(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
  67. o := orm.NewOrmUsingDB("data")
  68. to, err := o.Begin()
  69. if err != nil {
  70. return
  71. }
  72. defer func() {
  73. if err != nil {
  74. _ = to.Rollback()
  75. } else {
  76. _ = to.Commit()
  77. }
  78. }()
  79. thsUrl := utils.Hz_Data_Url + `edbInfo/ths?EdbCode=%s&StartDate=%s&EndDate=%s`
  80. thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate)
  81. utils.FileLog.Info("thsUrl:%s", thsUrl)
  82. body, err := http.Get(thsUrl)
  83. fmt.Println("GetEdbDataByThs body:")
  84. fmt.Println(string(body))
  85. if err != nil {
  86. return
  87. }
  88. item := new(EdbDataFromThs)
  89. err = json.Unmarshal(body, &item)
  90. if err != nil {
  91. return
  92. }
  93. if item.Errorcode != 0 {
  94. err = errors.New(string(body))
  95. return
  96. }
  97. //获取指标所有数据
  98. dataList := make([]*EdbDataBase, 0)
  99. dataTableName := GetEdbDataTableName(source)
  100. sql := `SELECT * FROM %s WHERE edb_info_id=? `
  101. sql = fmt.Sprintf(sql, dataTableName)
  102. _, err = to.Raw(sql, edbInfoId).QueryRows(&dataList)
  103. if err != nil {
  104. return err
  105. }
  106. dataMap := make(map[string]string)
  107. for _, v := range dataList {
  108. dataMap[v.DataTime] = v.Value
  109. }
  110. edbInfoIdStr := strconv.Itoa(edbInfoId)
  111. if len(item.Tables) > 0 {
  112. table := item.Tables[0]
  113. dataLen := len(table.Time)
  114. addSql := ` INSERT INTO edb_data_ths(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values `
  115. var isAdd bool
  116. for i := 0; i < dataLen; i++ {
  117. eDate := table.Time[i]
  118. sValue := table.Value[i]
  119. saveValue := utils.SubFloatToString(sValue, 30)
  120. if existVal, ok := dataMap[eDate]; !ok {
  121. dataTime, err := time.Parse(utils.FormatDate, eDate)
  122. if err != nil {
  123. }
  124. timestamp := dataTime.UnixNano() / 1e6
  125. timeStr := fmt.Sprintf("%d", timestamp)
  126. addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, saveValue)
  127. isAdd = true
  128. } else {
  129. if existVal != saveValue {
  130. sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  131. sql = fmt.Sprintf(sql, dataTableName)
  132. _, err = to.Raw(sql, sValue, edbInfoId, eDate).Exec()
  133. if err != nil {
  134. return err
  135. }
  136. }
  137. }
  138. }
  139. if isAdd {
  140. addSql = strings.TrimRight(addSql, ",")
  141. _, err = to.Raw(addSql).Exec()
  142. if err != nil {
  143. fmt.Println("RefreshAllEdbDataByThs add Err", err.Error())
  144. return
  145. }
  146. }
  147. }
  148. return
  149. }