package eta_bridge import ( "encoding/json" "eta/eta_task/models/data_manage" "eta/eta_task/utils" "fmt" "strings" ) // handleData // @Description: 监听的数据处理 // @author: Roc // @datetime 2024-03-11 14:52:49 // @param edbUpdateLog *data_manage.EdbUpdateLog // @return pushIndexData *PushIndexParamDataReq // @return pushEdbClassify *PushEdbClassifyItemReq // @return pushIndexValue *PushIndexValueItemReq // @return err error func handleData(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) { switch edbUpdateLog.OpType { case "insert": return handleInsert(edbUpdateLog) case "update": return handleUpdate(edbUpdateLog) case "delete": return handleDelete(edbUpdateLog) } return } // handleInsert // @Description: 新增数据处理 // @author: Roc // @datetime 2024-03-11 14:53:03 // @param edbUpdateLog *data_manage.EdbUpdateLog // @return pushIndexData *PushIndexParamDataReq // @return pushEdbClassify *PushEdbClassifyItemReq // @return pushIndexValue *PushIndexValueItemReq // @return err error func handleInsert(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) { data := edbUpdateLog.NewData //指标信息 if edbUpdateLog.OpTableName == "edb_info" { var edbInfo *data_manage.EdbInfoItem err = json.Unmarshal([]byte(data), &edbInfo) if err != nil { return } // 预测指标不处理 if edbInfo.EdbInfoType == 1 { return } // 计算指标不处理 if edbInfo.EdbType == 2 { return } // 自有数据不处理 if edbInfo.Source == utils.DATA_SOURCE_BUSINESS { return } // 获取数据源中指标的基础信息 origInfo := getOrigInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EdbName) // 指标信息 pushIndexData = &PushIndexParamDataReq{ SourceIndexCode: edbInfo.EdbCode, IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode), IndexName: origInfo.EdbName, IndexShortName: edbInfo.EdbName, FrequenceName: edbInfo.Frequency, UnitName: edbInfo.Unit, AssetBeginDate: edbInfo.StartDate, AssetEndDate: edbInfo.EndDate, CreateUser: edbInfo.SysUserRealName, IndexCreateTime: edbInfo.CreateTime, UpdateUser: edbInfo.SysUserRealName, DetailUpdateTime: getMaxModifyTime(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbUpdateLog.CreateTime.Format(utils.FormatDateTime)), IndexUpdateTime: edbInfo.ModifyTime, OrginSource: edbInfo.SourceName, OrginSysSource: origInfo.SourceName, SysSource: "产研平台", SourceType: getSourceType(edbInfo.Source), Status: 1, } // 指标与分类的关系信息 pushEdbClassify = &PushEdbClassifyItemReq{ Id: fmt.Sprint(edbInfo.EdbInfoId), ClassifyId: edbInfo.ClassifyId, IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode), CreateTime: edbInfo.CreateTime, CreateUser: edbInfo.SysUserRealName, UpdateTime: edbInfo.ModifyTime, UpdateUser: edbInfo.SysUserRealName, } return } // 分类信息 //if edbUpdateLog.OpTableName == "edb_classify" { // var edbClassify *data_manage.EdbClassify // err = json.Unmarshal([]byte(data), &edbClassify) // if err != nil { // return // } // // // 指标信息 // pushClassify = PushClassifyItemReq{ // ClassifyId: int(edbClassify.ClassifyID), // ClassifyType: int(edbClassify.ClassifyType), // ClassifyName: edbClassify.ClassifyName, // ParentId: int(edbClassify.ParentID), // HasData: int(edbClassify.HasData), // CreateTime: edbClassify.CreateTime.Format(utils.FormatDateTime), // UpdateTime: edbClassify.ModifyTime.Format(utils.FormatDateTime), // SysUserId: int(edbClassify.SysUserID), // SysUserRealName: edbClassify.SysUserRealName, // Level: int(edbClassify.Level), // UniqueCode: edbClassify.UniqueCode, // SortColumn: int(edbClassify.Sort), // } // // return //} // 数据信息 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_") { // 计算指标不处理 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_calculate") { return } // 预测指标不处理 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_predict") { return } var edbData *data_manage.EdbData err = json.Unmarshal([]byte(data), &edbData) if err != nil { return } source, subSource := data_manage.GetSourceAndSubSourceTableName(edbUpdateLog.OpTableName, edbData.EdbCode) if source <= 0 { // 没有找到来源,那就过滤 return } // 数据信息 pushIndexValue = &PushIndexValueItemReq{ Id: getIndexDataId(source, subSource, edbData.EdbDataId), IndexCode: getIndexCode(source, edbData.EdbCode), Value: fmt.Sprint(edbData.Value), BusinessDate: edbData.DataTime, CreateTime: edbData.CreateTime, UpdateTime: edbData.ModifyTime, Status: "1", } return } return } // handleDelete // @Description: 删除数据处理 // @author: Roc // @datetime 2024-03-11 14:53:20 // @param edbUpdateLog *data_manage.EdbUpdateLog // @return pushIndexData *PushIndexParamDataReq // @return pushEdbClassify *PushEdbClassifyItemReq // @return pushIndexValue *PushIndexValueItemReq // @return err error func handleDelete(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) { data := edbUpdateLog.OldData //指标信息 if edbUpdateLog.OpTableName == "edb_info" { var edbInfo *data_manage.EdbInfoItem err = json.Unmarshal([]byte(data), &edbInfo) if err != nil { return } // 预测指标不处理 if edbInfo.EdbInfoType == 1 { return } // 计算指标不处理 if edbInfo.EdbType == 2 { return } // 自有数据不处理 if edbInfo.Source == utils.DATA_SOURCE_BUSINESS { return } // 获取数据源中指标的基础信息 origInfo := getOrigInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EdbName) // 指标信息 pushIndexData = &PushIndexParamDataReq{ SourceIndexCode: edbInfo.EdbCode, IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode), IndexName: origInfo.EdbName, IndexShortName: edbInfo.EdbName, FrequenceName: edbInfo.Frequency, UnitName: edbInfo.Unit, AssetBeginDate: edbInfo.StartDate, AssetEndDate: edbInfo.EndDate, CreateUser: edbInfo.SysUserRealName, IndexCreateTime: edbInfo.CreateTime, UpdateUser: edbInfo.SysUserRealName, DetailUpdateTime: edbUpdateLog.CreateTime.Format(utils.FormatDateTime), IndexUpdateTime: edbInfo.ModifyTime, OrginSource: edbInfo.SourceName, OrginSysSource: origInfo.SourceName, SysSource: "产研平台", SourceType: getSourceType(edbInfo.Source), Status: 0, } // 指标与分类的关系信息 pushEdbClassify = &PushEdbClassifyItemReq{ Id: fmt.Sprint(edbInfo.EdbInfoId), ClassifyId: edbInfo.ClassifyId, IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode), CreateTime: edbInfo.CreateTime, CreateUser: edbInfo.SysUserRealName, UpdateTime: edbInfo.ModifyTime, UpdateUser: edbInfo.SysUserRealName, } return } // 分类信息 //if edbUpdateLog.OpTableName == "edb_classify" { // var edbClassify *data_manage.EdbClassify // err = json.Unmarshal([]byte(data), &edbClassify) // if err != nil { // return // } // // // 指标信息 // pushClassify = PushClassifyItemReq{ // ClassifyId: int(edbClassify.ClassifyID), // ClassifyType: int(edbClassify.ClassifyType), // ClassifyName: edbClassify.ClassifyName, // ParentId: int(edbClassify.ParentID), // HasData: int(edbClassify.HasData), // CreateTime: edbClassify.CreateTime.Format(utils.FormatDateTime), // UpdateTime: edbClassify.ModifyTime.Format(utils.FormatDateTime), // SysUserId: int(edbClassify.SysUserID), // SysUserRealName: edbClassify.SysUserRealName, // Level: int(edbClassify.Level), // UniqueCode: edbClassify.UniqueCode, // SortColumn: int(edbClassify.Sort), // } // // return //} // 数据信息 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_") { // 计算指标不处理 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_calculate") { return } // 预测指标不处理 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_predict") { return } var edbData *data_manage.EdbData err = json.Unmarshal([]byte(data), &edbData) if err != nil { return } source, subSource := data_manage.GetSourceAndSubSourceTableName(edbUpdateLog.OpTableName, edbData.EdbCode) if source <= 0 { // 没有找到来源,那就过滤 return } // 数据信息 pushIndexValue = &PushIndexValueItemReq{ Id: getIndexDataId(source, subSource, edbData.EdbDataId), IndexCode: getIndexCode(source, edbData.EdbCode), Value: fmt.Sprint(edbData.Value), BusinessDate: edbData.DataTime, CreateTime: edbData.CreateTime, UpdateTime: edbData.ModifyTime, Status: "0", } return } return } // handleUpdate // @Description: 更新数据处理 // @author: Roc // @datetime 2024-03-11 14:53:35 // @param edbUpdateLog *data_manage.EdbUpdateLog // @return pushIndexData *PushIndexParamDataReq // @return pushEdbClassify *PushEdbClassifyItemReq // @return pushIndexValue *PushIndexValueItemReq // @return err error func handleUpdate(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) { oldData := edbUpdateLog.OldData newData := edbUpdateLog.NewData //指标信息 if edbUpdateLog.OpTableName == "edb_info" { var oldEdbInfo *data_manage.EdbInfoItem err = json.Unmarshal([]byte(oldData), &oldEdbInfo) if err != nil { return } // 预测指标不处理 if oldEdbInfo.EdbInfoType == 1 { return } // 计算指标不处理 if oldEdbInfo.EdbType == 2 { return } // 自有数据不处理 if oldEdbInfo.Source == utils.DATA_SOURCE_BUSINESS { return } var newEdbInfo *data_manage.EdbInfoItem err = json.Unmarshal([]byte(newData), &newEdbInfo) if err != nil { return } isUpdateEdbInfo := checkUpdateType(oldEdbInfo, newEdbInfo) // 指标信息 if isUpdateEdbInfo { // 获取数据源中指标的基础信息 origInfo := getOrigInfo(newEdbInfo.Source, newEdbInfo.SubSource, newEdbInfo.EdbCode, newEdbInfo.EdbName) pushIndexData = &PushIndexParamDataReq{ SourceIndexCode: newEdbInfo.EdbCode, IndexCode: getIndexCode(newEdbInfo.Source, newEdbInfo.EdbCode), IndexName: origInfo.EdbName, IndexShortName: newEdbInfo.EdbName, FrequenceName: newEdbInfo.Frequency, UnitName: newEdbInfo.Unit, AssetBeginDate: newEdbInfo.StartDate, AssetEndDate: newEdbInfo.EndDate, CreateUser: newEdbInfo.SysUserRealName, IndexCreateTime: newEdbInfo.CreateTime, UpdateUser: newEdbInfo.SysUserRealName, DetailUpdateTime: getMaxModifyTime(newEdbInfo.Source, newEdbInfo.SubSource, newEdbInfo.EdbCode, edbUpdateLog.CreateTime.Format(utils.FormatDateTime)), IndexUpdateTime: newEdbInfo.ModifyTime, OrginSource: newEdbInfo.SourceName, OrginSysSource: origInfo.SourceName, SysSource: "产研平台", SourceType: getSourceType(newEdbInfo.Source), Status: 1, } } // 指标与分类的关系信息 if oldEdbInfo.ClassifyId != newEdbInfo.ClassifyId { pushEdbClassify = &PushEdbClassifyItemReq{ Id: fmt.Sprint(newEdbInfo.EdbInfoId), ClassifyId: newEdbInfo.ClassifyId, IndexCode: getIndexCode(newEdbInfo.Source, newEdbInfo.EdbCode), CreateTime: newEdbInfo.CreateTime, CreateUser: newEdbInfo.SysUserRealName, UpdateTime: newEdbInfo.ModifyTime, UpdateUser: newEdbInfo.SysUserRealName, } } return } // 数据信息 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_") { // 计算指标不处理 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_calculate") { return } // 预测指标不处理 if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_predict") { return } var edbData *data_manage.EdbData err = json.Unmarshal([]byte(newData), &edbData) if err != nil { return } source, subSource := data_manage.GetSourceAndSubSourceTableName(edbUpdateLog.OpTableName, edbData.EdbCode) if source <= 0 { // 没有找到来源,那就过滤 return } // 数据信息 pushIndexValue = &PushIndexValueItemReq{ Id: getIndexDataId(source, subSource, edbData.EdbDataId), IndexCode: getIndexCode(source, edbData.EdbCode), Value: fmt.Sprint(edbData.Value), BusinessDate: edbData.DataTime, CreateTime: edbData.CreateTime, UpdateTime: edbData.ModifyTime, Status: "1", } return } // 上面都没有匹配到的话,那就是数据源的 return handleUpdateDataSource(edbUpdateLog) } // handleUpdateDataSource // @Description: 更新数据处理(数据源的基础数据) // @author: Roc // @datetime 2024-03-11 14:53:35 // @param edbUpdateLog *data_manage.EdbUpdateLog // @return pushIndexData *PushIndexParamDataReq // @return pushEdbClassify *PushEdbClassifyItemReq // @return pushIndexValue *PushIndexValueItemReq // @return err error func handleUpdateDataSource(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) { oldData := edbUpdateLog.OldData newData := edbUpdateLog.NewData isUpdateIndexInfo := false indexName := `` indexSourceName := `` source := 0 edbCode := `` // 钢联 if edbUpdateLog.OpTableName == "base_from_mysteel_chemical_index" { var oldEdbInfo *data_manage.BaseFromMysteelChemicalIndexItem err = json.Unmarshal([]byte(oldData), &oldEdbInfo) if err != nil { return } var newEdbInfo *data_manage.BaseFromMysteelChemicalIndexItem err = json.Unmarshal([]byte(newData), &newEdbInfo) if err != nil { return } if oldEdbInfo.IndexName != newEdbInfo.IndexName { isUpdateIndexInfo = true } else if oldEdbInfo.Source != newEdbInfo.Source { isUpdateIndexInfo = true } if isUpdateIndexInfo { indexName = newEdbInfo.IndexName indexSourceName = newEdbInfo.Source edbCode = newEdbInfo.IndexCode source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL } } else if edbUpdateLog.OpTableName == "base_from_smm_index" { // 有色 var oldEdbInfo *data_manage.BaseFromSmmIndexItem err = json.Unmarshal([]byte(oldData), &oldEdbInfo) if err != nil { return } var newEdbInfo *data_manage.BaseFromSmmIndexItem err = json.Unmarshal([]byte(newData), &newEdbInfo) if err != nil { return } if oldEdbInfo.IndexName != newEdbInfo.IndexName { isUpdateIndexInfo = true } else if oldEdbInfo.Interface != newEdbInfo.Interface { isUpdateIndexInfo = true } if isUpdateIndexInfo { indexName = newEdbInfo.IndexName indexSourceName = newEdbInfo.Interface edbCode = newEdbInfo.IndexCode source = utils.DATA_SOURCE_YS } } // 数据源的指标信息变更了 if isUpdateIndexInfo { newEdbInfo, tmpErr := data_manage.GetEdbInfoItemByCodeAndSource(source, edbCode) if tmpErr != nil { if tmpErr.Error() != utils.ErrNoRow() { err = tmpErr return } return } pushIndexData = &PushIndexParamDataReq{ SourceIndexCode: newEdbInfo.EdbCode, IndexCode: getIndexCode(newEdbInfo.Source, newEdbInfo.EdbCode), IndexName: indexName, IndexShortName: newEdbInfo.EdbName, FrequenceName: newEdbInfo.Frequency, UnitName: newEdbInfo.Unit, AssetBeginDate: newEdbInfo.StartDate, AssetEndDate: newEdbInfo.EndDate, CreateUser: newEdbInfo.SysUserRealName, IndexCreateTime: newEdbInfo.CreateTime, UpdateUser: newEdbInfo.SysUserRealName, DetailUpdateTime: getMaxModifyTime(newEdbInfo.Source, newEdbInfo.SubSource, newEdbInfo.EdbCode, edbUpdateLog.CreateTime.Format(utils.FormatDateTime)), IndexUpdateTime: newEdbInfo.ModifyTime, OrginSource: newEdbInfo.SourceName, OrginSysSource: indexSourceName, SysSource: "产研平台", SourceType: getSourceType(newEdbInfo.Source), Status: 1, } } return } // getSourceType // @Description: 获取指标来源类型 // @author: Roc // @datetime 2024-03-01 13:40:03 // @param source int // @return string func getSourceType(source int) string { switch source { case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS, utils.DATA_SOURCE_BAIINFO, utils.DATA_SOURCE_SCI: //钢联,有色,百川盈孚,红桃3 return "RPA" case utils.DATA_SOURCE_MANUAL: return "手工" default: return "接口" } } // checkUpdateType // @Description: 检查指标更新状态 // @author: Roc // @datetime 2024-03-11 16:35:22 // @param oldEdbInfo *data_manage.EdbInfoItem // @param newEdbInfo *data_manage.EdbInfoItem // @return isUpdateEdbInfo bool func checkUpdateType(oldEdbInfo, newEdbInfo *data_manage.EdbInfoItem) (isUpdateEdbInfo bool) { // eta内部名称 if oldEdbInfo.EdbName != newEdbInfo.EdbName { isUpdateEdbInfo = true return } if oldEdbInfo.Frequency != newEdbInfo.Frequency { isUpdateEdbInfo = true return } if oldEdbInfo.Unit != newEdbInfo.Unit { isUpdateEdbInfo = true return } //if oldEdbInfo.StartDate != newEdbInfo.StartDate { // isUpdateEdbInfo = true // return //} //if oldEdbInfo.EndDate != newEdbInfo.EndDate { // isUpdateEdbInfo = true // return //} //if oldEdbInfo.SysUserId != newEdbInfo.SysUserId { // isUpdateEdbInfo = true // return //} //if oldEdbInfo.SysUserRealName != newEdbInfo.SysUserRealName { // isUpdateEdbInfo = true // return //} return } // OrigInfo // @Description: 数据源中的指标基础信息 type OrigInfo struct { EdbName string SourceName string } // getOrigInfo // @Description: 获取数据源中的指标基础信息 // @author: Roc // @datetime 2024-03-11 16:45:34 // @param source int // @param subSource int // @param edbCode string // @param edbName string // @return origInfo OrigInfo func getOrigInfo(source, subSource int, edbCode, edbName string) (origInfo OrigInfo) { switch source { case utils.DATA_SOURCE_THS, utils.DATA_SOURCE_WIND: // 同花顺、wind origInfo.SourceName = "经济数据库" if subSource == utils.DATA_SUB_SOURCE_DATE { origInfo.SourceName = "日期序列" } case utils.DATA_SOURCE_MYSTEEL_CHEMICAL: edbInfo, err := data_manage.GetBaseFromMysteelChemicalIndexItemByCode(edbCode) if err != nil { return } origInfo.EdbName = edbInfo.IndexName origInfo.SourceName = edbInfo.Source // 钢联化工 case utils.DATA_SOURCE_YS: // 有色 edbInfo, err := data_manage.GetBaseFromSmmIndexItemItemByCode(edbCode) if err != nil { return } origInfo.EdbName = edbInfo.IndexName origInfo.SourceName = edbInfo.Interface } // 如果原始名称为空,则使用ETA指标库的名称作为原始名称 if origInfo.EdbName == `` { origInfo.EdbName = edbName } return } // getMaxModifyTime // @Description: 获取eta指标明细中的指标最大修改时间 // @author: Roc // @datetime 2024-03-11 17:10:28 // @param source int // @param edbCode string // @param addUpdateTime string // @return modifyTime string func getMaxModifyTime(source, subSource int, edbCode, addUpdateTime string) (modifyTime string) { modifyTime, err := data_manage.GetEdbInfoMaxModifyTime(source, subSource, edbCode) if err != nil { modifyTime = addUpdateTime return } return } // getIndexCode // @Description: 指标编码生成 // @author: Roc // @datetime 2024-03-20 17:29:27 // @param source int // @param edbCode string // @return string func getIndexCode(source int, edbCode string) string { return fmt.Sprint(source, "_", edbCode) } // getIndexDataId // @Description: 获取数据id // @author: Roc // @datetime 2024-03-20 17:36:53 // @param source int // @param subSource int // @param edbDataId int // @return string func getIndexDataId(source, subSource int, edbDataId int32) string { return utils.MD5(fmt.Sprint(source, "_", subSource, "_", edbDataId)) }