package ai_predict_model import ( "encoding/json" "eta/eta_index_lib/models" aiPredictModel "eta/eta_index_lib/models/ai_predict_model" "eta/eta_index_lib/models/ai_predict_model/request" "eta/eta_index_lib/utils" "fmt" "strconv" "time" ) // HandleTaskRecordFailByTaskRecord // @Description: 任务标记失败 // @author: Roc // @datetime 2025-05-09 16:24:48 // @param taskType string // @param indexTaskRecordInfo *models.IndexTaskRecord // @param indexConfigItem *ai_predict_model.AiPredictModelIndexConfig // @param indexItem *ai_predict_model.AiPredictModelIndex // @param errMsg string func HandleTaskRecordFailByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex, errMsg string) { var err error defer func() { if err != nil { utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err)) } }() // 修改子任务状态 indexTaskRecordInfo.Status = `处理失败` indexTaskRecordInfo.Remark = errMsg indexTaskRecordInfo.ModifyTime = time.Now() err = indexTaskRecordInfo.Update([]string{"status", "remark", "modify_time"}) if err != nil { fmt.Println("修改子任务状态失败!") return } // 处理完成后标记任务状态 defer func() { obj := models.IndexTaskRecord{} // 修改任务状态 todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`}) if tmpErr != nil { err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error()) return } if todoCount <= 0 { indexTaskObj := models.IndexTask{} indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID) if tmpErr != nil { err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error()) return } tmpUpdateCols := []string{`end_time`, "status", "update_time"} indexTaskInfo.EndTime = time.Now() indexTaskInfo.Status = `处理成功` indexTaskInfo.UpdateTime = time.Now() if indexTaskInfo.StartTime.IsZero() { indexTaskInfo.StartTime = time.Now() tmpUpdateCols = append(tmpUpdateCols, "start_time") } tmpErr = indexTaskInfo.Update(tmpUpdateCols) if tmpErr != nil { utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error()) } } return }() // 修改模型状态 switch taskType { case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型 // 修改模型状态信息 if indexItem != nil { indexItem.TrainStatus = aiPredictModel.TrainStatusFailed indexItem.ModifyTime = time.Now() tmpErr := indexItem.Update([]string{"train_status", "modify_time"}) if tmpErr != nil { utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } } // 修改模型配置状态信息 if indexConfigItem != nil { indexConfigItem.TrainStatus = aiPredictModel.TrainStatusFailed indexConfigItem.Remark = errMsg indexConfigItem.ModifyTime = time.Now() tmpErr := indexConfigItem.Update([]string{"train_status", `remark`, "modify_time"}) if tmpErr != nil { utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } } case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型 if indexItem != nil { indexItem.RunStatus = aiPredictModel.RunStatusFailed indexItem.ModifyTime = time.Now() tmpErr := indexItem.Update([]string{"run_status", "modify_time"}) if tmpErr != nil { utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } } default: return } return } // HandleTaskRecordProcessingByTaskRecord // @Description: 任务标记处理中 // @author: Roc // @datetime 2025-05-09 16:24:38 // @param taskType string // @param indexTaskRecordInfo *models.IndexTaskRecord // @param indexConfigItem *ai_predict_model.AiPredictModelIndexConfig // @param indexItem *ai_predict_model.AiPredictModelIndex func HandleTaskRecordProcessingByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex) { var err error defer func() { if err != nil { utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err)) } }() // 修改子任务状态 indexTaskRecordInfo.Status = `处理中` indexTaskRecordInfo.ModifyTime = time.Now() err = indexTaskRecordInfo.Update([]string{"status", "modify_time"}) if err != nil { fmt.Println("修改子任务状态失败!") return } // 处理完成后标记任务状态 defer func() { obj := models.IndexTaskRecord{} // 修改任务状态 todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`}) if tmpErr != nil { err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error()) return } if todoCount <= 0 { indexTaskObj := models.IndexTask{} indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID) if tmpErr != nil { err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error()) return } tmpUpdateCols := []string{`end_time`, "status", "update_time"} indexTaskInfo.EndTime = time.Now() indexTaskInfo.Status = `处理中` indexTaskInfo.UpdateTime = time.Now() if indexTaskInfo.StartTime.IsZero() { indexTaskInfo.StartTime = time.Now() tmpUpdateCols = append(tmpUpdateCols, "start_time") } tmpErr = indexTaskInfo.Update(tmpUpdateCols) if tmpErr != nil { utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error()) } } return }() // 修改模型状态 switch taskType { case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型 // 修改模型状态信息 if indexItem != nil { indexItem.TrainStatus = aiPredictModel.TrainStatusTraining indexItem.ModifyTime = time.Now() tmpErr := indexItem.Update([]string{"train_status", "modify_time"}) if tmpErr != nil { utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } } // 修改模型配置状态信息 if indexConfigItem != nil { indexConfigItem.TrainStatus = aiPredictModel.TrainStatusTraining indexConfigItem.ModifyTime = time.Now() tmpErr := indexConfigItem.Update([]string{"train_status", "modify_time"}) if tmpErr != nil { utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } } case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型 // 修改模型状态信息 if indexItem != nil { indexItem.RunStatus = aiPredictModel.RunStatusRunning indexItem.ModifyTime = time.Now() tmpErr := indexItem.Update([]string{"run_status", "modify_time"}) if tmpErr != nil { utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } } default: return } return } // HandleTaskRecordSuccessByTaskRecord // @Description: 标记处理完成 // @author: Roc // @datetime 2025-05-14 16:00:26 // @param taskType string // @param indexTaskRecordInfo *models.IndexTaskRecord // @param aiPredictModelImportData request.AiPredictModelImportData func HandleTaskRecordSuccessByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, aiPredictModelImportData request.AiPredictModelImportData) { var err error defer func() { if err != nil { utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err)) } }() // 修改子任务状态 indexTaskRecordInfo.Status = `处理成功` //indexTaskRecordInfo.Remark = errMsg indexTaskRecordInfo.ModifyTime = time.Now() err = indexTaskRecordInfo.Update([]string{"status", "modify_time"}) if err != nil { fmt.Println("修改子任务状态失败!") return } // 处理完成后标记任务状态 defer func() { obj := models.IndexTaskRecord{} // 修改任务状态 todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`}) if tmpErr != nil { err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error()) return } if todoCount <= 0 { indexTaskObj := models.IndexTask{} indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID) if tmpErr != nil { err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error()) return } tmpUpdateCols := []string{`end_time`, "status", "update_time"} indexTaskInfo.EndTime = time.Now() indexTaskInfo.Status = `处理成功` indexTaskInfo.UpdateTime = time.Now() if indexTaskInfo.StartTime.IsZero() { indexTaskInfo.StartTime = time.Now() tmpUpdateCols = append(tmpUpdateCols, "start_time") } tmpErr = indexTaskInfo.Update(tmpUpdateCols) if tmpErr != nil { utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error()) } } return }() indexOb := new(aiPredictModel.AiPredictModelIndex) // 修改模型状态 switch taskType { case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型 // 训练模型 indexConfigId, tmpErr := strconv.Atoi(indexTaskRecordInfo.Parameters) // 模型配置ID if tmpErr != nil { err = fmt.Errorf("模型配置ID转换错误, err: %s", tmpErr.Error()) return } indexConfigObj := new(aiPredictModel.AiPredictModelIndexConfig) // 查找配置 indexConfigItem, tmpErr := indexConfigObj.GetById(indexConfigId) if tmpErr != nil { err = fmt.Errorf("获取模型配置失败, err: %s", tmpErr.Error()) return } // 查询标的情况 indexItem, tmpErr := indexOb.GetItemById(indexConfigItem.AiPredictModelIndexId) if err != nil { err = fmt.Errorf("获取标的失败, err: %s", tmpErr.Error()) return } handleTaskRecordSuccessByTrain(aiPredictModelImportData, indexConfigItem, indexItem) case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型 // 标的id转换 indexId, tmpErr := strconv.Atoi(indexTaskRecordInfo.Parameters) if err != nil { err = fmt.Errorf("标的ID转换错误, err: %s", tmpErr.Error()) return } // 查询标的情况 indexItem, tmpErr := indexOb.GetItemById(indexId) if tmpErr != nil { err = fmt.Errorf("训练失败,查找标的失败, err: %s", tmpErr.Error()) return } tmpErr = handleTaskRecordSuccessByRun(aiPredictModelImportData, indexItem) if tmpErr != nil { utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error()) } default: return } return } // handleTaskRecordSuccessByTrain // @Description: 处理模型训练成功后的操作 // @author: Roc // @datetime 2025-05-14 18:25:12 // @param aiPredictModelImportData request.AiPredictModelImportData // @param indexConfigItem *aiPredictModel.AiPredictModelIndexConfig // @param indexItem *aiPredictModel.AiPredictModelIndex // @return err error func handleTaskRecordSuccessByTrain(aiPredictModelImportData request.AiPredictModelImportData, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex) (err error) { defer func() { if err != nil { utils.FileLog.Error(fmt.Sprintf(`handleTaskRecordSuccessByTrain err:%v`, err)) } }() // 标的状态修改 updateIndexCols := []string{"train_status", "modify_time"} indexItem.TrainStatus = aiPredictModel.TrainStatusSuccess indexItem.ModifyTime = time.Now() updateIndexConfigCols := []string{"train_status", `remark`, "modify_time", `train_mse`, `train_r2`, `test_mse`, `test_r2`} // 配置状态修改 { // 训练参数 trainData := aiPredictModelImportData.TrainData indexConfigItem.TrainStatus = aiPredictModel.TrainStatusSuccess indexConfigItem.Remark = `成功` indexConfigItem.TrainMse = fmt.Sprint(trainData.TrainMse) indexConfigItem.TrainR2 = fmt.Sprint(trainData.TrainR2) indexConfigItem.TestMse = fmt.Sprint(trainData.TestMse) indexConfigItem.TestR2 = fmt.Sprint(trainData.TestR2) indexConfigItem.ModifyTime = time.Now() } indexConfigOb := new(aiPredictModel.AiPredictModelIndexConfig) dataList := make([]*aiPredictModel.AiPredictModelIndexConfigTrainData, 0) for _, tmpData := range aiPredictModelImportData.Data { tmpDate, e := time.ParseInLocation(utils.FormatDate, tmpData.DataTime, time.Local) if e != nil { err = fmt.Errorf("数据日期解析失败, %v", e) return } dataList = append(dataList, &aiPredictModel.AiPredictModelIndexConfigTrainData{ //AiPredictModelDataId: 0, AiPredictModelIndexConfigId: indexConfigItem.AiPredictModelIndexConfigId, AiPredictModelIndexId: indexItem.AiPredictModelIndexId, IndexCode: indexItem.IndexCode, DataTime: tmpDate, Value: tmpData.Value, PredictValue: tmpData.PredictValue, Direction: tmpData.Direction, DeviationRate: tmpData.DeviationRate, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: tmpData.DataTimestamp, Source: tmpData.Source, }) } // 更新指标和数据 err = indexConfigOb.UpdateIndexAndData(indexItem, indexConfigItem, dataList, updateIndexCols, updateIndexConfigCols) if err != nil { return } return } // handleTaskRecordSuccessByRun // @Description: 运行中的数据处理 // @author: Roc // @datetime 2025-05-14 14:28:11 // @param aiPredictModelImportData request.AiPredictModelImportData // @param indexItem *aiPredictModel.AiPredictModelIndex // @return err error func handleTaskRecordSuccessByRun(aiPredictModelImportData request.AiPredictModelImportData, indexItem *aiPredictModel.AiPredictModelIndex) (err error) { defer func() { defer func() { if err != nil { utils.FileLog.Error(fmt.Sprintf(`handleTaskRecordSuccessByRun err:%v`, err)) } }() }() indexOb := new(aiPredictModel.AiPredictModelIndex) updateCols := []string{indexOb.Cols().RunStatus, indexOb.Cols().PredictValue, indexOb.Cols().DirectionAccuracy, indexOb.Cols().AbsoluteDeviation, indexOb.Cols().ExtraConfig, indexOb.Cols().ModifyTime} // 预测日期,理论上是需要改的,可是产品说不需要改,所以暂时不改 updateCols = append(updateCols, indexOb.Cols().PredictDate) indexItem.RunStatus = aiPredictModel.RunStatusSuccess indexItem.PredictValue = aiPredictModelImportData.Index.PredictValue indexItem.DirectionAccuracy = aiPredictModelImportData.Index.DirectionAccuracy indexItem.AbsoluteDeviation = aiPredictModelImportData.Index.AbsoluteDeviation indexItem.ModifyTime = time.Now() predictDate, e := time.ParseInLocation(utils.FormatDate, aiPredictModelImportData.Index.PredictDate, time.Local) if e != nil { err = fmt.Errorf("预测日期解析失败, %v", e) return } indexItem.PredictDate = predictDate // 图例信息 if indexItem.ExtraConfig != "" && aiPredictModelImportData.Index.ExtraConfig != "" { var oldConfig, newConfig aiPredictModel.AiPredictModelIndexExtraConfig if e := json.Unmarshal([]byte(indexItem.ExtraConfig), &oldConfig); e != nil { err = fmt.Errorf("标的原配置解析失败, Config: %s, Err: %v", indexItem.ExtraConfig, e) return } if e := json.Unmarshal([]byte(aiPredictModelImportData.Index.ExtraConfig), &newConfig); e != nil { err = fmt.Errorf("标的新配置解析失败, Config: %s, Err: %v", aiPredictModelImportData.Index.ExtraConfig, e) return } oldConfig.DailyChart.PredictLegendName = newConfig.DailyChart.PredictLegendName b, _ := json.Marshal(oldConfig) indexItem.ExtraConfig = string(b) } dataList := make([]*aiPredictModel.AiPredictModelData, 0) for _, tmpData := range aiPredictModelImportData.Data { tmpDate, e := time.ParseInLocation(utils.FormatDate, tmpData.DataTime, time.Local) if e != nil { err = fmt.Errorf("数据日期解析失败, %v", e) return } dataList = append(dataList, &aiPredictModel.AiPredictModelData{ //AiPredictModelDataId: 0, AiPredictModelIndexId: indexItem.AiPredictModelIndexId, IndexCode: indexItem.IndexCode, DataTime: tmpDate, Value: tmpData.Value, PredictValue: tmpData.PredictValue, Direction: tmpData.Direction, DeviationRate: tmpData.DeviationRate, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: tmpData.DataTimestamp, Source: tmpData.Source, }) } // 更新指标和数据 err = indexOb.UpdateIndexAndData(indexItem, dataList, updateCols) if err != nil { return } return }