|
@@ -7,10 +7,13 @@ import (
|
|
"eta/eta_task/services/alarm_msg"
|
|
"eta/eta_task/services/alarm_msg"
|
|
"eta/eta_task/utils"
|
|
"eta/eta_task/utils"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "github.com/rdlucklib/rdluck_tools/uuid"
|
|
"strings"
|
|
"strings"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "time"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// 用户同步的锁
|
|
var lockSyncUser sync.Mutex
|
|
var lockSyncUser sync.Mutex
|
|
|
|
|
|
// SyncUser
|
|
// SyncUser
|
|
@@ -53,11 +56,11 @@ type PushBaseParamReq struct {
|
|
// @Description: 指标数据结构
|
|
// @Description: 指标数据结构
|
|
type PushIndexParamDataReq struct {
|
|
type PushIndexParamDataReq struct {
|
|
SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"`
|
|
SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"`
|
|
- IndexCode string `json:"index_code" description:""`
|
|
|
|
- IndexName string `json:"index_name" description:""`
|
|
|
|
- IndexShortName string `json:"index_short_name" description:""`
|
|
|
|
- FrequenceName string `json:"frequence_name" description:""`
|
|
|
|
- UnitName string `json:"unit_name" description:""`
|
|
|
|
|
|
+ IndexCode string `json:"index_code" description:"数仓加工的指标ID,来源+来源ID,使用下划线连接,MST000ID00013242"`
|
|
|
|
+ IndexName string `json:"index_name" description:"外部来源的指标名称"`
|
|
|
|
+ IndexShortName string `json:"index_short_name" description:"系统内的指标名称/简称"`
|
|
|
|
+ FrequenceName string `json:"frequence_name" description:"指标频度,如:日度、周度、月度"`
|
|
|
|
+ UnitName string `json:"unit_name" description:"指标单位,如:元/吨、千克、立方米"`
|
|
//CountryName string `json:"country_name" description:""`
|
|
//CountryName string `json:"country_name" description:""`
|
|
//ProvinceName string `json:"province_name" description:""`
|
|
//ProvinceName string `json:"province_name" description:""`
|
|
//AreaName string `json:"area_name" description:""`
|
|
//AreaName string `json:"area_name" description:""`
|
|
@@ -73,99 +76,21 @@ type PushIndexParamDataReq struct {
|
|
//ContractName string `json:"contract_name" description:""`
|
|
//ContractName string `json:"contract_name" description:""`
|
|
//AuthKindName string `json:"auth_kind_name" description:""`
|
|
//AuthKindName string `json:"auth_kind_name" description:""`
|
|
//CustomSmallClassName string `json:"custom_small_class_name" description:""`
|
|
//CustomSmallClassName string `json:"custom_small_class_name" description:""`
|
|
- AssetBeginDate string `json:"asset_begin_date" description:""`
|
|
|
|
- AssetEndDate string `json:"asset_end_date" description:""`
|
|
|
|
- CreateUser string `json:"create_user" description:""`
|
|
|
|
- IndexCreateTime string `json:"index_create_time" description:""`
|
|
|
|
- UpdateUser string `json:"update_user" description:""`
|
|
|
|
- DetailUpdateTime string `json:"detail_update_time" description:""`
|
|
|
|
- IndexUpdateTime string `json:"index_update_time" description:""`
|
|
|
|
|
|
+ AssetBeginDate string `json:"asset_begin_date" description:"业务字段,指标明细数据的业务日期开始时间;格式yyyy-mm-dd"`
|
|
|
|
+ AssetEndDate string `json:"asset_end_date" description:"业务字段,指标明细数据的业务日期结束时间;格式yyyy-mm-dd"`
|
|
|
|
+ CreateUser string `json:"create_user" description:"创建人姓名"`
|
|
|
|
+ IndexCreateTime string `json:"index_create_time" description:"指标基础信息创建时间戳;格式yyyy-mm-dd hh:mi:ss"`
|
|
|
|
+ UpdateUser string `json:"update_user" description:"更新人姓名"`
|
|
|
|
+ DetailUpdateTime string `json:"detail_update_time" description:"指标明细信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"`
|
|
|
|
+ IndexUpdateTime string `json:"index_update_time" description:"指标基础信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"`
|
|
//DutyDept string `json:"duty_dept" description:""`
|
|
//DutyDept string `json:"duty_dept" description:""`
|
|
//BusinessDept string `json:"business_dept" description:""`
|
|
//BusinessDept string `json:"business_dept" description:""`
|
|
- OrginSource string `json:"orgin_source" description:""`
|
|
|
|
- OrginSysSource string `json:"orgin_sys_source" description:""`
|
|
|
|
- SysSource string `json:"sys_source" description:""`
|
|
|
|
- SourceType string `json:"source_type" description:""`
|
|
|
|
|
|
+ OrginSource string `json:"orgin_source" description:"外部数据原始来源,如国家统计局、钢联等"`
|
|
|
|
+ OrginSysSource string `json:"orgin_sys_source" description:"外部来源系统,即数据供应商,如钢联、wind、同花顺"`
|
|
|
|
+ SysSource string `json:"sys_source" description:"内部来源系统,如产研平台、市价平台"`
|
|
|
|
+ SourceType string `json:"source_type" description:"数据接入方式,手工、接口、RPA"`
|
|
//EtlTime string `json:"etl_time" description:""`
|
|
//EtlTime string `json:"etl_time" description:""`
|
|
- Status int `json:"status" description:""`
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// SyncIndexList
|
|
|
|
-// @Description: 定时同步ETA指标信息变更数据至第三方
|
|
|
|
-// @author: Roc
|
|
|
|
-// @datetime 2024-02-28 14:00:45
|
|
|
|
-// @param cont context.Context
|
|
|
|
-// @return err error
|
|
|
|
-func SyncIndexList(cont context.Context) (err error) {
|
|
|
|
- defer func() {
|
|
|
|
- if err != nil {
|
|
|
|
- tips := "SyncIndexList-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
|
|
- utils.FileLog.Info(tips)
|
|
|
|
- go alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- var condition string
|
|
|
|
- var pars []interface{}
|
|
|
|
- condition += " AND update_type in (?,?) "
|
|
|
|
- pars = append(pars, 1, 2)
|
|
|
|
-
|
|
|
|
- list, err := data_manage.GetEdbInfoUpdateLogByCondition(condition, pars)
|
|
|
|
- if err != nil {
|
|
|
|
- fmt.Println(err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- dataList := make([]PushIndexParamDataReq, 0)
|
|
|
|
- for _, v := range list {
|
|
|
|
- dataList = append(dataList, PushIndexParamDataReq{
|
|
|
|
- SourceIndexCode: v.EdbCode,
|
|
|
|
- IndexCode: fmt.Sprint(v.Source, "_", v.EdbCode),
|
|
|
|
- IndexName: v.EdbName,
|
|
|
|
- IndexShortName: v.EdbName, //todo
|
|
|
|
- FrequenceName: v.Frequency,
|
|
|
|
- UnitName: v.Unit,
|
|
|
|
- AssetBeginDate: v.StartDate,
|
|
|
|
- AssetEndDate: v.EndDate,
|
|
|
|
- CreateUser: v.SysUserRealName,
|
|
|
|
- IndexCreateTime: v.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- UpdateUser: v.UpdateSysUserRealName,
|
|
|
|
- DetailUpdateTime: v.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- IndexUpdateTime: v.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- OrginSource: v.SourceName, // todo
|
|
|
|
- OrginSysSource: v.SourceName,
|
|
|
|
- SysSource: "产研平台", //todo
|
|
|
|
- SourceType: "RPA", //TODO
|
|
|
|
- Status: 1,
|
|
|
|
- })
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- lenData := len(dataList)
|
|
|
|
- if lenData <= 0 {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- req := PushBaseParamReq{
|
|
|
|
- SerialID: utils.GetRandString(32), //todo
|
|
|
|
- TableCode: "",
|
|
|
|
- Total: lenData,
|
|
|
|
- IsEmailWarn: 0,
|
|
|
|
- Data: dataList,
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- uri := "/xy/index/pushIndexData"
|
|
|
|
- _, e, errMsg := HttpEtaBridgePost(uri, req)
|
|
|
|
- if e != nil {
|
|
|
|
- err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
|
|
|
|
- fmt.Println(err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- fmt.Println(errMsg)
|
|
|
|
- //if res != nil && res.Ret != 200 {
|
|
|
|
- // err = fmt.Errorf("postRefreshEdbData fail")
|
|
|
|
- // return
|
|
|
|
- //}
|
|
|
|
- return
|
|
|
|
|
|
+ Status int `json:"status" description:"逻辑删除:0-失效,1-有效"`
|
|
}
|
|
}
|
|
|
|
|
|
// PushIndexValueItemReq
|
|
// PushIndexValueItemReq
|
|
@@ -180,73 +105,6 @@ type PushIndexValueItemReq struct {
|
|
Status string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"`
|
|
Status string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"`
|
|
}
|
|
}
|
|
|
|
|
|
-// SyncIndexValueList
|
|
|
|
-// @Description: 定时同步ETA指标日期值的变更数据至第三方
|
|
|
|
-// @author: Roc
|
|
|
|
-// @datetime 2024-02-28 14:00:45
|
|
|
|
-// @param cont context.Context
|
|
|
|
-// @return err error
|
|
|
|
-func SyncIndexValueList(cont context.Context) (err error) {
|
|
|
|
- defer func() {
|
|
|
|
- if err != nil {
|
|
|
|
- tips := "SyncIndexList-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
|
|
- utils.FileLog.Info(tips)
|
|
|
|
- go alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- var condition string
|
|
|
|
- var pars []interface{}
|
|
|
|
- condition += " AND update_type = ? "
|
|
|
|
- pars = append(pars, 0)
|
|
|
|
-
|
|
|
|
- list, err := data_manage.GetEdbInfoUpdateLogByCondition(condition, pars)
|
|
|
|
- if err != nil {
|
|
|
|
- fmt.Println(err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- dataList := make([]PushIndexValueItemReq, 0)
|
|
|
|
- for _, v := range list {
|
|
|
|
- dataList = append(dataList, PushIndexValueItemReq{
|
|
|
|
- Id: utils.MD5(fmt.Sprint(v.Source, "_", v.SourceName, "_", v.Id)),
|
|
|
|
- IndexCode: fmt.Sprint(v.Source, "_", v.EdbCode),
|
|
|
|
- Value: fmt.Sprint(v.LatestValue),
|
|
|
|
- BusinessDate: v.LatestDate,
|
|
|
|
- CreateTime: v.EdbModifyTime,
|
|
|
|
- UpdateTime: v.CreateTime.Format(utils.FormatDateTime), //todo,
|
|
|
|
- Status: "1",
|
|
|
|
- })
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- lenData := len(dataList)
|
|
|
|
- if lenData <= 0 {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- req := PushBaseParamReq{
|
|
|
|
- SerialID: utils.GetRandString(32), //todo
|
|
|
|
- TableCode: "",
|
|
|
|
- Total: lenData,
|
|
|
|
- IsEmailWarn: 0,
|
|
|
|
- Data: dataList,
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- uri := "/xy/index/pushIndexValue"
|
|
|
|
- _, e, errMsg := HttpEtaBridgePost(uri, req)
|
|
|
|
- if e != nil {
|
|
|
|
- err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
|
|
|
|
- fmt.Println(err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- fmt.Println(errMsg)
|
|
|
|
- //if res != nil && res.Ret != 200 {
|
|
|
|
- // err = fmt.Errorf("postRefreshEdbData fail")
|
|
|
|
- // return
|
|
|
|
- //}
|
|
|
|
- return
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
// PushClassifyItemReq
|
|
// PushClassifyItemReq
|
|
// @Description: 指标分类数据结构
|
|
// @Description: 指标分类数据结构
|
|
type PushClassifyItemReq struct {
|
|
type PushClassifyItemReq struct {
|
|
@@ -264,6 +122,21 @@ type PushClassifyItemReq struct {
|
|
SortColumn int `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"`
|
|
SortColumn int `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"`
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// PushEdbClassifyItemReq
|
|
|
|
+// @Description: 指标与目录的关系请求结构
|
|
|
|
+type PushEdbClassifyItemReq struct {
|
|
|
|
+ Id string `json:"id" description:"唯一主键"`
|
|
|
|
+ ClassifyId int `json:"classify_id" description:"目录分类ID"`
|
|
|
|
+ IndexCode string `json:"index_code" description:"指标ID"`
|
|
|
|
+ CreateTime string `json:"create_time" description:"创建时间"`
|
|
|
|
+ CreateUser string `json:"create_user" description:"创建人"`
|
|
|
|
+ UpdateTime string `json:"update_time" description:"修改时间"`
|
|
|
|
+ UpdateUser string `json:"update_user" description:"修改人"`
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 同步指标分类锁
|
|
|
|
+var lockSyncClassify sync.Mutex
|
|
|
|
+
|
|
// SyncClassifyList
|
|
// SyncClassifyList
|
|
// @Description: 定时同步ETA分类数据至第三方
|
|
// @Description: 定时同步ETA分类数据至第三方
|
|
// @author: Roc
|
|
// @author: Roc
|
|
@@ -271,17 +144,23 @@ type PushClassifyItemReq struct {
|
|
// @param cont context.Context
|
|
// @param cont context.Context
|
|
// @return err error
|
|
// @return err error
|
|
func SyncClassifyList(cont context.Context) (err error) {
|
|
func SyncClassifyList(cont context.Context) (err error) {
|
|
|
|
+ lockSyncClassify.Lock()
|
|
defer func() {
|
|
defer func() {
|
|
if err != nil {
|
|
if err != nil {
|
|
- tips := "SyncIndexList-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
|
|
|
|
+ tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
utils.FileLog.Info(tips)
|
|
utils.FileLog.Info(tips)
|
|
go alarm_msg.SendAlarmMsg(tips, 3)
|
|
go alarm_msg.SendAlarmMsg(tips, 3)
|
|
}
|
|
}
|
|
|
|
+ lockSyncClassify.Unlock()
|
|
}()
|
|
}()
|
|
|
|
|
|
var condition string
|
|
var condition string
|
|
var pars []interface{}
|
|
var pars []interface{}
|
|
|
|
|
|
|
|
+ // 普通指标分类
|
|
|
|
+ condition = " AND classify_type = ? "
|
|
|
|
+ pars = append(pars, 0)
|
|
|
|
+
|
|
list, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars)
|
|
list, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars)
|
|
if err != nil {
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
fmt.Println(err)
|
|
@@ -324,7 +203,7 @@ func SyncClassifyList(cont context.Context) (err error) {
|
|
|
|
|
|
for k, tmpDataList := range dataLimitList {
|
|
for k, tmpDataList := range dataLimitList {
|
|
req := PushBaseParamReq{
|
|
req := PushBaseParamReq{
|
|
- SerialID: utils.GetRandString(32), //todo
|
|
|
|
|
|
+ SerialID: uuid.NewUUID().Hex32(),
|
|
TableCode: "",
|
|
TableCode: "",
|
|
Total: len(tmpDataList),
|
|
Total: len(tmpDataList),
|
|
IsEmailWarn: 0,
|
|
IsEmailWarn: 0,
|
|
@@ -343,107 +222,77 @@ func SyncClassifyList(cont context.Context) (err error) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-// PushEdbClassifyItemReq
|
|
|
|
-// @Description: 指标与目录的关系请求结构
|
|
|
|
-type PushEdbClassifyItemReq struct {
|
|
|
|
- Id string `json:"id" description:"唯一主键"`
|
|
|
|
- ClassifyId int `json:"classify_id" description:"目录分类ID"`
|
|
|
|
- IndexCode string `json:"index_code" description:"指标ID"`
|
|
|
|
- CreateTime string `json:"create_time" description:"创建时间"`
|
|
|
|
- CreateUser string `json:"create_user" description:"创建人"`
|
|
|
|
- UpdateTime string `json:"update_time" description:"修改时间"`
|
|
|
|
- UpdateUser string `json:"update_user" description:"修改人"`
|
|
|
|
-}
|
|
|
|
|
|
+// 同步指标信息锁
|
|
|
|
+var lockSyncIndex sync.Mutex
|
|
|
|
|
|
-// SyncEdbClassifyList
|
|
|
|
-// @Description: 定时同步ETA指标与分类的关系至第三方
|
|
|
|
|
|
+// SyncIndex
|
|
|
|
+// @Description: 定时同步指标信息
|
|
// @author: Roc
|
|
// @author: Roc
|
|
-// @datetime 2024-02-28 14:00:45
|
|
|
|
|
|
+// @datetime 2024-03-07 17:39:34
|
|
// @param cont context.Context
|
|
// @param cont context.Context
|
|
// @return err error
|
|
// @return err error
|
|
-func SyncEdbClassifyList(cont context.Context) (err error) {
|
|
|
|
|
|
+func SyncIndex(cont context.Context) (err error) {
|
|
|
|
+ lockSyncIndex.Lock()
|
|
defer func() {
|
|
defer func() {
|
|
if err != nil {
|
|
if err != nil {
|
|
- tips := "SyncIndexList-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
|
|
|
|
+ tips := "SyncIndex-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
utils.FileLog.Info(tips)
|
|
utils.FileLog.Info(tips)
|
|
go alarm_msg.SendAlarmMsg(tips, 3)
|
|
go alarm_msg.SendAlarmMsg(tips, 3)
|
|
}
|
|
}
|
|
|
|
+ lockSyncIndex.Unlock()
|
|
}()
|
|
}()
|
|
|
|
|
|
- var condition string
|
|
|
|
- var pars []interface{}
|
|
|
|
-
|
|
|
|
- list, err := data_manage.GetAllEdbInfoClassifyListByCondition(condition, pars)
|
|
|
|
|
|
+ // 获取当前最大ID
|
|
|
|
+ logMaxId, err := data_manage.GetEdbUpdateLogMaxId()
|
|
if err != nil {
|
|
if err != nil {
|
|
- fmt.Println(err)
|
|
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- dataLimitList := make([][]PushEdbClassifyItemReq, 0)
|
|
|
|
-
|
|
|
|
- dataList := make([]PushEdbClassifyItemReq, 0)
|
|
|
|
- for _, v := range list {
|
|
|
|
- dataList = append(dataList, PushEdbClassifyItemReq{
|
|
|
|
- Id: fmt.Sprint(v.EdbInfoId),
|
|
|
|
- ClassifyId: v.ClassifyId,
|
|
|
|
- IndexCode: fmt.Sprint(v.Source, "_", v.EdbCode),
|
|
|
|
- CreateTime: v.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- CreateUser: v.SysUserRealName,
|
|
|
|
- UpdateTime: v.ModifyTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- UpdateUser: v.SysUserRealName,
|
|
|
|
- })
|
|
|
|
- if len(dataList) >= 100 {
|
|
|
|
- dataLimitList = append(dataLimitList, dataList)
|
|
|
|
- dataList = make([]PushEdbClassifyItemReq, 0)
|
|
|
|
|
|
+ var currLogId int64
|
|
|
|
+ // 当前已经操作的最大ID
|
|
|
|
+ currLogId, err = utils.Rc.GetInt64(utils.CACHE_EDB_UPDATE_LOG_ID)
|
|
|
|
+ if err != nil {
|
|
|
|
+ // 如果不是没找到key,那么说明是redis报错
|
|
|
|
+ if err.Error() != utils.RedisNoKeyErr {
|
|
|
|
+ return
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- lenData := len(dataList)
|
|
|
|
- if lenData > 0 {
|
|
|
|
- dataLimitList = append(dataLimitList, dataList)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if len(dataLimitList) < 0 {
|
|
|
|
- fmt.Println("无分类数据推送")
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for k, tmpDataList := range dataLimitList {
|
|
|
|
- req := PushBaseParamReq{
|
|
|
|
- SerialID: utils.GetRandString(32), //todo
|
|
|
|
- TableCode: "",
|
|
|
|
- Total: len(tmpDataList),
|
|
|
|
- IsEmailWarn: 0,
|
|
|
|
- Data: tmpDataList,
|
|
|
|
|
|
+ err = nil
|
|
|
|
+ // 查找当前已经处理了的日志最大ID
|
|
|
|
+ currLogId, err = data_manage.GetEdbUpdateLogMaxHandleId()
|
|
|
|
+ if err != nil {
|
|
|
|
+ if err.Error() != utils.ErrNoRow() {
|
|
|
|
+ utils.FileLog.Error("查找当前已经处理了的日志最大ID失败:" + err.Error())
|
|
|
|
+ } else {
|
|
|
|
+ err = nil
|
|
|
|
+ }
|
|
|
|
+ currLogId = 0
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- uri := "/xy/index/pushEdbClassify"
|
|
|
|
- _, e, _ := HttpEtaBridgePost(uri, req)
|
|
|
|
- if e != nil {
|
|
|
|
- err = fmt.Errorf("第%d组分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())
|
|
|
|
- fmt.Println(err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
|
|
+ // 遍历获取下一页的数据
|
|
|
|
+ for currId := currLogId; currId < logMaxId; {
|
|
|
|
+ currId = handlePush(currId, logMaxId)
|
|
}
|
|
}
|
|
|
|
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-func SyncIndex(cont context.Context) (err error) {
|
|
|
|
- defer func() {
|
|
|
|
- if err != nil {
|
|
|
|
- tips := "SyncIndex-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
|
|
|
|
- utils.FileLog.Info(tips)
|
|
|
|
- go alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
|
|
+// handlePush
|
|
|
|
+// @Description: 推送处理
|
|
|
|
+// @author: Roc
|
|
|
|
+// @datetime 2024-03-07 19:20:19
|
|
|
|
+// @param currLogIdStr int64
|
|
|
|
+// @param logMaxId int64
|
|
|
|
+// @return lastId int64
|
|
|
|
+func handlePush(currLogIdStr, logMaxId int64) (lastId int64) {
|
|
|
|
+ lastId = currLogIdStr
|
|
|
|
+
|
|
|
|
+ // 查询当次需要同步的数据
|
|
var condition string
|
|
var condition string
|
|
var pars []interface{}
|
|
var pars []interface{}
|
|
- //condition += " AND update_type in (?,?) "
|
|
|
|
- //pars = append(pars, 1, 2)
|
|
|
|
- condition += " AND id > ?"
|
|
|
|
- pars = append(pars, 50)
|
|
|
|
|
|
+
|
|
|
|
+ condition += " AND id > ? AND id < ?"
|
|
|
|
+ pars = append(pars, currLogIdStr, logMaxId)
|
|
|
|
|
|
list, err := data_manage.GetEdbUpdateLogByCondition(condition, pars)
|
|
list, err := data_manage.GetEdbUpdateLogByCondition(condition, pars)
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -451,15 +300,16 @@ func SyncIndex(cont context.Context) (err error) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- //pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq
|
|
|
|
-
|
|
|
|
pushIndexList := make([]*PushIndexParamDataReq, 0)
|
|
pushIndexList := make([]*PushIndexParamDataReq, 0)
|
|
pushEdbClassifyList := make([]*PushEdbClassifyItemReq, 0)
|
|
pushEdbClassifyList := make([]*PushEdbClassifyItemReq, 0)
|
|
pushIndexValueList := make([]*PushIndexValueItemReq, 0)
|
|
pushIndexValueList := make([]*PushIndexValueItemReq, 0)
|
|
|
|
|
|
|
|
+ idList := make([]int64, 0)
|
|
for _, v := range list {
|
|
for _, v := range list {
|
|
- pushIndexData, pushEdbClassify, pushIndexValue, err := handleData(v)
|
|
|
|
- if err != nil {
|
|
|
|
|
|
+ idList = append(idList, v.Id)
|
|
|
|
+ pushIndexData, pushEdbClassifyData, pushIndexValueData, tmpErr := handleData(v)
|
|
|
|
+ if tmpErr != nil {
|
|
|
|
+ err = tmpErr
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
@@ -467,445 +317,193 @@ func SyncIndex(cont context.Context) (err error) {
|
|
pushIndexList = append(pushIndexList, pushIndexData)
|
|
pushIndexList = append(pushIndexList, pushIndexData)
|
|
}
|
|
}
|
|
|
|
|
|
- if pushEdbClassify != nil {
|
|
|
|
- pushEdbClassifyList = append(pushEdbClassifyList, pushEdbClassify)
|
|
|
|
|
|
+ if pushEdbClassifyData != nil {
|
|
|
|
+ pushEdbClassifyList = append(pushEdbClassifyList, pushEdbClassifyData)
|
|
}
|
|
}
|
|
|
|
|
|
- if pushIndexValue != nil {
|
|
|
|
- pushIndexValueList = append(pushIndexValueList, pushIndexValue)
|
|
|
|
|
|
+ if pushIndexValueData != nil {
|
|
|
|
+ pushIndexValueList = append(pushIndexValueList, pushIndexValueData)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ lastId = v.Id
|
|
}
|
|
}
|
|
|
|
|
|
- return
|
|
|
|
-}
|
|
|
|
|
|
+ pushIndex(pushIndexList)
|
|
|
|
+ pushIndexClassify(pushEdbClassifyList)
|
|
|
|
+ pushIndexValue(pushIndexValueList)
|
|
|
|
|
|
-func handleData(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) {
|
|
|
|
- switch edbUpdateLog.OpType {
|
|
|
|
- case "insert":
|
|
|
|
- return handleInsert(edbUpdateLog)
|
|
|
|
- case "update":
|
|
|
|
- return handleUpdate(edbUpdateLog)
|
|
|
|
- case "delete":
|
|
|
|
- return handleDelete(edbUpdateLog)
|
|
|
|
|
|
+ // 标记处理
|
|
|
|
+ err = data_manage.HandleUpdateLogByIds(idList, time.Now().Format(utils.FormatDateTime))
|
|
|
|
+ if err != nil {
|
|
|
|
+ utils.FileLog.Error("批量处理指标更新记录失败:" + err.Error())
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ utils.Rc.Put(utils.CACHE_EDB_UPDATE_LOG_ID, lastId, 31*24*time.Hour)
|
|
|
|
+
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-func handleInsert(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) {
|
|
|
|
- data := edbUpdateLog.NewData
|
|
|
|
|
|
+const pushBatchSize = 100
|
|
|
|
|
|
- //指标信息
|
|
|
|
- if edbUpdateLog.OpTableName == "edb_info" {
|
|
|
|
- var edbInfo *data_manage.EdbInfo
|
|
|
|
- err = json.Unmarshal([]byte(data), &edbInfo)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+// pushIndex
|
|
|
|
+// @Description: 指标信息数据推送
|
|
|
|
+// @author: Roc
|
|
|
|
+// @datetime 2024-03-07 16:35:02
|
|
|
|
+// @param allPushList []*PushIndexParamDataReq
|
|
|
|
+func pushIndex(allPushList []*PushIndexParamDataReq) {
|
|
|
|
+ lenDataList := len(allPushList)
|
|
|
|
+ if lenDataList <= 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ uri := utils.SyncIndexPath + "/pushIndexData"
|
|
|
|
|
|
- // 指标信息
|
|
|
|
- pushIndexData = &PushIndexParamDataReq{
|
|
|
|
- SourceIndexCode: edbInfo.EdbCode,
|
|
|
|
- IndexCode: fmt.Sprint(edbInfo.Source, "_", edbInfo.EdbCode),
|
|
|
|
- IndexName: edbInfo.EdbName,
|
|
|
|
- IndexShortName: edbInfo.EdbName, //todo
|
|
|
|
- FrequenceName: edbInfo.Frequency,
|
|
|
|
- UnitName: edbInfo.Unit,
|
|
|
|
- AssetBeginDate: edbInfo.StartDate,
|
|
|
|
- AssetEndDate: edbInfo.EndDate,
|
|
|
|
- CreateUser: edbInfo.SysUserRealName,
|
|
|
|
- IndexCreateTime: edbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateUser: edbInfo.SysUserRealName, // todo
|
|
|
|
- DetailUpdateTime: edbInfo.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- IndexUpdateTime: edbInfo.ModifyTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- OrginSource: edbInfo.SourceName, // todo
|
|
|
|
- OrginSysSource: edbInfo.SourceName,
|
|
|
|
- SysSource: "产研平台",
|
|
|
|
- SourceType: getSourceType(edbInfo.Source),
|
|
|
|
- Status: 1,
|
|
|
|
|
|
+ errDataList := make([]*PushIndexParamDataReq, 0)
|
|
|
|
+ errList := make([]string, 0)
|
|
|
|
+ defer func() {
|
|
|
|
+ if len(errList) > 0 {
|
|
|
|
+ dataByte, err := json.Marshal(errDataList)
|
|
|
|
+ if err != nil {
|
|
|
|
+ dataByte = []byte("序列化指标信息数据失败" + err.Error())
|
|
|
|
+ }
|
|
|
|
+ utils.FileLog.Info("pushIndex errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
|
|
}
|
|
}
|
|
|
|
+ }()
|
|
|
|
|
|
- // 指标与分类的关系信息
|
|
|
|
- pushEdbClassify = &PushEdbClassifyItemReq{
|
|
|
|
- Id: fmt.Sprint(edbInfo.EdbInfoId),
|
|
|
|
- ClassifyId: edbInfo.ClassifyId,
|
|
|
|
- IndexCode: fmt.Sprint(edbInfo.Source, "_", edbInfo.EdbCode),
|
|
|
|
- CreateTime: edbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- CreateUser: edbInfo.SysUserRealName,
|
|
|
|
- UpdateTime: edbInfo.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateUser: edbInfo.SysUserRealName,
|
|
|
|
- }
|
|
|
|
|
|
+ dataLimitList := make([][]*PushIndexParamDataReq, 0)
|
|
|
|
|
|
- return
|
|
|
|
|
|
+ for i := 0; i < lenDataList; i += pushBatchSize {
|
|
|
|
+ endIndex := min(i+pushBatchSize, lenDataList)
|
|
|
|
+ tempSlice := allPushList[i:endIndex]
|
|
|
|
+ dataLimitList = append(dataLimitList, tempSlice)
|
|
}
|
|
}
|
|
|
|
|
|
- // 分类信息
|
|
|
|
- //if edbUpdateLog.OpTableName == "edb_classify" {
|
|
|
|
- // var edbClassify *data_manage.EdbClassify
|
|
|
|
- // err = json.Unmarshal([]byte(data), &edbClassify)
|
|
|
|
- // if err != nil {
|
|
|
|
- // return
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- // // 指标信息
|
|
|
|
- // pushClassify = PushClassifyItemReq{
|
|
|
|
- // ClassifyId: int(edbClassify.ClassifyID),
|
|
|
|
- // ClassifyType: int(edbClassify.ClassifyType),
|
|
|
|
- // ClassifyName: edbClassify.ClassifyName,
|
|
|
|
- // ParentId: int(edbClassify.ParentID),
|
|
|
|
- // HasData: int(edbClassify.HasData),
|
|
|
|
- // CreateTime: edbClassify.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- // UpdateTime: edbClassify.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- // SysUserId: int(edbClassify.SysUserID),
|
|
|
|
- // SysUserRealName: edbClassify.SysUserRealName,
|
|
|
|
- // Level: int(edbClassify.Level),
|
|
|
|
- // UniqueCode: edbClassify.UniqueCode,
|
|
|
|
- // SortColumn: int(edbClassify.Sort),
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- // return
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- // 数据信息
|
|
|
|
- if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_") {
|
|
|
|
- var edbData *data_manage.EdbData
|
|
|
|
- err = json.Unmarshal([]byte(data), &edbData)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
|
|
+ for k, dataList := range dataLimitList {
|
|
|
|
+ req := PushBaseParamReq{
|
|
|
|
+ SerialID: uuid.NewUUID().Hex32(),
|
|
|
|
+ TableCode: "",
|
|
|
|
+ Total: len(dataList),
|
|
|
|
+ IsEmailWarn: 0,
|
|
|
|
+ Data: dataList,
|
|
}
|
|
}
|
|
|
|
|
|
- edbSource, ok := data_manage.EdbTableNameSourceMap[edbUpdateLog.OpTableName]
|
|
|
|
- if !ok {
|
|
|
|
- // 没有找到来源,那就过滤
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // 数据信息
|
|
|
|
- pushIndexValue = &PushIndexValueItemReq{
|
|
|
|
- Id: utils.MD5(fmt.Sprint(edbSource.EdbSourceId, "_", edbSource.SourceName, "_", edbData.EdbDataId)),
|
|
|
|
- IndexCode: fmt.Sprint(edbSource, "_", edbData.EdbCode),
|
|
|
|
- Value: fmt.Sprint(edbData.Value),
|
|
|
|
- BusinessDate: edbData.DataTime.Format(utils.FormatDate),
|
|
|
|
- CreateTime: edbData.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateTime: edbData.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- Status: "1",
|
|
|
|
|
|
+ _, e, _ := HttpEtaBridgePost(uri, req)
|
|
|
|
+ if e != nil {
|
|
|
|
+ errList = append(errList, fmt.Sprintf("第%d组指标信息数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
|
|
|
|
+ errDataList = append(errDataList, dataList...)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
-
|
|
|
|
- return
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- return
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-func handleDelete(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) {
|
|
|
|
- data := edbUpdateLog.OldData
|
|
|
|
|
|
+// pushIndexValue
|
|
|
|
+// @Description: 指标明细数据推送
|
|
|
|
+// @author: Roc
|
|
|
|
+// @datetime 2024-03-07 16:32:47
|
|
|
|
+// @param allPushList []*PushIndexValueItemReq
|
|
|
|
+func pushIndexValue(allPushList []*PushIndexValueItemReq) {
|
|
|
|
+ lenDataList := len(allPushList)
|
|
|
|
+ if lenDataList <= 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ uri := utils.SyncIndexPath + "/pushIndexValue"
|
|
|
|
|
|
- //指标信息
|
|
|
|
- if edbUpdateLog.OpTableName == "edb_info" {
|
|
|
|
- var edbInfo *data_manage.EdbInfo
|
|
|
|
- err = json.Unmarshal([]byte(data), &edbInfo)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
|
|
+ errDataList := make([]*PushIndexValueItemReq, 0)
|
|
|
|
+ errList := make([]string, 0)
|
|
|
|
+ defer func() {
|
|
|
|
+ if len(errList) > 0 {
|
|
|
|
+ dataByte, err := json.Marshal(errDataList)
|
|
|
|
+ if err != nil {
|
|
|
|
+ dataByte = []byte("序列化指标明细数据失败" + err.Error())
|
|
|
|
+ }
|
|
|
|
+ utils.FileLog.Info("pushIndexValue errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
|
|
}
|
|
}
|
|
|
|
+ }()
|
|
|
|
|
|
- // 指标信息
|
|
|
|
- pushIndexData = &PushIndexParamDataReq{
|
|
|
|
- SourceIndexCode: edbInfo.EdbCode,
|
|
|
|
- IndexCode: fmt.Sprint(edbInfo.Source, "_", edbInfo.EdbCode),
|
|
|
|
- IndexName: edbInfo.EdbName,
|
|
|
|
- IndexShortName: edbInfo.EdbName, //todo
|
|
|
|
- FrequenceName: edbInfo.Frequency,
|
|
|
|
- UnitName: edbInfo.Unit,
|
|
|
|
- AssetBeginDate: edbInfo.StartDate,
|
|
|
|
- AssetEndDate: edbInfo.EndDate,
|
|
|
|
- CreateUser: edbInfo.SysUserRealName,
|
|
|
|
- IndexCreateTime: edbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateUser: edbInfo.SysUserRealName, // todo
|
|
|
|
- DetailUpdateTime: edbInfo.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- IndexUpdateTime: edbInfo.ModifyTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- OrginSource: edbInfo.SourceName, // todo
|
|
|
|
- OrginSysSource: edbInfo.SourceName,
|
|
|
|
- SysSource: "产研平台",
|
|
|
|
- SourceType: getSourceType(edbInfo.Source),
|
|
|
|
- Status: 0,
|
|
|
|
- }
|
|
|
|
|
|
+ dataLimitList := make([][]*PushIndexValueItemReq, 0)
|
|
|
|
|
|
- // 指标与分类的关系信息
|
|
|
|
- pushEdbClassify = &PushEdbClassifyItemReq{
|
|
|
|
- Id: fmt.Sprint(edbInfo.EdbInfoId),
|
|
|
|
- ClassifyId: edbInfo.ClassifyId,
|
|
|
|
- IndexCode: fmt.Sprint(edbInfo.Source, "_", edbInfo.EdbCode),
|
|
|
|
- CreateTime: edbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- CreateUser: edbInfo.SysUserRealName,
|
|
|
|
- UpdateTime: edbInfo.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateUser: edbInfo.SysUserRealName,
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return
|
|
|
|
|
|
+ for i := 0; i < lenDataList; i += pushBatchSize {
|
|
|
|
+ endIndex := min(i+pushBatchSize, lenDataList)
|
|
|
|
+ tempSlice := allPushList[i:endIndex]
|
|
|
|
+ dataLimitList = append(dataLimitList, tempSlice)
|
|
}
|
|
}
|
|
|
|
|
|
- // 分类信息
|
|
|
|
- //if edbUpdateLog.OpTableName == "edb_classify" {
|
|
|
|
- // var edbClassify *data_manage.EdbClassify
|
|
|
|
- // err = json.Unmarshal([]byte(data), &edbClassify)
|
|
|
|
- // if err != nil {
|
|
|
|
- // return
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- // // 指标信息
|
|
|
|
- // pushClassify = PushClassifyItemReq{
|
|
|
|
- // ClassifyId: int(edbClassify.ClassifyID),
|
|
|
|
- // ClassifyType: int(edbClassify.ClassifyType),
|
|
|
|
- // ClassifyName: edbClassify.ClassifyName,
|
|
|
|
- // ParentId: int(edbClassify.ParentID),
|
|
|
|
- // HasData: int(edbClassify.HasData),
|
|
|
|
- // CreateTime: edbClassify.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- // UpdateTime: edbClassify.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- // SysUserId: int(edbClassify.SysUserID),
|
|
|
|
- // SysUserRealName: edbClassify.SysUserRealName,
|
|
|
|
- // Level: int(edbClassify.Level),
|
|
|
|
- // UniqueCode: edbClassify.UniqueCode,
|
|
|
|
- // SortColumn: int(edbClassify.Sort),
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- // return
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- // 数据信息
|
|
|
|
- if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_") {
|
|
|
|
- var edbData *data_manage.EdbData
|
|
|
|
- err = json.Unmarshal([]byte(data), &edbData)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
|
|
+ for k, dataList := range dataLimitList {
|
|
|
|
+ req := PushBaseParamReq{
|
|
|
|
+ SerialID: uuid.NewUUID().Hex32(),
|
|
|
|
+ TableCode: "",
|
|
|
|
+ Total: len(dataList),
|
|
|
|
+ IsEmailWarn: 0,
|
|
|
|
+ Data: dataList,
|
|
}
|
|
}
|
|
|
|
|
|
- edbSource, ok := data_manage.EdbTableNameSourceMap[edbUpdateLog.OpTableName]
|
|
|
|
- if !ok {
|
|
|
|
- // 没有找到来源,那就过滤
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // 数据信息
|
|
|
|
- pushIndexValue = &PushIndexValueItemReq{
|
|
|
|
- Id: utils.MD5(fmt.Sprint(edbSource.EdbSourceId, "_", edbSource.SourceName, "_", edbData.EdbDataId)),
|
|
|
|
- IndexCode: fmt.Sprint(edbSource, "_", edbData.EdbCode),
|
|
|
|
- Value: fmt.Sprint(edbData.Value),
|
|
|
|
- BusinessDate: edbData.DataTime.Format(utils.FormatDate),
|
|
|
|
- CreateTime: edbData.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateTime: edbData.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- Status: "0",
|
|
|
|
|
|
+ _, e, _ := HttpEtaBridgePost(uri, req)
|
|
|
|
+ if e != nil {
|
|
|
|
+ errList = append(errList, fmt.Sprintf("第%d组指标明细数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
|
|
|
|
+ errDataList = append(errDataList, dataList...)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
-
|
|
|
|
- return
|
|
|
|
}
|
|
}
|
|
|
|
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-func handleUpdate(edbUpdateLog *data_manage.EdbUpdateLog) (pushIndexData *PushIndexParamDataReq, pushEdbClassify *PushEdbClassifyItemReq, pushIndexValue *PushIndexValueItemReq, err error) {
|
|
|
|
- oldData := edbUpdateLog.OldData
|
|
|
|
- newData := edbUpdateLog.NewData
|
|
|
|
-
|
|
|
|
- //指标信息
|
|
|
|
- if edbUpdateLog.OpTableName == "edb_info" {
|
|
|
|
- var oldEdbInfo *data_manage.EdbInfo
|
|
|
|
- err = json.Unmarshal([]byte(oldData), &oldEdbInfo)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var newEdbInfo *data_manage.EdbInfo
|
|
|
|
- err = json.Unmarshal([]byte(newData), &newEdbInfo)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+// pushIndexClassify
|
|
|
|
+// @Description: 指标与分类的关系推送
|
|
|
|
+// @author: Roc
|
|
|
|
+// @datetime 2024-03-07 16:32:47
|
|
|
|
+// @param allPushList []*PushIndexValueItemReq
|
|
|
|
+func pushIndexClassify(allPushList []*PushEdbClassifyItemReq) {
|
|
|
|
+ lenDataList := len(allPushList)
|
|
|
|
+ if lenDataList <= 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ uri := utils.SyncIndexPath + "/pushEdbClassify"
|
|
|
|
|
|
- isUpdateEdbInfo := checkUpdateType(oldEdbInfo, newEdbInfo)
|
|
|
|
- // 指标信息
|
|
|
|
- if isUpdateEdbInfo {
|
|
|
|
- pushIndexData = &PushIndexParamDataReq{
|
|
|
|
- SourceIndexCode: newEdbInfo.EdbCode,
|
|
|
|
- IndexCode: fmt.Sprint(newEdbInfo.Source, "_", newEdbInfo.EdbCode),
|
|
|
|
- IndexName: newEdbInfo.EdbName,
|
|
|
|
- IndexShortName: newEdbInfo.EdbName, //todo
|
|
|
|
- FrequenceName: newEdbInfo.Frequency,
|
|
|
|
- UnitName: newEdbInfo.Unit,
|
|
|
|
- AssetBeginDate: newEdbInfo.StartDate,
|
|
|
|
- AssetEndDate: newEdbInfo.EndDate,
|
|
|
|
- CreateUser: newEdbInfo.SysUserRealName,
|
|
|
|
- IndexCreateTime: newEdbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateUser: newEdbInfo.SysUserRealName, // todo
|
|
|
|
- DetailUpdateTime: newEdbInfo.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- IndexUpdateTime: newEdbInfo.ModifyTime.Format(utils.FormatDateTime), //todo
|
|
|
|
- OrginSource: newEdbInfo.SourceName, // todo
|
|
|
|
- OrginSysSource: newEdbInfo.SourceName,
|
|
|
|
- SysSource: "产研平台",
|
|
|
|
- SourceType: getSourceType(newEdbInfo.Source),
|
|
|
|
- Status: 1,
|
|
|
|
|
|
+ errDataList := make([]*PushEdbClassifyItemReq, 0)
|
|
|
|
+ errList := make([]string, 0)
|
|
|
|
+ defer func() {
|
|
|
|
+ if len(errList) > 0 {
|
|
|
|
+ dataByte, err := json.Marshal(errDataList)
|
|
|
|
+ if err != nil {
|
|
|
|
+ dataByte = []byte("序列化指标明细数据失败" + err.Error())
|
|
}
|
|
}
|
|
|
|
+ utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的指标所属分类明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
|
|
}
|
|
}
|
|
|
|
+ }()
|
|
|
|
|
|
- // 指标与分类的关系信息
|
|
|
|
- if oldEdbInfo.ClassifyId != newEdbInfo.ClassifyId {
|
|
|
|
- pushEdbClassify = &PushEdbClassifyItemReq{
|
|
|
|
- Id: fmt.Sprint(newEdbInfo.EdbInfoId),
|
|
|
|
- ClassifyId: newEdbInfo.ClassifyId,
|
|
|
|
- IndexCode: fmt.Sprint(newEdbInfo.Source, "_", newEdbInfo.EdbCode),
|
|
|
|
- CreateTime: newEdbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- CreateUser: newEdbInfo.SysUserRealName,
|
|
|
|
- UpdateTime: newEdbInfo.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateUser: newEdbInfo.SysUserRealName,
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ dataLimitList := make([][]*PushEdbClassifyItemReq, 0)
|
|
|
|
|
|
- return
|
|
|
|
|
|
+ for i := 0; i < lenDataList; i += pushBatchSize {
|
|
|
|
+ endIndex := min(i+pushBatchSize, lenDataList)
|
|
|
|
+ tempSlice := allPushList[i:endIndex]
|
|
|
|
+ dataLimitList = append(dataLimitList, tempSlice)
|
|
}
|
|
}
|
|
|
|
|
|
- // 分类信息
|
|
|
|
- //if edbUpdateLog.OpTableName == "edb_classify" {
|
|
|
|
- // var edbClassify *data_manage.EdbClassify
|
|
|
|
- // err = json.Unmarshal([]byte(newData), &edbClassify)
|
|
|
|
- // if err != nil {
|
|
|
|
- // return
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- // // 指标信息
|
|
|
|
- // pushClassify = PushClassifyItemReq{
|
|
|
|
- // ClassifyId: int(edbClassify.ClassifyID),
|
|
|
|
- // ClassifyType: int(edbClassify.ClassifyType),
|
|
|
|
- // ClassifyName: edbClassify.ClassifyName,
|
|
|
|
- // ParentId: int(edbClassify.ParentID),
|
|
|
|
- // HasData: int(edbClassify.HasData),
|
|
|
|
- // CreateTime: edbClassify.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- // UpdateTime: edbClassify.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- // SysUserId: int(edbClassify.SysUserID),
|
|
|
|
- // SysUserRealName: edbClassify.SysUserRealName,
|
|
|
|
- // Level: int(edbClassify.Level),
|
|
|
|
- // UniqueCode: edbClassify.UniqueCode,
|
|
|
|
- // SortColumn: int(edbClassify.Sort),
|
|
|
|
- // }
|
|
|
|
- //
|
|
|
|
- // return
|
|
|
|
- //}
|
|
|
|
-
|
|
|
|
- // 数据信息
|
|
|
|
- if strings.HasPrefix(edbUpdateLog.OpTableName, "edb_data_") {
|
|
|
|
- var edbData *data_manage.EdbData
|
|
|
|
- err = json.Unmarshal([]byte(newData), &edbData)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
|
|
+ for k, dataList := range dataLimitList {
|
|
|
|
+ req := PushBaseParamReq{
|
|
|
|
+ SerialID: uuid.NewUUID().Hex32(),
|
|
|
|
+ TableCode: "",
|
|
|
|
+ Total: len(dataList),
|
|
|
|
+ IsEmailWarn: 0,
|
|
|
|
+ Data: dataList,
|
|
}
|
|
}
|
|
|
|
|
|
- edbSource, ok := data_manage.EdbTableNameSourceMap[edbUpdateLog.OpTableName]
|
|
|
|
- if !ok {
|
|
|
|
- // 没有找到来源,那就过滤
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // 数据信息
|
|
|
|
- pushIndexValue = &PushIndexValueItemReq{
|
|
|
|
- Id: utils.MD5(fmt.Sprint(edbSource.EdbSourceId, "_", edbSource.SourceName, "_", edbData.EdbDataId)),
|
|
|
|
- IndexCode: fmt.Sprint(edbSource, "_", edbData.EdbCode),
|
|
|
|
- Value: fmt.Sprint(edbData.Value),
|
|
|
|
- BusinessDate: edbData.DataTime.Format(utils.FormatDate),
|
|
|
|
- CreateTime: edbData.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
- UpdateTime: edbData.ModifyTime.Format(utils.FormatDateTime),
|
|
|
|
- Status: "1",
|
|
|
|
|
|
+ _, e, _ := HttpEtaBridgePost(uri, req)
|
|
|
|
+ if e != nil {
|
|
|
|
+ errList = append(errList, fmt.Sprintf("第%d组指标所属分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
|
|
|
|
+ errDataList = append(errDataList, dataList...)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
-
|
|
|
|
- return
|
|
|
|
}
|
|
}
|
|
|
|
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-//PushIndexParamDataReq{
|
|
|
|
-//SourceIndexCode: edbInfo.EdbCode,
|
|
|
|
-//IndexCode: fmt.Sprint(edbInfo.Source, "_", edbInfo.EdbCode),
|
|
|
|
-//IndexName: edbInfo.EdbName,
|
|
|
|
-//IndexShortName: edbInfo.EdbName, //todo
|
|
|
|
-//FrequenceName: edbInfo.Frequency,
|
|
|
|
-//UnitName: edbInfo.Unit,
|
|
|
|
-//AssetBeginDate: edbInfo.StartDate,
|
|
|
|
-//AssetEndDate: edbInfo.EndDate,
|
|
|
|
-//CreateUser: edbInfo.SysUserRealName,
|
|
|
|
-//IndexCreateTime: edbInfo.CreateTime.Format(utils.FormatDateTime),
|
|
|
|
-//UpdateUser: edbInfo.SysUserRealName, // todo
|
|
|
|
-//DetailUpdateTime: edbInfo.CreateTime.Format(utils.FormatDateTime), //todo
|
|
|
|
-//IndexUpdateTime: edbInfo.ModifyTime.Format(utils.FormatDateTime), //todo
|
|
|
|
-//OrginSource: edbInfo.SourceName, // todo
|
|
|
|
-//OrginSysSource: edbInfo.SourceName,
|
|
|
|
-//SysSource: "产研平台",
|
|
|
|
-//SourceType: getSourceType(edbInfo.Source),
|
|
|
|
-//Status: 1,
|
|
|
|
-//}
|
|
|
|
-
|
|
|
|
-// getSourceType
|
|
|
|
-// @Description: 获取指标来源类型
|
|
|
|
-// @author: Roc
|
|
|
|
-// @datetime 2024-03-01 13:40:03
|
|
|
|
-// @param source int
|
|
|
|
-// @return string
|
|
|
|
-func getSourceType(source int) string {
|
|
|
|
- switch source {
|
|
|
|
- case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS, utils.DATA_SOURCE_BAIINFO, utils.DATA_SOURCE_SCI: //钢联,有色,百川盈孚,红桃3
|
|
|
|
- return "RPA"
|
|
|
|
- case utils.DATA_SOURCE_MANUAL:
|
|
|
|
- return "手工"
|
|
|
|
- default:
|
|
|
|
- return "接口"
|
|
|
|
|
|
+// 辅助函数:返回a和b中的较小值
|
|
|
|
+func min(a, b int) int {
|
|
|
|
+ if a < b {
|
|
|
|
+ return a
|
|
}
|
|
}
|
|
|
|
+ return b
|
|
}
|
|
}
|
|
-
|
|
|
|
-func checkUpdateType(oldEdbInfo, newEdbInfo *data_manage.EdbInfo) (isUpdateEdbInfo bool) {
|
|
|
|
- //todo 外部来源名称需要处理;更新人姓名,指标更新时间
|
|
|
|
-
|
|
|
|
- // eta内部名称
|
|
|
|
- if oldEdbInfo.EdbName != newEdbInfo.EdbName {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if oldEdbInfo.Frequency != newEdbInfo.Frequency {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if oldEdbInfo.Unit != newEdbInfo.Unit {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if oldEdbInfo.StartDate != newEdbInfo.StartDate {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if oldEdbInfo.EndDate != newEdbInfo.EndDate {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if oldEdbInfo.SysUserId != newEdbInfo.SysUserId {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if oldEdbInfo.SysUserRealName != newEdbInfo.SysUserRealName {
|
|
|
|
- isUpdateEdbInfo = true
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-//func pushIndex(pushIndexList []*PushIndexParamDataReq) {
|
|
|
|
-// if len(pushIndexList) <= 0 {
|
|
|
|
-// return
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// uri := "/xy/index/pushIndexData"
|
|
|
|
-// req := PushBaseParamReq{
|
|
|
|
-// SerialID: utils.GetRandString(32), //todo
|
|
|
|
-// TableCode: "",
|
|
|
|
-// Total: lenData,
|
|
|
|
-// IsEmailWarn: 0,
|
|
|
|
-// Data: dataList,
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// var pushIndexList []*PushIndexParamDataReq
|
|
|
|
-// dataLimitList := make([][]PushBaseParamReq, 0)
|
|
|
|
-//
|
|
|
|
-// _, e, errMsg := HttpEtaBridgePost(uri, req)
|
|
|
|
-//}
|
|
|