package data_manage import ( "encoding/json" "errors" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/rdlucklib/rdluck_tools/http" "hongze/hz_crm_api/utils" "strconv" "strings" "time" ) type EdbDataThs struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 Status int CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } func AddEdbDataThsBySql(sqlStr string) (err error) { o := orm.NewOrmUsingDB("data") _, err = o.Raw(sqlStr).Exec() return } func ModifyEdbDataThs(edbInfoId int64, dataTime string, value float64) (err error) { o := orm.NewOrmUsingDB("data") sql := ` UPDATE edb_data_ths SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? ` _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec() return } func GetEdbDataThsMaxOrMinDate(edbCode string) (min_date, max_date string, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_ths WHERE edb_code=? ` err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date) return } func GetEdbDataThsByCodeAndDate(edbCode string, startDate string) (count int, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT COUNT(1) AS count FROM edb_data_ths WHERE edb_code=? AND data_time=? ` err = o.Raw(sql, edbCode, startDate).QueryRow(&count) return } func GetEdbDataThsByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT * FROM edb_data_ths WHERE edb_code=? ORDER BY data_time DESC LIMIT ? ` _, err = o.Raw(sql, edbCode, size).QueryRows(&items) return } type EdbDataFromThs struct { DataVol int64 `json:"dataVol"` Errmsg string `json:"errmsg"` Errorcode int64 `json:"errorcode"` Perf interface{} `json:"perf"` Tables []struct { ID []string `json:"id"` Time []string `json:"time"` Value []float64 `json:"value"` } `json:"tables"` } // 刷新所有数据 func RefreshAllEdbDataByThs(edbInfoId, source int, edbCode, startDate, endDate string) (err error) { o := orm.NewOrmUsingDB("data") to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() } else { _ = to.Commit() } }() thsUrl := utils.Hz_Data_Url + `edbInfo/ths?EdbCode=%s&StartDate=%s&EndDate=%s` thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate) utils.FileLog.Info("thsUrl:%s", thsUrl) body, err := http.Get(thsUrl) fmt.Println("GetEdbDataByThs body:") fmt.Println(string(body)) if err != nil { return } item := new(EdbDataFromThs) err = json.Unmarshal(body, &item) if err != nil { return } if item.Errorcode != 0 { err = errors.New(string(body)) return } //获取指标所有数据 dataList := make([]*EdbDataBase, 0) dataTableName := GetEdbDataTableName(source) sql := `SELECT * FROM %s WHERE edb_info_id=? ` sql = fmt.Sprintf(sql, dataTableName) _, err = to.Raw(sql, edbInfoId).QueryRows(&dataList) if err != nil { return err } dataMap := make(map[string]string) for _, v := range dataList { dataMap[v.DataTime] = v.Value } edbInfoIdStr := strconv.Itoa(edbInfoId) if len(item.Tables) > 0 { table := item.Tables[0] dataLen := len(table.Time) addSql := ` INSERT INTO edb_data_ths(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values ` var isAdd bool for i := 0; i < dataLen; i++ { eDate := table.Time[i] sValue := table.Value[i] saveValue := utils.SubFloatToString(sValue, 30) if existVal, ok := dataMap[eDate]; !ok { dataTime, err := time.Parse(utils.FormatDate, eDate) if err != nil { } timestamp := dataTime.UnixNano() / 1e6 timeStr := fmt.Sprintf("%d", timestamp) addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, saveValue) isAdd = true } else { if existVal != saveValue { sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? ` sql = fmt.Sprintf(sql, dataTableName) _, err = to.Raw(sql, sValue, edbInfoId, eDate).Exec() if err != nil { return err } } } } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = to.Raw(addSql).Exec() if err != nil { fmt.Println("RefreshAllEdbDataByThs add Err", err.Error()) return } } } return }