index.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. package ai_predict_model
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/models"
  5. aiPredictModel "eta/eta_index_lib/models/ai_predict_model"
  6. "eta/eta_index_lib/models/ai_predict_model/request"
  7. "eta/eta_index_lib/utils"
  8. "fmt"
  9. "strconv"
  10. "time"
  11. )
  12. // HandleTaskRecordFailByTaskRecord
  13. // @Description: 任务标记失败
  14. // @author: Roc
  15. // @datetime 2025-05-09 16:24:48
  16. // @param taskType string
  17. // @param indexTaskRecordInfo *models.IndexTaskRecord
  18. // @param indexConfigItem *ai_predict_model.AiPredictModelIndexConfig
  19. // @param indexItem *ai_predict_model.AiPredictModelIndex
  20. // @param errMsg string
  21. func HandleTaskRecordFailByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex, errMsg string) {
  22. var err error
  23. defer func() {
  24. if err != nil {
  25. utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err))
  26. }
  27. }()
  28. // 修改子任务状态
  29. indexTaskRecordInfo.Status = `处理失败`
  30. indexTaskRecordInfo.Remark = errMsg
  31. indexTaskRecordInfo.ModifyTime = time.Now()
  32. err = indexTaskRecordInfo.Update([]string{"status", "remark", "modify_time"})
  33. if err != nil {
  34. fmt.Println("修改子任务状态失败!")
  35. return
  36. }
  37. // 处理完成后标记任务状态
  38. defer func() {
  39. obj := models.IndexTaskRecord{}
  40. // 修改任务状态
  41. todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`})
  42. if tmpErr != nil {
  43. err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
  44. return
  45. }
  46. if todoCount <= 0 {
  47. indexTaskObj := models.IndexTask{}
  48. indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID)
  49. if tmpErr != nil {
  50. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  51. return
  52. }
  53. tmpUpdateCols := []string{`end_time`, "status", "update_time"}
  54. indexTaskInfo.EndTime = time.Now()
  55. indexTaskInfo.Status = `处理成功`
  56. indexTaskInfo.UpdateTime = time.Now()
  57. if indexTaskInfo.StartTime.IsZero() {
  58. indexTaskInfo.StartTime = time.Now()
  59. tmpUpdateCols = append(tmpUpdateCols, "start_time")
  60. }
  61. tmpErr = indexTaskInfo.Update(tmpUpdateCols)
  62. if tmpErr != nil {
  63. utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
  64. }
  65. }
  66. return
  67. }()
  68. // 修改模型状态
  69. switch taskType {
  70. case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型
  71. // 修改模型状态信息
  72. if indexItem != nil {
  73. indexItem.TrainStatus = aiPredictModel.TrainStatusFailed
  74. indexItem.ModifyTime = time.Now()
  75. tmpErr := indexItem.Update([]string{"train_status", "modify_time"})
  76. if tmpErr != nil {
  77. utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  78. }
  79. }
  80. // 修改模型配置状态信息
  81. if indexConfigItem != nil {
  82. indexConfigItem.TrainStatus = aiPredictModel.TrainStatusFailed
  83. indexConfigItem.Remark = errMsg
  84. indexConfigItem.ModifyTime = time.Now()
  85. tmpErr := indexConfigItem.Update([]string{"train_status", `remark`, "modify_time"})
  86. if tmpErr != nil {
  87. utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  88. }
  89. }
  90. case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型
  91. if indexItem != nil {
  92. indexItem.RunStatus = aiPredictModel.RunStatusFailed
  93. indexItem.ModifyTime = time.Now()
  94. tmpErr := indexItem.Update([]string{"run_status", "modify_time"})
  95. if tmpErr != nil {
  96. utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  97. }
  98. }
  99. default:
  100. return
  101. }
  102. return
  103. }
  104. // HandleTaskRecordProcessingByTaskRecord
  105. // @Description: 任务标记处理中
  106. // @author: Roc
  107. // @datetime 2025-05-09 16:24:38
  108. // @param taskType string
  109. // @param indexTaskRecordInfo *models.IndexTaskRecord
  110. // @param indexConfigItem *ai_predict_model.AiPredictModelIndexConfig
  111. // @param indexItem *ai_predict_model.AiPredictModelIndex
  112. func HandleTaskRecordProcessingByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex) {
  113. var err error
  114. defer func() {
  115. if err != nil {
  116. utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err))
  117. }
  118. }()
  119. // 修改子任务状态
  120. indexTaskRecordInfo.Status = `处理中`
  121. indexTaskRecordInfo.ModifyTime = time.Now()
  122. err = indexTaskRecordInfo.Update([]string{"status", "modify_time"})
  123. if err != nil {
  124. fmt.Println("修改子任务状态失败!")
  125. return
  126. }
  127. // 处理完成后标记任务状态
  128. defer func() {
  129. obj := models.IndexTaskRecord{}
  130. // 修改任务状态
  131. todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`})
  132. if tmpErr != nil {
  133. err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
  134. return
  135. }
  136. if todoCount <= 0 {
  137. indexTaskObj := models.IndexTask{}
  138. indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID)
  139. if tmpErr != nil {
  140. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  141. return
  142. }
  143. tmpUpdateCols := []string{`end_time`, "status", "update_time"}
  144. indexTaskInfo.EndTime = time.Now()
  145. indexTaskInfo.Status = `处理中`
  146. indexTaskInfo.UpdateTime = time.Now()
  147. if indexTaskInfo.StartTime.IsZero() {
  148. indexTaskInfo.StartTime = time.Now()
  149. tmpUpdateCols = append(tmpUpdateCols, "start_time")
  150. }
  151. tmpErr = indexTaskInfo.Update(tmpUpdateCols)
  152. if tmpErr != nil {
  153. utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
  154. }
  155. }
  156. return
  157. }()
  158. // 修改模型状态
  159. switch taskType {
  160. case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型
  161. // 修改模型状态信息
  162. if indexItem != nil {
  163. indexItem.TrainStatus = aiPredictModel.TrainStatusTraining
  164. indexItem.ModifyTime = time.Now()
  165. tmpErr := indexItem.Update([]string{"train_status", "modify_time"})
  166. if tmpErr != nil {
  167. utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  168. }
  169. }
  170. // 修改模型配置状态信息
  171. if indexConfigItem != nil {
  172. indexConfigItem.TrainStatus = aiPredictModel.TrainStatusTraining
  173. indexConfigItem.ModifyTime = time.Now()
  174. tmpErr := indexConfigItem.Update([]string{"train_status", "modify_time"})
  175. if tmpErr != nil {
  176. utils.FileLog.Error("%d,修改模型训练状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  177. }
  178. }
  179. case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型
  180. // 修改模型状态信息
  181. if indexItem != nil {
  182. indexItem.RunStatus = aiPredictModel.RunStatusRunning
  183. indexItem.ModifyTime = time.Now()
  184. tmpErr := indexItem.Update([]string{"run_status", "modify_time"})
  185. if tmpErr != nil {
  186. utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  187. }
  188. }
  189. default:
  190. return
  191. }
  192. return
  193. }
  194. // HandleTaskRecordSuccessByTaskRecord
  195. // @Description: 标记处理完成
  196. // @author: Roc
  197. // @datetime 2025-05-14 16:00:26
  198. // @param taskType string
  199. // @param indexTaskRecordInfo *models.IndexTaskRecord
  200. // @param aiPredictModelImportData request.AiPredictModelImportData
  201. func HandleTaskRecordSuccessByTaskRecord(taskType string, indexTaskRecordInfo *models.IndexTaskRecord, aiPredictModelImportData request.AiPredictModelImportData) {
  202. var err error
  203. defer func() {
  204. if err != nil {
  205. utils.FileLog.Error(fmt.Sprintf(`HandleTaskRecordFailByTaskRecord err:%v`, err))
  206. }
  207. }()
  208. // 修改子任务状态
  209. indexTaskRecordInfo.Status = `处理成功`
  210. //indexTaskRecordInfo.Remark = errMsg
  211. indexTaskRecordInfo.ModifyTime = time.Now()
  212. err = indexTaskRecordInfo.Update([]string{"status", "modify_time"})
  213. if err != nil {
  214. fmt.Println("修改子任务状态失败!")
  215. return
  216. }
  217. // 处理完成后标记任务状态
  218. defer func() {
  219. obj := models.IndexTaskRecord{}
  220. // 修改任务状态
  221. todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, models.IndexTaskRecordColumns.IndexTaskID, models.IndexTaskRecordColumns.Status), []interface{}{indexTaskRecordInfo.IndexTaskID, `待处理`})
  222. if tmpErr != nil {
  223. err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
  224. return
  225. }
  226. if todoCount <= 0 {
  227. indexTaskObj := models.IndexTask{}
  228. indexTaskInfo, tmpErr := indexTaskObj.GetByID(indexTaskRecordInfo.IndexTaskID)
  229. if tmpErr != nil {
  230. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  231. return
  232. }
  233. tmpUpdateCols := []string{`end_time`, "status", "update_time"}
  234. indexTaskInfo.EndTime = time.Now()
  235. indexTaskInfo.Status = `处理成功`
  236. indexTaskInfo.UpdateTime = time.Now()
  237. if indexTaskInfo.StartTime.IsZero() {
  238. indexTaskInfo.StartTime = time.Now()
  239. tmpUpdateCols = append(tmpUpdateCols, "start_time")
  240. }
  241. tmpErr = indexTaskInfo.Update(tmpUpdateCols)
  242. if tmpErr != nil {
  243. utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
  244. }
  245. }
  246. return
  247. }()
  248. indexOb := new(aiPredictModel.AiPredictModelIndex)
  249. // 修改模型状态
  250. switch taskType {
  251. case utils.INDEX_TASK_TYPE_AI_MODEL_TRAIN: // 训练模型
  252. // 训练模型
  253. indexConfigId, tmpErr := strconv.Atoi(indexTaskRecordInfo.Parameters) // 模型配置ID
  254. if tmpErr != nil {
  255. err = fmt.Errorf("模型配置ID转换错误, err: %s", tmpErr.Error())
  256. return
  257. }
  258. indexConfigObj := new(aiPredictModel.AiPredictModelIndexConfig)
  259. // 查找配置
  260. indexConfigItem, tmpErr := indexConfigObj.GetById(indexConfigId)
  261. if tmpErr != nil {
  262. err = fmt.Errorf("获取模型配置失败, err: %s", tmpErr.Error())
  263. return
  264. }
  265. // 查询标的情况
  266. indexItem, tmpErr := indexOb.GetItemById(indexConfigItem.AiPredictModelIndexId)
  267. if err != nil {
  268. err = fmt.Errorf("获取标的失败, err: %s", tmpErr.Error())
  269. return
  270. }
  271. handleTaskRecordSuccessByTrain(aiPredictModelImportData, indexConfigItem, indexItem)
  272. case utils.INDEX_TASK_TYPE_AI_MODEL_RUN: // 运行模型
  273. // 标的id转换
  274. indexId, tmpErr := strconv.Atoi(indexTaskRecordInfo.Parameters)
  275. if err != nil {
  276. err = fmt.Errorf("标的ID转换错误, err: %s", tmpErr.Error())
  277. return
  278. }
  279. // 查询标的情况
  280. indexItem, tmpErr := indexOb.GetItemById(indexId)
  281. if tmpErr != nil {
  282. err = fmt.Errorf("训练失败,查找标的失败, err: %s", tmpErr.Error())
  283. return
  284. }
  285. tmpErr = handleTaskRecordSuccessByRun(aiPredictModelImportData, indexItem)
  286. if tmpErr != nil {
  287. utils.FileLog.Error("%d,修改模型运行状态失败, err: %s", indexItem.AiPredictModelIndexId, tmpErr.Error())
  288. }
  289. default:
  290. return
  291. }
  292. return
  293. }
  294. // handleTaskRecordSuccessByTrain
  295. // @Description: 处理模型训练成功后的操作
  296. // @author: Roc
  297. // @datetime 2025-05-14 18:25:12
  298. // @param aiPredictModelImportData request.AiPredictModelImportData
  299. // @param indexConfigItem *aiPredictModel.AiPredictModelIndexConfig
  300. // @param indexItem *aiPredictModel.AiPredictModelIndex
  301. // @return err error
  302. func handleTaskRecordSuccessByTrain(aiPredictModelImportData request.AiPredictModelImportData, indexConfigItem *aiPredictModel.AiPredictModelIndexConfig, indexItem *aiPredictModel.AiPredictModelIndex) (err error) {
  303. defer func() {
  304. if err != nil {
  305. utils.FileLog.Error(fmt.Sprintf(`handleTaskRecordSuccessByTrain err:%v`, err))
  306. }
  307. }()
  308. // 标的状态修改
  309. updateIndexCols := []string{"train_status", "modify_time"}
  310. indexItem.TrainStatus = aiPredictModel.TrainStatusSuccess
  311. indexItem.ModifyTime = time.Now()
  312. updateIndexConfigCols := []string{"train_status", `remark`, "modify_time", `train_mse`, `train_r2`, `test_mse`, `test_r2`}
  313. // 配置状态修改
  314. {
  315. // 训练参数
  316. trainData := aiPredictModelImportData.TrainData
  317. indexConfigItem.TrainStatus = aiPredictModel.TrainStatusSuccess
  318. indexConfigItem.Remark = `成功`
  319. indexConfigItem.TrainMse = fmt.Sprint(trainData.TrainMse)
  320. indexConfigItem.TrainR2 = fmt.Sprint(trainData.TrainR2)
  321. indexConfigItem.TestMse = fmt.Sprint(trainData.TestMse)
  322. indexConfigItem.TestR2 = fmt.Sprint(trainData.TestR2)
  323. indexConfigItem.ModifyTime = time.Now()
  324. }
  325. indexConfigOb := new(aiPredictModel.AiPredictModelIndexConfig)
  326. dataList := make([]*aiPredictModel.AiPredictModelIndexConfigTrainData, 0)
  327. for _, tmpData := range aiPredictModelImportData.Data {
  328. tmpDate, e := time.ParseInLocation(utils.FormatDate, tmpData.DataTime, time.Local)
  329. if e != nil {
  330. err = fmt.Errorf("数据日期解析失败, %v", e)
  331. return
  332. }
  333. timestamp := tmpDate.UnixNano() / 1e6
  334. dataList = append(dataList, &aiPredictModel.AiPredictModelIndexConfigTrainData{
  335. //AiPredictModelDataId: 0,
  336. AiPredictModelIndexConfigId: indexConfigItem.AiPredictModelIndexConfigId,
  337. AiPredictModelIndexId: indexItem.AiPredictModelIndexId,
  338. IndexCode: indexItem.IndexCode,
  339. DataTime: tmpDate,
  340. Value: tmpData.Value,
  341. PredictValue: tmpData.PredictValue,
  342. Direction: tmpData.Direction,
  343. DeviationRate: tmpData.DeviationRate,
  344. CreateTime: time.Now(),
  345. ModifyTime: time.Now(),
  346. DataTimestamp: timestamp,
  347. Source: tmpData.Source,
  348. })
  349. }
  350. // 更新指标和数据
  351. err = indexConfigOb.UpdateIndexAndData(indexItem, indexConfigItem, dataList, updateIndexCols, updateIndexConfigCols)
  352. if err != nil {
  353. return
  354. }
  355. return
  356. }
  357. // handleTaskRecordSuccessByRun
  358. // @Description: 运行中的数据处理
  359. // @author: Roc
  360. // @datetime 2025-05-14 14:28:11
  361. // @param aiPredictModelImportData request.AiPredictModelImportData
  362. // @param indexItem *aiPredictModel.AiPredictModelIndex
  363. // @return err error
  364. func handleTaskRecordSuccessByRun(aiPredictModelImportData request.AiPredictModelImportData, indexItem *aiPredictModel.AiPredictModelIndex) (err error) {
  365. defer func() {
  366. defer func() {
  367. if err != nil {
  368. utils.FileLog.Error(fmt.Sprintf(`handleTaskRecordSuccessByRun err:%v`, err))
  369. }
  370. }()
  371. }()
  372. indexOb := new(aiPredictModel.AiPredictModelIndex)
  373. updateCols := []string{indexOb.Cols().RunStatus, indexOb.Cols().PredictValue, indexOb.Cols().DirectionAccuracy, indexOb.Cols().AbsoluteDeviation, indexOb.Cols().ExtraConfig, indexOb.Cols().ModifyTime}
  374. // 预测日期,理论上是需要改的,可是产品说不需要改,所以暂时不改
  375. updateCols = append(updateCols, indexOb.Cols().PredictDate)
  376. indexItem.RunStatus = aiPredictModel.RunStatusSuccess
  377. indexItem.PredictValue = aiPredictModelImportData.Index.PredictValue
  378. indexItem.DirectionAccuracy = aiPredictModelImportData.Index.DirectionAccuracy
  379. indexItem.AbsoluteDeviation = aiPredictModelImportData.Index.AbsoluteDeviation
  380. indexItem.ModifyTime = time.Now()
  381. predictDate, e := time.ParseInLocation(utils.FormatDate, aiPredictModelImportData.Index.PredictDate, time.Local)
  382. if e != nil {
  383. err = fmt.Errorf("预测日期解析失败, %v", e)
  384. return
  385. }
  386. indexItem.PredictDate = predictDate
  387. // 图例信息
  388. if indexItem.ExtraConfig != "" && aiPredictModelImportData.Index.ExtraConfig != "" {
  389. var oldConfig, newConfig aiPredictModel.AiPredictModelIndexExtraConfig
  390. if e := json.Unmarshal([]byte(indexItem.ExtraConfig), &oldConfig); e != nil {
  391. err = fmt.Errorf("标的原配置解析失败, Config: %s, Err: %v", indexItem.ExtraConfig, e)
  392. return
  393. }
  394. if e := json.Unmarshal([]byte(aiPredictModelImportData.Index.ExtraConfig), &newConfig); e != nil {
  395. err = fmt.Errorf("标的新配置解析失败, Config: %s, Err: %v", aiPredictModelImportData.Index.ExtraConfig, e)
  396. return
  397. }
  398. oldConfig.DailyChart.PredictLegendName = newConfig.DailyChart.PredictLegendName
  399. b, _ := json.Marshal(oldConfig)
  400. indexItem.ExtraConfig = string(b)
  401. }
  402. dataList := make([]*aiPredictModel.AiPredictModelData, 0)
  403. for _, tmpData := range aiPredictModelImportData.Data {
  404. tmpDate, e := time.ParseInLocation(utils.FormatDate, tmpData.DataTime, time.Local)
  405. if e != nil {
  406. err = fmt.Errorf("数据日期解析失败, %v", e)
  407. return
  408. }
  409. timestamp := tmpDate.UnixNano() / 1e6
  410. dataList = append(dataList, &aiPredictModel.AiPredictModelData{
  411. //AiPredictModelDataId: 0,
  412. AiPredictModelIndexId: indexItem.AiPredictModelIndexId,
  413. IndexCode: indexItem.IndexCode,
  414. DataTime: tmpDate,
  415. Value: tmpData.Value,
  416. PredictValue: tmpData.PredictValue,
  417. Direction: tmpData.Direction,
  418. DeviationRate: tmpData.DeviationRate,
  419. CreateTime: time.Now(),
  420. ModifyTime: time.Now(),
  421. DataTimestamp: timestamp,
  422. Source: tmpData.Source,
  423. })
  424. }
  425. // 更新指标和数据
  426. err = indexOb.UpdateIndexAndData(indexItem, dataList, updateCols)
  427. if err != nil {
  428. return
  429. }
  430. return
  431. }