package ai_predict_model import ( "encoding/json" "errors" "eta/eta_index_lib/controllers" aiPredictModelLogic "eta/eta_index_lib/logic/ai_predict_model" "eta/eta_index_lib/models" "eta/eta_index_lib/models/ai_predict_model" "eta/eta_index_lib/models/ai_predict_model/request" "eta/eta_index_lib/models/ai_predict_model/response" "eta/eta_index_lib/utils" "fmt" "github.com/go-redis/redis/v8" "strconv" ) // AiPredictModelIndexController AI预测模型标的 type AiPredictModelIndexController struct { controllers.BaseAuthController } type IndexTaskRecordOp struct { IndexTaskRecordId int TaskType string } // OpToDo // @Title 获取待操作的标的 // @Description 获取待操作的标的 // @Success 200 {object} response.AiPredictModelIndexConfigResp // @router /op_todo [post] func (this *AiPredictModelIndexController) OpToDo() { br := new(models.BaseResponse).Init() defer func() { this.Data["json"] = br this.ServeJSON() }() resp := response.AiPredictModelIndexConfigResp{} val, err := utils.Rc.BrpopVal(utils.CACHE_INDEX_TASK) if err != nil { if errors.Is(err, redis.Nil) { br.Ret = 200 br.Success = true br.Msg = "获取成功" return } br.Msg = "获取失败" br.ErrMsg = `从redis中获取数据失败,Err:` + err.Error() return } indexTaskRecordOp := new(IndexTaskRecordOp) if err = json.Unmarshal([]byte(val), &indexTaskRecordOp); err != nil { fmt.Println("json unmarshal wrong!") return } indexTaskRecordObj := new(models.IndexTaskRecord) indexTaskRecordInfo, err := indexTaskRecordObj.GetByID(indexTaskRecordOp.IndexTaskRecordId) if err != nil { fmt.Println("get index task record info wrong!") br.Msg = "获取失败" return } if indexTaskRecordInfo.Status != `待处理` { fmt.Println("任务状态不是待处理!") br.Msg = "任务状态不是待处理" return } var indexConfigItem *ai_predict_model.AiPredictModelIndexConfig var indexItem *ai_predict_model.AiPredictModelIndex resp.IndexTaskRecordId = indexTaskRecordInfo.IndexTaskRecordID defer func() { // 获取完成任务后,需要更新任务状态 if resp.AiPredictModelIndexId <= 0 { // 如果获取失败了,那么就标记失败 go aiPredictModelLogic.HandleTaskRecordFailByTaskRecord(indexTaskRecordOp.TaskType, indexTaskRecordInfo, indexConfigItem, indexItem, br.Msg) } else { // 如果获取成功了,那么就标记进行中 go aiPredictModelLogic.HandleTaskRecordProcessingByTaskRecord(indexTaskRecordOp.TaskType, indexTaskRecordInfo, indexConfigItem, indexItem) } }() indexConfigObj := new(ai_predict_model.AiPredictModelIndexConfig) indexOb := new(ai_predict_model.AiPredictModelIndex) switch indexTaskRecordOp.TaskType { case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型 indexConfigId, err := strconv.Atoi(indexTaskRecordInfo.Parameters) // 模型配置ID if err != nil { fmt.Println("模型配置ID转换错误!") br.Msg = "模型配置ID转换错误" br.ErrMsg = "模型配置ID转换错误,err:" + err.Error() return } // 查找配置 indexConfigItem, err = indexConfigObj.GetById(indexConfigId) if err != nil { br.Msg = "获取模型配置失败" br.ErrMsg = "获取模型配置失败,查找配置失败,Err:" + err.Error() if utils.IsErrNoRow(err) { br.Msg = "配置不存在" br.IsSendEmail = false } return } // 查询标的情况 indexItem, err = indexOb.GetItemById(indexConfigItem.AiPredictModelIndexId) if err != nil { br.Msg = "训练失败,查找标的失败" br.ErrMsg = fmt.Sprintf("训练失败,查找标的失败, %v", err) if utils.IsErrNoRow(err) { br.Msg = "标的不存在" br.IsSendEmail = false } return } case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型 // 标的id转换 indexId, err := strconv.Atoi(indexTaskRecordInfo.Parameters) if err != nil { fmt.Println("标的ID转换错误!") br.Msg = "标的ID转换错误" br.ErrMsg = "标的ID转换错误,err:" + err.Error() return } // 查询标的情况 indexItem, err = indexOb.GetItemById(indexId) if err != nil { br.Msg = "训练失败,查找标的失败" br.ErrMsg = fmt.Sprintf("训练失败,查找标的失败, %v", err) if utils.IsErrNoRow(err) { br.Msg = "标的不存在" br.IsSendEmail = false } return } // 查找配置 indexConfigItem, err = indexConfigObj.GetById(indexItem.AiPredictModelIndexConfigId) if err != nil { br.Msg = "获取模型配置失败" br.ErrMsg = "获取模型配置失败,查找配置失败,Err:" + err.Error() if utils.IsErrNoRow(err) { br.Msg = "配置不存在" br.IsSendEmail = false } return } default: br.Msg = "异常的任务类型" br.ErrMsg = "异常的任务类型,Err:" + indexTaskRecordOp.TaskType return } if indexItem.ScriptPath == `` { br.Msg = `没有配置脚本路径` br.ErrMsg = `没有配置脚本路径` return } var configParams response.ConfigParams if e := json.Unmarshal([]byte(indexConfigItem.Params), &configParams); e != nil { br.Msg = "获取模型配置失败" br.ErrMsg = "获取模型配置失败,解析配置失败,Err:" + e.Error() return } resp.AiPredictModelIndexId = indexConfigItem.AiPredictModelIndexId resp.AiPredictModelIndexConfigId = indexConfigItem.AiPredictModelIndexConfigId resp.ConfigParams = configParams resp.ExecType = indexTaskRecordOp.TaskType resp.ScriptPath = indexItem.ScriptPath br.Data = resp br.Ret = 200 br.Success = true br.Msg = "获取成功" } // HandleTaskRecordFailByTaskRecord // @Title 任务操作失败 // @Description 任务操作失败 // @Success 200 {object} response.HandleTaskRecordFailReq // @router /handle/fail [post] func (this *AiPredictModelIndexController) HandleTaskRecordFailByTaskRecord() { br := new(models.BaseResponse).Init() defer func() { this.Data["json"] = br this.ServeJSON() }() var req request.HandleTaskRecordFailReq err := json.Unmarshal(this.Ctx.Input.RequestBody, &req) if err != nil { br.Msg = "参数解析异常!" br.ErrMsg = "参数解析失败,Err:" + err.Error() return } indexTaskRecordObj := new(models.IndexTaskRecord) indexTaskRecordInfo, err := indexTaskRecordObj.GetByID(req.IndexTaskRecordId) if err != nil { fmt.Println("get index task record info wrong!") br.Msg = "获取失败" return } if indexTaskRecordInfo.Status != `处理中` { fmt.Println("任务状态不是处理中!") br.Msg = "任务状态不是处理中" return } indexTaskObj := models.IndexTask{} indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID) if tmpErr != nil { err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error()) return } var indexConfigItem *ai_predict_model.AiPredictModelIndexConfig var indexItem *ai_predict_model.AiPredictModelIndex indexConfigObj := new(ai_predict_model.AiPredictModelIndexConfig) indexOb := new(ai_predict_model.AiPredictModelIndex) switch indexTaskInfo.TaskType { case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型 indexConfigId, err := strconv.Atoi(indexTaskRecordInfo.Parameters) // 模型配置ID if err != nil { fmt.Println("模型配置ID转换错误!") br.Msg = "模型配置ID转换错误" br.ErrMsg = "模型配置ID转换错误,err:" + err.Error() return } // 查找配置 indexConfigItem, err = indexConfigObj.GetById(indexConfigId) if err != nil { br.Msg = "获取模型配置失败" br.ErrMsg = "获取模型配置失败,查找配置失败,Err:" + err.Error() if utils.IsErrNoRow(err) { br.Msg = "配置不存在" br.IsSendEmail = false } return } // 查询标的情况 indexItem, err = indexOb.GetItemById(indexConfigItem.AiPredictModelIndexId) if err != nil { br.Msg = "训练失败,查找标的失败" br.ErrMsg = fmt.Sprintf("训练失败,查找标的失败, %v", err) if utils.IsErrNoRow(err) { br.Msg = "标的不存在" br.IsSendEmail = false } return } case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型 // 标的id转换 indexId, err := strconv.Atoi(indexTaskRecordInfo.Parameters) if err != nil { fmt.Println("标的ID转换错误!") br.Msg = "标的ID转换错误" br.ErrMsg = "标的ID转换错误,err:" + err.Error() return } // 查询标的情况 indexItem, err = indexOb.GetItemById(indexId) if err != nil { br.Msg = "训练失败,查找标的失败" br.ErrMsg = fmt.Sprintf("训练失败,查找标的失败, %v", err) if utils.IsErrNoRow(err) { br.Msg = "标的不存在" br.IsSendEmail = false } return } // 查找配置 indexConfigItem, err = indexConfigObj.GetById(indexItem.AiPredictModelIndexConfigId) if err != nil { br.Msg = "获取模型配置失败" br.ErrMsg = "获取模型配置失败,查找配置失败,Err:" + err.Error() if utils.IsErrNoRow(err) { br.Msg = "配置不存在" br.IsSendEmail = false } return } } // 标记处理任务失败 aiPredictModelLogic.HandleTaskRecordFailByTaskRecord(indexTaskInfo.TaskType, indexTaskRecordInfo, indexConfigItem, indexItem, req.FailMsg) br.Ret = 200 br.Success = true br.Msg = "处理成功" } // HandleTaskRecordSuccessByTaskRecord // @Title 任务操作成功 // @Description 任务操作成功 // @Success 200 {object} response.HandleTaskRecordFailReq // @router /handle/success [post] func (this *AiPredictModelIndexController) HandleTaskRecordSuccessByTaskRecord() { br := new(models.BaseResponse).Init() defer func() { this.Data["json"] = br this.ServeJSON() }() var req request.HandleTaskRecordSuccessReq err := json.Unmarshal(this.Ctx.Input.RequestBody, &req) if err != nil { br.Msg = "参数解析异常!" br.ErrMsg = "参数解析失败,Err:" + err.Error() return } indexTaskRecordObj := new(models.IndexTaskRecord) indexTaskRecordInfo, err := indexTaskRecordObj.GetByID(req.IndexTaskRecordId) if err != nil { fmt.Println("get index task record info wrong!") br.Msg = "获取失败" return } if indexTaskRecordInfo.Status != `处理中` { fmt.Println("任务状态不是处理中!") br.Msg = "任务状态不是处理中" return } indexTaskObj := models.IndexTask{} indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID) if tmpErr != nil { err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error()) return } // 标记处理任务成功 aiPredictModelLogic.HandleTaskRecordSuccessByTaskRecord(indexTaskInfo.TaskType, indexTaskRecordInfo, req.Data) br.Ret = 200 br.Success = true br.Msg = "处理成功" }