package data_manage import ( "encoding/json" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/rdlucklib/rdluck_tools/http" "hongze/hz_crm_api/utils" "strconv" "strings" "time" ) type EdbDataWind struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 Status int CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } func AddEdbDataWind(items []*EdbDataWind) (err error) { o := orm.NewOrmUsingDB("data") _, err = o.InsertMulti(1, items) return } func AddEdbDataWindBySql(sqlStr string) (err error) { o := orm.NewOrmUsingDB("data") _, err = o.Raw(sqlStr).Exec() return } func DeleteEdbDataWind(edbCode string) (err error) { o := orm.NewOrmUsingDB("data") sql := `DELETE FROM edb_data_wind WHERE edb_code=? ` _, err = o.Raw(sql, edbCode).Exec() return } func ModifyEdbDataWind(edbInfoId int64, dataTime string, value float64) (err error) { o := orm.NewOrmUsingDB("data") sql := ` UPDATE edb_data_wind SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? ` _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec() return } func GetEdbDataWindByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT * FROM edb_data_wind WHERE edb_code=? ORDER BY data_time DESC LIMIT ? ` _, err = o.Raw(sql, edbCode, size).QueryRows(&items) return } func GetEdbDataWindMaxOrMinDate(edbCode string) (min_date, max_date string, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_wind WHERE edb_code=? ` err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date) return } func GetEdbDataWindByCodeAndDate(edbCode string, startDate string) (count int, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT COUNT(1) AS count FROM edb_data_wind WHERE edb_code=? AND data_time=? ` err = o.Raw(sql, edbCode, startDate).QueryRow(&count) return } type EdbDataFromWind struct { Close map[string]float64 `json:"CLOSE"` Dt map[string]int64 `json:"DT"` ErrMsg string } func RefreshAllEdbDataByWind(edbInfoId, source int, edbCode, startDate, endDate string) (err error) { o := orm.NewOrmUsingDB("data") to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() } else { _ = to.Commit() } }() fmt.Println("wind start:", time.Now()) thsUrl := utils.Hz_Data_Url + `edbInfo/wind?EdbCode=%s&StartDate=%s&EndDate=%s` thsUrl = fmt.Sprintf(thsUrl, edbCode, startDate, endDate) utils.FileLog.Info("thsUrl:%s", thsUrl) body, err := http.Get(thsUrl) utils.FileLog.Info("wind %s", string(body)) fmt.Println("wind end:", time.Now()) if err != nil { return } item := new(EdbDataFromWind) err = json.Unmarshal(body, &item) if err != nil { return } //获取指标所有数据 dataList := make([]*EdbDataBase, 0) dataTableName := GetEdbDataTableName(source) sql := `SELECT * FROM %s WHERE edb_info_id=? ` sql = fmt.Sprintf(sql, dataTableName) _, err = to.Raw(sql, edbInfoId).QueryRows(&dataList) if err != nil { return err } dataMap := make(map[string]string) for _, v := range dataList { dataMap[v.DataTime] = v.Value } addSql := ` INSERT INTO edb_data_wind(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values ` var isAdd bool edbInfoIdStr := strconv.Itoa(edbInfoId) existDataMap := make(map[string]string) fmt.Println("start for:", time.Now()) for k, v := range item.Dt { timeStr := fmt.Sprintf("%d", v) v = v / 1000 t := time.Unix(v, 0) dateTime := t.Format(utils.FormatDate) val := item.Close[k] saveVal := utils.SubFloatToString(val, 30) if existVal, ok := dataMap[dateTime]; !ok { if _, existOk := existDataMap[dateTime]; !existOk { isAdd = true addSql += GetAddSql(edbInfoIdStr, edbCode, dateTime, timeStr, saveVal) } existDataMap[dateTime] = dateTime } else { if existVal != saveVal { sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? ` sql = fmt.Sprintf(sql, dataTableName) _, err = to.Raw(sql, saveVal, edbInfoId, dateTime).Exec() if err != nil { return err } } } } fmt.Println("end for:", time.Now()) if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = to.Raw(addSql).Exec() if err != nil { fmt.Println("RefreshAllEdbDataByWind add Err", err.Error()) return } } return }