|
@@ -5,9 +5,11 @@ import (
|
|
|
"encoding/json"
|
|
|
"eta/eta_task/models/data_manage"
|
|
|
"eta/eta_task/services/alarm_msg"
|
|
|
+ "eta/eta_task/services/eta_hub"
|
|
|
"eta/eta_task/utils"
|
|
|
"fmt"
|
|
|
"github.com/rdlucklib/rdluck_tools/uuid"
|
|
|
+ "net/url"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -519,3 +521,182 @@ func min(a, b int) int {
|
|
|
}
|
|
|
return b
|
|
|
}
|
|
|
+
|
|
|
+// 同步crm指标信息锁
|
|
|
+var lockGetCrmIndex sync.Mutex
|
|
|
+
|
|
|
+// SyncXyCrmIndex
|
|
|
+// @Description: 定时同步CRM指标信息
|
|
|
+// @author: Roc
|
|
|
+// @datetime 2024-5-22 10:46:08
|
|
|
+// @param cont context.Context
|
|
|
+// @return err error
|
|
|
+func SyncXyCrmIndex(cont context.Context) (err error) {
|
|
|
+ lockGetCrmIndex.Lock()
|
|
|
+ errMsgList := make([]string, 0)
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ tips := "SyncXyCrmIndex-定时同步CRM数据到ETA平台失败, ErrMsg:\n" + err.Error()
|
|
|
+ utils.FileLog.Info(tips)
|
|
|
+ go alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
+ }
|
|
|
+ if len(errMsgList) > 0 {
|
|
|
+ tips := "SyncXyCrmIndex-定时同步CRM数据到ETA平台失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
|
|
|
+ utils.FileLog.Info(tips)
|
|
|
+ go alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
+ }
|
|
|
+ lockGetCrmIndex.Unlock()
|
|
|
+ fmt.Println("end SyncXyCrmIndex")
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 未配置资产包合数据分区,那么就不执行
|
|
|
+ if utils.SyncCrmAssetPkgCd == `` || utils.SyncCrmDataSourceType == `` {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var lastUpdateTimeStr string // 上一次更新的时间
|
|
|
+ nowTimeStr := time.Now().Format(utils.FormatDateTimeUnSpaceV2) // 这次更新的时间
|
|
|
+
|
|
|
+ key := data_manage.CrmIndexLastUpdateTime
|
|
|
+ sysInteractionLog, err := data_manage.GetBusinessSysInteractionLogByKey(key)
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() != utils.ErrNoRow() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //lastUpdateTime := time.Now().Format("2006-01-02 15:04:05")
|
|
|
+ } else {
|
|
|
+ if sysInteractionLog.InteractionVal != `` {
|
|
|
+ lastUpdateTimeStr = sysInteractionLog.InteractionVal
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ syncCrmAssetPkgCd := utils.SyncCrmAssetPkgCd
|
|
|
+ syncCrmAssetPkgCdList := strings.Split(syncCrmAssetPkgCd, ",")
|
|
|
+
|
|
|
+ for _, assetPkgCd := range syncCrmAssetPkgCdList {
|
|
|
+ err, errMsgList = syncCrmIndex(assetPkgCd, 1, utils.SyncCrmIndexNum, lastUpdateTimeStr)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 修改最后的更新时间
|
|
|
+ modifyCrmIndexLastUpdateTime(nowTimeStr)
|
|
|
+
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// syncCrmIndex
|
|
|
+// @Description: 开始同步CRM指标信息
|
|
|
+// @author: Roc
|
|
|
+// @datetime 2024-05-17 15:55:11
|
|
|
+// @param assetPkgCd string
|
|
|
+// @param currIndex int
|
|
|
+// @param pageSize int
|
|
|
+// @param lastUpdateTimeStr string
|
|
|
+// @return err error
|
|
|
+// @return errMsgList []string
|
|
|
+func syncCrmIndex(assetPkgCd string, currIndex, pageSize int, baseLastUpdateTimeStr string) (err error, errMsgList []string) {
|
|
|
+ errMsgList = make([]string, 0)
|
|
|
+
|
|
|
+ lastUpdateTimeStr := baseLastUpdateTimeStr
|
|
|
+ if lastUpdateTimeStr != `` {
|
|
|
+ lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr)
|
|
|
+ }
|
|
|
+ uri := fmt.Sprintf("%s/getCrmData?index_pkg_code=%s&data_source_type=%s¤t_index=%d&page_size=%d&detail_last_update_start_time=%s", utils.SyncCrmIndexPath, assetPkgCd, utils.SyncCrmDataSourceType, currIndex, pageSize, lastUpdateTimeStr)
|
|
|
+ bResult, err, _ := HttpEtaBridgeGet(uri)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ result := new(EtaBridgeDataRespAndBusinessData)
|
|
|
+ err = json.Unmarshal(bResult, &result)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult))
|
|
|
+ utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ //totalPage := result.Data.Paging.Pages
|
|
|
+
|
|
|
+ for _, v := range result.Data.List {
|
|
|
+ tmpErr := pushCrmDataToHub(v)
|
|
|
+ if tmpErr != nil {
|
|
|
+ errMsgList = append(errMsgList, tmpErr.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果还有下一页,那么就继续请求下一页
|
|
|
+ if currIndex < result.Data.Paging.Pages {
|
|
|
+ _, tmpErrMsgList := syncCrmIndex(assetPkgCd, currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr)
|
|
|
+ errMsgList = append(errMsgList, tmpErrMsgList...)
|
|
|
+ }
|
|
|
+
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// pushCrmDataToHub
|
|
|
+// @Description: 调用hub服务,将数据推送到eta
|
|
|
+// @author: Roc
|
|
|
+// @datetime 2024-05-17 15:55:24
|
|
|
+// @param data interface{}
|
|
|
+// @return err error
|
|
|
+func pushCrmDataToHub(data interface{}) (err error) {
|
|
|
+ uri := `/edb/push`
|
|
|
+ _, err, _ = eta_hub.HttpEtaHubPost(uri, data)
|
|
|
+
|
|
|
+ //result := new(EtaBridgeDataRespAndBusinessData)
|
|
|
+ //err = json.Unmarshal(bResult, &result)
|
|
|
+ //if err != nil {
|
|
|
+ // err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult))
|
|
|
+ // utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult))
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// modifyCrmIndexLastUpdateTime
|
|
|
+// @Description: 修改crm指标的最近拉取的时间
|
|
|
+// @author: Roc
|
|
|
+// @datetime 2024-05-17 11:32:32
|
|
|
+// @param fileName string
|
|
|
+// @param position uint32
|
|
|
+// @return err error
|
|
|
+func modifyCrmIndexLastUpdateTime(lastUpdateTime string) {
|
|
|
+ var err error
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("修改binlog文件名称和位置异常,lastUpdateTime", lastUpdateTime, ",err:", err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // fileName 变更
|
|
|
+ key := data_manage.CrmIndexLastUpdateTime
|
|
|
+ fileNameLog, err := data_manage.GetBusinessSysInteractionLogByKey(key)
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() != utils.ErrNoRow() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err = nil
|
|
|
+ fileNameLog = &data_manage.BusinessSysInteractionLog{
|
|
|
+ //ID: 0,
|
|
|
+ InteractionKey: key,
|
|
|
+ InteractionVal: lastUpdateTime,
|
|
|
+ Remark: "crm拉取数据的最近更新时间",
|
|
|
+ ModifyTime: time.Now(),
|
|
|
+ CreateTime: time.Now(),
|
|
|
+ }
|
|
|
+ err = fileNameLog.Create()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ fileNameLog.InteractionVal = lastUpdateTime
|
|
|
+ fileNameLog.ModifyTime = time.Now()
|
|
|
+ err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return
|
|
|
+}
|