package services import ( "errors" "eta/eta_index_lib/models" "eta/eta_index_lib/models/mgo" "eta/eta_index_lib/utils" "fmt" "go.mongodb.org/mongo-driver/bson" "strings" "time" ) // HandleBusinessIndex // @Description: 处理处理外部指标 // @author: Roc // @datetime 2024-04-26 14:23:42 // @param indexItem *models.AddBusinessIndexReq // @return err error func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) { defer func() { if err != nil { // 添加刷新失败日志 dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexItem.IndexCode) if e == nil { //查询指标存在,才添加刷新日志 _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) } } }() // 兼容频度缺少度的字段 if !strings.Contains(indexItem.Frequency, "度") { indexItem.Frequency = indexItem.Frequency + "度" } if !utils.VerifyFrequency(indexItem.Frequency) { err = errors.New("指标频度不合法:" + indexItem.Frequency) return } // 判断来源,如果来源不存在的话,则创建 sourceObj := new(models.EdbBusinessSource) sourceItem, err := sourceObj.GetEdbBusinessSourceItem(indexItem.SourceName) if err != nil { if err.Error() == utils.ErrNoRow() { sourceItem = &models.EdbBusinessSource{ EdbBusinessSourceId: 0, SourceName: indexItem.SourceName, CreateTime: time.Now(), } err = sourceItem.Add() } else { return } } // 指标 indexObj := new(models.BaseFromBusinessIndex) //判断指标是否存在 item, err := indexObj.GetIndexItem(indexItem.IndexCode) if err != nil { if err.Error() != utils.ErrNoRow() { return } // 添加指标 item = &models.BaseFromBusinessIndex{ BaseFromBusinessIndexId: 0, IndexCode: indexItem.IndexCode, IndexName: indexItem.IndexName, Unit: indexItem.Unit, Frequency: indexItem.Frequency, Source: int(sourceItem.EdbBusinessSourceId), SourceName: sourceItem.SourceName, //StartDate: time.Time{}, //EndDate: time.Time{}, Remark: indexItem.Remark, BaseModifyTime: time.Now(), DataUpdateTime: time.Now(), CreateTime: time.Now(), ModifyTime: time.Now(), } err = item.Add() if err != nil { fmt.Println("add err:" + err.Error()) return } } else { updateCols := make([]string, 0) if item.IndexName != indexItem.IndexName { item.IndexName = indexItem.IndexName updateCols = append(updateCols, "IndexName") } if item.Unit != indexItem.Unit { item.Unit = indexItem.Unit updateCols = append(updateCols, "Unit") } if item.Frequency != indexItem.Frequency { item.Frequency = indexItem.Frequency updateCols = append(updateCols, "Frequency") } if item.Source != int(sourceItem.EdbBusinessSourceId) { item.Source = int(sourceItem.EdbBusinessSourceId) item.SourceName = sourceItem.SourceName updateCols = append(updateCols, "Source", "SourceName") } if len(updateCols) > 0 { item.BaseModifyTime = time.Now() item.ModifyTime = time.Now() updateCols = append(updateCols, "BaseModifyTime", "ModifyTime") err = item.Update(updateCols) if err != nil { fmt.Println("update index err:" + err.Error()) return } } } // 数据处理 mogDataObj := new(mgo.BaseFromBusinessData) //获取已存在的所有数据 exitDataList, err := mogDataObj.GetAllDataList(bson.M{"index_code": indexItem.IndexCode}) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return err } fmt.Println("exitDataListLen:", len(exitDataList)) // 已经存在的数据集 exitDataMap := make(map[string]mgo.BaseFromBusinessData) for _, v := range exitDataList { exitDataMap[v.DataTime] = v } // 待添加的数据集 addDataList := make([]mgo.BaseAddFromBusinessData, 0) updateDataList := make([]mgo.BaseFromBusinessData, 0) //var hasUpdate bool // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for _, data := range indexItem.DataList { dateTime, err := utils.DealExcelDate(data.Date) if err != nil { fmt.Println("time.ParseInLocation Err:" + err.Error()) return err } date := dateTime.Format(utils.FormatDate) findData, ok := exitDataMap[date] if !ok { addDataList = append(addDataList, mgo.BaseAddFromBusinessData{ BaseFromBusinessIndexId: item.BaseFromBusinessIndexId, IndexCode: item.IndexCode, DataTime: date, Value: data.Value, CreateTime: time.Now(), ModifyTime: time.Now(), //DataTimestamp: 0, }) continue } // 值不匹配,修改数据 if findData.Value != data.Value { findData.Value = data.Value updateDataList = append(updateDataList, findData) } } result, err := mogDataObj.HandleData(addDataList, updateDataList) if err != nil { fmt.Println("mogDataObj.HandleData() Err:" + err.Error()) return } fmt.Println("result", result) ////修改最大最小日期 //mysteelIndexMaxItem, err := dataObj.GetMysteelIndexInfoMaxAndMinInfo(indexItem.IndexCode) //if err == nil && mysteelIndexMaxItem != nil { // e := dataObj.ModifyMysteelIndexMaxAndMinInfo(indexItem.IndexCode, mysteelIndexMaxItem) // if e != nil { // fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error()) // } //} //// 同步刷新图库钢联的指标 ////go func() { //var indexErr error //var lErr error //defer func() { // if indexErr != nil { // tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error()) // alarm_msg.SendAlarmMsg(tips, 3) // } // // if lErr != nil { // tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error()) // alarm_msg.SendAlarmMsg(tips, 3) // } //}() // //edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode) //if e != nil && e.Error() != utils.ErrNoRow() { // indexErr = e // return //} // //if edbInfo != nil { // dataUpdateResult := 2 // dataUpdateFailedReason := "服务异常" // _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``) // if logErr != nil { // lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) // return // } //} return }