package eta_bridge import ( "context" "encoding/json" "eta/eta_task/models/data_manage" "eta/eta_task/services/alarm_msg" "eta/eta_task/services/eta_hub" "eta/eta_task/utils" "fmt" "github.com/rdlucklib/rdluck_tools/uuid" "net/url" "strings" "sync" "time" ) // 用户同步的锁 var lockSyncUser sync.Mutex // SyncUser // @Description: 定时同步ETA指标信息变更数据至第三方 // @author: Roc // @datetime 2024-02-28 14:00:45 // @param cont context.Context // @return err error func SyncUser(cont context.Context) (err error) { lockSyncUser.Lock() defer func() { if err != nil { tips := "SyncUser-定时将第三方的用户数据同步到ETA失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } lockSyncUser.Unlock() }() uri := "/xy/user/pull" _, err, _ = HttpEtaBridgeGet(uri) if err != nil { return } return } // PushBaseParamReq // @Description: 业务报文 type PushBaseParamReq struct { SerialID string `json:"serialID" description:"流水号"` TableCode string `json:"tableCode" description:"数据表编码"` Total int `json:"total" description:"本次落表数据总数"` IsEmailWarn int `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"` Data interface{} `json:"data" description:"报文体"` } // PushIndexParamDataReq // @Description: 指标数据结构 type PushIndexParamDataReq struct { SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"` IndexCode string `json:"index_code" description:"数仓加工的指标ID,来源+来源ID,使用下划线连接,MST000ID00013242"` IndexName string `json:"index_name" description:"外部来源的指标名称"` IndexShortName string `json:"index_short_name" description:"系统内的指标名称/简称"` FrequenceName string `json:"frequence_name" description:"指标频度,如:日度、周度、月度"` UnitName string `json:"unit_name" description:"指标单位,如:元/吨、千克、立方米"` //CountryName string `json:"country_name" description:""` //ProvinceName string `json:"province_name" description:""` //AreaName string `json:"area_name" description:""` //CityName string `json:"city_name" description:""` //CountyName string `json:"county_name" description:""` //RegionName string `json:"region_name" description:""` //CompanyName string `json:"company_name" description:""` //BreedName string `json:"breed_name" description:""` //MaterialName string `json:"material_name" description:""` //SpecName string `json:"spec_name" description:""` //MarketName string `json:"market_name" description:""` //DerivativeType string `json:"derivative_type" description:""` //ContractName string `json:"contract_name" description:""` //AuthKindName string `json:"auth_kind_name" description:""` //CustomSmallClassName string `json:"custom_small_class_name" description:""` AssetBeginDate string `json:"asset_begin_date" description:"业务字段,指标明细数据的业务日期开始时间;格式yyyy-mm-dd"` AssetEndDate string `json:"asset_end_date" description:"业务字段,指标明细数据的业务日期结束时间;格式yyyy-mm-dd"` CreateUser string `json:"create_user" description:"创建人姓名"` IndexCreateTime string `json:"index_create_time" description:"指标基础信息创建时间戳;格式yyyy-mm-dd hh:mi:ss"` UpdateUser string `json:"update_user" description:"更新人姓名"` DetailUpdateTime string `json:"detail_update_time" description:"指标明细信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"` IndexUpdateTime string `json:"index_update_time" description:"指标基础信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"` //DutyDept string `json:"duty_dept" description:""` //BusinessDept string `json:"business_dept" description:""` OrginSource string `json:"orgin_source" description:"外部数据原始来源,如国家统计局、钢联等"` OrginSysSource string `json:"orgin_sys_source" description:"外部来源系统,即数据供应商,如钢联、wind、同花顺"` SysSource string `json:"sys_source" description:"内部来源系统,如产研平台、市价平台"` SourceType string `json:"source_type" description:"数据接入方式,手工、接口、RPA"` //EtlTime string `json:"etl_time" description:""` Status int `json:"status" description:"逻辑删除:0-失效,1-有效"` } // PushIndexValueItemReq // @Description: 指标日期值数据结构 type PushIndexValueItemReq struct { Id string `json:"id"` IndexCode string `json:"index_code" description:"指标代码"` Value string `json:"value" description:"数值"` BusinessDate string `json:"business_date" description:"业务日期(数据日期)"` CreateTime string `json:"create_time" description:"数据进入ETA的时间"` UpdateTime string `json:"update_time" description:"eta库中修改数据的时间"` Status string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"` } // PushClassifyItemReq // @Description: 指标分类数据结构 type PushClassifyItemReq struct { ClassifyId int `json:"classify_id" description:"自增id"` ClassifyType int `json:"classify_type" description:"分类类型,0:普通指标分类,1:预测指标分类"` ClassifyName string `json:"classify_name" description:"分类名称"` ParentId int `json:"parent_id" description:"父级id"` HasData int `json:"has_data" description:"是否存在指标数据,1:有,2:无"` CreateTime string `json:"create_time" description:"创建时间"` UpdateTime string `json:"update_time" description:"修改时间"` SysUserId int `json:"sys_user_id" description:"创建人id"` SysUserRealName string `json:"sys_user_real_name" description:"创建人姓名"` Level int `json:"level" description:"层级"` UniqueCode string `json:"unique_code" description:"唯一编码"` SortColumn int `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"` } // PushEdbClassifyItemReq // @Description: 指标与目录的关系请求结构 type PushEdbClassifyItemReq struct { Id string `json:"id" description:"唯一主键"` ClassifyId int `json:"classify_id" description:"目录分类ID"` IndexCode string `json:"index_code" description:"指标ID"` CreateTime string `json:"create_time" description:"创建时间"` CreateUser string `json:"create_user" description:"创建人"` UpdateTime string `json:"update_time" description:"修改时间"` UpdateUser string `json:"update_user" description:"修改人"` } // 同步指标分类锁 var lockSyncClassify sync.Mutex // SyncClassifyList // @Description: 定时同步ETA分类数据至第三方 // @author: Roc // @datetime 2024-02-28 14:00:45 // @param cont context.Context // @return err error func SyncClassifyList(cont context.Context) (err error) { lockSyncClassify.Lock() defer func() { if err != nil { tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } lockSyncClassify.Unlock() }() var condition string var pars []interface{} // 普通指标分类 condition = " AND classify_type = ? " pars = append(pars, 0) list, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars) if err != nil { utils.FileLog.Error("获取分类列表数据失败:" + err.Error()) return } dataLimitList := make([][]PushClassifyItemReq, 0) dataList := make([]PushClassifyItemReq, 0) for _, v := range list { dataList = append(dataList, PushClassifyItemReq{ ClassifyId: int(v.ClassifyID), ClassifyType: int(v.ClassifyType), ClassifyName: v.ClassifyName, ParentId: int(v.ParentID), HasData: int(v.HasData), CreateTime: v.CreateTime.Format(utils.FormatDateTime), UpdateTime: v.ModifyTime.Format(utils.FormatDateTime), SysUserId: int(v.SysUserID), SysUserRealName: v.SysUserRealName, Level: int(v.Level), UniqueCode: v.UniqueCode, SortColumn: int(v.Sort), }) if len(dataList) >= 100 { dataLimitList = append(dataLimitList, dataList) dataList = make([]PushClassifyItemReq, 0) } } lenData := len(dataList) if lenData > 0 { dataLimitList = append(dataLimitList, dataList) } if len(dataLimitList) < 0 { //fmt.Println("无分类数据推送") return } errDataList := make([]PushClassifyItemReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化分类列表数据失败" + err.Error()) } utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的分类列表数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() for k, tmpDataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(tmpDataList), IsEmailWarn: 0, Data: tmpDataList, } uri := "/xy/index/pushClassify" _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组分类列表数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } return } // 同步指标信息锁 var lockSyncIndex sync.Mutex // SyncIndex // @Description: 定时同步指标信息 // @author: Roc // @datetime 2024-03-07 17:39:34 // @param cont context.Context // @return err error func SyncIndex(cont context.Context) (err error) { lockSyncIndex.Lock() defer func() { if err != nil { tips := "SyncIndex-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } lockSyncIndex.Unlock() }() // 获取当前最大ID logMaxId, err := data_manage.GetEdbUpdateLogMaxId() if err != nil { return } var currLogId int64 // 当前已经操作的最大ID currLogId, err = utils.Rc.GetInt64(utils.CACHE_EDB_UPDATE_LOG_ID) err = fmt.Errorf(utils.RedisNoKeyErr) if err != nil { // 如果不是没找到key,那么说明是redis报错 if err.Error() != utils.RedisNoKeyErr { return } err = nil // 查找当前已经处理了的日志最大ID currLogId, err = data_manage.GetEdbUpdateLogMaxHandleId() if err != nil { if err.Error() != utils.ErrNoRow() { utils.FileLog.Error("查找当前已经处理了的日志最大ID失败:" + err.Error()) } else { err = nil } currLogId = 0 } } // 遍历获取下一页的数据 for currId := currLogId; currId < logMaxId; { currId = handlePush(currId, logMaxId) } return } // handlePush // @Description: 推送处理 // @author: Roc // @datetime 2024-03-07 19:20:19 // @param currLogIdStr int64 // @param logMaxId int64 // @return lastId int64 func handlePush(currLogIdStr, logMaxId int64) (lastId int64) { lastId = currLogIdStr // 查询当次需要同步的数据 var condition string var pars []interface{} condition += " AND id > ? AND id <= ?" pars = append(pars, currLogIdStr, logMaxId) list, err := data_manage.GetEdbUpdateLogByCondition(condition, pars) if err != nil { utils.FileLog.Error("获取变更日志失败:" + err.Error()) return } pushIndexList := make([]*PushIndexParamDataReq, 0) pushEdbClassifyList := make([]*PushEdbClassifyItemReq, 0) pushIndexValueList := make([]*PushIndexValueItemReq, 0) idList := make([]int64, 0) for _, v := range list { lastId = v.Id idList = append(idList, v.Id) pushIndexData, pushEdbClassifyData, pushIndexValueData, tmpErr := handleData(v) if tmpErr != nil { err = tmpErr utils.FileLog.Error("获取待处理的数据失败:" + err.Error()) continue } if pushIndexData != nil { pushIndexList = append(pushIndexList, pushIndexData) } if pushEdbClassifyData != nil { pushEdbClassifyList = append(pushEdbClassifyList, pushEdbClassifyData) } if pushIndexValueData != nil { pushIndexValueList = append(pushIndexValueList, pushIndexValueData) } } pushIndex(pushIndexList) pushIndexClassify(pushEdbClassifyList) pushIndexValue(pushIndexValueList) // 标记处理 err = data_manage.HandleUpdateLogByIds(idList, time.Now().Format(utils.FormatDateTime)) if err != nil { utils.FileLog.Error("批量处理指标更新记录失败:" + err.Error()) } utils.Rc.Put(utils.CACHE_EDB_UPDATE_LOG_ID, lastId, 31*24*time.Hour) return } const pushBatchSize = 100 // pushIndex // @Description: 指标信息数据推送 // @author: Roc // @datetime 2024-03-07 16:35:02 // @param allPushList []*PushIndexParamDataReq func pushIndex(allPushList []*PushIndexParamDataReq) { lenDataList := len(allPushList) if lenDataList <= 0 { return } uri := utils.SyncIndexPath + "/pushIndexData" errDataList := make([]*PushIndexParamDataReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化指标信息数据失败" + err.Error()) } utils.FileLog.Info("pushIndex errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() dataLimitList := make([][]*PushIndexParamDataReq, 0) for i := 0; i < lenDataList; i += pushBatchSize { endIndex := min(i+pushBatchSize, lenDataList) tempSlice := allPushList[i:endIndex] dataLimitList = append(dataLimitList, tempSlice) } for k, dataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(dataList), IsEmailWarn: 0, Data: dataList, } _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组指标信息数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } } // pushIndexValue // @Description: 指标明细数据推送 // @author: Roc // @datetime 2024-03-07 16:32:47 // @param allPushList []*PushIndexValueItemReq func pushIndexValue(allPushList []*PushIndexValueItemReq) { lenDataList := len(allPushList) if lenDataList <= 0 { return } uri := utils.SyncIndexPath + "/pushIndexValue" errDataList := make([]*PushIndexValueItemReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化指标明细数据失败" + err.Error()) } utils.FileLog.Info("pushIndexValue errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() dataLimitList := make([][]*PushIndexValueItemReq, 0) for i := 0; i < lenDataList; i += pushBatchSize { endIndex := min(i+pushBatchSize, lenDataList) tempSlice := allPushList[i:endIndex] dataLimitList = append(dataLimitList, tempSlice) } for k, dataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(dataList), IsEmailWarn: 0, Data: dataList, } _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组指标明细数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } return } // pushIndexClassify // @Description: 指标与分类的关系推送 // @author: Roc // @datetime 2024-03-07 16:32:47 // @param allPushList []*PushIndexValueItemReq func pushIndexClassify(allPushList []*PushEdbClassifyItemReq) { lenDataList := len(allPushList) if lenDataList <= 0 { return } uri := utils.SyncIndexPath + "/pushEdbClassify" errDataList := make([]*PushEdbClassifyItemReq, 0) errList := make([]string, 0) defer func() { if len(errList) > 0 { dataByte, err := json.Marshal(errDataList) if err != nil { dataByte = []byte("序列化指标明细数据失败" + err.Error()) } utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的指标所属分类明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte)) } }() dataLimitList := make([][]*PushEdbClassifyItemReq, 0) for i := 0; i < lenDataList; i += pushBatchSize { endIndex := min(i+pushBatchSize, lenDataList) tempSlice := allPushList[i:endIndex] dataLimitList = append(dataLimitList, tempSlice) } for k, dataList := range dataLimitList { req := PushBaseParamReq{ SerialID: uuid.NewUUID().Hex32(), TableCode: "", Total: len(dataList), IsEmailWarn: 0, Data: dataList, } _, e, _ := HttpEtaBridgePost(uri, req) if e != nil { errList = append(errList, fmt.Sprintf("第%d组指标所属分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())) errDataList = append(errDataList, dataList...) continue } } return } // 辅助函数:返回a和b中的较小值 func min(a, b int) int { if a < b { return a } return b } // 同步crm指标信息锁 var lockGetCrmIndex sync.Mutex // SyncXyCrmIndex // @Description: 定时同步CRM指标信息 // @author: Roc // @datetime 2024-5-22 10:46:08 // @param cont context.Context // @return err error func SyncXyCrmIndex(cont context.Context) (err error) { lockGetCrmIndex.Lock() errMsgList := make([]string, 0) defer func() { if err != nil { tips := "SyncXyCrmIndex-定时同步CRM数据到ETA平台失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } if len(errMsgList) > 0 { tips := "SyncXyCrmIndex-定时同步CRM数据到ETA平台失败, ErrMsg:\n" + strings.Join(errMsgList, "\n") utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } lockGetCrmIndex.Unlock() fmt.Println("end SyncXyCrmIndex") }() // 未配置资产包合数据分区,那么就不执行 if utils.SyncCrmAssetPkgCd == `` || utils.SyncCrmDataSourceType == `` { return } var lastUpdateTimeStr string // 上一次更新的时间 nowTimeStr := time.Now().Format(utils.FormatDateTimeUnSpaceV2) // 这次更新的时间 key := data_manage.CrmIndexLastUpdateTime sysInteractionLog, err := data_manage.GetBusinessSysInteractionLogByKey(key) if err != nil { if err.Error() != utils.ErrNoRow() { return } //lastUpdateTime := time.Now().Format("2006-01-02 15:04:05") } else { if sysInteractionLog.InteractionVal != `` { lastUpdateTimeStr = sysInteractionLog.InteractionVal } } syncCrmAssetPkgCd := utils.SyncCrmAssetPkgCd syncCrmAssetPkgCdList := strings.Split(syncCrmAssetPkgCd, ",") for _, assetPkgCd := range syncCrmAssetPkgCdList { err, errMsgList = syncCrmIndex(assetPkgCd, 1, utils.SyncCrmIndexNum, lastUpdateTimeStr) } // 修改最后的更新时间 modifyCrmIndexLastUpdateTime(nowTimeStr) return } // syncCrmIndex // @Description: 开始同步CRM指标信息 // @author: Roc // @datetime 2024-05-17 15:55:11 // @param assetPkgCd string // @param currIndex int // @param pageSize int // @param lastUpdateTimeStr string // @return err error // @return errMsgList []string func syncCrmIndex(assetPkgCd string, currIndex, pageSize int, baseLastUpdateTimeStr string) (err error, errMsgList []string) { errMsgList = make([]string, 0) lastUpdateTimeStr := baseLastUpdateTimeStr if lastUpdateTimeStr != `` { lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr) } uri := fmt.Sprintf("%s/getCrmData?index_pkg_code=%s&data_source_type=%s¤t_index=%d&page_size=%d&detail_last_update_start_time=%s", utils.SyncCrmIndexPath, assetPkgCd, utils.SyncCrmDataSourceType, currIndex, pageSize, lastUpdateTimeStr) bResult, err, _ := HttpEtaBridgeGet(uri) if err != nil { return } result := new(EtaBridgeDataRespAndBusinessData) err = json.Unmarshal(bResult, &result) if err != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult)) utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult)) return } //totalPage := result.Data.Paging.Pages for _, v := range result.Data.List { tmpErr := pushCrmDataToHub(v) if tmpErr != nil { errMsgList = append(errMsgList, tmpErr.Error()) } } // 如果还有下一页,那么就继续请求下一页 if currIndex < result.Data.Paging.Pages { _, tmpErrMsgList := syncCrmIndex(assetPkgCd, currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr) errMsgList = append(errMsgList, tmpErrMsgList...) } return } // pushCrmDataToHub // @Description: 调用hub服务,将数据推送到eta // @author: Roc // @datetime 2024-05-17 15:55:24 // @param data interface{} // @return err error func pushCrmDataToHub(data interface{}) (err error) { uri := `/edb/push` _, err, _ = eta_hub.HttpEtaHubPost(uri, data) //result := new(EtaBridgeDataRespAndBusinessData) //err = json.Unmarshal(bResult, &result) //if err != nil { // err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult)) // utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult)) // return //} return } // modifyCrmIndexLastUpdateTime // @Description: 修改crm指标的最近拉取的时间 // @author: Roc // @datetime 2024-05-17 11:32:32 // @param fileName string // @param position uint32 // @return err error func modifyCrmIndexLastUpdateTime(lastUpdateTime string) { var err error defer func() { if err != nil { utils.FileLog.Error("修改binlog文件名称和位置异常,lastUpdateTime", lastUpdateTime, ",err:", err) } }() // fileName 变更 key := data_manage.CrmIndexLastUpdateTime fileNameLog, err := data_manage.GetBusinessSysInteractionLogByKey(key) if err != nil { if err.Error() != utils.ErrNoRow() { return } err = nil fileNameLog = &data_manage.BusinessSysInteractionLog{ //ID: 0, InteractionKey: key, InteractionVal: lastUpdateTime, Remark: "crm拉取数据的最近更新时间", ModifyTime: time.Now(), CreateTime: time.Now(), } err = fileNameLog.Create() if err != nil { return } } else { fileNameLog.InteractionVal = lastUpdateTime fileNameLog.ModifyTime = time.Now() err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"}) if err != nil { return } } return }