Browse Source

Merge remote-tracking branch 'origin/eta/2.6.4' into debug

# Conflicts:
#	utils/constants.go
Roc 1 week ago
parent
commit
0efdbc7892

+ 34 - 0
cache/index_task.go

@@ -0,0 +1,34 @@
+package cache
+
+import (
+	"eta/eta_api/utils"
+	"fmt"
+)
+
+type IndexTaskRecordOp struct {
+	IndexTaskRecordId int
+	TaskType          string
+}
+
+// AddIndexTaskRecordOpToCache
+// @Description: AI任务操作调度入队列
+// @author: Roc
+// @datetime 2025-04-24 09:41:11
+// @param aiTaskRecordId int
+// @return bool
+func AddIndexTaskRecordOpToCache(aiTaskRecordId int, taskType string) bool {
+	record := new(IndexTaskRecordOp)
+	record.IndexTaskRecordId = aiTaskRecordId
+	record.TaskType = taskType
+
+	if utils.Re == nil {
+		err := utils.Rc.LPush(utils.CACHE_INDEX_TASK, record)
+
+		utils.FileLog.Info(fmt.Sprintf("将指标库任务操作调度入队列 加入缓存 AddIndexTaskRecordOpToCache LPush: 记录id:%d", aiTaskRecordId))
+		if err != nil {
+			fmt.Println("AddIndexTaskRecordOpToCache LPush Err:" + err.Error())
+		}
+		return true
+	}
+	return false
+}

+ 178 - 136
controllers/data_manage/ai_predict_model/index.go

@@ -168,135 +168,6 @@ func (this *AiPredictModelIndexController) List() {
 	br.Msg = "获取成功"
 }
 
-// GetAll
-// @Title 获取标的全量数据
-// @Description 获取标的全量数据
-// @Param   ClassifyId   query   int   false   "分类id"
-// @Param   IndexId   query   int   false   "模型标的ID"
-// @Param   Keyword   query   string   false   "搜索关键词"
-// @Success 200 {object} data_manage.ChartListResp
-// @router /index/all [get]
-func (this *AiPredictModelIndexController) GetAll() {
-	br := new(models.BaseResponse).Init()
-	defer func() {
-		this.Data["json"] = br
-		this.ServeJSON()
-	}()
-	sysUser := this.SysUser
-	if sysUser == nil {
-		br.Msg = "请登录"
-		br.ErrMsg = "请登录,SysUser Is Empty"
-		br.Ret = 408
-		return
-	}
-	classifyId, _ := this.GetInt("ClassifyId")
-	indexId, _ := this.GetInt("IndexId")
-	keyword := this.GetString("KeyWord")
-	if keyword == "" {
-		keyword = this.GetString("Keyword")
-	}
-	keyword = strings.TrimSpace(keyword)
-	resp := new(aiPredictModel.AiPredictModelIndexPageListResp)
-
-	// 分类
-	classifyIdName := make(map[int]string)
-	{
-		classifyOb := new(aiPredictModel.AiPredictModelClassify)
-		list, e := classifyOb.GetItemsByCondition("", make([]interface{}, 0), []string{}, "")
-		if e != nil {
-			br.Msg = "获取失败"
-			br.ErrMsg = fmt.Sprintf("获取分类失败, %v", e)
-			return
-		}
-		for _, v := range list {
-			classifyIdName[v.AiPredictModelClassifyId] = v.ClassifyName
-		}
-	}
-
-	// 筛选条件
-	highlightMap := make(map[int]string)
-	indexOb := new(aiPredictModel.AiPredictModelIndex)
-	var cond string
-	var pars []interface{}
-	{
-		if indexId > 0 {
-			cond += fmt.Sprintf(" AND %s = ?", indexOb.Cols().PrimaryId)
-			pars = append(pars, indexId)
-		}
-		if classifyId > 0 {
-			cond += fmt.Sprintf(" AND %s = ?", indexOb.Cols().ClassifyId)
-			pars = append(pars, classifyId)
-		}
-
-		// 有关键词从es中搜索
-		if keyword != "" {
-			// 使用scroll API获取所有匹配的数据
-			scrollId, list, e := elastic.SearchDataSourceIndexWithScroll(utils.EsDataSourceIndexName, keyword, utils.DATA_SOURCE_AI_PREDICT_MODEL, 0, []int{}, []int{}, []string{}, 1000)
-			if e != nil {
-				br.Msg = "获取失败"
-				br.ErrMsg = fmt.Sprintf("ES-搜索AI预测模型列表失败, %v", e)
-				return
-			}
-
-			// 如果scrollId不为空,说明还有更多数据,继续获取
-			for scrollId != "" {
-				nextScrollId, nextList, e := elastic.ScrollDataSourceIndex(utils.EsDataSourceIndexName, scrollId)
-				if e != nil {
-					br.Msg = "获取失败"
-					br.ErrMsg = fmt.Sprintf("ES-获取更多数据失败, %v", e)
-					return
-				}
-				if len(nextList) > 0 {
-					list = append(list, nextList...)
-				}
-				scrollId = nextScrollId
-			}
-
-			if len(list) == 0 {
-				resp.List = make([]*aiPredictModel.AiPredictModelIndexItem, 0)
-				br.Ret = 200
-				br.Success = true
-				br.Msg = "获取成功"
-				br.Data = resp
-				return
-			}
-			var ids []int
-			for _, v := range list {
-				ids = append(ids, v.PrimaryId)
-				highlightMap[v.PrimaryId] = v.SearchText
-			}
-			cond += fmt.Sprintf(` AND %s IN (%s)`, indexOb.Cols().PrimaryId, utils.GetOrmInReplace(len(ids)))
-			pars = append(pars, ids)
-		}
-	}
-
-	// 获取列表
-	list, e := indexOb.GetItemsByCondition(cond, pars, []string{}, "")
-	if e != nil {
-		br.Msg = "获取失败"
-		br.ErrMsg = fmt.Sprintf("获取列表失败, %v", e)
-		return
-	}
-	pageList := make([]*aiPredictModel.AiPredictModelIndexItem, 0)
-	for _, v := range list {
-		t := v.Format2Item()
-		t.ClassifyName = classifyIdName[v.ClassifyId]
-		// 搜索高亮
-		t.SearchText = v.IndexName
-		s := highlightMap[v.AiPredictModelIndexId]
-		if s != "" {
-			t.SearchText = s
-		}
-		pageList = append(pageList, t)
-	}
-
-	resp.List = pageList
-	br.Data = resp
-	br.Ret = 200
-	br.Success = true
-	br.Msg = "获取成功"
-}
-
 // Import
 // @Title 导入标的和数据
 // @Description 导入标的和数据
