package services import ( "encoding/json" "eta/eta_api/cache" "eta/eta_api/models/rag" "eta/eta_api/models/system" "eta/eta_api/utils" "fmt" "time" ) // AddGenerateAbstractTask // @Description: 添加全部报告(微信文章/ETA报告)生成摘要任务 // @author: Roc // @datetime 2025-04-16 17:02:18 // @param question *rag.Question // @param sysUser *system.Admin func AddGenerateAbstractTask(question *rag.Question, sysUser *system.Admin) { // 找出所有公众号文章Id wechatArticleIdList, err := getAllWechatArticleIdList() if err != nil { return } // 找出所有Eta报告 ragEtaReportIdList, err := getAllEtaReportIdList() if err != nil { return } taskName := fmt.Sprintf("自动生成摘要%s-%s", time.Now().Format(utils.FormatShortDateTimeUnSpace), question.QuestionTitle) aiTask := &rag.AiTask{ AiTaskID: 0, TaskName: taskName, TaskType: utils.AI_TASK_TYPE_GENERATE_ABSTRACT, Status: "init", //StartTime: time.Time{}, //EndTime: time.Time{}, CreateTime: time.Now(), UpdateTime: time.Now(), Parameters: fmt.Sprint(question.QuestionId), Logs: "", Errormessage: "", Priority: 0, RetryCount: 0, //EstimatedCompletionTime: time.Time{}, //ActualCompletitonTime: time.Time{}, Remark: "", SysUserID: sysUser.AdminId, SysUserRealName: sysUser.RealName, } taskRecordList := make([]*rag.AiTaskRecord, 0) // 微信文章 for _, wechatArticleId := range wechatArticleIdList { param := rag.QuestionGenerateAbstractParam{ QuestionId: question.QuestionId, ArticleType: `wechat_article`, ArticleId: wechatArticleId, } paramByte, tmpErr := json.Marshal(param) if tmpErr != nil { return } taskRecord := &rag.AiTaskRecord{ AiTaskRecordID: 0, AiTaskID: 0, Parameters: string(paramByte), Status: "待处理", Remark: "", ModifyTime: time.Now(), CreateTime: time.Now(), } taskRecordList = append(taskRecordList, taskRecord) } // eta报告 for _, ragEtaReportId := range ragEtaReportIdList { param := rag.QuestionGenerateAbstractParam{ QuestionId: question.QuestionId, ArticleType: `rag_eta_report`, ArticleId: ragEtaReportId, } paramByte, tmpErr := json.Marshal(param) if tmpErr != nil { return } taskRecord := &rag.AiTaskRecord{ AiTaskRecordID: 0, AiTaskID: 0, Parameters: string(paramByte), Status: "待处理", Remark: "", ModifyTime: time.Now(), CreateTime: time.Now(), } taskRecordList = append(taskRecordList, taskRecord) } // 创建AI模块的任务,用于后面的任务调度去生成摘要 err = rag.AddAiTask(aiTask, taskRecordList) if err != nil { return } // 添加到缓存队列中 go addTaskToCache(aiTask.AiTaskID) return } func addTaskToCache(aiTaskId int) { var err error defer func() { if err != nil { utils.FileLog.Error("addTaskToCache error: %v", err) } }() obj := rag.AiTaskRecord{} list, err := obj.GetAllListByCondition("*", ` AND ai_task_id = ? `, []interface{}{aiTaskId}) if err != nil { return } for _, item := range list { cache.AddAiTaskRecordOpToCache(item.AiTaskRecordID) } } // getAllWechatArticleIdList // @Description: 获取所有的微信文章Id列表 // @author: Roc // @datetime 2025-04-16 17:18:31 // @return wechatArticleIdList []int // @return err error func getAllWechatArticleIdList() (wechatArticleIdList []int, err error) { wechatArticleIdList = make([]int, 0) pageSize := 10000 currentIndex := 1 // 注意,默认是10000条,如果超过10000条,需要分页查询 // 避免死循环 for { tmpWechatArticleIdList, tmpErr := getWechatArticleIdList(currentIndex, pageSize) if tmpErr != nil { return } wechatArticleIdList = append(wechatArticleIdList, tmpWechatArticleIdList...) if len(tmpWechatArticleIdList) < pageSize { return } currentIndex++ // 超过100次,那么也退出,避免死循环 if currentIndex > 100 { return } } } // getWechatArticleIdList // @Description: 分页获取微信文章Id列表 // @author: Roc // @datetime 2025-04-16 17:18:44 // @param currentIndex int // @param pageSize int // @return wechatArticleIdList []int // @return err error func getWechatArticleIdList(currentIndex, pageSize int) (wechatArticleIdList []int, err error) { wechatArticleIdList = make([]int, 0) var condition string var pars []interface{} var startSize int if pageSize <= 0 { pageSize = utils.PageSize20 } if currentIndex <= 0 { currentIndex = 1 } startSize = utils.StartIndex(currentIndex, pageSize) condition += fmt.Sprintf(` AND %s = ? `, rag.WechatArticleColumns.IsDeleted) pars = append(pars, 0) obj := new(rag.WechatArticle) list, err := obj.GetListByCondition(` wechat_article_id `, condition, pars, startSize, pageSize) if err != nil { return } for _, item := range list { wechatArticleIdList = append(wechatArticleIdList, item.WechatArticleId) } return } // getAllEtaReportIdList // @Description: 获取所有的eta报告Id列表 // @author: Roc // @datetime 2025-04-16 17:19:29 // @return ragEtaReportIdList []int // @return err error func getAllEtaReportIdList() (ragEtaReportIdList []int, err error) { ragEtaReportIdList = make([]int, 0) pageSize := 10000 currentIndex := 1 // 注意,默认是10000条,如果超过10000条,需要分页查询 // 避免死循环 for { tmpRagEtaReportIdList, tmpErr := getEtaReportIdList(currentIndex, pageSize) if tmpErr != nil { return } ragEtaReportIdList = append(ragEtaReportIdList, tmpRagEtaReportIdList...) if len(tmpRagEtaReportIdList) < pageSize { return } currentIndex++ // 超过100次,那么也退出,避免死循环 if currentIndex > 100 { return } } } // getEtaReportIdList // @Description: 分页获取eta报告Id列表 // @author: Roc // @datetime 2025-04-16 17:19:14 // @param currentIndex int // @param pageSize int // @return ragEtaReportIdList []int // @return err error func getEtaReportIdList(currentIndex, pageSize int) (ragEtaReportIdList []int, err error) { ragEtaReportIdList = make([]int, 0) var condition string var pars []interface{} var startSize int if pageSize <= 0 { pageSize = utils.PageSize20 } if currentIndex <= 0 { currentIndex = 1 } startSize = utils.StartIndex(currentIndex, pageSize) condition += fmt.Sprintf(` AND %s = ? AND %s = ? `, rag.RagEtaReportColumns.IsDeleted, rag.RagEtaReportColumns.IsPublished) pars = append(pars, 0, 1) obj := new(rag.RagEtaReport) list, err := obj.GetListByCondition(` rag_eta_report_id `, condition, pars, startSize, pageSize) if err != nil { return } for _, item := range list { ragEtaReportIdList = append(ragEtaReportIdList, item.RagEtaReportId) } return } // CheckOpQuestionAuth // @Description: 校验是否有权限操作提示词 // @author: Roc // @datetime 2025-04-16 17:33:01 // @return auth bool // @return err error func CheckOpQuestionAuth() (auth bool, err error) { total, err := getNotFinishGenerateAbstractTaskNum() if err != nil { return } // 存在未完成的任务,则无权限 if total > 0 { return } auth = true return } // getNotFinishGenerateAbstractTaskNum // @Description: 获取未完成的生成摘要任务的数量 // @author: Roc // @datetime 2025-04-16 17:31:12 // @return total int // @return err error func getNotFinishGenerateAbstractTaskNum() (total int, err error) { obj := rag.AiTask{} var condition string var pars []interface{} condition += fmt.Sprintf(` AND %s NOT IN (?) AND %s = ? `, rag.AiTaskColumns.Status, rag.AiTaskColumns.TaskType) pars = append(pars, []string{`done`, `failed`}, utils.AI_TASK_TYPE_GENERATE_ABSTRACT) total, err = obj.GetCountByCondition(condition, pars) if err != nil { return } return } // GetNotFinishGenerateAbstractTaskNumByQuestionId // @Description: 根据提示词ID获取未完成的生成摘要任务的数量 // @author: Roc // @datetime 2025-04-16 17:31:12 // @return total int // @return err error func GetNotFinishGenerateAbstractTaskNumByQuestionId(questionId int) (total int, err error) { obj := rag.AiTask{} var condition string var pars []interface{} condition += fmt.Sprintf(` AND %s NOT IN (?) AND %s = ? AND %s = ? `, rag.AiTaskColumns.Status, rag.AiTaskColumns.TaskType, rag.AiTaskColumns.Parameters) pars = append(pars, []string{`done`, `failed`}, utils.AI_TASK_TYPE_GENERATE_ABSTRACT, fmt.Sprint(questionId)) total, err = obj.GetCountByCondition(condition, pars) if err != nil { return } return }