ソースを参照

Merge branch 'rag/4.0' into debug

Roc 3 日 前
コミット
36cc177f33

+ 164 - 0
models/rag/ai_task.go

@@ -0,0 +1,164 @@
+package rag
+
+import (
+	"database/sql"
+	"eta/eta_task/global"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+// AiTask ai这边的任务表
+type AiTask struct {
+	AiTaskID                int       `gorm:"primaryKey;column:ai_task_id" description:"-"`
+	TaskName                string    `gorm:"column:task_name" description:"任务名称"`
+	TaskType                string    `gorm:"column:task_type" description:"任务类型"`
+	Status                  string    `gorm:"column:status" description:"任务状态"`
+	StartTime               time.Time `gorm:"column:start_time" description:"开始时间"`
+	EndTime                 time.Time `gorm:"column:end_time" description:"结束时间"`
+	CreateTime              time.Time `gorm:"column:create_time" description:"创建时间"`
+	UpdateTime              time.Time `gorm:"column:update_time" description:"更新时间"`
+	Parameters              string    `gorm:"column:parameters" description:"执行参数"`
+	Logs                    string    `gorm:"column:logs" description:"日志"`
+	Errormessage            string    `gorm:"column:ErrorMessage" description:"错误信息"`
+	Priority                int       `gorm:"column:priority" description:"优先级"`
+	RetryCount              int       `gorm:"column:retry_count" description:"重试次数"`
+	EstimatedCompletionTime time.Time `gorm:"column:estimated_completion_time" description:"预计完成时间"`
+	ActualCompletitonTime   time.Time `gorm:"column:actual_completiton_time" description:"实际完成时间"`
+	Remark                  string    `gorm:"column:remark" description:"备注"`
+	SysUserID               int       `gorm:"column:sys_user_id" description:"任务创建人id"`
+	SysUserRealName         string    `gorm:"column:sys_user_real_name" description:"任务创建人名称"`
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *AiTask) TableName() string {
+	return "ai_task"
+}
+
+// AiTaskColumns get sql column name.获取数据库列名
+var AiTaskColumns = struct {
+	AiTaskID                string
+	TaskName                string
+	TaskType                string
+	Status                  string
+	StartTime               string
+	EndTime                 string
+	CreateTime              string
+	UpdateTime              string
+	Parameters              string
+	Logs                    string
+	Errormessage            string
+	Priority                string
+	RetryCount              string
+	EstimatedCompletionTime string
+	ActualCompletitonTime   string
+	Remark                  string
+	SysUserID               string
+	SysUserRealName         string
+}{
+	AiTaskID:                "ai_task_id",
+	TaskName:                "task_name",
+	TaskType:                "task_type",
+	Status:                  "status",
+	StartTime:               "start_time",
+	EndTime:                 "end_time",
+	CreateTime:              "create_time",
+	UpdateTime:              "update_time",
+	Parameters:              "parameters",
+	Logs:                    "logs",
+	Errormessage:            "ErrorMessage",
+	Priority:                "priority",
+	RetryCount:              "retry_count",
+	EstimatedCompletionTime: "estimated_completion_time",
+	ActualCompletitonTime:   "actual_completiton_time",
+	Remark:                  "remark",
+	SysUserID:               "sys_user_id",
+	SysUserRealName:         "sys_user_real_name",
+}
+
+func (m *AiTask) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *AiTask) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *AiTask) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *AiTask) GetByID(id int) (item *AiTask, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", AiTaskColumns.AiTaskID), id).First(&item).Error
+
+	return
+}
+
+func (m *AiTask) GetByCondition(condition string, pars []interface{}) (item *AiTask, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *AiTask) GetListByCondition(field, condition string, pars []interface{}, startSize, pageSize int) (items []*AiTask, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *AiTask) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+// AddAiTask
+// @Description: 添加Ai模块的任务
+// @author: Roc
+// @datetime 2025-04-16 16:55:36
+// @param aiTask *AiTask
+// @param aiRecordList []*AiTaskRecord
+// @return err error
+func AddAiTask(aiTask *AiTask, aiRecordList []*AiTaskRecord) (err error) {
+	to := global.DbMap[utils.DbNameAI].Begin()
+	defer func() {
+		if err != nil {
+			_ = to.Rollback()
+		} else {
+			_ = to.Commit()
+		}
+	}()
+
+	err = to.Create(aiTask).Error
+	if err != nil {
+		return
+	}
+
+	for _, aiTaskRecord := range aiRecordList {
+		aiTaskRecord.AiTaskID = aiTask.AiTaskID
+	}
+
+	err = to.CreateInBatches(aiRecordList, utils.MultiAddNum).Error
+	if err != nil {
+		return
+	}
+
+	return
+}

