Roc 3 ngày trước cách đây
mục cha
commit
77521699ee
5 tập tin đã thay đổi với 54 bổ sung7 xóa
  1. 1 1
      models/rag/ai_task.go
  2. 28 6
      services/task.go
  3. 1 0
      utils/redis.go
  4. 12 0
      utils/redis/cluster_redis.go
  5. 12 0
      utils/redis/standalone_redis.go

+ 1 - 1
models/rag/ai_task.go

@@ -111,7 +111,7 @@ func (m *AiTask) GetListByCondition(field, condition string, pars []interface{},
 	if field == "" {
 		field = "*"
 	}
-	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by AiTask_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_id desc LIMIT ?,?`, field, m.TableName(), condition)
 	pars = append(pars, startSize, pageSize)
 	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
 

+ 28 - 6
services/task.go

@@ -796,7 +796,9 @@ func HandleAiArticleAbstractLlmOp() {
 	}
 }
 
-var aiTaskIdMap = map[int]bool{}
+var aiTaskHandleIdMap = map[int]bool{}
+
+// todo 任务开始时间
 
 // handleAiArticleAbstractLlmOp
 // @Description: 处理AI库的报告摘要生成(批量任务)
@@ -822,6 +824,27 @@ func handleAiArticleAbstractLlmOp(b []byte) {
 		return
 	}
 
+	// 如果没有处理过该任务,那么就标记该任务开始
+	if _, ok := aiTaskHandleIdMap[item.AiTaskID]; !ok {
+		aiTaskObj := rag.AiTask{}
+		aiTaskInfo, tmpErr := aiTaskObj.GetByID(item.AiTaskID)
+		if tmpErr != nil {
+			err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
+			return
+		}
+		// 如果任务是初始化,那么就标记开始
+		if aiTaskInfo.Status == `init` {
+			aiTaskInfo.StartTime = time.Now()
+			aiTaskInfo.Status = `processing`
+			aiTaskInfo.UpdateTime = time.Now()
+			tmpErr = aiTaskInfo.Update([]string{`start_time`, "status", "update_time"})
+			if tmpErr != nil {
+				utils.FileLog.Error("标记任务开始状态失败, err: %s", tmpErr.Error())
+			}
+		}
+
+	}
+
 	// 处理完成后标记任务状态
 	defer func() {
 		// 修改任务状态
@@ -837,12 +860,12 @@ func handleAiArticleAbstractLlmOp(b []byte) {
 				err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
 				return
 			}
+			aiTaskInfo.EndTime = time.Now()
 			aiTaskInfo.Status = `done`
 			aiTaskInfo.UpdateTime = time.Now()
-			tmpErr = aiTaskInfo.Update([]string{"status", "update_time"})
+			tmpErr = aiTaskInfo.Update([]string{`end_time`, "status", "update_time"})
 			if tmpErr != nil {
-				err = fmt.Errorf("标记任务状态失败, err: %s", tmpErr.Error())
-				return
+				utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
 			}
 		}
 
@@ -867,8 +890,7 @@ func handleAiArticleAbstractLlmOp(b []byte) {
 		item.ModifyTime = time.Now()
 		tmpErr := item.Update([]string{"status", "remark", "modify_time"})
 		if tmpErr != nil {
-			err = fmt.Errorf("标记任务记录状态失败, err: %s", tmpErr.Error())
-			return
+			utils.FileLog.Error("标记任务记录状态失败, err: %s", tmpErr.Error())
 		}
 	}()
 

+ 1 - 0
utils/redis.go

@@ -19,6 +19,7 @@ type RedisClient interface {
 	IsExist(key string) bool
 	LPush(key string, val interface{}) error
 	Brpop(key string, callback func([]byte))
+	LLen(key string) (int64, error)
 	GetRedisTTL(key string) time.Duration
 	Incrby(key string, num int) (interface{}, error)
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)

+ 12 - 0
utils/redis/cluster_redis.go

@@ -249,6 +249,18 @@ func (rc *ClusterRedisClient) Brpop(key string, callback func([]byte)) {
 
 }
 
+// LLen
+// @Description: 获取list中剩余的数据数
+// @author: Roc
+// @receiver rc
+// @datetime 2025-04-25 10:58:25
+// @param key string
+// @return int64
+// @return error
+func (rc *ClusterRedisClient) LLen(key string) (int64, error) {
+	return rc.redisClient.LLen(context.TODO(), key).Result()
+}
+
 // GetRedisTTL
 // @Description: 获取key的过期时间
 // @receiver rc

+ 12 - 0
utils/redis/standalone_redis.go

@@ -237,6 +237,18 @@ func (rc *StandaloneRedisClient) Brpop(key string, callback func([]byte)) {
 
 }
 
+// LLen
+// @Description: 获取list中剩余的数据数
+// @author: Roc
+// @receiver rc
+// @datetime 2025-04-25 10:58:25
+// @param key string
+// @return int64
+// @return error
+func (rc *StandaloneRedisClient) LLen(key string) (int64, error) {
+	return rc.redisClient.LLen(context.TODO(), key).Result()
+}
+
 // GetRedisTTL
 // @Description: 获取key的过期时间
 // @receiver rc