package data_manage import ( "errors" "eta/eta_api/models/mgo" "eta/eta_api/utils" "fmt" "github.com/beego/beego/v2/client/orm" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "strconv" "time" ) // EdbDataInsertConfig 指标数据插入配置表 type EdbDataInsertConfig struct { EdbInfoId int `orm:"column(edb_info_id);pk" description:"指标id"` Date time.Time `description:"插入的日期"` Value string `description:"插入的值"` RealDate time.Time `description:"实际数据的值日期"` ModifyTime time.Time `description:"数据更新时间"` CreateTime time.Time `description:"数据插入的时间"` } // GetEdbDataInsertConfigByEdbId 根据指标id 获取数据插入配置详情 func GetEdbDataInsertConfigByEdbId(edbInfoId int) (item *EdbDataInsertConfig, err error) { o := orm.NewOrmUsingDB("data") sql := ` SELECT * FROM edb_data_insert_config WHERE edb_info_id=? ` err = o.Raw(sql, edbInfoId).QueryRow(&item) return } // GetEdbDataInsertConfigByEdbIdList 根据指标id列表 获取数据插入配置详情 func GetEdbDataInsertConfigByEdbIdList(edbInfoIdList []int) (items []*EdbDataInsertConfig, err error) { num := len(edbInfoIdList) if num <= 0 { return } o := orm.NewOrmUsingDB("data") sql := ` SELECT * FROM edb_data_insert_config WHERE edb_info_id in (` + utils.GetOrmInReplace(num) + `) ` _, err = o.Raw(sql, edbInfoIdList).QueryRows(&items) return } // CreateEdbDataInsertConfigAndData 创建数据插入配置规则,及插入数据 func CreateEdbDataInsertConfigAndData(edbInfo *EdbInfo, date time.Time, value string) (err error, errMsg string, isSendEmail bool) { isSendEmail = true tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) if tableName == `` { err = errors.New("找不到该指标的数据表") return } dateStr := date.Format(utils.FormatDate) var saveValue string if value != "" { floatValue, e := strconv.ParseFloat(value, 64) if e != nil { err = e fmt.Println("转换失败:", e.Error()) return } saveValue = utils.SubFloatToString(floatValue, 30) } to, err := orm.NewOrmUsingDB("data").Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() } else { _ = to.Commit() } }() var oldConfigDate time.Time //之前配置的日期 // 添加/变更配置 { var item *EdbDataInsertConfig sql := ` SELECT * FROM edb_data_insert_config WHERE edb_info_id=? ` err = to.Raw(sql, edbInfo.EdbInfoId).QueryRow(&item) if err != nil && err.Error() != utils.ErrNoRow() { return } err = nil // 如果是没有配置,那么就需要添加配置 if item == nil { var currLatestDate time.Time currLatestDateStr := edbInfo.LatestDate // 实际日期 if currLatestDateStr != `` && currLatestDateStr != `0000-00-00` { currLatestDate, _ = time.ParseInLocation(utils.FormatDate, currLatestDateStr, time.Local) } if !currLatestDate.IsZero() && (currLatestDate.After(date) || currLatestDate.Equal(date)) { errMsg = `选择日期不能早于/等于数据最新日期` err = errors.New("选择日期不能早于/等于数据最新日期,数据最新日期:" + edbInfo.EndDate + ";填写日期:" + dateStr) isSendEmail = false return } realDate, _ := time.ParseInLocation(utils.FormatDate, edbInfo.LatestDate, time.Local) item = &EdbDataInsertConfig{ EdbInfoId: edbInfo.EdbInfoId, Date: date, Value: saveValue, RealDate: realDate, ModifyTime: time.Now(), CreateTime: time.Now(), } _, err = to.Insert(item) } else { if date.Equal(item.RealDate) || date.Before(item.RealDate) { errMsg = `选择日期不能早于/等于数据最新日期` err = errors.New("选择日期不能早于/等于数据最新日期,数据最新日期:" + edbInfo.EndDate + ";填写日期:" + dateStr) isSendEmail = false return } oldConfigDate = item.Date // 之前配置的日期 item.Date = date item.Value = saveValue item.ModifyTime = time.Now() _, err = to.Update(item, "Date", "Value", "ModifyTime") } if err != nil { return } } // 指标明细数据更新 if edbInfo.Source == utils.DATA_SOURCE_BUSINESS && utils.UseMongo { dateStr, err = updateInsertConfigValueByMongo(to, edbInfo, oldConfigDate, date, value) } else if edbInfo.Source == utils.DATA_SOURCE_THS && edbInfo.SubSource == utils.DATA_SUB_SOURCE_HIGH_FREQUENCY && utils.UseMongo { dateStr, err = updateThsHfInsertConfigValueByMongo(to, edbInfo, oldConfigDate, date, value) } else { dateStr, err = updateInsertConfigValueByMysql(to, edbInfo, oldConfigDate, date, value) } if err != nil { return } // 指标信息更新 edbInfo.EndDate = dateStr _, err = to.Update(edbInfo, "EndDate") return } // updateInsertConfigValueByMysql // @Description: 从mysql更新或插入配置值 // @author: Roc // @datetime 2024-05-09 11:05:41 // @param to orm.TxOrmer // @param edbInfo *EdbInfo // @param oldConfigDate // @param date time.Time // @param value string // @return dateStr string // @return err error func updateInsertConfigValueByMysql(to orm.TxOrmer, edbInfo *EdbInfo, oldConfigDate, date time.Time, value string) (dateStr string, err error) { tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) if tableName == `` { err = errors.New("找不到该指标的数据表") return } dateStr = date.Format(utils.FormatDate) timestamp := date.UnixNano() / 1e6 timeStr := fmt.Sprintf("%d", timestamp) var saveValue string if value != "" { floatValue, e := strconv.ParseFloat(value, 64) if e != nil { err = e fmt.Println("转换失败:", e.Error()) return } saveValue = utils.SubFloatToString(floatValue, 30) } var offsetDate string // 更改数据 { var edbDateData *EdbDataList if !oldConfigDate.IsZero() { sql := `SELECT edb_data_id,edb_info_id,data_time,value,data_timestamp FROM %s WHERE edb_info_id=? AND data_time = ?` sql = fmt.Sprintf(sql, tableName) err = to.Raw(sql, edbInfo.EdbInfoId, oldConfigDate.Format(utils.FormatDate)).QueryRow(&edbDateData) if err != nil && err.Error() != utils.ErrNoRow() { return } err = nil } // 如果是没有历史数据,那么就需要增加数据 if edbDateData == nil { addSql := ` INSERT INTO %s (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) VALUES ( %d, "%s", "%s", "%s", now(), now(), %s) ` addSql = fmt.Sprintf(addSql, tableName, edbInfo.EdbInfoId, edbInfo.EdbCode, dateStr, saveValue, timeStr) _, err = to.Raw(addSql).Exec() } else if value == "" { // 传空值时删除 sql := `SELECT data_time FROM %s WHERE edb_info_id=? ORDER BY data_time DESC LIMIT 1 OFFSET 1` sql = fmt.Sprintf(sql, tableName) err = to.Raw(sql, edbInfo.EdbInfoId).QueryRow(&offsetDate) if err != nil && err.Error() != utils.ErrNoRow() { return } dateStr = offsetDate deleteSql := `DELETE FROM %s WHERE edb_data_id = %d;` deleteSql = fmt.Sprintf(deleteSql, tableName, edbDateData.EdbDataId) _, err = to.Raw(deleteSql).Exec() } else { updateSql := `UPDATE %s SET data_time = "%s", value = "%s", modify_time= now(), data_timestamp= %s WHERE edb_data_id = %d;` updateSql = fmt.Sprintf(updateSql, tableName, dateStr, saveValue, timeStr, edbDateData.EdbDataId) _, err = to.Raw(updateSql).Exec() } if err != nil { return } } return } // updateInsertConfigValueByMongo // @Description: 从mongo更新或插入配置值 // @author: Roc // @datetime 2024-05-09 11:05:49 // @param to orm.TxOrmer // @param edbInfo *EdbInfo // @param oldConfigDate // @param newDate time.Time // @param value string // @return dateStr string // @return err error func updateInsertConfigValueByMongo(to orm.TxOrmer, edbInfo *EdbInfo, oldConfigDate, newDate time.Time, value string) (dateStr string, err error) { tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) if tableName == `` { err = errors.New("找不到该指标的数据表") return } dateStr = newDate.Format(utils.FormatDate) timestamp := newDate.UnixNano() / 1e6 var floatValue float64 if value != "" { floatValue, err = strconv.ParseFloat(value, 64) if err != nil { fmt.Println("转换失败:", err.Error()) return } } mogDataObj := mgo.EdbDataBusiness{} coll := mogDataObj.GetCollection() var edbDateData *mgo.EdbDataBusiness if !oldConfigDate.IsZero() { // 构建查询条件 queryConditions := bson.M{ "edb_info_id": edbInfo.EdbInfoId, "data_time": oldConfigDate, } edbDateData, err = mogDataObj.GetItem(queryConditions) //if tmpErr != nil && tmpErr == mongo.ErrNoDocuments { // err = tmpErr // return //} if err != nil && err != mongo.ErrNoDocuments { return } err = nil } // 如果是没有历史数据,那么就需要增加数据 if edbDateData == nil { addDataItem := mgo.EdbDataBusiness{ //ID: primitive.ObjectID{}, EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: newDate, Value: floatValue, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } err = mogDataObj.InsertDataByColl(coll, addDataItem) if err != nil { fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error()) return } return } // 数据清空 if value == "" { queryConditions := bson.M{ "edb_info_id": edbInfo.EdbInfoId, } // 获取最新的两条数据 tmpDataList, tmpErr := mogDataObj.GetLimitDataList(queryConditions, 2, []string{"-data_time"}) if tmpErr != nil { fmt.Println("mogDataObj.GetLimitDataList() Err:" + tmpErr.Error()) return } // 如果并没有两条数据,那么就返回 if len(tmpDataList) < 2 { return } // 实际应该是倒数第二条数据的日期 dateStr = tmpDataList[1].DataTime.Format(utils.FormatDate) // 删除插入的数据 err = mogDataObj.RemoveManyByColl(coll, bson.M{"_id": tmpDataList[0].ID}) return } // 更新配置的数据 updateData := bson.M{"$set": bson.M{ "value": floatValue, "modify_time": time.Now(), "data_time": newDate, "data_timestamp": timestamp, }} err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": edbDateData.ID}, updateData) return } func updateThsHfInsertConfigValueByMongo(to orm.TxOrmer, edbInfo *EdbInfo, oldConfigDate, newDate time.Time, value string) (dateStr string, err error) { tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) if tableName == `` { err = errors.New("找不到该指标的数据表") return } dateStr = newDate.Format(utils.FormatDate) timestamp := newDate.UnixNano() / 1e6 var floatValue float64 if value != "" { floatValue, err = strconv.ParseFloat(value, 64) if err != nil { fmt.Println("转换失败:", err.Error()) return } } mogDataObj := mgo.EdbDataThsHf{} coll := mogDataObj.GetCollection() var edbDateData *mgo.EdbDataThsHf if !oldConfigDate.IsZero() { // 构建查询条件 queryConditions := bson.M{ "edb_info_id": edbInfo.EdbInfoId, "data_time": oldConfigDate, } edbDateData, err = mogDataObj.GetItem(queryConditions) //if tmpErr != nil && tmpErr == mongo.ErrNoDocuments { // err = tmpErr // return //} if err != nil && err != mongo.ErrNoDocuments { return } err = nil } // 如果是没有历史数据,那么就需要增加数据 if edbDateData == nil { addDataItem := mgo.EdbDataThsHf{ //ID: primitive.ObjectID{}, EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: newDate, Value: floatValue, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } err = mogDataObj.InsertDataByColl(coll, addDataItem) if err != nil { fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error()) return } return } // 数据清空 if value == "" { queryConditions := bson.M{ "edb_info_id": edbInfo.EdbInfoId, } // 获取最新的两条数据 tmpDataList, tmpErr := mogDataObj.GetLimitDataList(queryConditions, 2, []string{"-data_time"}) if tmpErr != nil { fmt.Println("mogDataObj.GetLimitDataList() Err:" + tmpErr.Error()) return } // 如果并没有两条数据,那么就返回 if len(tmpDataList) < 2 { return } // 实际应该是倒数第二条数据的日期 dateStr = tmpDataList[1].DataTime.Format(utils.FormatDate) // 删除插入的数据 err = mogDataObj.RemoveManyByColl(coll, bson.M{"_id": tmpDataList[0].ID}) return } // 更新配置的数据 updateData := bson.M{"$set": bson.M{ "value": floatValue, "modify_time": time.Now(), "data_time": newDate, "data_timestamp": timestamp, }} err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": edbDateData.ID}, updateData) return }