+ 115 - 0
models/rag/ai_task_record.go

@@ -0,0 +1,115 @@
+package rag
+
+import (
+	"database/sql"
+	"eta/eta_task/global"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+// AiTaskRecord AI任务的子记录
+type AiTaskRecord struct {
+	AiTaskRecordID int       `gorm:"primaryKey;column:ai_task_record_id" json:"-"` // 任务记录id
+	AiTaskID       int       `gorm:"column:ai_task_id" json:"aiTaskId"`            // 任务id
+	Parameters     string    `gorm:"column:parameters" json:"parameters"`          // 子任务参数
+	Status         string    `gorm:"column:status" json:"status"`                  // 状态
+	Remark         string    `gorm:"column:remark" json:"remark"`                  // 备注
+	ModifyTime     time.Time `gorm:"column:modify_time" json:"modifyTime"`         // 最后一次修改时间
+	CreateTime     time.Time `gorm:"column:create_time" json:"createTime"`         // 任务创建时间
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *AiTaskRecord) TableName() string {
+	return "ai_task_record"
+}
+
+// AiTaskRecordColumns get sql column name.获取数据库列名
+var AiTaskRecordColumns = struct {
+	AiTaskRecordID string
+	AiTaskID       string
+	Parameters     string
+	Status         string
+	Remark         string
+	ModifyTime     string
+	CreateTime     string
+}{
+	AiTaskRecordID: "ai_task_record_id",
+	AiTaskID:       "ai_task_id",
+	Parameters:     "parameters",
+	Status:         "status",
+	Remark:         "remark",
+	ModifyTime:     "modify_time",
+	CreateTime:     "create_time",
+}
+
+func (m *AiTaskRecord) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *AiTaskRecord) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *AiTaskRecord) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetByID(id int) (item *AiTaskRecord, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", AiTaskRecordColumns.AiTaskRecordID), id).First(&item).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetByCondition(condition string, pars []interface{}) (item *AiTaskRecord, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetAllListByCondition(field, condition string, pars []interface{}) (items []*AiTaskRecord, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_record_id desc `, field, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetListByCondition(field, condition string, pars []interface{}, startSize, pageSize int) (items []*AiTaskRecord, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_record_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+// QuestionGenerateAbstractParam
+// @Description:
+type QuestionGenerateAbstractParam struct {
+	QuestionId  int    `json:"questionId"`
+	ArticleType string `json:"articleType"`
+	ArticleId   int    `json:"articleId"`
+}

+ 6 - 0
services/task.go

@@ -31,6 +31,12 @@ func Task() {
 	// 定时汇总数据源终端指标更新情况
 	setEdbSourceStatTask := task.NewTask("setEdbSourceStatTask", "0 20 19,23 * * *", data_stat.SetEdbSourceStatTask)
 	task.AddTask("数据源统计表", setEdbSourceStatTask)
+	
+	// 定时监测AI任务调度情况
+	if utils.BusinessCode == utils.BusinessCodeRelease || utils.BusinessCode == utils.BusinessCodeDebug {
+		checkAiTask := task.NewTask("checkAiTask", "0 */5 * * * *", CheckAiTask)
+		task.AddTask("定时定时监测AI任务调度情况", checkAiTask)
+	}
 
 	// 定时禁用钢联化工和wind指标的刷新状态
 	disableEdbRefresh := task.NewTask("disableEdbRefresh", "0 0 10 * * *", DisableEdbRefresh)

+ 75 - 0
services/wechat_platform.go

@@ -56,3 +56,78 @@ func RefreshWechatPlatform(cont context.Context) (err error) {
 
 	return
 }
+
+// AiTaskRecordOp
+// @Description: AI模块任务操作记录
+type AiTaskRecordOp struct {
+	AiTaskRecordId int
+}
+
+// AddAiTaskRecordOpToCache
+// @Description: AI任务操作调度入队列
+// @author: Roc
+// @datetime 2025-04-24 09:41:11
+// @param aiTaskRecordId int
+// @return bool
+func AddAiTaskRecordOpToCache(aiTaskRecordId int) bool {
+	// 如果不在发布和调试模式,那么就不加入缓存
+	if !utils.InArrayByStr([]string{utils.BusinessCodeRelease, utils.BusinessCodeDebug}, utils.BusinessCode) {
+		return true
+	}
+	record := new(AiTaskRecordOp)
+	record.AiTaskRecordId = aiTaskRecordId
+	if utils.Re == nil {
+		err := utils.Rc.LPush(utils.CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK, record)
+
+		utils.FileLog.Info(fmt.Sprintf("将AI任务操作调度入队列 加入缓存 AddAiTaskRecordOpToCache LPush: 记录id:%d", aiTaskRecordId))
+		if err != nil {
+			fmt.Println("AddAiTaskRecordOpToCache LPush Err:" + err.Error())
+		}
+		return true
+	}
+	return false
+}
+
+func CheckAiTask(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("CheckAiTask err:", err)
+		}
+	}()
+	count, err := utils.Rc.LLen(utils.CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK)
+	if err != nil {
+		fmt.Println("CheckAiTask err:", err)
+	}
+	if count > 0 {
+		return
+	}
+
+	// 队列没数据了,那么需要重新检查下任务状态,并将状态为待处理且未完成的任务加入到队列中
+	aiTaskObj := rag.AiTask{}
+	taskList, err := aiTaskObj.GetListByCondition(``, ` AND status = ? `, []interface{}{`processing`}, 0, 10)
+	if err != nil {
+		fmt.Println("CheckAiTask err:", err)
+		return
+	}
+
+	if len(taskList) <= 0 {
+		return
+	}
+
+	aiTaskRecordObj := rag.AiTaskRecord{}
+	for _, v := range taskList {
+		// 查找具体记录
+		recordList, tmpErr := aiTaskRecordObj.GetAllListByCondition(``, ` AND ai_task_id = ? AND status = ? `, []interface{}{v.AiTaskID, `待处理`})
+		if tmpErr != nil {
+			fmt.Println(v.AiTaskID, "获取待处理记录失败; err:", tmpErr)
+			continue
+		}
+
+		// 将具体记录加入到队列中
+		for _, record := range recordList {
+			AddAiTaskRecordOpToCache(record.AiTaskRecordID)
+		}
+	}
+
+	return
+}

