task.go 27 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/cache"
  5. "eta/eta_api/models"
  6. "eta/eta_api/models/rag"
  7. "eta/eta_api/services/binlog"
  8. "eta/eta_api/services/data"
  9. edbmonitor "eta/eta_api/services/edb_monitor"
  10. "eta/eta_api/services/llm"
  11. "eta/eta_api/utils"
  12. "fmt"
  13. "strings"
  14. "time"
  15. )
  16. func Task() {
  17. fmt.Println("task start")
  18. {
  19. // 修复客户试用数据
  20. //FixCompanyUpdateData()
  21. //FixCompanyTryDay()
  22. //FixCompanyTryDay()
  23. //StaticCompanyTryDay()
  24. //return
  25. }
  26. //FixPermissionStatus()
  27. //GetCompanyInfo()
  28. //ReportCount()
  29. //windSourceUrl:=`http://47.100.166.55:7002/edbInfo/wind/?EdbCode=M0001427&StartDate=2020-11-01&EndDate=2021-03-01`
  30. //data.AddAllArticle()
  31. //data.GetSmmIndex()
  32. //data.GetSmmIndexData()
  33. go AutoInsertLogToDB()
  34. //手工数据表格导入后的指标库刷新
  35. go ImportManualDataRefresh()
  36. // 加入上海钢联指标数据之后的刷新
  37. go MysteelChemicalDataAdd()
  38. //修复用户关注标识
  39. //GetWxUsersSubscribe()
  40. go AutoInsertAdminOperateRecordToDB()
  41. // 指标刷新
  42. go data.HandleEdbRefreshQueue()
  43. // 进行指标替换操作
  44. go DealReplaceEdbCache()
  45. // TODO:监听这里需要找下达梦的方案,主流程先跑起来再处理
  46. // 监听binlog
  47. if utils.MYSQL_DATA_BINLOG_URL != "" && utils.DbDriverName == `mysql` {
  48. go binlog.ListenMysql()
  49. go edbmonitor.HandleEdbMonitorEdbInfo()
  50. // 监听数据源binlog写入es
  51. go binlog.HandleDataSourceChange2Es()
  52. }
  53. go StartSessionManager()
  54. go llm.SaveAllChatRecordsToDB()
  55. // 定时任务进行微信文章操作
  56. // 定时任务进行微信公众号添加、文章刷新操作(暂不处理了,改成其他服务处理)
  57. //go HandleWechatArticleOp()
  58. // 定时任务进行微信文章LLM操作
  59. go HandleWechatArticleLLmOp()
  60. // 队列任务将eta报告同步到知识库操作
  61. go HandleEtaReportUpdateOp()
  62. // 定时任务进行eta报告进行LLM操作
  63. go HandleEtaReportKnowledgeLLmOp()
  64. // 定时任务进行进行AI报告/文章的摘要任务LLM操作
  65. go HandleAiArticleAbstractLlmOp()
  66. // 权益报告监听入库
  67. go AutoInsertRaiReport()
  68. go llm.DealHistoryArticleDafTags()
  69. // 指标预警信息发送
  70. //go edbmonitor.AutoCheckMonitorMessageList()
  71. // 巡检信息发送
  72. go AutoCheckInspectionMessageList()
  73. // TODO:数据修复
  74. //FixNewEs()
  75. fmt.Println("task end")
  76. }
  77. //// 每日发布晨报
  78. //func AutoPublishDayReport() {
  79. // defer func() {
  80. // if err := recover(); err != nil {
  81. // fmt.Println("[AutoPublishDayReport]", err)
  82. // }
  83. // }()
  84. //
  85. // // 每日8:42发布晨报
  86. // ticker := time.Tick(50 * time.Second)
  87. // for range ticker {
  88. // nowTime := time.Now()
  89. // clock := nowTime.Format("1504")
  90. // if clock == "0842" {
  91. // if err := PublishTodayDayReport(); err != nil {
  92. // go alarm_msg.SendAlarmMsg(fmt.Sprint("每日晨报自动发送 AutoPublishDayReport ERR:", err), 3)
  93. // //utils.SendEmail(utils.APPNAME+" "+utils.RunMode+" 失败提醒", fmt.Sprint("AutoPublishDayReport ERR:", err), utils.EmailSendToUsers)
  94. // }
  95. // }
  96. // }
  97. //}
  98. // ImportManualDataRefresh 导入手工数据后的刷新
  99. func ImportManualDataRefresh() {
  100. defer func() {
  101. if err := recover(); err != nil {
  102. fmt.Println("[ImportManualDataRefresh]", err)
  103. }
  104. }()
  105. for {
  106. utils.Rc.Brpop(utils.CACHE_IMPORT_MANUAL_DATA, func(b []byte) {
  107. edbCode := string(b)
  108. edbCode = strings.TrimPrefix(edbCode, `"`)
  109. edbCode = strings.TrimSuffix(edbCode, `"`)
  110. data.RefreshManualData(edbCode)
  111. })
  112. }
  113. }
  114. // MysteelChemicalDataAdd 加入上海钢联指标数据之后的刷新
  115. func MysteelChemicalDataAdd() {
  116. defer func() {
  117. if err := recover(); err != nil {
  118. fmt.Println("[ImportManualDataRefresh]", err)
  119. }
  120. }()
  121. for {
  122. utils.Rc.Brpop(utils.CACHE_MYSTEEL_CHEMICAL_ADD_DATA, func(b []byte) {
  123. edbCode := string(b)
  124. edbCode = strings.TrimPrefix(edbCode, `"`)
  125. edbCode = strings.TrimSuffix(edbCode, `"`)
  126. data.RefreshMysteelChemicalData(edbCode)
  127. })
  128. }
  129. }
  130. //func init() {
  131. // fmt.Println("start task init")
  132. // UpdateEnglishEmailLogErrMsg()
  133. // fmt.Println("end task init")
  134. //}
  135. //
  136. //// UpdateEnglishEmailLogErrMsg 更新英文邮件日志的ErrMsg(研报后台4.2上线后执行, 仅一次)
  137. //func UpdateEnglishEmailLogErrMsg() {
  138. // var cond string
  139. // var pars []interface{}
  140. // list, e := models.GetEnglishReportEmailLogList(cond, pars)
  141. // if e != nil {
  142. // fmt.Println("获取日志列表失败")
  143. // return
  144. // }
  145. // for _, v := range list {
  146. // if v.SendStatus != 0 || v.Source != 1 || v.Result == "" || v.ErrMsg != "" {
  147. // continue
  148. // }
  149. // // 取出错误信息
  150. // fmt.Printf("正在更新%d\n", v.Id)
  151. // r := new(AliyunEmailResult)
  152. // if e = json.Unmarshal([]byte(v.Result), &r); e != nil {
  153. // fmt.Println("JSON解析报错了1" + e.Error())
  154. // continue
  155. // }
  156. // rd := new(AliyunEmailResultData)
  157. // res := strings.Replace(r.Data, `\`, ``, -1)
  158. // if e = json.Unmarshal([]byte(res), &rd); e != nil {
  159. // fmt.Println("JSON解析报错了2" + e.Error())
  160. // continue
  161. // }
  162. // v.ErrMsg = rd.Message
  163. // if e = v.Update([]string{"ErrMsg"}); e != nil {
  164. // fmt.Println("更新失败了" + e.Error())
  165. // continue
  166. // }
  167. // }
  168. // fmt.Println("更新成功")
  169. //}
  170. //func FixCompanyUpdateData1() {
  171. // list, err := company.GetCompanyProductUpdateLogList()
  172. // if err != nil {
  173. // fmt.Println("获取客户变更数据失败:", err)
  174. // return
  175. // }
  176. //
  177. // nowTime := time.Now()
  178. // for _, v := range list {
  179. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  180. // //if tmpErr != nil {
  181. // // fmt.Println(v.Id, "找数据,", tmpErr)
  182. // // continue
  183. // //}
  184. //
  185. // //permissionList := make([]*company.CompanyReportPermission, 0)
  186. // //switch v.Source {
  187. // //case "add", "receive", "thaw", "delay", "apply_receive":
  188. // //
  189. // //}
  190. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  191. // if err != nil {
  192. // fmt.Println("err:", err)
  193. // continue
  194. // }
  195. //
  196. // startDate := v.CreateTime
  197. // endDate := v.CreateTime.AddDate(0, 2, 0)
  198. // isStop := 1
  199. // realEndDate := endDate
  200. // if realEndDate.After(nowTime) {
  201. // realEndDate = nowTime
  202. // isStop = 0
  203. // }
  204. // for _, permission := range permissionList {
  205. // permission.StartDate = startDate.Format(utils.FormatDate)
  206. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  207. // }
  208. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  209. // Id: 0,
  210. // CompanyId: v.CompanyId,
  211. // ProductId: v.ProductId,
  212. // SellerId: v.SellerId,
  213. // SellerName: v.SellerName,
  214. // Source: v.Source,
  215. // StartDate: startDate,
  216. // EndDate: endDate,
  217. // RealEndDate: realEndDate,
  218. // IsStop: isStop,
  219. // CreateTime: v.CreateTime,
  220. // }
  221. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  222. // }
  223. // //fmt.Println("结束")
  224. //}
  225. //func FixCompanyUpdateData2() {
  226. // list, err := company.GetTryOutCompanyOperationRecordList()
  227. // if err != nil {
  228. // fmt.Println("获取客户变更数据失败:", err)
  229. // return
  230. // }
  231. //
  232. // nowTime := time.Now()
  233. // for _, v := range list {
  234. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  235. // //if tmpErr != nil {
  236. // // fmt.Println(v.Id, "找数据,", tmpErr)
  237. // // continue
  238. // //}
  239. //
  240. // //permissionList := make([]*company.CompanyReportPermission, 0)
  241. // //switch v.Source {
  242. // //case "add", "receive", "thaw", "delay", "apply_receive":
  243. // //
  244. // //}
  245. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  246. // if err != nil {
  247. // fmt.Println("err:", err)
  248. // continue
  249. // }
  250. //
  251. // startDate := v.CreateTime
  252. // endDate := v.CreateTime.AddDate(0, 2, 0)
  253. // isStop := 1
  254. // realEndDate := endDate
  255. // if realEndDate.After(nowTime) {
  256. // realEndDate = nowTime
  257. // isStop = 0
  258. // }
  259. // for _, permission := range permissionList {
  260. // permission.StartDate = startDate.Format(utils.FormatDate)
  261. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  262. // }
  263. // sellerName := ``
  264. // {
  265. // sysUser, _ := system.GetSysAdminById(v.SellerId)
  266. // if sysUser != nil {
  267. // sellerName = sysUser.RealName
  268. // }
  269. // }
  270. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  271. // Id: 0,
  272. // CompanyId: v.CompanyId,
  273. // ProductId: v.ProductId,
  274. // SellerId: v.SellerId,
  275. // SellerName: sellerName,
  276. // Source: "formal_to_try_out",
  277. // StartDate: startDate,
  278. // EndDate: endDate,
  279. // RealEndDate: realEndDate,
  280. // IsStop: isStop,
  281. // CreateTime: v.CreateTime,
  282. // }
  283. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  284. // }
  285. // fmt.Println("结束")
  286. //}
  287. // FixCompanyTryDay 修复试用天数
  288. //func FixCompanyTryDay() {
  289. // list, err := company.GetCompanyProductTryOutUpdateGroup()
  290. // if err != nil {
  291. // fmt.Println("获取客户变更数据失败:", err)
  292. // return
  293. // }
  294. //
  295. // lenList := len(list)
  296. // for k, v := range list {
  297. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  298. // companyProduct, tmpErr := company.GetCompanyProductByCompanyIdAndProductId(v.CompanyId, v.ProductId)
  299. // if tmpErr != nil {
  300. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";找不到对应的客户,Err:", tmpErr)
  301. // continue
  302. // }
  303. //
  304. // logList, err := company.GetCompanyProductTryOutUpdateList(v.CompanyId, v.ProductId)
  305. // if err != nil {
  306. // fmt.Println("查找客户日志失败,err:", err)
  307. // continue
  308. // }
  309. //
  310. // //lenLog := len(logList)
  311. // var day int //实际试用天数
  312. // var endDate time.Time
  313. // for _, log := range logList {
  314. // startDate := log.StartDate
  315. // if endDate.IsZero() {
  316. // endDate = log.RealEndDate
  317. // day = utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  318. // } else {
  319. // if log.RealEndDate.After(endDate) {
  320. // if endDate.After(startDate) {
  321. // startDate = endDate
  322. // }
  323. // if startDate.Equal(log.EndDate) {
  324. // day += utils.GetTimeSubDay(startDate, log.RealEndDate)
  325. // } else {
  326. // day += utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  327. // }
  328. // endDate = log.RealEndDate
  329. // }
  330. // }
  331. // }
  332. // companyProduct.TryOutDayTotal = day
  333. // companyProduct.Update([]string{"TryOutDayTotal"})
  334. // }
  335. // fmt.Println("结束")
  336. //}
  337. // StaticCompanyTryDay 定时任务每天更新试用天数
  338. //func StaticCompanyTryDay() {
  339. // list, err := company.GetCompanyProductTryOutUpdateNoStopGroup()
  340. // if err != nil {
  341. // fmt.Println("获取客户变更数据失败:", err)
  342. // return
  343. // }
  344. //
  345. // lenList := len(list)
  346. // for k, v := range list {
  347. // isAdd := false //是否要增加一天,默认不加
  348. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  349. // permissionList, tmpErr := company.GetCompanyReportPermissionByStatus(v.CompanyId, v.ProductId, "试用")
  350. // if tmpErr == nil {
  351. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";寻找对应的试用品种失败,Err:", tmpErr)
  352. // continue
  353. // }
  354. // currPermissionIdList := make([]int, 0) //当前试用的品种
  355. // for _, permission := range permissionList {
  356. // currPermissionIdList = append(currPermissionIdList, permission.ChartPermissionId)
  357. // }
  358. //
  359. // //获取所有未停止的记录列表
  360. // logList, err := company.GetCompanyProductTryOutUpdateNoStopListByEndDate(v.CompanyId, v.ProductId)
  361. // if err != nil {
  362. // fmt.Println("查找客户日志失败,err:", err)
  363. // continue
  364. // }
  365. //
  366. // logPermissionIdList := make([]int, 0) //当前日志中的试用的品种
  367. // for _, log := range logList {
  368. // //获取所有未停止的品种试用记录列表
  369. // logPermissionList, tmpErr := company.GetCompanyProductTryOutPermissionUpdateNoStopListByEndDate(log.Id)
  370. // if tmpErr != nil {
  371. // fmt.Println("查找客户品种变更日志失败,err:", tmpErr)
  372. // continue
  373. // }
  374. // lenLogPermissionList := len(logPermissionList) //当前日志存在试用的品种数量
  375. // stopPermission := 0 //当前日志需要停止的品种数量
  376. //
  377. // currTime := time.Now() //当前时间
  378. // for _, logPermission := range logPermissionList {
  379. // if utils.InArrayByInt(logPermissionIdList, logPermission.ChartPermissionId) {
  380. // // 如果已经被其他记录使用了,那么就将当前记录给标记停止
  381. // logPermission.IsStop = 1
  382. //
  383. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  384. // if currTime.After(logPermission.EndDate) {
  385. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  386. // } else {
  387. // logPermission.RealEndDate = time.Now()
  388. // }
  389. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  390. // stopPermission++
  391. // continue
  392. // } else if !utils.InArrayByInt(currPermissionIdList, logPermission.ChartPermissionId) {
  393. // // 如果该品种不在当前客户的品种里面,那么也要将当前记录给标记停止
  394. // logPermission.IsStop = 1
  395. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  396. // if currTime.After(logPermission.EndDate) {
  397. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  398. // } else {
  399. // logPermission.RealEndDate = time.Now()
  400. // }
  401. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  402. // stopPermission++
  403. // continue
  404. // }
  405. //
  406. // // 剩下的说明还处于试用状态,需要添加1天试用期,且需要把该品种加入到当前日志中的试用的品种列表
  407. // isAdd = true
  408. // logPermissionIdList = append(logPermissionIdList, logPermission.ChartPermissionId)
  409. // }
  410. //
  411. // //如果当前日志存在试用的品种数量 == 当前日志需要停止的品种数量
  412. // // 那么当前日志也是处于停用状态
  413. // if lenLogPermissionList == stopPermission {
  414. // log.IsStop = 1
  415. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  416. // if currTime.After(log.EndDate) {
  417. // log.RealEndDate = currTime.AddDate(0, 0, -1)
  418. // } else {
  419. // log.RealEndDate = time.Now()
  420. // }
  421. // log.Update([]string{"IsStop", "RealEndDate"})
  422. // }
  423. //
  424. // }
  425. //
  426. // // 如果需要添加,那么将该客户品种添加1天
  427. // if isAdd {
  428. //
  429. // }
  430. // // 更新客户产品的试用天数
  431. // tmpErr = company.AddCompanyProductTryOutDayTotal(v.CompanyId, v.ProductId)
  432. // if tmpErr != nil {
  433. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";更新客户产品的试用天数,Err:", tmpErr)
  434. // continue
  435. // }
  436. // }
  437. // fmt.Println("结束")
  438. //}
  439. //func Task2() {
  440. // fmt.Println("task start")
  441. //
  442. // //_, _ = maycur.GetPublicOfferingSaleLeader()
  443. // //_ = maycur.TestSyncCompanyProfile()
  444. // //_ = maycur.SyncCompanyProfile()
  445. // //_ = maycur.ImportExcelEmployeeId()
  446. // //_ = maycur.ApiTest()
  447. // //_ = maycur.ApiTest2()
  448. // fmt.Println("task end")
  449. //}
  450. // FixEnCompanyPermission 英文权限上线时修复英文客户拥有所有权限(一次性)
  451. func FixEnCompanyPermission() {
  452. var err error
  453. defer func() {
  454. if err != nil {
  455. fmt.Println("FixEnCompanyPermission Err: ", err.Error())
  456. }
  457. }()
  458. // 获取正式客户
  459. companies := make([]*models.EnglishCompany, 0)
  460. {
  461. cond := ` AND status = ?`
  462. pars := make([]interface{}, 0)
  463. pars = append(pars, 1)
  464. list, e := models.GetEnglishCompanyList(cond, pars, "")
  465. if e != nil {
  466. err = fmt.Errorf("GetEnglishCompanyList err: %s", e.Error())
  467. return
  468. }
  469. companies = list
  470. }
  471. // 获取所有权限
  472. permissions := make([]*models.EnPermission, 0)
  473. {
  474. cond := ` AND parent_id > ?`
  475. pars := make([]interface{}, 0)
  476. pars = append(pars, 0)
  477. ob := new(models.EnPermission)
  478. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  479. if e != nil {
  480. err = fmt.Errorf("GetPermissionItemsByCondition err: %s", e.Error())
  481. return
  482. }
  483. permissions = list
  484. }
  485. for _, c := range companies {
  486. ps := make([]*models.EnCompanyPermission, 0)
  487. for _, p := range permissions {
  488. ps = append(ps, &models.EnCompanyPermission{
  489. EnCompanyId: c.CompanyId,
  490. EnPermissionId: p.EnPermissionId,
  491. CreateTime: time.Now().Local(),
  492. })
  493. }
  494. if e := models.ClearAndCreateEnCompanyPermissions(c.CompanyId, ps); e != nil {
  495. err = fmt.Errorf("ClearAndCreateEnCompanyPermissions err: %s", e.Error())
  496. return
  497. }
  498. }
  499. fmt.Println("修复完成")
  500. }
  501. // ModifyEsEnglishReport 批量修改es里的英文研报信息和线上路演信息
  502. func ModifyEsEnglishReport() {
  503. fmt.Println("开始")
  504. err := ModifyAllEsEnglishReportVideo()
  505. if err != nil {
  506. err = fmt.Errorf("重置es中的英文研报信息失败:ModifyAllEnglishReportInEs err: %s", err.Error())
  507. return
  508. }
  509. fmt.Println("结束")
  510. }
  511. //func InsertBloombergIndex() {
  512. // fmt.Println("开始写入")
  513. //
  514. // start := 100000
  515. // now := time.Now()
  516. // for i := 1; i <= 100; i++ {
  517. // fmt.Printf("写入第%d个\n", i)
  518. //
  519. // start += 1
  520. // index := new(data_manage.BaseFromBloombergIndex)
  521. // index.IndexCode = fmt.Sprintf("BLID%d", start)
  522. // index.IndexName = fmt.Sprintf("模拟Bloomberg-%s", index.IndexCode)
  523. // index.Unit = "无"
  524. // index.Source = utils.DATA_SOURCE_BLOOMBERG
  525. // index.Frequency = "日度"
  526. // index.StartDate = now.AddDate(0, 0, -i)
  527. // index.EndDate = now
  528. // index.CreateTime = time.Now().Local()
  529. // index.ModifyTime = time.Now().Local()
  530. // if e := index.Create(); e != nil {
  531. // fmt.Printf("新增指标失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  532. // return
  533. // }
  534. //
  535. // insertData := make([]*data_manage.BaseFromBloombergData, 0)
  536. // for ii := 0; ii <= 50; ii++ {
  537. // indexData := new(data_manage.BaseFromBloombergData)
  538. // indexData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  539. // indexData.IndexCode = index.IndexCode
  540. // indexData.DataTime = now.AddDate(0, 0, -ii)
  541. // va := GenerateRandomFloat64InRange()
  542. // va += float64(ii)
  543. // indexData.Value = va
  544. // indexData.CreateTime = time.Now().Local()
  545. // indexData.ModifyTime = time.Now().Local()
  546. // indexData.DataTimestamp = int(indexData.DataTime.UnixNano() / 1e6)
  547. // insertData = append(insertData, indexData)
  548. // }
  549. // ob := new(data_manage.BaseFromBloombergData)
  550. // if e := ob.CreateMulti(insertData); e != nil {
  551. // fmt.Printf("新增指标数据失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  552. // return
  553. // }
  554. // }
  555. //
  556. // fmt.Println("结束写入")
  557. //}
  558. //
  559. //func GenerateRandomFloat64InRange() float64 {
  560. // var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) // 设置随机数种子
  561. //
  562. // return rnd.Float64()*11000 - 1000
  563. //}
  564. // HandleWechatArticleOp
  565. // @Description: 处理ETA报告加入到知识库
  566. func HandleWechatArticleOp() {
  567. defer func() {
  568. if err := recover(); err != nil {
  569. fmt.Println("[HandleWechatArticleOp]", err)
  570. }
  571. }()
  572. obj := rag.WechatPlatform{}
  573. for {
  574. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
  575. wechatArticleOp := new(cache.WechatPlatformOp)
  576. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  577. fmt.Println("json unmarshal wrong!")
  578. return
  579. }
  580. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  581. if tmpErr != nil {
  582. // 找不到就处理失败
  583. return
  584. }
  585. switch wechatArticleOp.Source {
  586. case `add`:
  587. AddWechatPlatform(item)
  588. case `refresh`:
  589. BeachAddWechatArticle(item, 2)
  590. }
  591. })
  592. }
  593. }
  594. // HandleWechatArticleLLmOp
  595. // @Description: 处理微信文章加入知识库
  596. func HandleWechatArticleLLmOp() {
  597. defer func() {
  598. if err := recover(); err != nil {
  599. fmt.Println("[HandleWechatArticleLLmOp]", err)
  600. }
  601. }()
  602. for {
  603. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE_KNOWLEDGE, handleWechatArticleLLmOp)
  604. }
  605. }
  606. // handleWechatArticleLLmOp
  607. // @Description: 处理微信文章加入知识库
  608. // @author: Roc
  609. // @datetime 2025-04-24 13:33:08
  610. // @param b []byte
  611. func handleWechatArticleLLmOp(b []byte) {
  612. var err error
  613. defer func() {
  614. if err != nil {
  615. utils.FileLog.Error("[handleWechatArticleLLmOp] params:%s;err:%s", string(b), err.Error())
  616. }
  617. }()
  618. obj := rag.WechatArticle{}
  619. wechatArticleOp := new(cache.WechatArticleOp)
  620. if err = json.Unmarshal(b, &wechatArticleOp); err != nil {
  621. fmt.Println("json unmarshal wrong!")
  622. return
  623. }
  624. item, err := obj.GetById(wechatArticleOp.WechatArticleId)
  625. if err != nil {
  626. // 找不到就处理失败
  627. return
  628. }
  629. // 文章加入到知识库
  630. ArticleToKnowledge(item)
  631. // 生成摘要
  632. if wechatArticleOp.QuestionId <= 0 {
  633. // 全部摘要生成
  634. GenerateWechatArticleAbstract(item, false)
  635. } else {
  636. questionObj := rag.Question{}
  637. questionInfo, tmpErr := questionObj.GetByID(wechatArticleOp.QuestionId)
  638. if tmpErr != nil {
  639. err = tmpErr
  640. return
  641. }
  642. // 指定指定摘要生成
  643. err = GenerateWechatArticleAbstractByQuestion(item, questionInfo, false)
  644. }
  645. }
  646. // HandleEtaReportUpdateOp
  647. // @Description: 处理eta报告加入知识库操作
  648. func HandleEtaReportUpdateOp() {
  649. defer func() {
  650. if err := recover(); err != nil {
  651. fmt.Println("[HandleEtaReportUpdateOp]", err)
  652. }
  653. }()
  654. for {
  655. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE, func(b []byte) {
  656. ragEtaReportOpOp := new(cache.RagEtaReportOp)
  657. if err := json.Unmarshal(b, &ragEtaReportOpOp); err != nil {
  658. fmt.Println("json unmarshal wrong!")
  659. return
  660. }
  661. switch ragEtaReportOpOp.Source {
  662. case `publish`:
  663. ReportAddOrModifyKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  664. case `un_publish`:
  665. ReportUnPublishedKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  666. }
  667. })
  668. }
  669. }
  670. // HandleEtaReportKnowledgeLLmOp
  671. // @Description: 处理微信文章加入知识库
  672. func HandleEtaReportKnowledgeLLmOp() {
  673. defer func() {
  674. if err := recover(); err != nil {
  675. fmt.Println("[HandleEtaReportKnowledgeLLmOp]", err)
  676. }
  677. }()
  678. for {
  679. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE_LLM, handleEtaReportKnowledgeLLmOp)
  680. }
  681. }
  682. // handleEtaReportKnowledgeLLmOp
  683. // @Description: 处理微信文章加入知识库操作
  684. // @author: Roc
  685. // @datetime 2025-04-24 14:04:10
  686. // @param b []byte
  687. func handleEtaReportKnowledgeLLmOp(b []byte) {
  688. var err error
  689. defer func() {
  690. if err != nil {
  691. utils.FileLog.Error("[handleEtaReportKnowledgeLLmOp] params:%s;err:%s", string(b), err.Error())
  692. }
  693. }()
  694. obj := rag.RagEtaReport{}
  695. wechatArticleOp := new(cache.RagEtaReportLlmOp)
  696. if err = json.Unmarshal(b, &wechatArticleOp); err != nil {
  697. fmt.Println("json unmarshal wrong!")
  698. return
  699. }
  700. item, err := obj.GetById(wechatArticleOp.RagEtaReportId)
  701. if err != nil {
  702. // 找不到就处理失败
  703. return
  704. }
  705. // 已经删除的就不做操作了
  706. if item.IsDeleted == 1 {
  707. return
  708. }
  709. // 未发布的就不操作了
  710. if item.IsPublished != 1 {
  711. return
  712. }
  713. // 文章加入到知识库
  714. //ArticleToKnowledge(item)
  715. // 生成摘要
  716. if wechatArticleOp.QuestionId <= 0 {
  717. // 全部提示词摘要生成
  718. GenerateRagEtaReportAbstract(item, wechatArticleOp.ForceGenerate)
  719. } else {
  720. questionObj := rag.Question{}
  721. questionInfo, tmpErr := questionObj.GetByID(wechatArticleOp.QuestionId)
  722. if tmpErr != nil {
  723. err = tmpErr
  724. return
  725. }
  726. // 全部提示词摘要生成
  727. err = GenerateRagEtaReportAbstractByQuestion(item, questionInfo, wechatArticleOp.ForceGenerate)
  728. }
  729. }
  730. // HandleAiArticleAbstractLlmOp
  731. // @Description: 处理AI库的报告摘要生成(批量任务)
  732. // @author: Roc
  733. // @datetime 2025-04-24 10:25:51
  734. func HandleAiArticleAbstractLlmOp() {
  735. defer func() {
  736. if err := recover(); err != nil {
  737. fmt.Println("[HandleAiArticleAbstractLlmOp]", err)
  738. }
  739. }()
  740. for {
  741. utils.Rc.Brpop(utils.CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK, handleAiArticleAbstractLlmOp)
  742. }
  743. }
  744. var aiTaskHandleIdMap = map[int]bool{}
  745. // todo 任务开始时间
  746. // handleAiArticleAbstractLlmOp
  747. // @Description: 处理AI库的报告摘要生成(批量任务)
  748. // @author: Roc
  749. // @datetime 2025-04-24 11:26:05
  750. // @param b []byte
  751. func handleAiArticleAbstractLlmOp(b []byte) {
  752. var err error
  753. defer func() {
  754. if err != nil {
  755. utils.FileLog.Error("[handleAiArticleAbstractLlmOp] params:%s;err:%s", string(b), err.Error())
  756. }
  757. }()
  758. obj := rag.AiTaskRecord{}
  759. aiTaskRecordOp := new(cache.AiTaskRecordOp)
  760. if err = json.Unmarshal(b, &aiTaskRecordOp); err != nil {
  761. fmt.Println("json unmarshal wrong!")
  762. return
  763. }
  764. item, err := obj.GetByID(aiTaskRecordOp.AiTaskRecordId)
  765. if err != nil {
  766. err = fmt.Errorf("查找任务记录状态失败, err: %s", err.Error())
  767. return
  768. }
  769. // 如果没有处理过该任务,那么就标记该任务开始
  770. if _, ok := aiTaskHandleIdMap[item.AiTaskID]; !ok {
  771. aiTaskObj := rag.AiTask{}
  772. aiTaskInfo, tmpErr := aiTaskObj.GetByID(item.AiTaskID)
  773. if tmpErr != nil {
  774. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  775. return
  776. }
  777. // 如果任务是初始化,那么就标记开始
  778. if aiTaskInfo.Status == `init` {
  779. aiTaskInfo.StartTime = time.Now()
  780. aiTaskInfo.Status = `processing`
  781. aiTaskInfo.UpdateTime = time.Now()
  782. tmpErr = aiTaskInfo.Update([]string{`start_time`, "status", "update_time"})
  783. if tmpErr != nil {
  784. utils.FileLog.Error("标记任务开始状态失败, err: %s", tmpErr.Error())
  785. }
  786. }
  787. }
  788. // 处理完成后标记任务状态
  789. defer func() {
  790. // 修改任务状态
  791. todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, rag.AiTaskColumns.AiTaskID, rag.AiTaskColumns.Status), []interface{}{item.AiTaskID, `待处理`})
  792. if tmpErr != nil {
  793. err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
  794. return
  795. }
  796. if todoCount <= 0 {
  797. aiTaskObj := rag.AiTask{}
  798. aiTaskInfo, tmpErr := aiTaskObj.GetByID(item.AiTaskID)
  799. if tmpErr != nil {
  800. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  801. return
  802. }
  803. aiTaskInfo.EndTime = time.Now()
  804. aiTaskInfo.Status = `done`
  805. aiTaskInfo.UpdateTime = time.Now()
  806. tmpErr = aiTaskInfo.Update([]string{`end_time`, "status", "update_time"})
  807. if tmpErr != nil {
  808. utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
  809. }
  810. }
  811. return
  812. }()
  813. // 不是待处理就不处理
  814. if item.Status != `待处理` {
  815. return
  816. }
  817. // 处理完成后标记记录状态
  818. defer func() {
  819. status := `处理成功`
  820. remark := ``
  821. if err != nil {
  822. status = `处理失败`
  823. remark = err.Error()
  824. }
  825. item.Status = status
  826. item.Remark = remark
  827. item.ModifyTime = time.Now()
  828. tmpErr := item.Update([]string{"status", "remark", "modify_time"})
  829. if tmpErr != nil {
  830. utils.FileLog.Error("标记任务记录状态失败, err: %s", tmpErr.Error())
  831. }
  832. }()
  833. var params rag.QuestionGenerateAbstractParam
  834. if err = json.Unmarshal([]byte(item.Parameters), &params); err != nil {
  835. fmt.Println("json unmarshal wrong!")
  836. return
  837. }
  838. // 查找提示词
  839. questionObj := rag.Question{}
  840. questionInfo, tmpErr := questionObj.GetByID(params.QuestionId)
  841. if tmpErr != nil {
  842. // 找不到就处理失败
  843. err = fmt.Errorf("查找提示词失败, err: %s", err.Error())
  844. return
  845. }
  846. switch params.ArticleType {
  847. case `wechat_article`:
  848. articleObj := rag.WechatArticle{}
  849. articleInfo, tmpErr := articleObj.GetById(params.ArticleId)
  850. if tmpErr != nil {
  851. err = tmpErr
  852. // 找不到就处理失败
  853. return
  854. }
  855. // 生成摘要
  856. err = GenerateWechatArticleAbstractByQuestion(articleInfo, questionInfo, true)
  857. case `rag_eta_report`:
  858. articleObj := rag.RagEtaReport{}
  859. articleInfo, tmpErr := articleObj.GetById(params.ArticleId)
  860. if tmpErr != nil {
  861. err = tmpErr
  862. // 找不到就处理失败
  863. return
  864. }
  865. // 已经删除的就不做操作了
  866. if articleInfo.IsDeleted == 1 {
  867. return
  868. }
  869. // 未发布的就不操作了
  870. if articleInfo.IsPublished != 1 {
  871. return
  872. }
  873. // 生成摘要
  874. err = GenerateRagEtaReportAbstractByQuestion(articleInfo, questionInfo, true)
  875. }
  876. }