|
@@ -15,12 +15,38 @@ import (
|
|
|
var lockSyncDataNode sync.Mutex
|
|
|
|
|
|
func SyncDataNode(cont context.Context) (err error) {
|
|
|
+ ctx, cancel := context.WithTimeout(cont, 5*time.Minute)
|
|
|
+ defer cancel()
|
|
|
+ if err = lockWithTimeout(ctx); err != nil {
|
|
|
+ fmt.Println("获取锁失败, err", err)
|
|
|
+ utils.FileLog.Info("SyncDataNode-获取锁失败, err:%s", err.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer lockSyncDataNode.Unlock()
|
|
|
+ syncDataNodeExecute()
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func lockWithTimeout(ctx context.Context) error {
|
|
|
+ done := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ lockSyncDataNode.Lock()
|
|
|
+ close(done)
|
|
|
+ }()
|
|
|
+ select {
|
|
|
+ case <-done:
|
|
|
+ return nil
|
|
|
+ case <-ctx.Done():
|
|
|
+ return fmt.Errorf("lockWithTimeout: %v", ctx.Err())
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func syncDataNodeExecute() (err error) {
|
|
|
fmt.Println("准备同步数据节点")
|
|
|
- lockSyncDataNode.Lock()
|
|
|
utils.FileLog.Info("准备同步数据节点")
|
|
|
errMsgList := make([]string, 0)
|
|
|
defer func() {
|
|
|
- lockSyncDataNode.Unlock()
|
|
|
fmt.Println("同步数据节点结束")
|
|
|
if err != nil {
|
|
|
tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
|
|
@@ -50,7 +76,7 @@ func SyncDataNode(cont context.Context) (err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/save", param)
|
|
|
+ bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param)
|
|
|
if err != nil {
|
|
|
errMsgList = append(errMsgList, errMsg)
|
|
|
errMsgList = append(errMsgList, err.Error())
|
|
@@ -72,7 +98,6 @@ func SyncDataNode(cont context.Context) (err error) {
|
|
|
if err != nil {
|
|
|
errMsgList = append(errMsgList, err.Error())
|
|
|
}
|
|
|
-
|
|
|
return
|
|
|
}
|
|
|
|