+ 2 - 1
utils/constants.go

@@ -181,7 +181,8 @@ const (
 const CACHE_EDB_UPDATE_LOG_ID = "eta:edb_update_log:id"
 
 const (
-	CACHE_WECHAT_PLATFORM_ARTICLE = "wechat_platform:article:op:" //微信文章处理
+	CACHE_WECHAT_PLATFORM_ARTICLE      = "wechat_platform:article:op:"          //微信文章处理
+	CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK = "eta:ai:article:abstract:llm:task:op:" //微信文章/eta报告的摘要重新生成处理(任务调度)
 )
 
 // 指标引用对象

+ 1 - 0
utils/redis.go

@@ -19,6 +19,7 @@ type RedisClient interface {
 	IsExist(key string) bool
 	LPush(key string, val interface{}) error
 	Brpop(key string, callback func([]byte))
+	LLen(key string) (int64, error)
 	GetRedisTTL(key string) time.Duration
 	Incrby(key string, num int) (interface{}, error)
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)

+ 12 - 0
utils/redis/cluster_redis.go

@@ -254,6 +254,18 @@ func (rc *ClusterRedisClient) Brpop(key string, callback func([]byte)) {
 
 }
 
+// LLen
+// @Description: 获取list中剩余的数据数
+// @author: Roc
+// @receiver rc
+// @datetime 2025-04-25 10:58:25
+// @param key string
+// @return int64
+// @return error
+func (rc *ClusterRedisClient) LLen(key string) (int64, error) {
+	return rc.redisClient.LLen(context.TODO(), key).Result()
+}
+
 // GetRedisTTL
 // @Description: 获取key的过期时间
 // @receiver rc

+ 12 - 0
utils/redis/standalone_redis.go

@@ -246,6 +246,18 @@ func (rc *StandaloneRedisClient) Brpop(key string, callback func([]byte)) {
 
 }
 
+// LLen
+// @Description: 获取list中剩余的数据数
+// @author: Roc
+// @receiver rc
+// @datetime 2025-04-25 10:58:25
+// @param key string
+// @return int64
+// @return error
+func (rc *StandaloneRedisClient) LLen(key string) (int64, error) {
+	return rc.redisClient.LLen(context.TODO(), key).Result()
+}
+
 // GetRedisTTL
 // @Description: 获取key的过期时间
 // @receiver rc