@@ -1377,13 +1248,10 @@ func (this *AiPredictModelIndexController) GetCurrentRunningAiPredictModelIndexC
 // @Description 获取当前正在运行中的模型数量
 // @Success 200 Ret=200 保存成功
 // @Success 200 {object} response.CurrentRunningCountResp
-// @router /index/run [get]
+// @router /index/run [post]
 func (this *AiPredictModelIndexController) Run() {
 	br := new(models.BaseResponse).Init()
 	defer func() {
-		if br.ErrMsg == "" {
-			br.IsSendEmail = false
-		}
 		this.Data["json"] = br
 		this.ServeJSON()
 	}()
@@ -1395,6 +1263,17 @@ func (this *AiPredictModelIndexController) Run() {
 		return
 	}
 
+	var req request.AiPredictModelIndexRunReq
+	if e := json.Unmarshal(this.Ctx.Input.RequestBody, &req); e != nil {
+		br.Msg = "参数解析异常"
+		br.ErrMsg = fmt.Sprintf("参数解析异常, %v", e)
+		return
+	}
+
+	classifyId := req.ClassifyId
+	//indexId, _ := this.GetInt("IndexId")
+	keyword := strings.TrimSpace(req.Keyword)
+
 	// 查找当前标的是否存在待训练/训练中的模型
 	count, err := services.GetCurrentRunningAiPredictModelIndexCount()
 	if err != nil {
@@ -1402,13 +1281,176 @@ func (this *AiPredictModelIndexController) Run() {
 		br.ErrMsg = "训练失败,查找待训练的模型失败,Err:" + err.Error()
 		return
 	}
+	if count > 0 {
+		br.Msg = "当前有模型正在训练/运行中,请勿重复训练"
+		br.ErrMsg = "当前有模型正在训练/运行中,请勿重复训练"
+		br.IsSendEmail = false
+		return
+	}
 
-	resp := response.CurrentRunningCountResp{
-		Total: count,
+	// 分类
+	classifyIdName := make(map[int]string)
+	{
+		classifyOb := new(aiPredictModel.AiPredictModelClassify)
+		list, e := classifyOb.GetItemsByCondition("", make([]interface{}, 0), []string{}, "")
+		if e != nil {
+			br.Msg = "获取失败"
+			br.ErrMsg = fmt.Sprintf("获取分类失败, %v", e)
+			return
+		}
+		for _, v := range list {
+			classifyIdName[v.AiPredictModelClassifyId] = v.ClassifyName
+		}
 	}
 
-	br.Data = resp
+	// 筛选条件
+	highlightMap := make(map[int]string)
+	indexOb := new(aiPredictModel.AiPredictModelIndex)
+	var cond string
+	var pars []interface{}
+
+	cond += fmt.Sprintf(` AND %s NOT IN (?)  AND %s NOT IN (?) `, indexOb.Cols().TrainStatus, indexOb.Cols().RunStatus)
+	pars = append(pars, []string{aiPredictModel.TrainStatusWaiting, aiPredictModel.TrainStatusTraining}, []string{aiPredictModel.RunStatusWaiting, aiPredictModel.RunStatusRunning})
+
+	if req.SelectAll {
+		// 如果列表全选
+		if classifyId > 0 {
+			cond += fmt.Sprintf(" AND %s = ?", indexOb.Cols().ClassifyId)
+			pars = append(pars, classifyId)
+		}
+
+		// 有关键词从es中搜索
+		if keyword != "" {
+			// 使用scroll API获取所有匹配的数据
+			list, e := getAllSearchDataSource(keyword, utils.DATA_SOURCE_AI_PREDICT_MODEL, 0)
+			if e != nil {
+				br.Msg = "获取失败"
+				br.ErrMsg = fmt.Sprintf("获取失败,Err:%v", e)
+				return
+			}
+
+			if len(list) == 0 {
+				br.Msg = "没有找到可以运行的标的"
+				br.IsSendEmail = false
+				return
+			}
+			var ids []int
+			for _, v := range list {
+				ids = append(ids, v.PrimaryId)
+				highlightMap[v.PrimaryId] = v.SearchText
+			}
+			cond += fmt.Sprintf(` AND %s IN (%s)`, indexOb.Cols().PrimaryId, utils.GetOrmInReplace(len(ids)))
+			pars = append(pars, ids)
+		}
+
+		// 不勾选的标的
+		if len(req.NotIndexIdList) > 0 {
+			var ids []int
+			for _, v := range req.NotIndexIdList {
+				ids = append(ids, v)
+			}
+			cond += fmt.Sprintf(` AND %s NOT IN (%s)`, indexOb.Cols().PrimaryId, utils.GetOrmInReplace(len(ids)))
+			pars = append(pars, ids)
+		}
+	} else {
+		// 如果不是列表全选
+		if len(req.IndexIdList) <= 0 {
+			br.Msg = `请选择标的`
+			br.IsSendEmail = false
+			return
+		}
+		var ids []int
+		for _, v := range req.IndexIdList {
+			ids = append(ids, v)
+		}
+		cond += fmt.Sprintf(` AND %s IN (%s)`, indexOb.Cols().PrimaryId, utils.GetOrmInReplace(len(ids)))
+		pars = append(pars, ids)
+	}
+
+	// 获取列表
+	list, e := indexOb.GetItemsByCondition(cond, pars, []string{`ai_predict_model_index_id`}, "")
+	if e != nil {
+		br.Msg = "获取失败"
+		br.ErrMsg = fmt.Sprintf("获取列表失败, %v", e)
+		return
+	}
+
+	indexIdList := make([]int, 0)
+	for _, v := range list {
+		indexIdList = append(indexIdList, v.AiPredictModelIndexId)
+	}
+
+	if len(indexIdList) <= 0 {
+		br.Msg = "没有找到可以运行的标的"
+		br.IsSendEmail = false
+		return
+	}
+
+	err = indexOb.UpdateRunStatusByIdList(indexIdList)
+	if err != nil {
+		br.Msg = "运行失败"
+		br.ErrMsg = fmt.Sprintf("运行失败, %v", e)
+		return
+	}
+
+	// 加入模型运行任务中
+	go services.AddAiModelRunTask(indexIdList, this.SysUser)
+
+	br.Data = indexIdList
 	br.Ret = 200
 	br.Success = true
 	br.Msg = "获取成功"
 }
+
+// getAllSearchDataSource
+// @Description: 根据条件获取ES中的所有数据
+// @author: Roc
+// @datetime 2025-05-08 11:15:27
+// @param keyword string
+// @param source int
+// @param subSource int
+// @return list []*dataSourceModel.SearchDataSourceItem
+// @return err error
+func getAllSearchDataSource(keyword string, source, subSource int) (list []*dataSourceModel.SearchDataSourceItem, err error) {
+	dataLimit := 1000 // 每页获取的数据量
+	var scrollId string
+	// 使用scroll API获取所有匹配的数据
+	scrollId, list, e := elastic.SearchDataSourceIndexWithScroll(utils.EsDataSourceIndexName, keyword, source, subSource, []int{}, []int{}, []string{}, dataLimit)
+	if e != nil {
+		err = fmt.Errorf("ES-搜索列表失败, %v", e)
+		return
+	}
+	// 最终清理用的 scrollId 放到一个指针里,保证 defer 拿到最新值
+	finalScrollId := &scrollId
+
+	defer func() {
+		// 清除滚动查询的缓存
+		if *finalScrollId != "" {
+			elastic.ClearScrollDataSourceIndex(*finalScrollId)
+		}
+	}()
+
+	// 如果scrollId不为空,说明还有更多数据,继续获取
+	for scrollId != "" {
+		nextScrollId, nextList, e := elastic.ScrollDataSourceIndex(scrollId)
+		if e != nil {
+			err = fmt.Errorf("ES-获取更多数据失败, %v", e)
+			return
+		}
+
+		// 如果没有更多数据,则退出循环
+		if len(nextList) <= 0 {
+			break
+		}
+
+		list = append(list, nextList...)
+
+		if nextScrollId != `` {
+			scrollId = nextScrollId
+			*finalScrollId = scrollId // 更新 finalScrollId 的指向内容
+		}
+
+	}
+
+	return
+}

+ 36 - 28
controllers/data_manage/ai_predict_model/index_config.go

@@ -4,7 +4,7 @@ import (
 	"encoding/json"
 	"eta/eta_api/controllers"
 	"eta/eta_api/models"
-	data_manage "eta/eta_api/models/ai_predict_model"
+	aiPredictModel "eta/eta_api/models/ai_predict_model"
 	"eta/eta_api/models/ai_predict_model/request"
 	"eta/eta_api/models/ai_predict_model/response"
 	"eta/eta_api/services"
@@ -25,7 +25,7 @@ type AiPredictModelIndexConfigController struct {
 // @Param   PageSize   query   int  true       "每页数据条数"
 // @Param   CurrentIndex   query   int  true       "当前页页码,从1开始"
 // @Param   IndexId   query   int  true       "标的id"
-// @Success 200 {object} []*data_manage.AiPredictModelIndexConfigView
+// @Success 200 {object} []*response.AiPredictModelIndexConfigListResp
 // @router /index_config/list [get]
 func (c *AiPredictModelIndexConfigController) List() {
 	br := new(models.BaseResponse).Init()
@@ -60,15 +60,15 @@ func (c *AiPredictModelIndexConfigController) List() {
 	startSize = utils.StartIndex(currentIndex, pageSize)
 
 	var total int
-	viewList := make([]data_manage.AiPredictModelIndexConfigView, 0)
+	viewList := make([]aiPredictModel.AiPredictModelIndexConfigView, 0)
 
 	var condition string
 	var pars []interface{}
 
-	condition += fmt.Sprintf(` AND %s = ? `, data_manage.AiPredictModelIndexConfigColumns.AiPredictModelIndexId)
+	condition += fmt.Sprintf(` AND %s = ? `, aiPredictModel.AiPredictModelIndexConfigColumns.AiPredictModelIndexId)
 	pars = append(pars, indexId)
 
-	obj := new(data_manage.AiPredictModelIndexConfig)
+	obj := new(aiPredictModel.AiPredictModelIndexConfig)
 	tmpTotal, list, err := obj.GetPageListByCondition(condition, pars, startSize, pageSize)
 	if err != nil {
 		br.Msg = "获取失败"
@@ -120,7 +120,7 @@ func (c *AiPredictModelIndexConfigController) CurrVersion() {
 	}
 
 	// 查询标的情况
-	indexOb := new(data_manage.AiPredictModelIndex)
+	indexOb := new(aiPredictModel.AiPredictModelIndex)
 	indexItem, e := indexOb.GetItemById(indexId)
 	if e != nil {
 		if utils.IsErrNoRow(e) {
@@ -138,7 +138,7 @@ func (c *AiPredictModelIndexConfigController) CurrVersion() {
 		return
 	}
 
-	obj := new(data_manage.AiPredictModelIndexConfig)
+	obj := new(aiPredictModel.AiPredictModelIndexConfig)
 	configItem, err := obj.GetById(indexItem.AiPredictModelIndexConfigId)
 	if err != nil {
 		br.Msg = "获取失败"
@@ -176,7 +176,7 @@ func (c *AiPredictModelIndexConfigController) SetCurr() {
 	var req request.DelConfigReq
 	err := json.Unmarshal(c.Ctx.Input.RequestBody, &req)
 	// 查找配置
-	obj := new(data_manage.AiPredictModelIndexConfig)
+	obj := new(aiPredictModel.AiPredictModelIndexConfig)
 	configItem, err := obj.GetById(req.AiPredictModelIndexConfigId)
 	if err != nil {
 		br.Msg = "修改失败"
@@ -189,7 +189,7 @@ func (c *AiPredictModelIndexConfigController) SetCurr() {
 	}
 
 	// 查询标的情况
-	indexOb := new(data_manage.AiPredictModelIndex)
+	indexOb := new(aiPredictModel.AiPredictModelIndex)
 	indexItem, e := indexOb.GetItemById(configItem.AiPredictModelIndexId)
 	if e != nil {
 		br.Msg = "操作失败"
@@ -237,7 +237,7 @@ func (c *AiPredictModelIndexConfigController) Del() {
 	}
 
 	// 查找配置
-	obj := new(data_manage.AiPredictModelIndexConfig)
+	obj := new(aiPredictModel.AiPredictModelIndexConfig)
 	item, err := obj.GetById(req.AiPredictModelIndexConfigId)
 	if err != nil {
 		br.Msg = "修改失败"
@@ -252,7 +252,7 @@ func (c *AiPredictModelIndexConfigController) Del() {
 	// 查找是否被标的引用为默认模型
 	{
 		// 查询标的情况
-		indexOb := new(data_manage.AiPredictModelIndex)
+		indexOb := new(aiPredictModel.AiPredictModelIndex)
 		count, e := indexOb.GetCountByCondition(fmt.Sprintf(` AND %s = ? `, indexOb.Cols().AiPredictModelIndexConfigId), []interface{}{item.AiPredictModelIndexConfigId})
 		if e != nil {
 			br.Msg = "删除失败"
@@ -267,7 +267,7 @@ func (c *AiPredictModelIndexConfigController) Del() {
 		}
 	}
 
-	if !utils.InArrayByStr([]string{data_manage.TrainStatusSuccess, data_manage.TrainStatusFailed}, item.TrainStatus) {
+	if !utils.InArrayByStr([]string{aiPredictModel.TrainStatusSuccess, aiPredictModel.TrainStatusFailed}, item.TrainStatus) {
 		br.Msg = "删除失败,该版本配置正在训练中"
 		br.IsSendEmail = false
 		return
@@ -291,7 +291,7 @@ func (c *AiPredictModelIndexConfigController) Del() {
 // @Title 获取当前版本的图表信息
 // @Description 获取当前版本的图表信息
 // @Param   AiPredictModelIndexConfigId   query   int   true   "标的配置ID"
-// @Success 200 {object} []*data_manage.AiPredictModelIndexConfigView
+// @Success 200 {object} []*response.AiPredictModelDetailResp
 // @router /index_config/chart/detail [get]
 func (c *AiPredictModelIndexConfigController) ChartDetail() {
 	br := new(models.BaseResponse).Init()
@@ -318,7 +318,7 @@ func (c *AiPredictModelIndexConfigController) ChartDetail() {
 	// TODO 后面加上数据缓存
 
 	// 查找配置
-	obj := new(data_manage.AiPredictModelIndexConfig)
+	obj := new(aiPredictModel.AiPredictModelIndexConfig)
 	configItem, err := obj.GetById(indexConfigId)
 	if err != nil {
 		br.Msg = "修改失败"
@@ -332,7 +332,7 @@ func (c *AiPredictModelIndexConfigController) ChartDetail() {
 
 	// 查找是否被标的引用为默认模型
 	// 查询标的情况
-	indexOb := new(data_manage.AiPredictModelIndex)
+	indexOb := new(aiPredictModel.AiPredictModelIndex)
 	indexItem, e := indexOb.GetItemByConfigId(configItem.AiPredictModelIndexConfigId)
 	if e != nil {
 		br.Msg = "获取失败"
@@ -341,13 +341,13 @@ func (c *AiPredictModelIndexConfigController) ChartDetail() {
 	}
 
 	// 获取标的数据
-	dailyData := make([]*data_manage.AiPredictModelIndexConfigTrainData, 0)
+	dailyData := make([]*aiPredictModel.AiPredictModelIndexConfigTrainData, 0)
 	{
-		dataOb := new(data_manage.AiPredictModelIndexConfigTrainData)
-		dataCond := fmt.Sprintf(` AND %s = ?`, data_manage.AiPredictModelIndexConfigTrainDataColumns.AiPredictModelIndexConfigId)
+		dataOb := new(aiPredictModel.AiPredictModelIndexConfigTrainData)
+		dataCond := fmt.Sprintf(` AND %s = ?`, aiPredictModel.AiPredictModelIndexConfigTrainDataColumns.AiPredictModelIndexConfigId)
 		dataPars := make([]interface{}, 0)
 		dataPars = append(dataPars, configItem.AiPredictModelIndexConfigId)
-		list, e := dataOb.GetAllListByCondition(dataCond, dataPars, []string{}, fmt.Sprintf("%s DESC", data_manage.AiPredictModelIndexConfigTrainDataColumns.DataTime))
+		list, e := dataOb.GetAllListByCondition(dataCond, dataPars, []string{}, fmt.Sprintf("%s DESC", aiPredictModel.AiPredictModelIndexConfigTrainDataColumns.DataTime))
 		if e != nil {
 			br.Msg = "获取失败"
 			br.ErrMsg = fmt.Sprintf("获取标的数据失败, %v", e)
@@ -356,7 +356,7 @@ func (c *AiPredictModelIndexConfigController) ChartDetail() {
 
 		for _, v := range list {
 			// 日度数据
-			if v.Source == data_manage.ModelDataSourceDaily {
+			if v.Source == aiPredictModel.ModelDataSourceDaily {
 				dailyData = append(dailyData, v)
 				continue
 			}
@@ -365,7 +365,7 @@ func (c *AiPredictModelIndexConfigController) ChartDetail() {
 
 	// 日度图表
 	if len(dailyData) > 0 {
-		dailyChartDetail, e := services.GetAiPredictConfigChartDetailByData(indexItem.IndexName, configItem, dailyData, data_manage.ModelDataSourceDaily)
+		dailyChartDetail, e := services.GetAiPredictConfigChartDetailByData(indexItem.IndexName, configItem, dailyData, aiPredictModel.ModelDataSourceDaily)
 		if e != nil {
 			br.Msg = "获取失败"
 			br.ErrMsg = fmt.Sprintf("获取日度图表失败, %v", e)
@@ -413,7 +413,7 @@ func (c *AiPredictModelIndexConfigController) Train() {
 	}
 
 	// 查询标的情况
-	indexOb := new(data_manage.AiPredictModelIndex)
+	indexOb := new(aiPredictModel.AiPredictModelIndex)
 	indexItem, err := indexOb.GetItemById(req.IndexId)
 	if err != nil {
 		br.Msg = "训练失败,查找标的失败"
@@ -430,7 +430,7 @@ func (c *AiPredictModelIndexConfigController) Train() {
 		return
 	}
 
-	obj := new(data_manage.AiPredictModelIndexConfig)
+	obj := new(aiPredictModel.AiPredictModelIndexConfig)
 
 	// 查找当前标的是否存在待训练/训练中的模型
 	count, err := services.GetCurrentRunningAiPredictModelIndexCount()
@@ -445,6 +445,8 @@ func (c *AiPredictModelIndexConfigController) Train() {
 		return
 	}
 
+	var indexConfig *aiPredictModel.AiPredictModelIndexConfig
+
 	if req.AiPredictModelIndexConfigId > 0 {
 		// 查找配置
 		item, err := obj.GetById(req.AiPredictModelIndexConfigId)
@@ -464,7 +466,7 @@ func (c *AiPredictModelIndexConfigController) Train() {
 			return
 		}
 
-		if item.TrainStatus != data_manage.TrainStatusFailed {
+		if item.TrainStatus != aiPredictModel.TrainStatusFailed {
 			br.Msg = "该模型训练状态异常,不允许重新训练"
 			br.ErrMsg = "该模型训练状态异常,不允许重新训练,当前状态:" + item.TrainStatus
 			br.IsSendEmail = false
@@ -479,12 +481,14 @@ func (c *AiPredictModelIndexConfigController) Train() {
 			return
 		}
 
+		indexConfig = item
+
 	} else {
 		// 新增训练模型
-		item := &data_manage.AiPredictModelIndexConfig{
+		item := &aiPredictModel.AiPredictModelIndexConfig{
 			AiPredictModelIndexConfigId: 0,
 			AiPredictModelIndexId:       indexItem.AiPredictModelIndexId,
-			TrainStatus:                 data_manage.TrainStatusWaiting,
+			TrainStatus:                 aiPredictModel.TrainStatusWaiting,
 			Params:                      string(paramsStrByte),
 			TrainMse:                    "",
 			TrainR2:                     "",
@@ -503,9 +507,11 @@ func (c *AiPredictModelIndexConfigController) Train() {
 			br.ErrMsg = "训练失败,Err:" + err.Error()
 			return
 		}
+
+		indexConfig = item
 	}
 
-	indexItem.TrainStatus = data_manage.TrainStatusWaiting
+	indexItem.TrainStatus = aiPredictModel.TrainStatusWaiting
 	indexItem.ModifyTime = time.Now()
 	err = indexItem.Update([]string{"train_status", "modify_time"})
 	if err != nil {
@@ -513,7 +519,9 @@ func (c *AiPredictModelIndexConfigController) Train() {
 		br.ErrMsg = "训练失败,Err:" + err.Error()
 		return
 	}
-	// TODO 加入训练任务中
+
+	// 加入模型训练任务中
+	go services.AddAiModelTrainTask(indexItem, indexConfig, c.SysUser)
 
 	br.Ret = 200
 	br.Success = true

+ 21 - 4
models/ai_predict_model/ai_predict_model_index.go

@@ -13,10 +13,10 @@ import (
 
 // 训练状态
 const (
-	RunStatusWaiting  = "待运行"
-	RunStatusTraining = "运行中"
-	RunStatusSuccess  = "运行成功"
-	RunStatusFailed   = "运行失败"
+	RunStatusWaiting = "待运行"
+	RunStatusRunning = "运行中"
+	RunStatusSuccess = "运行成功"
+	RunStatusFailed  = "运行失败"
 )
 
 // AiPredictModelIndex AI预测模型标的
@@ -510,3 +510,20 @@ func (m *AiPredictModelIndex) GetSortMax() (sort int, err error) {
 	}
 	return
 }
+
+// UpdateRunStatusByIdList
+// @Description: 通过标的ID列表更新运行状态
+// @author: Roc
+// @receiver m
+// @datetime 2025-05-08 14:44:15
+// @param indexIdList []int
+// @return err error
+func (m *AiPredictModelIndex) UpdateRunStatusByIdList(indexIdList []int) (err error) {
+	if len(indexIdList) <= 0 {
+		return
+	}
+	sql := ` UPDATE ai_predict_model_index SET run_status = ? WHERE ai_predict_model_index_id in (?)`
+	err = global.DbMap[utils.DbNameIndex].Exec(sql, RunStatusWaiting, indexIdList).Error
+
+	return
+}

+ 9 - 0
models/ai_predict_model/request/index.go

@@ -4,3 +4,12 @@ type AiPredictModelIndexSaveScriptPathReq struct {
 	IndexId    int    `description:"指标ID"`
 	ScriptPath string `description:"脚本的路径"`
 }
+
+// AiPredictModelIndexRunReq AI预测模型运行请求参数
+type AiPredictModelIndexRunReq struct {
+	ClassifyId     int    `description:"分类ID"`
+	Keyword        string `description:"关键词-指标ID/指标名称"`
+	IndexIdList    []int  `description:"选中的模型ID列表,SelectAll-false时,会用到这个字段"`
+	NotIndexIdList []int  `description:"排除的模型ID列表,SelectAll-true时,会用到这个字段"`
+	SelectAll      bool   `description:"列表全选"`
+}

+ 155 - 0
models/data_manage/index_task.go

@@ -0,0 +1,155 @@
+package data_manage
+
+import (
+	"database/sql"
+	"eta/eta_api/global"
+	"eta/eta_api/utils"
+	"fmt"
+	"time"
+)
+
+// IndexTask index这边的任务表
+type IndexTask struct {
+	IndexTaskID     int       `gorm:"primaryKey;column:index_task_id" description:"-"`
+	TaskName        string    `gorm:"column:task_name" description:"任务名称"`
+	TaskType        string    `gorm:"column:task_type" description:"任务类型"`
+	Status          string    `gorm:"column:status" description:"任务状态,枚举值:待处理,处理成功,处理失败,暂停处理"`
+	StartTime       time.Time `gorm:"column:start_time" description:"开始时间"`
+	EndTime         time.Time `gorm:"column:end_time" description:"结束时间"`
+	CreateTime      time.Time `gorm:"column:create_time" description:"创建时间"`
+	UpdateTime      time.Time `gorm:"column:update_time" description:"更新时间"`
+	Logs            string    `gorm:"column:logs" description:"日志"`
+	Errormessage    string    `gorm:"column:ErrorMessage" description:"错误信息"`
+	Priority        int       `gorm:"column:priority" description:"优先级"`
+	RetryCount      int       `gorm:"column:retry_count" description:"重试次数"`
+	Remark          string    `gorm:"column:remark" description:"备注"`
+	SysUserID       int       `gorm:"column:sys_user_id" description:"任务创建人id"`
+	SysUserRealName string    `gorm:"column:sys_user_real_name" description:"任务创建人名称"`
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *IndexTask) TableName() string {
+	return "index_task"
+}
+
+// IndexTaskColumns get sql column name.获取数据库列名
+var IndexTaskColumns = struct {
+	IndexTaskID     string
+	TaskName        string
+	TaskType        string
+	Status          string
+	StartTime       string
+	EndTime         string
+	CreateTime      string
+	UpdateTime      string
+	Logs            string
+	Errormessage    string
+	Priority        string
+	RetryCount      string
+	Remark          string
+	SysUserID       string
+	SysUserRealName string
+}{
+	IndexTaskID:     "index_task_id",
+	TaskName:        "task_name",
+	TaskType:        "task_type",
+	Status:          "status",
+	StartTime:       "start_time",
+	EndTime:         "end_time",
+	CreateTime:      "create_time",
+	UpdateTime:      "update_time",
+	Logs:            "logs",
+	Errormessage:    "ErrorMessage",
+	Priority:        "priority",
+	RetryCount:      "retry_count",
+	Remark:          "remark",
+	SysUserID:       "sys_user_id",
+	SysUserRealName: "sys_user_real_name",
+}
+
+func (m *IndexTask) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *IndexTask) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *IndexTask) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *IndexTask) GetByID(id int) (item *IndexTask, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", IndexTaskColumns.IndexTaskID), id).First(&item).Error
+
+	return
+}
+
+func (m *IndexTask) GetByCondition(condition string, pars []interface{}) (item *IndexTask, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *IndexTask) GetListByCondition(field, condition string, pars []interface{}, startSize, pageSize int) (items []*IndexTask, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by index_task_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *IndexTask) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+// AddIndexTask
+// @Description: 添加Index模块的任务
+// @author: Roc
+// @datetime 2025-04-16 16:55:36
+// @param indexTask *IndexTask
+// @param indexRecordList []*IndexTaskRecord
+// @return err error
+func AddIndexTask(indexTask *IndexTask, indexRecordList []*IndexTaskRecord) (err error) {
+	to := global.DbMap[utils.DbNameAI].Begin()
+	defer func() {
+		if err != nil {
+			_ = to.Rollback()
+		} else {
+			_ = to.Commit()
+		}
+	}()
+
+	err = to.Create(indexTask).Error
+	if err != nil {
+		return
+	}
+
+	for _, indexTaskRecord := range indexRecordList {
+		indexTaskRecord.IndexTaskID = indexTask.IndexTaskID
+	}
+
+	err = to.CreateInBatches(indexRecordList, utils.MultiAddNum).Error
+	if err != nil {
+		return
+	}
+
+	return
+}

+ 115 - 0
models/data_manage/index_task_record.go

@@ -0,0 +1,115 @@
+package data_manage
+
+import (
+	"database/sql"
+	"eta/eta_api/global"
+	"eta/eta_api/utils"
+	"fmt"
+	"time"
+)
+
+// IndexTaskRecord AI任务的子记录
+type IndexTaskRecord struct {
+	IndexTaskRecordID int       `gorm:"primaryKey;column:index_task_record_id" json:"-"` // 任务记录id
+	IndexTaskID       int       `gorm:"column:index_task_id" json:"indexTaskId"`         // 任务id
+	Parameters        string    `gorm:"column:parameters" json:"parameters"`             // 子任务参数
+	Status            string    `gorm:"column:status" json:"status"`                     // 状态
+	Remark            string    `gorm:"column:remark" json:"remark"`                     // 备注
+	ModifyTime        time.Time `gorm:"column:modify_time" json:"modifyTime"`            // 最后一次修改时间
+	CreateTime        time.Time `gorm:"column:create_time" json:"createTime"`            // 任务创建时间
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *IndexTaskRecord) TableName() string {
+	return "index_task_record"
+}
+
+// IndexTaskRecordColumns get sql column name.获取数据库列名
+var IndexTaskRecordColumns = struct {
+	IndexTaskRecordID string
+	IndexTaskID       string
+	Parameters        string
+	Status            string
+	Remark            string
+	ModifyTime        string
+	CreateTime        string
+}{
+	IndexTaskRecordID: "index_task_record_id",
+	IndexTaskID:       "index_task_id",
+	Parameters:        "parameters",
+	Status:            "status",
+	Remark:            "remark",
+	ModifyTime:        "modify_time",
+	CreateTime:        "create_time",
+}
+
+func (m *IndexTaskRecord) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) GetByID(id int) (item *IndexTaskRecord, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", IndexTaskRecordColumns.IndexTaskRecordID), id).First(&item).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) GetByCondition(condition string, pars []interface{}) (item *IndexTaskRecord, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) GetAllListByCondition(field, condition string, pars []interface{}) (items []*IndexTaskRecord, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by index_task_record_id desc `, field, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) GetListByCondition(field, condition string, pars []interface{}, startSize, pageSize int) (items []*IndexTaskRecord, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by index_task_record_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *IndexTaskRecord) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+// QuestionGenerateAbstractParam
+// @Description:
+type QuestionGenerateAbstractParam struct {
+	QuestionId  int    `json:"questionId"`
+	ArticleType string `json:"articleType"`
+	ArticleId   int    `json:"articleId"`
+}

+ 9 - 0
routers/commentsRouter.go

@@ -466,6 +466,15 @@ func init() {
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_api/controllers/data_manage/ai_predict_model:AiPredictModelIndexController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/data_manage/ai_predict_model:AiPredictModelIndexController"],
+        beego.ControllerComments{
+            Method: "Run",
+            Router: `/index/run`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_api/controllers/data_manage/ai_predict_model:AiPredictModelIndexController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/data_manage/ai_predict_model:AiPredictModelIndexController"],
         beego.ControllerComments{
             Method: "GetCurrentRunningAiPredictModelIndexCount",

+ 1 - 1
services/ai_predict_model_index.go

@@ -651,6 +651,6 @@ func GetAiPredictConfigChartDetailByData(indexName string, indexConfigItem *aiPr
 // @return err error
 func GetCurrentRunningAiPredictModelIndexCount() (total int, err error) {
 	obj := new(aiPredictModel.AiPredictModelIndex)
-	total, err = obj.GetCountByCondition(" AND (train_status in (?) OR run_status in (?) ) ", []interface{}{[]string{aiPredictModel.TrainStatusTraining, aiPredictModel.TrainStatusWaiting}, []string{aiPredictModel.RunStatusTraining, aiPredictModel.RunStatusWaiting}})
+	total, err = obj.GetCountByCondition(" AND (train_status in (?) OR run_status in (?) ) ", []interface{}{[]string{aiPredictModel.TrainStatusTraining, aiPredictModel.TrainStatusWaiting}, []string{aiPredictModel.RunStatusRunning, aiPredictModel.RunStatusWaiting}})
 	return
 }

+ 153 - 0
services/ai_predict_model_index_config.go

@@ -0,0 +1,153 @@
+package services
+
+import (
+	"eta/eta_api/cache"
+	predictModel "eta/eta_api/models/ai_predict_model"
+	"eta/eta_api/models/data_manage"
+	"eta/eta_api/models/system"
+	"eta/eta_api/utils"
+	"fmt"
+	"time"
+)
+
+// AddAiModelTrainTask
+// @Description: 添加模型训练任务
+// @author: Roc
+// @datetime 2025-05-08 14:28:13
+// @param aiIndex *predictModel.AiPredictModelIndex
+// @param indexConfig *predictModel.AiPredictModelIndexConfig
+// @param sysUser *system.Admin
+func AddAiModelTrainTask(aiIndex *predictModel.AiPredictModelIndex, indexConfig *predictModel.AiPredictModelIndexConfig, sysUser *system.Admin) {
+	var err error
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error(fmt.Sprintf("AddAiModelTrainTask error: %s", err.Error()))
+		}
+	}()
+	taskName := fmt.Sprintf("《%s》模型训练-%s", aiIndex.IndexName, time.Now().Format(utils.FormatShortDateTimeUnSpace))
+
+	indexTask := &data_manage.IndexTask{
+		IndexTaskID: 0,
+		TaskName:    taskName,
+		TaskType:    utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN,
+		Status:      "待处理",
+		//StartTime:               time.Time{},
+		//EndTime:                 time.Time{},
+		CreateTime:      time.Now(),
+		UpdateTime:      time.Now(),
+		Logs:            "",
+		Errormessage:    "",
+		Priority:        0,
+		RetryCount:      0,
+		Remark:          "",
+		SysUserID:       sysUser.AdminId,
+		SysUserRealName: sysUser.RealName,
+	}
+
+	taskRecordList := make([]*data_manage.IndexTaskRecord, 0)
+
+	taskRecord := &data_manage.IndexTaskRecord{
+		IndexTaskRecordID: 0,
+		IndexTaskID:       0,
+		Parameters:        fmt.Sprint(indexConfig.AiPredictModelIndexConfigId),
+		Status:            "待处理",
+		Remark:            "",
+		ModifyTime:        time.Now(),
+		CreateTime:        time.Now(),
+	}
+	taskRecordList = append(taskRecordList, taskRecord)
+
+	// 创建AI模块的任务,用于后面的任务调度去生成摘要
+	err = data_manage.AddIndexTask(indexTask, taskRecordList)
+	if err != nil {
+		return
+	}
+
+	// 添加到缓存队列中
+	go addIndexTaskToCache(indexTask.IndexTaskID, indexTask.TaskType)
+
+	return
+}
+
+// AddAiModelRunTask
+// @Description: 添加模型运行任务
+// @author: Roc
+// @datetime 2025-05-08 14:33:38
+// @param aiModelIndexIdList []int 标的模型ID列表
+// @param sysUser *system.Admin
+func AddAiModelRunTask(aiModelIndexIdList []int, sysUser *system.Admin) {
+	var err error
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error(fmt.Sprintf("AddAiModelTrainTask error: %s", err.Error()))
+		}
+	}()
+	taskName := fmt.Sprintf("模型运行-%s", time.Now().Format(utils.FormatShortDateTimeUnSpace))
+
+	indexTask := &data_manage.IndexTask{
+		IndexTaskID: 0,
+		TaskName:    taskName,
+		TaskType:    utils.INDEX_TASK_TYPE_AI_MODEL_RUN,
+		Status:      "待处理",
+		//StartTime:               time.Time{},
+		//EndTime:                 time.Time{},
+		CreateTime:      time.Now(),
+		UpdateTime:      time.Now(),
+		Logs:            "",
+		Errormessage:    "",
+		Priority:        0,
+		RetryCount:      0,
+		Remark:          "",
+		SysUserID:       sysUser.AdminId,
+		SysUserRealName: sysUser.RealName,
+	}
+
+	taskRecordList := make([]*data_manage.IndexTaskRecord, 0)
+
+	for _, aiModelIndexId := range aiModelIndexIdList {
+		taskRecord := &data_manage.IndexTaskRecord{
+			IndexTaskRecordID: 0,
+			IndexTaskID:       0,
+			Parameters:        fmt.Sprint(aiModelIndexId),
+			Status:            "待处理",
+			Remark:            "",
+			ModifyTime:        time.Now(),
+			CreateTime:        time.Now(),
+		}
+		taskRecordList = append(taskRecordList, taskRecord)
+	}
+
+	// 创建AI模块的任务,用于后面的任务调度去生成摘要
+	err = data_manage.AddIndexTask(indexTask, taskRecordList)
+	if err != nil {
+		return
+	}
+
+	// 添加到缓存队列中
+	go addIndexTaskToCache(indexTask.IndexTaskID, indexTask.TaskType)
+
+	return
+}
+
+// addIndexTaskToCache
+// @Description: 根据任务ID将待处理的任务丢入到list中
+// @author: Roc
+// @datetime 2025-05-08 14:30:05
+// @param indexTaskId int
+// @param taskType string
+func addIndexTaskToCache(indexTaskId int, taskType string) {
+	var err error
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("addTaskToCache error: %v", err)
+		}
+	}()
+	obj := data_manage.IndexTaskRecord{}
+	list, err := obj.GetAllListByCondition(" index_task_record_id ", ` AND index_task_id = ? AND status = ? `, []interface{}{indexTaskId, `待处理`})
+	if err != nil {
+		return
+	}
+	for _, item := range list {
+		cache.AddIndexTaskRecordOpToCache(item.IndexTaskRecordID, taskType)
+	}
+}

+ 33 - 1
services/elastic/elastic.go

@@ -10,6 +10,7 @@ import (
 	dataSourceModel "eta/eta_api/models/data_source"
 	"eta/eta_api/utils"
 	"fmt"
+	"io"
 	"strconv"
 	"strings"
 
@@ -2354,7 +2355,7 @@ func SearchDataSourceIndexWithScroll(indexName, keyword string, source, subSourc
 }
 
 // ScrollDataSourceIndex 使用scroll API获取下一页数据
-func ScrollDataSourceIndex(indexName, scrollId string) (nextScrollId string, list []*dataSourceModel.SearchDataSourceItem, err error) {
+func ScrollDataSourceIndex(scrollId string) (nextScrollId string, list []*dataSourceModel.SearchDataSourceItem, err error) {
 	list = make([]*dataSourceModel.SearchDataSourceItem, 0)
 	defer func() {
 		if err != nil {
@@ -2368,6 +2369,10 @@ func ScrollDataSourceIndex(indexName, scrollId string) (nextScrollId string, lis
 	request := client.Scroll().ScrollId(scrollId).Scroll("5m")
 	searchResp, e := request.Do(context.Background())
 	if e != nil {
+		if errors.Is(e, io.EOF) {
+			// 结束了,没有更多的数据了
+			return
+		}
 		err = fmt.Errorf("scroll do err: %v", e)
 		return
 	}
@@ -2379,6 +2384,8 @@ func ScrollDataSourceIndex(indexName, scrollId string) (nextScrollId string, lis
 		return
 	}
 
+	//理论上 scrollId 在整个 Scroll 生命周期中是固定不变的。
+	//不过为了代码可读性和未来可能的扩展性,仍保留 nextScrollId = searchResp.ScrollId 这一行代码,这样即使逻辑迁移或改动,也不会出错。
 	nextScrollId = searchResp.ScrollId
 	searchMap := make(map[string]string)
 	for _, v := range searchResp.Hits.Hits {
@@ -2406,3 +2413,28 @@ func ScrollDataSourceIndex(indexName, scrollId string) (nextScrollId string, lis
 	}
 	return
 }
+
+// ClearScrollDataSourceIndex
+// @Description: 清除scroll查询
+// @author: Roc
+// @datetime 2025-05-08 10:17:01
+// @param scrollId string
+// @return nextScrollId string
+// @return list []*dataSourceModel.SearchDataSourceItem
+// @return err error
+func ClearScrollDataSourceIndex(scrollId string) (nextScrollId string, list []*dataSourceModel.SearchDataSourceItem, err error) {
+	list = make([]*dataSourceModel.SearchDataSourceItem, 0)
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("ScrollDataSourceIndex err: %v", err)
+			utils.FileLog.Info(tips)
+		}
+	}()
+	client := utils.EsClient
+
+	// 使用scroll API获取下一页
+	request := client.ClearScroll().ScrollId(scrollId)
+	_, err = request.Do(context.Background())
+
+	return
+}

+ 6 - 0
utils/constants.go

@@ -279,6 +279,7 @@ const (
 	CACHE_REPORT_SHARE_AUTH                 = "eta:report:auth:share:"                //报告短链与报告图表授权映射key
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
 	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"          //巡检消息队列
+	CACHE_INDEX_TASK                        = "eta:index:task:op:"                    // 指标库的任务调度缓存
 )
 
 // 模板消息推送类型
@@ -618,3 +619,8 @@ const (
 	AI_ARTICLE_SOURCE_WECHAT     = 0 // AI文章来源(微信公众号)
 	AI_ARTICLE_SOURCE_ETA_REPORT = 1 // AI文章来源(ETA报告)
 )
+
+const (
+	INDEX_TASK_TYPE_AI_MODEL_TRAIN = `ai_predict_model_train`
+	INDEX_TASK_TYPE_AI_MODEL_RUN   = `ai_predict_model_run`
+)