123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 |
- package ai_predict_model
- import (
- "encoding/json"
- "eta/eta_index_lib/models"
- aiPredictModel "eta/eta_index_lib/models/ai_predict_model"
- "eta/eta_index_lib/models/ai_predict_model/request"
- "eta/eta_index_lib/utils"
- "fmt"
- "strconv"
- "time"
- )
- // HandleTaskRecordFailByTaskRecord
- // @Description: 任务标记失败
- // @author: Roc
- // @datetime 2025-05-09 16:24:48
- // @param taskType string
- // @param indexTaskRecordInfo *models.IndexTaskRecord
- // @param indexConfigItem *ai_predict_model.AiPredictModelIndexConfig
- // @param indexItem *ai_predict_model.AiPredictModelIndex
- // @param errMsg string
- func HandleTaskRecordFailByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex, errMsg string) {
- var err error
- defer func() {
- if err != nil {
- utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err))
- }
- }()
- // 修改子任务状态
- indexTaskRecordInfo.Status = `处理失败`
- indexTaskRecordInfo.Remark = errMsg
- indexTaskRecordInfo.ModifyTime = time.Now()
- err = indexTaskRecordInfo.Update([]string{"status", "remark", "modify_time"})
- if err != nil {
- fmt.Println("修改子任务状态失败!")
- return
- }
- // 处理完成后标记任务状态
- defer func() {
- obj := models.IndexTaskRecord{}
- // 修改任务状态
- todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`})
- if tmpErr != nil {
- err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
- return
- }
- if todoCount <= 0 {
- indexTaskObj := models.IndexTask{}
- indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID)
- if tmpErr != nil {
- err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
- return
- }
- tmpUpdateCols := []string{`end_time`, "status", "update_time"}
- indexTaskInfo.EndTime = time.Now()
- indexTaskInfo.Status = `处理成功`
- indexTaskInfo.UpdateTime = time.Now()
- if indexTaskInfo.StartTime.IsZero() {
- indexTaskInfo.StartTime = time.Now()
- tmpUpdateCols = append(tmpUpdateCols, "start_time")
- }
- tmpErr = indexTaskInfo.Update(tmpUpdateCols)
- if tmpErr != nil {
- utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
- }
- }
- return
- }()
- // 修改模型状态
- switch taskType {
- case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型
- // 修改模型状态信息
- if indexItem != nil {
- indexItem.TrainStatus = aiPredictModel.TrainStatusFailed
- indexItem.ModifyTime = time.Now()
- tmpErr := indexItem.Update([]string{"train_status", "modify_time"})
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- }
- // 修改模型配置状态信息
- if indexConfigItem != nil {
- indexConfigItem.TrainStatus = aiPredictModel.TrainStatusFailed
- indexConfigItem.Remark = errMsg
- indexConfigItem.ModifyTime = time.Now()
- tmpErr := indexConfigItem.Update([]string{"train_status", `remark`, "modify_time"})
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- }
- case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型
- if indexItem != nil {
- indexItem.RunStatus = aiPredictModel.RunStatusFailed
- indexItem.ModifyTime = time.Now()
- tmpErr := indexItem.Update([]string{"run_status", "modify_time"})
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- }
- default:
- return
- }
- return
- }
- // HandleTaskRecordProcessingByTaskRecord
- // @Description: 任务标记处理中
- // @author: Roc
- // @datetime 2025-05-09 16:24:38
- // @param taskType string
- // @param indexTaskRecordInfo *models.IndexTaskRecord
- // @param indexConfigItem *ai_predict_model.AiPredictModelIndexConfig
- // @param indexItem *ai_predict_model.AiPredictModelIndex
- func HandleTaskRecordProcessingByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex) {
- var err error
- defer func() {
- if err != nil {
- utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err))
- }
- }()
- // 修改子任务状态
- indexTaskRecordInfo.Status = `处理中`
- indexTaskRecordInfo.ModifyTime = time.Now()
- err = indexTaskRecordInfo.Update([]string{"status", "modify_time"})
- if err != nil {
- fmt.Println("修改子任务状态失败!")
- return
- }
- // 处理完成后标记任务状态
- defer func() {
- obj := models.IndexTaskRecord{}
- // 修改任务状态
- todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`})
- if tmpErr != nil {
- err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
- return
- }
- if todoCount <= 0 {
- indexTaskObj := models.IndexTask{}
- indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID)
- if tmpErr != nil {
- err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
- return
- }
- tmpUpdateCols := []string{`end_time`, "status", "update_time"}
- indexTaskInfo.EndTime = time.Now()
- indexTaskInfo.Status = `处理中`
- indexTaskInfo.UpdateTime = time.Now()
- if indexTaskInfo.StartTime.IsZero() {
- indexTaskInfo.StartTime = time.Now()
- tmpUpdateCols = append(tmpUpdateCols, "start_time")
- }
- tmpErr = indexTaskInfo.Update(tmpUpdateCols)
- if tmpErr != nil {
- utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
- }
- }
- return
- }()
- // 修改模型状态
- switch taskType {
- case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型
- // 修改模型状态信息
- if indexItem != nil {
- indexItem.TrainStatus = aiPredictModel.TrainStatusTraining
- indexItem.ModifyTime = time.Now()
- tmpErr := indexItem.Update([]string{"train_status", "modify_time"})
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- }
- // 修改模型配置状态信息
- if indexConfigItem != nil {
- indexConfigItem.TrainStatus = aiPredictModel.TrainStatusTraining
- indexConfigItem.ModifyTime = time.Now()
- tmpErr := indexConfigItem.Update([]string{"train_status", "modify_time"})
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- }
- case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型
- // 修改模型状态信息
- if indexItem != nil {
- indexItem.RunStatus = aiPredictModel.RunStatusRunning
- indexItem.ModifyTime = time.Now()
- tmpErr := indexItem.Update([]string{"run_status", "modify_time"})
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- }
- default:
- return
- }
- return
- }
- // HandleTaskRecordSuccessByTaskRecord
- // @Description: 标记处理完成
- // @author: Roc
- // @datetime 2025-05-14 16:00:26
- // @param taskType string
- // @param indexTaskRecordInfo *models.IndexTaskRecord
- // @param aiPredictModelImportData request.AiPredictModelImportData
- func HandleTaskRecordSuccessByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, aiPredictModelImportData request.AiPredictModelImportData) {
- var err error
- defer func() {
- if err != nil {
- utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err))
- }
- }()
- // 修改子任务状态
- indexTaskRecordInfo.Status = `处理成功`
- //indexTaskRecordInfo.Remark = errMsg
- indexTaskRecordInfo.ModifyTime = time.Now()
- err = indexTaskRecordInfo.Update([]string{"status", "modify_time"})
- if err != nil {
- fmt.Println("修改子任务状态失败!")
- return
- }
- // 处理完成后标记任务状态
- defer func() {
- obj := models.IndexTaskRecord{}
- // 修改任务状态
- todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`})
- if tmpErr != nil {
- err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
- return
- }
- if todoCount <= 0 {
- indexTaskObj := models.IndexTask{}
- indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID)
- if tmpErr != nil {
- err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
- return
- }
- tmpUpdateCols := []string{`end_time`, "status", "update_time"}
- indexTaskInfo.EndTime = time.Now()
- indexTaskInfo.Status = `处理成功`
- indexTaskInfo.UpdateTime = time.Now()
- if indexTaskInfo.StartTime.IsZero() {
- indexTaskInfo.StartTime = time.Now()
- tmpUpdateCols = append(tmpUpdateCols, "start_time")
- }
- tmpErr = indexTaskInfo.Update(tmpUpdateCols)
- if tmpErr != nil {
- utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
- }
- }
- return
- }()
- indexOb := new(aiPredictModel.AiPredictModelIndex)
- // 修改模型状态
- switch taskType {
- case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型
- // 训练模型
- indexConfigId, tmpErr := strconv.Atoi(indexTaskRecordInfo.Parameters) // 模型配置ID
- if tmpErr != nil {
- err = fmt.Errorf("模型配置ID转换错误, err: %s", tmpErr.Error())
- return
- }
- indexConfigObj := new(aiPredictModel.AiPredictModelIndexConfig)
- // 查找配置
- indexConfigItem, tmpErr := indexConfigObj.GetById(indexConfigId)
- if tmpErr != nil {
- err = fmt.Errorf("获取模型配置失败, err: %s", tmpErr.Error())
- return
- }
- // 查询标的情况
- indexItem, tmpErr := indexOb.GetItemById(indexConfigItem.AiPredictModelIndexId)
- if err != nil {
- err = fmt.Errorf("获取标的失败, err: %s", tmpErr.Error())
- return
- }
- handleTaskRecordSuccessByTrain(aiPredictModelImportData, indexConfigItem, indexItem)
- case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型
- // 标的id转换
- indexId, tmpErr := strconv.Atoi(indexTaskRecordInfo.Parameters)
- if err != nil {
- err = fmt.Errorf("标的ID转换错误, err: %s", tmpErr.Error())
- return
- }
- // 查询标的情况
- indexItem, tmpErr := indexOb.GetItemById(indexId)
- if tmpErr != nil {
- err = fmt.Errorf("训练失败,查找标的失败, err: %s", tmpErr.Error())
- return
- }
- tmpErr = handleTaskRecordSuccessByRun(aiPredictModelImportData, indexItem)
- if tmpErr != nil {
- utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
- }
- default:
- return
- }
- return
- }
- // handleTaskRecordSuccessByTrain
- // @Description: 处理模型训练成功后的操作
- // @author: Roc
- // @datetime 2025-05-14 18:25:12
- // @param aiPredictModelImportData request.AiPredictModelImportData
- // @param indexConfigItem *aiPredictModel.AiPredictModelIndexConfig
- // @param indexItem *aiPredictModel.AiPredictModelIndex
- // @return err error
- func handleTaskRecordSuccessByTrain(aiPredictModelImportData request.AiPredictModelImportData, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error(fmt.Sprintf(`handleTaskRecordSuccessByTrain err:%v`, err))
- }
- }()
- // 标的状态修改
- updateIndexCols := []string{"train_status", "modify_time"}
- indexItem.TrainStatus = aiPredictModel.TrainStatusSuccess
- indexItem.ModifyTime = time.Now()
- updateIndexConfigCols := []string{"train_status", `remark`, "modify_time", `train_mse`, `train_r2`, `test_mse`, `test_r2`}
- // 配置状态修改
- {
- // 训练参数
- trainData := aiPredictModelImportData.TrainData
- indexConfigItem.TrainStatus = aiPredictModel.TrainStatusSuccess
- indexConfigItem.Remark = `成功`
- indexConfigItem.TrainMse = fmt.Sprint(trainData.TrainMse)
- indexConfigItem.TrainR2 = fmt.Sprint(trainData.TrainR2)
- indexConfigItem.TestMse = fmt.Sprint(trainData.TestMse)
- indexConfigItem.TestR2 = fmt.Sprint(trainData.TestR2)
- indexConfigItem.ModifyTime = time.Now()
- }
- indexConfigOb := new(aiPredictModel.AiPredictModelIndexConfig)
- dataList := make([]*aiPredictModel.AiPredictModelIndexConfigTrainData, 0)
- for _, tmpData := range aiPredictModelImportData.Data {
- tmpDate, e := time.ParseInLocation(utils.FormatDate, tmpData.DataTime, time.Local)
- if e != nil {
- err = fmt.Errorf("数据日期解析失败, %v", e)
- return
- }
- dataList = append(dataList, &aiPredictModel.AiPredictModelIndexConfigTrainData{
- //AiPredictModelDataId: 0,
- AiPredictModelIndexConfigId: indexConfigItem.AiPredictModelIndexConfigId,
- AiPredictModelIndexId: indexItem.AiPredictModelIndexId,
- IndexCode: indexItem.IndexCode,
- DataTime: tmpDate,
- Value: tmpData.Value,
- PredictValue: tmpData.PredictValue,
- Direction: tmpData.Direction,
- DeviationRate: tmpData.DeviationRate,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: tmpData.DataTimestamp,
- Source: tmpData.Source,
- })
- }
- // 更新指标和数据
- err = indexConfigOb.UpdateIndexAndData(indexItem, indexConfigItem, dataList, updateIndexCols, updateIndexConfigCols)
- if err != nil {
- return
- }
- return
- }
- // handleTaskRecordSuccessByRun
- // @Description: 运行中的数据处理
- // @author: Roc
- // @datetime 2025-05-14 14:28:11
- // @param aiPredictModelImportData request.AiPredictModelImportData
- // @param indexItem *aiPredictModel.AiPredictModelIndex
- // @return err error
- func handleTaskRecordSuccessByRun(aiPredictModelImportData request.AiPredictModelImportData, indexItem *aiPredictModel.AiPredictModelIndex) (err error) {
- defer func() {
- defer func() {
- if err != nil {
- utils.FileLog.Error(fmt.Sprintf(`handleTaskRecordSuccessByRun err:%v`, err))
- }
- }()
- }()
- indexOb := new(aiPredictModel.AiPredictModelIndex)
- updateCols := []string{indexOb.Cols().RunStatus, indexOb.Cols().PredictValue, indexOb.Cols().DirectionAccuracy, indexOb.Cols().AbsoluteDeviation, indexOb.Cols().ExtraConfig, indexOb.Cols().ModifyTime}
- // 预测日期,理论上是需要改的,可是产品说不需要改,所以暂时不改
- updateCols = append(updateCols, indexOb.Cols().PredictDate)
- indexItem.RunStatus = aiPredictModel.RunStatusSuccess
- indexItem.PredictValue = aiPredictModelImportData.Index.PredictValue
- indexItem.DirectionAccuracy = aiPredictModelImportData.Index.DirectionAccuracy
- indexItem.AbsoluteDeviation = aiPredictModelImportData.Index.AbsoluteDeviation
- indexItem.ModifyTime = time.Now()
- predictDate, e := time.ParseInLocation(utils.FormatDate, aiPredictModelImportData.Index.PredictDate, time.Local)
- if e != nil {
- err = fmt.Errorf("预测日期解析失败, %v", e)
- return
- }
- indexItem.PredictDate = predictDate
- // 图例信息
- if indexItem.ExtraConfig != "" && aiPredictModelImportData.Index.ExtraConfig != "" {
- var oldConfig, newConfig aiPredictModel.AiPredictModelIndexExtraConfig
- if e := json.Unmarshal([]byte(indexItem.ExtraConfig), &oldConfig); e != nil {
- err = fmt.Errorf("标的原配置解析失败, Config: %s, Err: %v", indexItem.ExtraConfig, e)
- return
- }
- if e := json.Unmarshal([]byte(aiPredictModelImportData.Index.ExtraConfig), &newConfig); e != nil {
- err = fmt.Errorf("标的新配置解析失败, Config: %s, Err: %v", aiPredictModelImportData.Index.ExtraConfig, e)
- return
- }
- oldConfig.DailyChart.PredictLegendName = newConfig.DailyChart.PredictLegendName
- b, _ := json.Marshal(oldConfig)
- indexItem.ExtraConfig = string(b)
- }
- dataList := make([]*aiPredictModel.AiPredictModelData, 0)
- for _, tmpData := range aiPredictModelImportData.Data {
- tmpDate, e := time.ParseInLocation(utils.FormatDate, tmpData.DataTime, time.Local)
- if e != nil {
- err = fmt.Errorf("数据日期解析失败, %v", e)
- return
- }
- dataList = append(dataList, &aiPredictModel.AiPredictModelData{
- //AiPredictModelDataId: 0,
- AiPredictModelIndexId: indexItem.AiPredictModelIndexId,
- IndexCode: indexItem.IndexCode,
- DataTime: tmpDate,
- Value: tmpData.Value,
- PredictValue: tmpData.PredictValue,
- Direction: tmpData.Direction,
- DeviationRate: tmpData.DeviationRate,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: tmpData.DataTimestamp,
- Source: tmpData.Source,
- })
- }
- // 更新指标和数据
- err = indexOb.UpdateIndexAndData(indexItem, dataList, updateCols)
- if err != nil {
- return
- }
- return
- }
|