package eta_bridge import ( "encoding/json" "eta/eta_data_init/models" "eta/eta_data_init/utils" "fmt" "github.com/rdlucklib/rdluck_tools/uuid" "strings" "sync" ) // 用户同步的锁 var lockSyncUser sync.Mutex // SyncUser // @Description: 定时同步ETA指标信息变更数据至第三方 // @author: Roc // @datetime 2024-02-28 14:00:45 // @param cont context.Context // @return err error func SyncUser() (err error) { lockSyncUser.Lock() defer func() { if err != nil { tips := "SyncUser-定时将第三方的用户数据同步到ETA失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) } lockSyncUser.Unlock() }() uri := "/xy/user/sync" _, err, _ = HttpEtaBridgeGet(uri) if err != nil { return } return } // PushBaseParamReq // @Description: 业务报文 type PushBaseParamReq struct { SerialID string `json:"serialID" description:"流水号"` TableCode string `json:"tableCode" description:"数据表编码"` Total int `json:"total" description:"本次落表数据总数"` IsEmailWarn int `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"` Data interface{} `json:"data" description:"报文体"` } // PushIndexParamDataReq // @Description: 指标数据结构 type PushIndexParamDataReq struct { SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"` IndexCode string `json:"index_code" description:"数仓加工的指标ID,来源+来源ID,使用下划线连接,MST000ID00013242"` IndexName string `json:"index_name" description:"外部来源的指标名称"` IndexShortName string `json:"index_short_name" description:"系统内的指标名称/简称"` FrequenceName string `json:"frequence_name" description:"指标频度,如:日度、周度、月度"` UnitName string `json:"unit_name" description:"指标单位,如:元/吨、千克、立方米"` //CountryName string `json:"country_name" description:""` //ProvinceName string `json:"province_name" description:""` //AreaName string `json:"area_name" description:""` //CityName string `json:"city_name" description:""` //CountyName string `json:"county_name" description:""` //RegionName string `json:"region_name" description:""` //CompanyName string `json:"company_name" description:""` //BreedName string `json:"breed_name" description:""` //MaterialName string `json:"material_name" description:""` //SpecName string `json:"spec_name" description:""` //MarketName string `json:"market_name" description:""` //DerivativeType string `json:"derivative_type" description:""` //ContractName string `json:"contract_name" description:""` //AuthKindName string `json:"auth_kind_name" description:""` //CustomSmallClassName string `json:"custom_small_class_name" description:""` AssetBeginDate string `json:"asset_begin_date" description:"业务字段,指标明细数据的业务日期开始时间;格式yyyy-mm-dd"` AssetEndDate string `json:"asset_end_date" description:"业务字段,指标明细数据的业务日期结束时间;格式yyyy-mm-dd"` CreateUser string `json:"create_user" description:"创建人姓名"` IndexCreateTime string `json:"index_create_time" description:"指标基础信息创建时间戳;格式yyyy-mm-dd hh:mi:ss"` UpdateUser string `json:"update_user" description:"更新人姓名"` DetailUpdateTime string `json:"detail_update_time" description:"指标明细信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"` IndexUpdateTime string `json:"index_update_time" description:"指标基础信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"` //DutyDept string `json:"duty_dept" description:""` //BusinessDept string `json:"business_dept" description:""` OrginSource string `json:"orgin_source" description:"外部数据原始来源,如国家统计局、钢联等"` OrginSysSource string `json:"orgin_sys_source" description:"外部来源系统,即数据供应商,如钢联、wind、同花顺"` SysSource string `json:"sys_source" description:"内部来源系统,如产研平台、市价平台"` SourceType string `json:"source_type" description:"数据接入方式,手工、接口、RPA"` //EtlTime string `json:"etl_time" description:""` Status int `json:"status" description:"逻辑删除:0-失效,1-有效"` } // PushIndexValueItemReq // @Description: 指标日期值数据结构 type PushIndexValueItemReq struct { Id string `json:"id"` IndexCode string `json:"index_code" description:"指标代码"` Value string `json:"value" description:"数值"` BusinessDate string `json:"business_date" description:"业务日期(数据日期)"` CreateTime string `json:"create_time" description:"数据进入ETA的时间"` UpdateTime string `json:"update_time" description:"eta库中修改数据的时间"` Status string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"` } // PushClassifyItemReq // @Description: 指标分类数据结构 type PushClassifyItemReq struct { ClassifyId int `json:"classify_id" description:"自增id"` ClassifyType int `json:"classify_type" description:"分类类型,0:普通指标分类,1:预测指标分类"` ClassifyName string `json:"classify_name" description:"分类名称"` ParentId int `json:"parent_id" description:"父级id"` HasData int `json:"has_data" description:"是否存在指标数据,1:有,2:无"` CreateTime string `json:"create_time" description:"创建时间"` UpdateTime string `json:"update_time" description:"修改时间"` SysUserId int `json:"sys_user_id" description:"创建人id"` SysUserRealName string `json:"sys_user_real_name" description:"创建人姓名"` Level int `json:"level" description:"层级"` UniqueCode string `json:"unique_code" description:"唯一编码"` SortColumn int `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"` } // PushEdbClassifyItemReq // @Description: 指标与目录的关系请求结构 type PushEdbClassifyItemReq struct { Id string `json:"id" description:"唯一主键"` ClassifyId int `json:"classify_id" description:"目录分类ID"` IndexCode string `json:"index_code" description:"指标ID"` CreateTime string `json:"create_time" description:"创建时间"` CreateUser string `json:"create_user" description:"创建人"` UpdateTime string `json:"update_time" description:"修改时间"` UpdateUser string `json:"update_user" description:"修改人"` } // 同步指标分类锁 var lockSyncClassify sync.Mutex // SyncClassifyList // @Description: 同步分类 // @author: Roc // @datetime 2024-03-14 10:15:53 // @param num int 如果小于等于0,那么是同步所有的,如果大于0,那么是同步num条数据 // @return err error func SyncClassifyList(num int) (err error) { lockSyncClassify.Lock() defer func() { if err != nil { tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) } lockSyncClassify.Unlock() }() // 需要同步的数据下标 syncIndex := num - 1 var condition string var pars []interface{} // 普通指标分类 condition = " AND classify_type = ? " pars = append(pars, 0) list, err := models.GetAllEdbClassifyListByCondition(condition, pars) if err != nil { fmt.Println(err) return } dataLimitList := make([][]PushClassifyItemReq, 0) dataList := make([]PushClassifyItemReq, 0) for k, v := range list { dataList = append(dataList, PushClassifyItemReq{ ClassifyId: int(v.ClassifyID), ClassifyType: int(v.ClassifyType), ClassifyName: v.ClassifyName, ParentId: int(v.ParentID), HasData: int(v.HasData), CreateTime: v.CreateTime.Format(utils.FormatDateTime), UpdateTime: v.ModifyTime.Format(utils.FormatDateTime), SysUserId: int(v.SysUserID), SysUserRealName: v.SysUserRealName, Level: int(v.Level), UniqueCode: v.UniqueCode, SortColumn: int(v.Sort), }) // 如果指定数量了,且当前下标等于指定的下标,那么就跳出循环 if num > 0 && syncIndex == k { break } if len(dataList) >= 100 { dataLimitList = append(dataLimitList, dataList) dataList = make([]PushClassifyItemReq, 0) } } lenData := len(dataList) if lenData > 0 { dataLimitList = append(dataLimitList, dataList) } if len(dataLimitList) < 0 { fmt.Println("无分类数据推送") return } for k, tmpDataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(tmpDataList), IsEmailWarn: 0, Data: tmpDataList, } uri := "/xy/index/pushClassify" _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { err = fmt.Errorf("第%d组分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()) fmt.Println(err) continue } } return } // 同步指标锁 var lockSyncIndex sync.Mutex // SyncIndexList // @Description: 批量初始化指标 // @author: Roc // @datetime 2024-03-07 19:20:19 // @param currLogIdStr int64 // @param logMaxId int64 // @return lastId int64 func SyncIndexList(num int) (err error) { lockSyncIndex.Lock() defer func() { if err != nil { tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) } lockSyncIndex.Unlock() }() // 需要同步的数据下标 syncIndex := num - 1 // 初始化数据表的关系 models.InitEdbSourceVar() // 查询当次需要同步的数据 var condition string var pars []interface{} condition += " AND edb_info_type = ? AND edb_type = ? " pars = append(pars, 0, 1) // 获取指标数 list, err := models.GetAllEdbInfoListByCondition(condition, pars) if err != nil { fmt.Println(err) return } pushIndexParamDataReqList := make([]*PushIndexParamDataReq, 0) pushEdbClassifyItemReqList := make([]*PushEdbClassifyItemReq, 0) for k, edbInfo := range list { // 获取数据源中指标的基础信息 origInfo := getOrigInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EdbName) // 指标信息 pushIndexData := &PushIndexParamDataReq{ SourceIndexCode: edbInfo.EdbCode, IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode), IndexName: origInfo.EdbName, IndexShortName: edbInfo.EdbName, FrequenceName: edbInfo.Frequency, UnitName: edbInfo.Unit, AssetBeginDate: edbInfo.StartDate, AssetEndDate: edbInfo.EndDate, CreateUser: edbInfo.SysUserRealName, IndexCreateTime: edbInfo.CreateTime, UpdateUser: edbInfo.SysUserRealName, //DetailUpdateTime: getMaxModifyTime(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.ModifyTime), DetailUpdateTime: edbInfo.ModifyTime, IndexUpdateTime: edbInfo.ModifyTime, OrginSource: edbInfo.SourceName, OrginSysSource: origInfo.SourceName, SysSource: "产研平台", SourceType: getSourceType(edbInfo.Source), Status: 1, } // 指标与分类的关系信息 pushEdbClassify := &PushEdbClassifyItemReq{ Id: fmt.Sprint(edbInfo.EdbInfoId), ClassifyId: edbInfo.ClassifyId, IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode), CreateTime: edbInfo.CreateTime, CreateUser: edbInfo.SysUserRealName, UpdateTime: edbInfo.ModifyTime, UpdateUser: edbInfo.SysUserRealName, } pushIndexParamDataReqList = append(pushIndexParamDataReqList, pushIndexData) pushEdbClassifyItemReqList = append(pushEdbClassifyItemReqList, pushEdbClassify) //pushIndexValueList, tmpErr := getPushIndexValueItemReqList(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbInfoId) //if tmpErr != nil { // fmt.Printf("%s渠道的%s指标数据获取失败\n", edbInfo.SourceName, edbInfo.EdbName) // continue //} //pushIndex([]*PushIndexParamDataReq{pushIndexData}) //pushIndexClassify([]*PushEdbClassifyItemReq{pushEdbClassify}) //pushIndexValue(pushIndexValueList) // 如果指定数量了,且当前下标等于指定的下标,那么就跳出循环 if num > 0 && syncIndex == k { break } } pushIndex(pushIndexParamDataReqList) pushIndexClassify(pushEdbClassifyItemReqList) return } const pushBatchSize = 100 // pushIndex // @Description: 指标信息数据推送 // @author: Roc // @datetime 2024-03-07 16:35:02 // @param allPushList []*PushIndexParamDataReq func pushIndex(allPushList []*PushIndexParamDataReq) { lenDataList := len(allPushList) if lenDataList <= 0 { return } uri := utils.SyncIndexPath + "/pushIndexData" errDataList := make([]*PushIndexParamDataReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化指标信息数据失败" + err.Error()) } utils.FileLog.Info("pushIndex errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() dataLimitList := make([][]*PushIndexParamDataReq, 0) for i := 0; i < lenDataList; i += pushBatchSize { endIndex := min(i+pushBatchSize, lenDataList) tempSlice := allPushList[i:endIndex] dataLimitList = append(dataLimitList, tempSlice) } for k, dataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(dataList), IsEmailWarn: 0, Data: dataList, } _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组指标信息数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } } // pushIndexValue // @Description: 指标明细数据推送 // @author: Roc // @datetime 2024-03-07 16:32:47 // @param allPushList []*PushIndexValueItemReq func pushIndexValue(allPushList []*PushIndexValueItemReq) { lenDataList := len(allPushList) if lenDataList <= 0 { return } uri := utils.SyncIndexPath + "/pushIndexValue" errDataList := make([]*PushIndexValueItemReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化指标明细数据失败" + err.Error()) } utils.FileLog.Info("pushIndexValue errList:%s;推送失败的指标明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() dataLimitList := make([][]*PushIndexValueItemReq, 0) for i := 0; i < lenDataList; i += pushBatchSize { endIndex := min(i+pushBatchSize, lenDataList) tempSlice := allPushList[i:endIndex] dataLimitList = append(dataLimitList, tempSlice) } for k, dataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(dataList), IsEmailWarn: 0, Data: dataList, } _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组指标明细数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } return } // pushIndexClassify // @Description: 指标与分类的关系推送 // @author: Roc // @datetime 2024-03-07 16:32:47 // @param allPushList []*PushIndexValueItemReq func pushIndexClassify(allPushList []*PushEdbClassifyItemReq) { lenDataList := len(allPushList) if lenDataList <= 0 { return } uri := utils.SyncIndexPath + "/pushEdbClassify" errDataList := make([]*PushEdbClassifyItemReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化指标明细数据失败" + err.Error()) } utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的指标所属分类明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() dataLimitList := make([][]*PushEdbClassifyItemReq, 0) for i := 0; i < lenDataList; i += pushBatchSize { endIndex := min(i+pushBatchSize, lenDataList) tempSlice := allPushList[i:endIndex] dataLimitList = append(dataLimitList, tempSlice) } for k, dataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(dataList), IsEmailWarn: 0, Data: dataList, } _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组指标所属分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } return } // getIndexCode // @Description: 指标编码生成 // @author: Roc // @datetime 2024-03-20 17:29:27 // @param source int // @param edbCode string // @return string func getIndexCode(source int, edbCode string) string { return fmt.Sprint(source, "_", edbCode) } // 辅助函数:返回a和b中的较小值 func min(a, b int) int { if a < b { return a } return b }