package services import ( "errors" "eta/eta_index_lib/logic" "eta/eta_index_lib/models" "eta/eta_index_lib/models/mgo" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "go.mongodb.org/mongo-driver/bson" "strings" "time" ) // HandleBusinessIndex // @Description: 处理处理外部指标 // @author: Roc // @datetime 2024-04-26 14:23:42 // @param indexItem *models.AddBusinessIndexReq // @return err error func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) { defer func() { if err != nil { // 添加刷新失败日志 dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexItem.IndexCode) if e == nil { //查询指标存在,才添加刷新日志 _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) } } }() // 兼容频度缺少度的字段 if !strings.Contains(indexItem.Frequency, "度") { indexItem.Frequency = indexItem.Frequency + "度" } if !utils.VerifyFrequency(indexItem.Frequency) { err = errors.New("指标频度不合法:" + indexItem.Frequency) return } // 判断来源,如果来源不存在的话,则创建 sourceObj := new(models.EdbBusinessSource) sourceItem, err := sourceObj.GetEdbBusinessSourceItem(indexItem.SourceName) if err != nil { if err.Error() == utils.ErrNoRow() { sourceItem = &models.EdbBusinessSource{ EdbBusinessSourceId: 0, SourceName: indexItem.SourceName, CreateTime: time.Now(), } err = sourceItem.Add() } else { return } } // 指标 indexObj := new(models.BaseFromBusinessIndex) //判断指标是否存在 item, err := indexObj.GetIndexItem(indexItem.IndexCode) if err != nil { if err.Error() != utils.ErrNoRow() { return } // 添加指标 item = &models.BaseFromBusinessIndex{ BaseFromBusinessIndexId: 0, IndexCode: indexItem.IndexCode, IndexName: indexItem.IndexName, Unit: indexItem.Unit, Frequency: indexItem.Frequency, Source: int(sourceItem.EdbBusinessSourceId), SourceName: sourceItem.SourceName, //StartDate: time.Time{}, //EndDate: time.Time{}, Remark: indexItem.Remark, BaseModifyTime: time.Now(), DataUpdateTime: time.Now(), CreateTime: time.Now(), ModifyTime: time.Now(), } err = item.Add() if err != nil { fmt.Println("add err:" + err.Error()) return } } else { updateCols := make([]string, 0) if item.IndexName != indexItem.IndexName { item.IndexName = indexItem.IndexName updateCols = append(updateCols, "IndexName") } if item.Unit != indexItem.Unit { item.Unit = indexItem.Unit updateCols = append(updateCols, "Unit") } if item.Frequency != indexItem.Frequency { item.Frequency = indexItem.Frequency updateCols = append(updateCols, "Frequency") } if item.Source != int(sourceItem.EdbBusinessSourceId) { item.Source = int(sourceItem.EdbBusinessSourceId) item.SourceName = sourceItem.SourceName updateCols = append(updateCols, "Source", "SourceName") } if len(updateCols) > 0 { item.BaseModifyTime = time.Now() item.ModifyTime = time.Now() updateCols = append(updateCols, "BaseModifyTime", "ModifyTime") err = item.Update(updateCols) if err != nil { fmt.Println("update index err:" + err.Error()) return } } } // 数据处理 mogDataObj := new(mgo.BaseFromBusinessData) //获取已存在的所有数据 exitDataList, err := mogDataObj.GetAllDataList(bson.M{"index_code": indexItem.IndexCode}) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return err } // 已经存在的数据集 exitDataMap := make(map[string]*mgo.BaseFromBusinessData) for _, v := range exitDataList { exitDataMap[v.DataTime.Format(utils.FormatDate)] = v } // 当前传入的最小日期 var reqMinDate time.Time // 待添加的数据集 addDataList := make([]mgo.BaseFromBusinessData, 0) updateDataList := make([]mgo.BaseFromBusinessData, 0) //var hasUpdate bool // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for _, data := range indexItem.DataList { dateTime, err := utils.DealExcelDate(data.Date) if err != nil { fmt.Println("time.ParseInLocation Err:" + err.Error()) return err } // 调整最小日期 if reqMinDate.IsZero() || reqMinDate.After(dateTime) { reqMinDate = dateTime } date := dateTime.Format(utils.FormatDate) findData, ok := exitDataMap[date] if !ok { addDataList = append(addDataList, mgo.BaseFromBusinessData{ BaseFromBusinessIndexId: item.BaseFromBusinessIndexId, IndexCode: item.IndexCode, DataTime: dateTime, Value: data.Value, CreateTime: time.Now(), ModifyTime: time.Now(), //DataTimestamp: 0, }) continue } // 值不匹配,修改数据 if findData.Value != data.Value { findData.Value = data.Value updateDataList = append(updateDataList, *findData) } } // 指标数据是否新增或修改 var isIndexUpdateOrAdd bool // 入库 { if len(addDataList) > 0 { isIndexUpdateOrAdd = true err = mogDataObj.BatchInsertData(addDataList) if err != nil { fmt.Println("mogDataObj.HandleData() Err:" + err.Error()) return } } if len(updateDataList) > 0 { isIndexUpdateOrAdd = true coll := mogDataObj.GetCollection() for _, v := range updateDataList { err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } } } } // 支持事务的话,下面操作 //result, err := mogDataObj.HandleData(addDataList, updateDataList) //if err != nil { // fmt.Println("mogDataObj.HandleData() Err:" + err.Error()) // return //} //fmt.Println("result", result) //修改最大最小日期 indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode) if err != nil { return } if err == nil && indexMaxAndMinInfo != nil { e := item.ModifyIndexMaxAndMinInfo(indexItem.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd) if e != nil { fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error()) } } // 同步刷新指标库的指标 go refreshEdbBusiness(item.IndexCode, reqMinDate) return } func refreshEdbBusiness(indexCode string, reqMinDate time.Time) { var indexErr error var errMsg string defer func() { if indexErr != nil { tips := fmt.Sprintf("外部数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s, 错误信息:%s", indexCode, indexErr.Error(), errMsg) alarm_msg.SendAlarmMsg(tips, 3) } }() edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BUSINESS, indexCode) if e != nil && e.Error() != utils.ErrNoRow() { indexErr = e return } if edbInfo != nil { startDate := `` if reqMinDate.IsZero() { startDate = edbInfo.EndDate } else { startDate = reqMinDate.Format(utils.FormatDate) } params := models.RefreshBaseParams{ EdbInfo: edbInfo, StartDate: startDate, } obj := models.Business{} indexErr, errMsg = obj.Refresh(params) if indexErr != nil { return } // 更新指标最大最小值 indexErr = obj.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo) if indexErr != nil { return } // 更新ES go logic.UpdateEs(edbInfo.EdbInfoId) } }