package services import ( "errors" "eta_gn/eta_index_lib/logic" "eta_gn/eta_index_lib/models" "eta_gn/eta_index_lib/models/mgo" "eta_gn/eta_index_lib/services/alarm_msg" "eta_gn/eta_index_lib/utils" "fmt" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/bson" "strings" "time" ) // HandleBusinessIndex // @Description: 处理外部指标 // @author: Roc // @datetime 2024-04-26 14:23:42 // @param indexReq *models.AddBusinessIndexReq // @return err error func HandleBusinessIndex(indexReq *models.AddBusinessIndexReq) (resp models.BaseFromBusinessIndexResp, err error) { defer func() { if err != nil { // 添加刷新失败日志 dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexReq.IndexCode) if e == nil { //查询指标存在,才添加刷新日志 _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) } } }() // 没有数据就返回 if indexReq.DataList == nil || len(indexReq.DataList) <= 0 { return } // 兼容频度缺少度的字段 if !strings.Contains(indexReq.Frequency, "度") { indexReq.Frequency = indexReq.Frequency + "度" } if !utils.VerifyFrequency(indexReq.Frequency) { err = errors.New("指标频度不合法:" + indexReq.Frequency) return } // 判断来源,如果来源不存在的话,则创建 sourceObj := new(models.EdbBusinessSource) sourceItem, err := sourceObj.GetEdbBusinessSourceItem(indexReq.SourceName) if err != nil { if err.Error() == utils.ErrNoRow() { sourceItem = &models.EdbBusinessSource{ EdbBusinessSourceId: 0, SourceName: indexReq.SourceName, CreateTime: time.Now(), } err = sourceItem.Add() } else { return } } // 指标 indexObj := new(models.BaseFromBusinessIndex) if indexReq.IndexCode == `` { // 如果指标编码为空,那么自动生成 currId, tmpErr := indexObj.GetMaxId() if tmpErr != nil { err = tmpErr return } indexReq.IndexCode = fmt.Sprintf("SELF%07d", currId+1) } //判断指标是否存在 item, err := indexObj.GetIndexItem(indexReq.IndexCode) if err != nil { if err.Error() != utils.ErrNoRow() { return } // 添加指标 item = &models.BaseFromBusinessIndex{ BaseFromBusinessIndexId: 0, IndexCode: indexReq.IndexCode, IndexName: indexReq.IndexName, Unit: indexReq.Unit, Frequency: indexReq.Frequency, Source: int(sourceItem.EdbBusinessSourceId), SourceName: sourceItem.SourceName, //StartDate: time.Time{}, //EndDate: time.Time{}, Remark: indexReq.Remark, BaseModifyTime: time.Now(), DataUpdateTime: time.Now(), CreateTime: time.Now(), ModifyTime: time.Now(), } err = item.Add() if err != nil { fmt.Println("add err:" + err.Error()) return } } else { updateCols := make([]string, 0) if item.IndexName != indexReq.IndexName { item.IndexName = indexReq.IndexName updateCols = append(updateCols, "IndexName") } if item.Unit != indexReq.Unit { item.Unit = indexReq.Unit updateCols = append(updateCols, "Unit") } if item.Frequency != indexReq.Frequency { item.Frequency = indexReq.Frequency updateCols = append(updateCols, "Frequency") } if item.Source != int(sourceItem.EdbBusinessSourceId) { item.Source = int(sourceItem.EdbBusinessSourceId) item.SourceName = sourceItem.SourceName updateCols = append(updateCols, "Source", "SourceName") } if len(updateCols) > 0 { item.BaseModifyTime = time.Now() item.ModifyTime = time.Now() updateCols = append(updateCols, "BaseModifyTime", "ModifyTime") err = item.Update(updateCols) if err != nil { fmt.Println("update index err:" + err.Error()) return } } } // 数据处理 // 当前传入的最小日期 var reqMinDate time.Time if utils.UseMongo { reqMinDate, err = handleBusinessDataByMongo(item, indexReq.DataList) } else { reqMinDate, err = handleBusinessDataByMysql(item, indexReq.DataList) } if err != nil { return } // 同步刷新指标库的指标 go refreshEdbBusiness(item.IndexCode, reqMinDate) resp = models.BaseFromBusinessIndexResp{ IndexCode: item.IndexCode, IndexName: item.IndexName, Unit: item.Unit, Frequency: item.Frequency, SourceName: item.SourceName, } return } // handleBusinessDataByMongo // @Description: 处理外部指标数据(mongo) // @author: Roc // @datetime 2024-07-01 15:30:41 // @param item *models.BaseFromBusinessIndex // @param reqDataList []*models.AddBusinessDataReq // @return reqMinDate time.Time 当前传入的最小日期 // @return err error func handleBusinessDataByMongo(item *models.BaseFromBusinessIndex, reqDataList []models.AddBusinessDataReq) (reqMinDate time.Time, err error) { mogDataObj := new(mgo.BaseFromBusinessData) //获取已存在的所有数据 exitDataList, err := mogDataObj.GetAllDataList(bson.M{"index_code": item.IndexCode}, []string{"data_time"}) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return } // 已经存在的数据集 exitDataMap := make(map[string]*mgo.BaseFromBusinessData) for _, v := range exitDataList { exitDataMap[v.DataTime.Format(utils.FormatDate)] = v } // 待添加的数据集 addDataList := make([]interface{}, 0) updateDataList := make([]mgo.BaseFromBusinessData, 0) //var hasUpdate bool // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for _, data := range reqDataList { dateTime, tmpErr := utils.DealExcelDate(data.Date) if tmpErr != nil { fmt.Println("time.ParseInLocation Err:" + tmpErr.Error()) err = tmpErr return } // 调整最小日期 if reqMinDate.IsZero() || reqMinDate.After(dateTime) { reqMinDate = dateTime } date := dateTime.Format(utils.FormatDate) findData, ok := exitDataMap[date] if !ok { addDataList = append(addDataList, mgo.BaseFromBusinessData{ BaseFromBusinessIndexId: item.BaseFromBusinessIndexId, IndexCode: item.IndexCode, DataTime: dateTime, Value: data.Value, CreateTime: time.Now(), ModifyTime: time.Now(), //DataTimestamp: 0, }) continue } // 值不匹配,修改数据 if findData.Value != data.Value { findData.Value = data.Value updateDataList = append(updateDataList, *findData) } } // 指标数据是否新增或修改 var isIndexUpdateOrAdd bool // 入库 { coll := mogDataObj.GetCollection() if len(addDataList) > 0 { isIndexUpdateOrAdd = true err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList) if err != nil { fmt.Println("mogDataObj.HandleData() Err:" + err.Error()) return } } if len(updateDataList) > 0 { isIndexUpdateOrAdd = true for _, v := range updateDataList { err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } } } } // 支持事务的话,下面操作 //result, err := mogDataObj.HandleData(addDataList, updateDataList) //if err != nil { // fmt.Println("mogDataObj.HandleData() Err:" + err.Error()) // return //} //fmt.Println("result", result) //修改最大最小日期 indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(item.IndexCode) if err != nil { return } if err == nil && indexMaxAndMinInfo != nil { e := item.ModifyIndexMaxAndMinInfo(item.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd) if e != nil { fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error()) } } return } // handleBusinessDataByMysql // @Description: 处理外部指标数据(mysql) // @author: Roc // @datetime 2024-07-01 15:59:43 // @param item *models.BaseFromBusinessIndex // @param reqDataList []models.AddBusinessDataReq // @return reqMinDate time.Time // @return err error func handleBusinessDataByMysql(item *models.BaseFromBusinessIndex, reqDataList []models.AddBusinessDataReq) (reqMinDate time.Time, err error) { businessDataObj := new(models.BaseFromBusinessData) var condition []string var pars []interface{} condition = append(condition, "index_code = ?") pars = append(pars, item.IndexCode) //获取已存在的所有数据 exitDataList, err := businessDataObj.GetAllDataList(condition, pars, "data_time ASC") if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return } // 已经存在的数据集 exitDataMap := make(map[string]*models.BaseFromBusinessData) for _, v := range exitDataList { exitDataMap[v.DataTime.Format(utils.FormatDate)] = v } // 待添加的数据集 addDataList := make([]*models.BaseFromBusinessData, 0) updateDataList := make([]*models.BaseFromBusinessData, 0) //var hasUpdate bool // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for _, data := range reqDataList { dateTime, tmpErr := utils.DealExcelDate(data.Date) if tmpErr != nil { fmt.Println("time.ParseInLocation Err:" + tmpErr.Error()) err = tmpErr return } // 调整最小日期 if reqMinDate.IsZero() || reqMinDate.After(dateTime) { reqMinDate = dateTime } date := dateTime.Format(utils.FormatDate) findData, ok := exitDataMap[date] if !ok { addDataList = append(addDataList, &models.BaseFromBusinessData{ BaseFromBusinessIndexId: int(item.BaseFromBusinessIndexId), IndexCode: item.IndexCode, DataTime: dateTime, Value: data.Value, CreateTime: time.Now(), ModifyTime: time.Now(), //DataTimestamp: 0, }) continue } // 值不匹配,修改数据 if findData.Value != data.Value { findData.Value = data.Value findData.ModifyTime = time.Now() updateDataList = append(updateDataList, findData) } } // 指标数据是否新增或修改 var isIndexUpdateOrAdd bool // 入库 { if len(addDataList) > 0 { isIndexUpdateOrAdd = true } if len(updateDataList) > 0 { isIndexUpdateOrAdd = true } err = businessDataObj.HandleData(addDataList, updateDataList) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } } //修改最大最小日期 indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(item.IndexCode) if err != nil { return } if err == nil && indexMaxAndMinInfo != nil { e := item.ModifyIndexMaxAndMinInfo(item.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd) if e != nil { fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error()) } } return } // DelBusinessIndexResp // @Description: 删除外部指标的返回 type DelBusinessIndexResp struct { IsDeleteEdbCodeList []string `description:"已经删除了的指标编码"` NoDeleteEdbCodeList []string `description:"未删除的指标编码"` } // DelBusinessIndex // @Description: 删除外部指标 // @author: Roc // @datetime 2024-05-31 16:27:37 // @param indexCodeList []string // @return err error // @return errMsg string func DelBusinessIndex(indexCodeList []string) (joinEdbCodeList, needDelEdbCodeList []string, err error, errMsg string) { defer func() { if err != nil { fmt.Println("DelBusinessIndex Err:" + err.Error()) } }() errMsg = "删除失败" if len(indexCodeList) < 0 { errMsg = "指标编码不允许为空" err = errors.New(errMsg) return } // 指标 indexObj := new(models.BaseFromBusinessIndex) //判断指标是否存在 baseIndexList, err := indexObj.GetIndexItemList(indexCodeList) if err != nil { return } // 已加入到eta指标库的编码map joinEdbCodeList = make([]string, 0) joinEdbMap := make(map[string]string) // 未加入到eta指标库的编码 needDelEdbCodeList = make([]string, 0) // 判断指标是否加入到指标库 tmpEdbInfoList, err := models.GetEdbInfoByEdbCodeList(utils.DATA_SOURCE_BUSINESS, indexCodeList) if err != nil { return } for _, v := range tmpEdbInfoList { joinEdbCodeList = append(joinEdbCodeList, v.EdbCode) joinEdbMap[v.EdbCode] = v.EdbCode } for _, v := range baseIndexList { _, ok := joinEdbMap[v.IndexCode] if !ok { needDelEdbCodeList = append(needDelEdbCodeList, v.IndexCode) } } // 如果需要删除的指标,则直接返回 if len(needDelEdbCodeList) <= 0 { return } // 删除指标 err = indexObj.DelIndexItemList(needDelEdbCodeList) if err != nil { fmt.Println("删除自有指标失败, Err:" + err.Error()) return } if utils.UseMongo { // 删除指标明细数据 mogDataObj := new(mgo.BaseFromBusinessData) err = mogDataObj.RemoveMany(bson.M{"index_code": bson.M{"$in": needDelEdbCodeList}}) } else { var condition []string var pars []interface{} delNum := len(needDelEdbCodeList) if delNum > 0 { condition = append(condition, "index_code in ("+utils.GetOrmInReplace(delNum)+")") pars = append(pars, needDelEdbCodeList) businessDataObj := models.BaseFromBusinessData{} err = businessDataObj.DelDataByCond(condition, pars) } } if err != nil { fmt.Println("删除自有指标明细数据 Err:" + err.Error()) return } return } // DelBusinessIndexData // @Description: 删除指标数据 // @author: Roc // @datetime 2024-05-31 15:43:32 // @param indexCode string // @param dateList []string // @return err error // @return errMsg string func DelBusinessIndexData(indexCode string, startDate, endDate string) (err error, errMsg string) { defer func() { if err != nil { fmt.Println("DelBusinessIndexData Err:" + err.Error()) } }() errMsg = "删除失败" if indexCode == `` { errMsg = "指标编码不允许为空" err = errors.New(errMsg) return } if startDate == `` && endDate == `` { errMsg = "开始日期和结束日期不允许同时为空" err = errors.New(errMsg) return } // 指标 indexObj := new(models.BaseFromBusinessIndex) //判断指标是否存在 item, err := indexObj.GetIndexItem(indexCode) if err != nil { if err.Error() != utils.ErrNoRow() { return } err = nil return } // 当前传入的最小日期 var reqMinDate time.Time var startDateTime, endDateTime time.Time if startDate != `` { //获取已存在的所有数据 startDateTime, err = time.ParseInLocation(utils.FormatDate, startDate, time.Local) if err != nil { return } // 调整最小日期 if reqMinDate.IsZero() || reqMinDate.After(startDateTime) { reqMinDate = startDateTime } } if endDate != `` { //获取已存在的所有数据 endDateTime, err = time.ParseInLocation(utils.FormatDate, endDate, time.Local) if err != nil { return } // 调整最小日期 if reqMinDate.IsZero() || reqMinDate.After(endDateTime) { reqMinDate = endDateTime } } // 删除具体的数据 var indexMaxAndMinInfo *models.EdbInfoMaxAndMinInfo if utils.UseMongo { indexMaxAndMinInfo, err = delBusinessIndexDataByMongo(item, startDateTime, endDateTime) } else { indexMaxAndMinInfo, err = delBusinessIndexDataByMysql(item, startDateTime, endDateTime) } if err != nil { return } // 修改指标的最早最晚日期 if indexMaxAndMinInfo != nil { e := item.ModifyIndexMaxAndMinInfo(item.IndexCode, indexMaxAndMinInfo, true) if e != nil { fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error()) } } // 同步刷新指标库的指标 go refreshEdbBusiness(item.IndexCode, reqMinDate) return } // delBusinessIndexDataByMongo // @Description: 删除指标数据(从mongo删除) // @author: Roc // @datetime 2024-07-01 18:00:07 // @param item *models.BaseFromBusinessIndex // @param startDateTime time.Time // @param endDateTime time.Time // @return err error func delBusinessIndexDataByMongo(item *models.BaseFromBusinessIndex, startDateTime, endDateTime time.Time) (indexMaxAndMinInfo *models.EdbInfoMaxAndMinInfo, err error) { defer func() { if err != nil { utils.FileLog.Error("delBusinessIndexDataByMongo 删除自有指标明细数据 Err:" + err.Error()) } }() // 构建查询条件 queryConditions := bson.M{ "index_code": item.IndexCode, } dateCondition, err := mgo.BuildDateTimeCondition(startDateTime, endDateTime) if err != nil { return } if len(dateCondition) > 0 { queryConditions["data_time"] = dateCondition } // 删除数据源中的指标明细数据 mogDataObj := new(mgo.BaseFromBusinessData) err = mogDataObj.RemoveMany(queryConditions) if err != nil { return } //修改最大最小日期 indexMaxAndMinInfo, err = item.GetEdbInfoMaxAndMinInfo(item.IndexCode) // 如果有错误,且错误信息是取不到文档,那么就不修改了 if err != nil && !errors.Is(err, qmgo.ErrNoSuchDocuments) { return } // 清空的目的是为了避免异常返回 err = nil return } // delBusinessIndexDataByMysql // @Description: 删除指标数据(从mysql删除) // @author: Roc // @datetime 2024-07-02 09:53:13 // @param item *models.BaseFromBusinessIndex // @param startDateTime time.Time // @param endDateTime time.Time // @return err error func delBusinessIndexDataByMysql(item *models.BaseFromBusinessIndex, startDateTime, endDateTime time.Time) (indexMaxAndMinInfo *models.EdbInfoMaxAndMinInfo, err error) { defer func() { if err != nil { utils.FileLog.Error("delBusinessIndexDataByMysql 删除自有指标明细数据 Err:" + err.Error()) } }() // 构建查询条件 var condition []string var pars []interface{} condition = append(condition, "index_code = ? ") pars = append(pars, item.IndexCode) if !startDateTime.IsZero() { condition = append(condition, " data_time >= ? ") pars = append(pars, startDateTime.Format(utils.FormatDate)) } if !endDateTime.IsZero() { condition = append(condition, " data_time <= ? ") pars = append(pars, endDateTime.Format(utils.FormatDate)) } // 删除数据源中的指标明细数据 businessDataObj := new(models.BaseFromBusinessData) err = businessDataObj.DelDataByCond(condition, pars) if err != nil { return } //修改最大最小日期 indexMaxAndMinInfo, err = item.GetEdbInfoMaxAndMinInfo(item.IndexCode) // 如果有错误,且错误信息是取不到文档,那么就不修改了 if err != nil && err.Error() != utils.ErrNoRow() { return } // 清空的目的是为了避免异常返回 err = nil return } func refreshEdbBusiness(indexCode string, reqMinDate time.Time) { var indexErr error var errMsg string defer func() { if indexErr != nil { tips := fmt.Sprintf("自有数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s, 错误信息:%s", indexCode, indexErr.Error(), errMsg) alarm_msg.SendAlarmMsg(tips, 3) } }() edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BUSINESS, indexCode) if e != nil && e.Error() != utils.ErrNoRow() { indexErr = e return } if edbInfo != nil { startDate := `` if reqMinDate.IsZero() { startDate = edbInfo.EndDate } else { startDate = reqMinDate.Format(utils.FormatDate) } params := models.RefreshBaseParams{ EdbInfo: edbInfo, StartDate: startDate, } obj := models.Business{} indexErr, errMsg = obj.Refresh(params) if indexErr != nil { return } // 更新指标最大最小值 indexErr = obj.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo) if indexErr != nil { return } // 更新ES go logic.UpdateEs(edbInfo.EdbInfoId) } }