task.go 19 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/models"
  5. "eta/eta_api/models/rag"
  6. "eta/eta_api/services/binlog"
  7. "eta/eta_api/services/data"
  8. edbmonitor "eta/eta_api/services/edb_monitor"
  9. "eta/eta_api/services/llm"
  10. "eta/eta_api/utils"
  11. "fmt"
  12. "strings"
  13. "time"
  14. )
  15. func Task() {
  16. fmt.Println("task start")
  17. {
  18. // 修复客户试用数据
  19. //FixCompanyUpdateData()
  20. //FixCompanyTryDay()
  21. //FixCompanyTryDay()
  22. //StaticCompanyTryDay()
  23. //return
  24. }
  25. //FixPermissionStatus()
  26. //GetCompanyInfo()
  27. //ReportCount()
  28. //windSourceUrl:=`http://47.100.166.55:7002/edbInfo/wind/?EdbCode=M0001427&StartDate=2020-11-01&EndDate=2021-03-01`
  29. //data.AddAllArticle()
  30. //data.GetSmmIndex()
  31. //data.GetSmmIndexData()
  32. go AutoInsertLogToDB()
  33. //手工数据表格导入后的指标库刷新
  34. go ImportManualDataRefresh()
  35. // 加入上海钢联指标数据之后的刷新
  36. go MysteelChemicalDataAdd()
  37. //修复用户关注标识
  38. //GetWxUsersSubscribe()
  39. go AutoInsertAdminOperateRecordToDB()
  40. // 指标刷新
  41. go data.HandleEdbRefreshQueue()
  42. // 进行指标替换操作
  43. go DealReplaceEdbCache()
  44. // TODO:监听这里需要找下达梦的方案,主流程先跑起来再处理
  45. // 监听binlog
  46. if utils.MYSQL_DATA_BINLOG_URL != "" && utils.DbDriverName == `mysql` {
  47. go binlog.ListenMysql()
  48. go edbmonitor.HandleEdbMonitorEdbInfo()
  49. // 监听数据源binlog写入es
  50. go binlog.HandleDataSourceChange2Es()
  51. }
  52. go StartSessionManager()
  53. go llm.SaveAllChatRecordsToDB()
  54. // 定时任务进行微信文章操作
  55. go HandleWechatArticleOp()
  56. // 定时任务进行微信文章LLM操作
  57. go HandleWechatArticleLLmOp()
  58. // TODO:数据修复
  59. //FixNewEs()
  60. fmt.Println("task end")
  61. }
  62. //// 每日发布晨报
  63. //func AutoPublishDayReport() {
  64. // defer func() {
  65. // if err := recover(); err != nil {
  66. // fmt.Println("[AutoPublishDayReport]", err)
  67. // }
  68. // }()
  69. //
  70. // // 每日8:42发布晨报
  71. // ticker := time.Tick(50 * time.Second)
  72. // for range ticker {
  73. // nowTime := time.Now()
  74. // clock := nowTime.Format("1504")
  75. // if clock == "0842" {
  76. // if err := PublishTodayDayReport(); err != nil {
  77. // go alarm_msg.SendAlarmMsg(fmt.Sprint("每日晨报自动发送 AutoPublishDayReport ERR:", err), 3)
  78. // //utils.SendEmail(utils.APPNAME+" "+utils.RunMode+" 失败提醒", fmt.Sprint("AutoPublishDayReport ERR:", err), utils.EmailSendToUsers)
  79. // }
  80. // }
  81. // }
  82. //}
  83. // ImportManualDataRefresh 导入手工数据后的刷新
  84. func ImportManualDataRefresh() {
  85. defer func() {
  86. if err := recover(); err != nil {
  87. fmt.Println("[ImportManualDataRefresh]", err)
  88. }
  89. }()
  90. for {
  91. utils.Rc.Brpop(utils.CACHE_IMPORT_MANUAL_DATA, func(b []byte) {
  92. edbCode := string(b)
  93. edbCode = strings.TrimPrefix(edbCode, `"`)
  94. edbCode = strings.TrimSuffix(edbCode, `"`)
  95. data.RefreshManualData(edbCode)
  96. })
  97. }
  98. }
  99. // MysteelChemicalDataAdd 加入上海钢联指标数据之后的刷新
  100. func MysteelChemicalDataAdd() {
  101. defer func() {
  102. if err := recover(); err != nil {
  103. fmt.Println("[ImportManualDataRefresh]", err)
  104. }
  105. }()
  106. for {
  107. utils.Rc.Brpop(utils.CACHE_MYSTEEL_CHEMICAL_ADD_DATA, func(b []byte) {
  108. edbCode := string(b)
  109. edbCode = strings.TrimPrefix(edbCode, `"`)
  110. edbCode = strings.TrimSuffix(edbCode, `"`)
  111. data.RefreshMysteelChemicalData(edbCode)
  112. })
  113. }
  114. }
  115. //func init() {
  116. // fmt.Println("start task init")
  117. // UpdateEnglishEmailLogErrMsg()
  118. // fmt.Println("end task init")
  119. //}
  120. //
  121. //// UpdateEnglishEmailLogErrMsg 更新英文邮件日志的ErrMsg(研报后台4.2上线后执行, 仅一次)
  122. //func UpdateEnglishEmailLogErrMsg() {
  123. // var cond string
  124. // var pars []interface{}
  125. // list, e := models.GetEnglishReportEmailLogList(cond, pars)
  126. // if e != nil {
  127. // fmt.Println("获取日志列表失败")
  128. // return
  129. // }
  130. // for _, v := range list {
  131. // if v.SendStatus != 0 || v.Source != 1 || v.Result == "" || v.ErrMsg != "" {
  132. // continue
  133. // }
  134. // // 取出错误信息
  135. // fmt.Printf("正在更新%d\n", v.Id)
  136. // r := new(AliyunEmailResult)
  137. // if e = json.Unmarshal([]byte(v.Result), &r); e != nil {
  138. // fmt.Println("JSON解析报错了1" + e.Error())
  139. // continue
  140. // }
  141. // rd := new(AliyunEmailResultData)
  142. // res := strings.Replace(r.Data, `\`, ``, -1)
  143. // if e = json.Unmarshal([]byte(res), &rd); e != nil {
  144. // fmt.Println("JSON解析报错了2" + e.Error())
  145. // continue
  146. // }
  147. // v.ErrMsg = rd.Message
  148. // if e = v.Update([]string{"ErrMsg"}); e != nil {
  149. // fmt.Println("更新失败了" + e.Error())
  150. // continue
  151. // }
  152. // }
  153. // fmt.Println("更新成功")
  154. //}
  155. //func FixCompanyUpdateData1() {
  156. // list, err := company.GetCompanyProductUpdateLogList()
  157. // if err != nil {
  158. // fmt.Println("获取客户变更数据失败:", err)
  159. // return
  160. // }
  161. //
  162. // nowTime := time.Now()
  163. // for _, v := range list {
  164. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  165. // //if tmpErr != nil {
  166. // // fmt.Println(v.Id, "找数据,", tmpErr)
  167. // // continue
  168. // //}
  169. //
  170. // //permissionList := make([]*company.CompanyReportPermission, 0)
  171. // //switch v.Source {
  172. // //case "add", "receive", "thaw", "delay", "apply_receive":
  173. // //
  174. // //}
  175. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  176. // if err != nil {
  177. // fmt.Println("err:", err)
  178. // continue
  179. // }
  180. //
  181. // startDate := v.CreateTime
  182. // endDate := v.CreateTime.AddDate(0, 2, 0)
  183. // isStop := 1
  184. // realEndDate := endDate
  185. // if realEndDate.After(nowTime) {
  186. // realEndDate = nowTime
  187. // isStop = 0
  188. // }
  189. // for _, permission := range permissionList {
  190. // permission.StartDate = startDate.Format(utils.FormatDate)
  191. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  192. // }
  193. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  194. // Id: 0,
  195. // CompanyId: v.CompanyId,
  196. // ProductId: v.ProductId,
  197. // SellerId: v.SellerId,
  198. // SellerName: v.SellerName,
  199. // Source: v.Source,
  200. // StartDate: startDate,
  201. // EndDate: endDate,
  202. // RealEndDate: realEndDate,
  203. // IsStop: isStop,
  204. // CreateTime: v.CreateTime,
  205. // }
  206. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  207. // }
  208. // //fmt.Println("结束")
  209. //}
  210. //func FixCompanyUpdateData2() {
  211. // list, err := company.GetTryOutCompanyOperationRecordList()
  212. // if err != nil {
  213. // fmt.Println("获取客户变更数据失败:", err)
  214. // return
  215. // }
  216. //
  217. // nowTime := time.Now()
  218. // for _, v := range list {
  219. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  220. // //if tmpErr != nil {
  221. // // fmt.Println(v.Id, "找数据,", tmpErr)
  222. // // continue
  223. // //}
  224. //
  225. // //permissionList := make([]*company.CompanyReportPermission, 0)
  226. // //switch v.Source {
  227. // //case "add", "receive", "thaw", "delay", "apply_receive":
  228. // //
  229. // //}
  230. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  231. // if err != nil {
  232. // fmt.Println("err:", err)
  233. // continue
  234. // }
  235. //
  236. // startDate := v.CreateTime
  237. // endDate := v.CreateTime.AddDate(0, 2, 0)
  238. // isStop := 1
  239. // realEndDate := endDate
  240. // if realEndDate.After(nowTime) {
  241. // realEndDate = nowTime
  242. // isStop = 0
  243. // }
  244. // for _, permission := range permissionList {
  245. // permission.StartDate = startDate.Format(utils.FormatDate)
  246. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  247. // }
  248. // sellerName := ``
  249. // {
  250. // sysUser, _ := system.GetSysAdminById(v.SellerId)
  251. // if sysUser != nil {
  252. // sellerName = sysUser.RealName
  253. // }
  254. // }
  255. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  256. // Id: 0,
  257. // CompanyId: v.CompanyId,
  258. // ProductId: v.ProductId,
  259. // SellerId: v.SellerId,
  260. // SellerName: sellerName,
  261. // Source: "formal_to_try_out",
  262. // StartDate: startDate,
  263. // EndDate: endDate,
  264. // RealEndDate: realEndDate,
  265. // IsStop: isStop,
  266. // CreateTime: v.CreateTime,
  267. // }
  268. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  269. // }
  270. // fmt.Println("结束")
  271. //}
  272. // FixCompanyTryDay 修复试用天数
  273. //func FixCompanyTryDay() {
  274. // list, err := company.GetCompanyProductTryOutUpdateGroup()
  275. // if err != nil {
  276. // fmt.Println("获取客户变更数据失败:", err)
  277. // return
  278. // }
  279. //
  280. // lenList := len(list)
  281. // for k, v := range list {
  282. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  283. // companyProduct, tmpErr := company.GetCompanyProductByCompanyIdAndProductId(v.CompanyId, v.ProductId)
  284. // if tmpErr != nil {
  285. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";找不到对应的客户,Err:", tmpErr)
  286. // continue
  287. // }
  288. //
  289. // logList, err := company.GetCompanyProductTryOutUpdateList(v.CompanyId, v.ProductId)
  290. // if err != nil {
  291. // fmt.Println("查找客户日志失败,err:", err)
  292. // continue
  293. // }
  294. //
  295. // //lenLog := len(logList)
  296. // var day int //实际试用天数
  297. // var endDate time.Time
  298. // for _, log := range logList {
  299. // startDate := log.StartDate
  300. // if endDate.IsZero() {
  301. // endDate = log.RealEndDate
  302. // day = utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  303. // } else {
  304. // if log.RealEndDate.After(endDate) {
  305. // if endDate.After(startDate) {
  306. // startDate = endDate
  307. // }
  308. // if startDate.Equal(log.EndDate) {
  309. // day += utils.GetTimeSubDay(startDate, log.RealEndDate)
  310. // } else {
  311. // day += utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  312. // }
  313. // endDate = log.RealEndDate
  314. // }
  315. // }
  316. // }
  317. // companyProduct.TryOutDayTotal = day
  318. // companyProduct.Update([]string{"TryOutDayTotal"})
  319. // }
  320. // fmt.Println("结束")
  321. //}
  322. // StaticCompanyTryDay 定时任务每天更新试用天数
  323. //func StaticCompanyTryDay() {
  324. // list, err := company.GetCompanyProductTryOutUpdateNoStopGroup()
  325. // if err != nil {
  326. // fmt.Println("获取客户变更数据失败:", err)
  327. // return
  328. // }
  329. //
  330. // lenList := len(list)
  331. // for k, v := range list {
  332. // isAdd := false //是否要增加一天,默认不加
  333. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  334. // permissionList, tmpErr := company.GetCompanyReportPermissionByStatus(v.CompanyId, v.ProductId, "试用")
  335. // if tmpErr == nil {
  336. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";寻找对应的试用品种失败,Err:", tmpErr)
  337. // continue
  338. // }
  339. // currPermissionIdList := make([]int, 0) //当前试用的品种
  340. // for _, permission := range permissionList {
  341. // currPermissionIdList = append(currPermissionIdList, permission.ChartPermissionId)
  342. // }
  343. //
  344. // //获取所有未停止的记录列表
  345. // logList, err := company.GetCompanyProductTryOutUpdateNoStopListByEndDate(v.CompanyId, v.ProductId)
  346. // if err != nil {
  347. // fmt.Println("查找客户日志失败,err:", err)
  348. // continue
  349. // }
  350. //
  351. // logPermissionIdList := make([]int, 0) //当前日志中的试用的品种
  352. // for _, log := range logList {
  353. // //获取所有未停止的品种试用记录列表
  354. // logPermissionList, tmpErr := company.GetCompanyProductTryOutPermissionUpdateNoStopListByEndDate(log.Id)
  355. // if tmpErr != nil {
  356. // fmt.Println("查找客户品种变更日志失败,err:", tmpErr)
  357. // continue
  358. // }
  359. // lenLogPermissionList := len(logPermissionList) //当前日志存在试用的品种数量
  360. // stopPermission := 0 //当前日志需要停止的品种数量
  361. //
  362. // currTime := time.Now() //当前时间
  363. // for _, logPermission := range logPermissionList {
  364. // if utils.InArrayByInt(logPermissionIdList, logPermission.ChartPermissionId) {
  365. // // 如果已经被其他记录使用了,那么就将当前记录给标记停止
  366. // logPermission.IsStop = 1
  367. //
  368. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  369. // if currTime.After(logPermission.EndDate) {
  370. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  371. // } else {
  372. // logPermission.RealEndDate = time.Now()
  373. // }
  374. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  375. // stopPermission++
  376. // continue
  377. // } else if !utils.InArrayByInt(currPermissionIdList, logPermission.ChartPermissionId) {
  378. // // 如果该品种不在当前客户的品种里面,那么也要将当前记录给标记停止
  379. // logPermission.IsStop = 1
  380. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  381. // if currTime.After(logPermission.EndDate) {
  382. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  383. // } else {
  384. // logPermission.RealEndDate = time.Now()
  385. // }
  386. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  387. // stopPermission++
  388. // continue
  389. // }
  390. //
  391. // // 剩下的说明还处于试用状态,需要添加1天试用期,且需要把该品种加入到当前日志中的试用的品种列表
  392. // isAdd = true
  393. // logPermissionIdList = append(logPermissionIdList, logPermission.ChartPermissionId)
  394. // }
  395. //
  396. // //如果当前日志存在试用的品种数量 == 当前日志需要停止的品种数量
  397. // // 那么当前日志也是处于停用状态
  398. // if lenLogPermissionList == stopPermission {
  399. // log.IsStop = 1
  400. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  401. // if currTime.After(log.EndDate) {
  402. // log.RealEndDate = currTime.AddDate(0, 0, -1)
  403. // } else {
  404. // log.RealEndDate = time.Now()
  405. // }
  406. // log.Update([]string{"IsStop", "RealEndDate"})
  407. // }
  408. //
  409. // }
  410. //
  411. // // 如果需要添加,那么将该客户品种添加1天
  412. // if isAdd {
  413. //
  414. // }
  415. // // 更新客户产品的试用天数
  416. // tmpErr = company.AddCompanyProductTryOutDayTotal(v.CompanyId, v.ProductId)
  417. // if tmpErr != nil {
  418. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";更新客户产品的试用天数,Err:", tmpErr)
  419. // continue
  420. // }
  421. // }
  422. // fmt.Println("结束")
  423. //}
  424. //func Task2() {
  425. // fmt.Println("task start")
  426. //
  427. // //_, _ = maycur.GetPublicOfferingSaleLeader()
  428. // //_ = maycur.TestSyncCompanyProfile()
  429. // //_ = maycur.SyncCompanyProfile()
  430. // //_ = maycur.ImportExcelEmployeeId()
  431. // //_ = maycur.ApiTest()
  432. // //_ = maycur.ApiTest2()
  433. // fmt.Println("task end")
  434. //}
  435. // FixEnCompanyPermission 英文权限上线时修复英文客户拥有所有权限(一次性)
  436. func FixEnCompanyPermission() {
  437. var err error
  438. defer func() {
  439. if err != nil {
  440. fmt.Println("FixEnCompanyPermission Err: ", err.Error())
  441. }
  442. }()
  443. // 获取正式客户
  444. companies := make([]*models.EnglishCompany, 0)
  445. {
  446. cond := ` AND status = ?`
  447. pars := make([]interface{}, 0)
  448. pars = append(pars, 1)
  449. list, e := models.GetEnglishCompanyList(cond, pars, "")
  450. if e != nil {
  451. err = fmt.Errorf("GetEnglishCompanyList err: %s", e.Error())
  452. return
  453. }
  454. companies = list
  455. }
  456. // 获取所有权限
  457. permissions := make([]*models.EnPermission, 0)
  458. {
  459. cond := ` AND parent_id > ?`
  460. pars := make([]interface{}, 0)
  461. pars = append(pars, 0)
  462. ob := new(models.EnPermission)
  463. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  464. if e != nil {
  465. err = fmt.Errorf("GetPermissionItemsByCondition err: %s", e.Error())
  466. return
  467. }
  468. permissions = list
  469. }
  470. for _, c := range companies {
  471. ps := make([]*models.EnCompanyPermission, 0)
  472. for _, p := range permissions {
  473. ps = append(ps, &models.EnCompanyPermission{
  474. EnCompanyId: c.CompanyId,
  475. EnPermissionId: p.EnPermissionId,
  476. CreateTime: time.Now().Local(),
  477. })
  478. }
  479. if e := models.ClearAndCreateEnCompanyPermissions(c.CompanyId, ps); e != nil {
  480. err = fmt.Errorf("ClearAndCreateEnCompanyPermissions err: %s", e.Error())
  481. return
  482. }
  483. }
  484. fmt.Println("修复完成")
  485. }
  486. // ModifyEsEnglishReport 批量修改es里的英文研报信息和线上路演信息
  487. func ModifyEsEnglishReport() {
  488. fmt.Println("开始")
  489. err := ModifyAllEsEnglishReportVideo()
  490. if err != nil {
  491. err = fmt.Errorf("重置es中的英文研报信息失败:ModifyAllEnglishReportInEs err: %s", err.Error())
  492. return
  493. }
  494. fmt.Println("结束")
  495. }
  496. //func InsertBloombergIndex() {
  497. // fmt.Println("开始写入")
  498. //
  499. // start := 100000
  500. // now := time.Now()
  501. // for i := 1; i <= 100; i++ {
  502. // fmt.Printf("写入第%d个\n", i)
  503. //
  504. // start += 1
  505. // index := new(data_manage.BaseFromBloombergIndex)
  506. // index.IndexCode = fmt.Sprintf("BLID%d", start)
  507. // index.IndexName = fmt.Sprintf("模拟Bloomberg-%s", index.IndexCode)
  508. // index.Unit = "无"
  509. // index.Source = utils.DATA_SOURCE_BLOOMBERG
  510. // index.Frequency = "日度"
  511. // index.StartDate = now.AddDate(0, 0, -i)
  512. // index.EndDate = now
  513. // index.CreateTime = time.Now().Local()
  514. // index.ModifyTime = time.Now().Local()
  515. // if e := index.Create(); e != nil {
  516. // fmt.Printf("新增指标失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  517. // return
  518. // }
  519. //
  520. // insertData := make([]*data_manage.BaseFromBloombergData, 0)
  521. // for ii := 0; ii <= 50; ii++ {
  522. // indexData := new(data_manage.BaseFromBloombergData)
  523. // indexData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  524. // indexData.IndexCode = index.IndexCode
  525. // indexData.DataTime = now.AddDate(0, 0, -ii)
  526. // va := GenerateRandomFloat64InRange()
  527. // va += float64(ii)
  528. // indexData.Value = va
  529. // indexData.CreateTime = time.Now().Local()
  530. // indexData.ModifyTime = time.Now().Local()
  531. // indexData.DataTimestamp = int(indexData.DataTime.UnixNano() / 1e6)
  532. // insertData = append(insertData, indexData)
  533. // }
  534. // ob := new(data_manage.BaseFromBloombergData)
  535. // if e := ob.CreateMulti(insertData); e != nil {
  536. // fmt.Printf("新增指标数据失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  537. // return
  538. // }
  539. // }
  540. //
  541. // fmt.Println("结束写入")
  542. //}
  543. //
  544. //func GenerateRandomFloat64InRange() float64 {
  545. // var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) // 设置随机数种子
  546. //
  547. // return rnd.Float64()*11000 - 1000
  548. //}
  549. // HandleSearchByWechatOp
  550. // @Description: 处理微信爬虫
  551. func HandleWechatArticleOp() {
  552. defer func() {
  553. if err := recover(); err != nil {
  554. fmt.Println("[ImportManualDataRefresh]", err)
  555. }
  556. }()
  557. obj := rag.WechatPlatform{}
  558. for {
  559. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
  560. wechatArticleOp := new(llm.WechatArticleOp)
  561. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  562. fmt.Println("json unmarshal wrong!")
  563. return
  564. }
  565. item, tmpErr := obj.GetByID(wechatArticleOp.WechatPlatformId)
  566. if tmpErr != nil {
  567. // 找不到就处理失败
  568. return
  569. }
  570. switch wechatArticleOp.Source {
  571. case `add`:
  572. llm.AddWechatPlatform(item)
  573. case `refresh`:
  574. llm.BeachAddWechatArticle(item, 2)
  575. }
  576. })
  577. }
  578. }
  579. // HandleWechatArticleLLmOp
  580. // @Description: 处理微信文章加入知识库
  581. func HandleWechatArticleLLmOp() {
  582. defer func() {
  583. if err := recover(); err != nil {
  584. fmt.Println("[ImportManualDataRefresh]", err)
  585. }
  586. }()
  587. obj := rag.WechatArticle{}
  588. for {
  589. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE_KNOWLEDGE, func(b []byte) {
  590. wechatArticleOp := new(llm.WechatArticleOp)
  591. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  592. fmt.Println("json unmarshal wrong!")
  593. return
  594. }
  595. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  596. if tmpErr != nil {
  597. // 找不到就处理失败
  598. return
  599. }
  600. // 文章加入到知识库
  601. llm.ArticleToKnowledge(item)
  602. // 生成摘要
  603. llm.GenerateArticleAbstract(item)
  604. })
  605. }
  606. }