task.go 21 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/cache"
  5. "eta/eta_api/models"
  6. "eta/eta_api/models/rag"
  7. "eta/eta_api/services/binlog"
  8. "eta/eta_api/services/data"
  9. edbmonitor "eta/eta_api/services/edb_monitor"
  10. "eta/eta_api/services/llm"
  11. "eta/eta_api/utils"
  12. "fmt"
  13. "strings"
  14. "time"
  15. )
  16. func Task() {
  17. fmt.Println("task start")
  18. {
  19. // 修复客户试用数据
  20. //FixCompanyUpdateData()
  21. //FixCompanyTryDay()
  22. //FixCompanyTryDay()
  23. //StaticCompanyTryDay()
  24. //return
  25. }
  26. //FixPermissionStatus()
  27. //GetCompanyInfo()
  28. //ReportCount()
  29. //windSourceUrl:=`http://47.100.166.55:7002/edbInfo/wind/?EdbCode=M0001427&StartDate=2020-11-01&EndDate=2021-03-01`
  30. //data.AddAllArticle()
  31. //data.GetSmmIndex()
  32. //data.GetSmmIndexData()
  33. go AutoInsertLogToDB()
  34. //手工数据表格导入后的指标库刷新
  35. go ImportManualDataRefresh()
  36. // 加入钢联化工指标数据之后的刷新
  37. go MysteelChemicalDataAdd()
  38. //修复用户关注标识
  39. //GetWxUsersSubscribe()
  40. go AutoInsertAdminOperateRecordToDB()
  41. // 指标刷新
  42. go data.HandleEdbRefreshQueue()
  43. // 进行指标替换操作
  44. go DealReplaceEdbCache()
  45. // TODO:监听这里需要找下达梦的方案,主流程先跑起来再处理
  46. // 监听binlog
  47. if utils.MYSQL_DATA_BINLOG_URL != "" && utils.DbDriverName == `mysql` {
  48. go binlog.ListenMysql()
  49. go edbmonitor.HandleEdbMonitorEdbInfo()
  50. // 监听数据源binlog写入es
  51. go binlog.HandleDataSourceChange2Es()
  52. }
  53. go StartSessionManager()
  54. go llm.SaveAllChatRecordsToDB()
  55. // 定时任务进行微信公众号添加、文章刷新操作(暂不处理了,改成其他服务处理)
  56. //go HandleWechatArticleOp()
  57. // 定时任务进行微信文章LLM操作
  58. go HandleWechatArticleLLmOp()
  59. // 队列任务将eta报告同步到知识库操作
  60. go HandleEtaReportUpdateOp()
  61. // 定时任务进行eta报告进行LLM操作
  62. go HandleEtaReportKnowledgeLLmOp()
  63. // 权益报告监听入库
  64. go AutoInsertRaiReport()
  65. // TODO:数据修复
  66. //FixNewEs()
  67. fmt.Println("task end")
  68. }
  69. //// 每日发布晨报
  70. //func AutoPublishDayReport() {
  71. // defer func() {
  72. // if err := recover(); err != nil {
  73. // fmt.Println("[AutoPublishDayReport]", err)
  74. // }
  75. // }()
  76. //
  77. // // 每日8:42发布晨报
  78. // ticker := time.Tick(50 * time.Second)
  79. // for range ticker {
  80. // nowTime := time.Now()
  81. // clock := nowTime.Format("1504")
  82. // if clock == "0842" {
  83. // if err := PublishTodayDayReport(); err != nil {
  84. // go alarm_msg.SendAlarmMsg(fmt.Sprint("每日晨报自动发送 AutoPublishDayReport ERR:", err), 3)
  85. // //utils.SendEmail(utils.APPNAME+" "+utils.RunMode+" 失败提醒", fmt.Sprint("AutoPublishDayReport ERR:", err), utils.EmailSendToUsers)
  86. // }
  87. // }
  88. // }
  89. //}
  90. // ImportManualDataRefresh 导入手工数据后的刷新
  91. func ImportManualDataRefresh() {
  92. defer func() {
  93. if err := recover(); err != nil {
  94. fmt.Println("[ImportManualDataRefresh]", err)
  95. }
  96. }()
  97. for {
  98. utils.Rc.Brpop(utils.CACHE_IMPORT_MANUAL_DATA, func(b []byte) {
  99. edbCode := string(b)
  100. edbCode = strings.TrimPrefix(edbCode, `"`)
  101. edbCode = strings.TrimSuffix(edbCode, `"`)
  102. data.RefreshManualData(edbCode)
  103. })
  104. }
  105. }
  106. // MysteelChemicalDataAdd 加入钢联化工指标数据之后的刷新
  107. func MysteelChemicalDataAdd() {
  108. defer func() {
  109. if err := recover(); err != nil {
  110. fmt.Println("[ImportManualDataRefresh]", err)
  111. }
  112. }()
  113. for {
  114. utils.Rc.Brpop(utils.CACHE_MYSTEEL_CHEMICAL_ADD_DATA, func(b []byte) {
  115. edbCode := string(b)
  116. edbCode = strings.TrimPrefix(edbCode, `"`)
  117. edbCode = strings.TrimSuffix(edbCode, `"`)
  118. data.RefreshMysteelChemicalData(edbCode)
  119. })
  120. }
  121. }
  122. //func init() {
  123. // fmt.Println("start task init")
  124. // UpdateEnglishEmailLogErrMsg()
  125. // fmt.Println("end task init")
  126. //}
  127. //
  128. //// UpdateEnglishEmailLogErrMsg 更新英文邮件日志的ErrMsg(研报后台4.2上线后执行, 仅一次)
  129. //func UpdateEnglishEmailLogErrMsg() {
  130. // var cond string
  131. // var pars []interface{}
  132. // list, e := models.GetEnglishReportEmailLogList(cond, pars)
  133. // if e != nil {
  134. // fmt.Println("获取日志列表失败")
  135. // return
  136. // }
  137. // for _, v := range list {
  138. // if v.SendStatus != 0 || v.Source != 1 || v.Result == "" || v.ErrMsg != "" {
  139. // continue
  140. // }
  141. // // 取出错误信息
  142. // fmt.Printf("正在更新%d\n", v.Id)
  143. // r := new(AliyunEmailResult)
  144. // if e = json.Unmarshal([]byte(v.Result), &r); e != nil {
  145. // fmt.Println("JSON解析报错了1" + e.Error())
  146. // continue
  147. // }
  148. // rd := new(AliyunEmailResultData)
  149. // res := strings.Replace(r.Data, `\`, ``, -1)
  150. // if e = json.Unmarshal([]byte(res), &rd); e != nil {
  151. // fmt.Println("JSON解析报错了2" + e.Error())
  152. // continue
  153. // }
  154. // v.ErrMsg = rd.Message
  155. // if e = v.Update([]string{"ErrMsg"}); e != nil {
  156. // fmt.Println("更新失败了" + e.Error())
  157. // continue
  158. // }
  159. // }
  160. // fmt.Println("更新成功")
  161. //}
  162. //func FixCompanyUpdateData1() {
  163. // list, err := company.GetCompanyProductUpdateLogList()
  164. // if err != nil {
  165. // fmt.Println("获取客户变更数据失败:", err)
  166. // return
  167. // }
  168. //
  169. // nowTime := time.Now()
  170. // for _, v := range list {
  171. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  172. // //if tmpErr != nil {
  173. // // fmt.Println(v.Id, "找数据,", tmpErr)
  174. // // continue
  175. // //}
  176. //
  177. // //permissionList := make([]*company.CompanyReportPermission, 0)
  178. // //switch v.Source {
  179. // //case "add", "receive", "thaw", "delay", "apply_receive":
  180. // //
  181. // //}
  182. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  183. // if err != nil {
  184. // fmt.Println("err:", err)
  185. // continue
  186. // }
  187. //
  188. // startDate := v.CreateTime
  189. // endDate := v.CreateTime.AddDate(0, 2, 0)
  190. // isStop := 1
  191. // realEndDate := endDate
  192. // if realEndDate.After(nowTime) {
  193. // realEndDate = nowTime
  194. // isStop = 0
  195. // }
  196. // for _, permission := range permissionList {
  197. // permission.StartDate = startDate.Format(utils.FormatDate)
  198. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  199. // }
  200. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  201. // Id: 0,
  202. // CompanyId: v.CompanyId,
  203. // ProductId: v.ProductId,
  204. // SellerId: v.SellerId,
  205. // SellerName: v.SellerName,
  206. // Source: v.Source,
  207. // StartDate: startDate,
  208. // EndDate: endDate,
  209. // RealEndDate: realEndDate,
  210. // IsStop: isStop,
  211. // CreateTime: v.CreateTime,
  212. // }
  213. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  214. // }
  215. // //fmt.Println("结束")
  216. //}
  217. //func FixCompanyUpdateData2() {
  218. // list, err := company.GetTryOutCompanyOperationRecordList()
  219. // if err != nil {
  220. // fmt.Println("获取客户变更数据失败:", err)
  221. // return
  222. // }
  223. //
  224. // nowTime := time.Now()
  225. // for _, v := range list {
  226. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  227. // //if tmpErr != nil {
  228. // // fmt.Println(v.Id, "找数据,", tmpErr)
  229. // // continue
  230. // //}
  231. //
  232. // //permissionList := make([]*company.CompanyReportPermission, 0)
  233. // //switch v.Source {
  234. // //case "add", "receive", "thaw", "delay", "apply_receive":
  235. // //
  236. // //}
  237. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  238. // if err != nil {
  239. // fmt.Println("err:", err)
  240. // continue
  241. // }
  242. //
  243. // startDate := v.CreateTime
  244. // endDate := v.CreateTime.AddDate(0, 2, 0)
  245. // isStop := 1
  246. // realEndDate := endDate
  247. // if realEndDate.After(nowTime) {
  248. // realEndDate = nowTime
  249. // isStop = 0
  250. // }
  251. // for _, permission := range permissionList {
  252. // permission.StartDate = startDate.Format(utils.FormatDate)
  253. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  254. // }
  255. // sellerName := ``
  256. // {
  257. // sysUser, _ := system.GetSysAdminById(v.SellerId)
  258. // if sysUser != nil {
  259. // sellerName = sysUser.RealName
  260. // }
  261. // }
  262. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  263. // Id: 0,
  264. // CompanyId: v.CompanyId,
  265. // ProductId: v.ProductId,
  266. // SellerId: v.SellerId,
  267. // SellerName: sellerName,
  268. // Source: "formal_to_try_out",
  269. // StartDate: startDate,
  270. // EndDate: endDate,
  271. // RealEndDate: realEndDate,
  272. // IsStop: isStop,
  273. // CreateTime: v.CreateTime,
  274. // }
  275. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  276. // }
  277. // fmt.Println("结束")
  278. //}
  279. // FixCompanyTryDay 修复试用天数
  280. //func FixCompanyTryDay() {
  281. // list, err := company.GetCompanyProductTryOutUpdateGroup()
  282. // if err != nil {
  283. // fmt.Println("获取客户变更数据失败:", err)
  284. // return
  285. // }
  286. //
  287. // lenList := len(list)
  288. // for k, v := range list {
  289. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  290. // companyProduct, tmpErr := company.GetCompanyProductByCompanyIdAndProductId(v.CompanyId, v.ProductId)
  291. // if tmpErr != nil {
  292. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";找不到对应的客户,Err:", tmpErr)
  293. // continue
  294. // }
  295. //
  296. // logList, err := company.GetCompanyProductTryOutUpdateList(v.CompanyId, v.ProductId)
  297. // if err != nil {
  298. // fmt.Println("查找客户日志失败,err:", err)
  299. // continue
  300. // }
  301. //
  302. // //lenLog := len(logList)
  303. // var day int //实际试用天数
  304. // var endDate time.Time
  305. // for _, log := range logList {
  306. // startDate := log.StartDate
  307. // if endDate.IsZero() {
  308. // endDate = log.RealEndDate
  309. // day = utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  310. // } else {
  311. // if log.RealEndDate.After(endDate) {
  312. // if endDate.After(startDate) {
  313. // startDate = endDate
  314. // }
  315. // if startDate.Equal(log.EndDate) {
  316. // day += utils.GetTimeSubDay(startDate, log.RealEndDate)
  317. // } else {
  318. // day += utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  319. // }
  320. // endDate = log.RealEndDate
  321. // }
  322. // }
  323. // }
  324. // companyProduct.TryOutDayTotal = day
  325. // companyProduct.Update([]string{"TryOutDayTotal"})
  326. // }
  327. // fmt.Println("结束")
  328. //}
  329. // StaticCompanyTryDay 定时任务每天更新试用天数
  330. //func StaticCompanyTryDay() {
  331. // list, err := company.GetCompanyProductTryOutUpdateNoStopGroup()
  332. // if err != nil {
  333. // fmt.Println("获取客户变更数据失败:", err)
  334. // return
  335. // }
  336. //
  337. // lenList := len(list)
  338. // for k, v := range list {
  339. // isAdd := false //是否要增加一天,默认不加
  340. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  341. // permissionList, tmpErr := company.GetCompanyReportPermissionByStatus(v.CompanyId, v.ProductId, "试用")
  342. // if tmpErr == nil {
  343. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";寻找对应的试用品种失败,Err:", tmpErr)
  344. // continue
  345. // }
  346. // currPermissionIdList := make([]int, 0) //当前试用的品种
  347. // for _, permission := range permissionList {
  348. // currPermissionIdList = append(currPermissionIdList, permission.ChartPermissionId)
  349. // }
  350. //
  351. // //获取所有未停止的记录列表
  352. // logList, err := company.GetCompanyProductTryOutUpdateNoStopListByEndDate(v.CompanyId, v.ProductId)
  353. // if err != nil {
  354. // fmt.Println("查找客户日志失败,err:", err)
  355. // continue
  356. // }
  357. //
  358. // logPermissionIdList := make([]int, 0) //当前日志中的试用的品种
  359. // for _, log := range logList {
  360. // //获取所有未停止的品种试用记录列表
  361. // logPermissionList, tmpErr := company.GetCompanyProductTryOutPermissionUpdateNoStopListByEndDate(log.Id)
  362. // if tmpErr != nil {
  363. // fmt.Println("查找客户品种变更日志失败,err:", tmpErr)
  364. // continue
  365. // }
  366. // lenLogPermissionList := len(logPermissionList) //当前日志存在试用的品种数量
  367. // stopPermission := 0 //当前日志需要停止的品种数量
  368. //
  369. // currTime := time.Now() //当前时间
  370. // for _, logPermission := range logPermissionList {
  371. // if utils.InArrayByInt(logPermissionIdList, logPermission.ChartPermissionId) {
  372. // // 如果已经被其他记录使用了,那么就将当前记录给标记停止
  373. // logPermission.IsStop = 1
  374. //
  375. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  376. // if currTime.After(logPermission.EndDate) {
  377. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  378. // } else {
  379. // logPermission.RealEndDate = time.Now()
  380. // }
  381. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  382. // stopPermission++
  383. // continue
  384. // } else if !utils.InArrayByInt(currPermissionIdList, logPermission.ChartPermissionId) {
  385. // // 如果该品种不在当前客户的品种里面,那么也要将当前记录给标记停止
  386. // logPermission.IsStop = 1
  387. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  388. // if currTime.After(logPermission.EndDate) {
  389. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  390. // } else {
  391. // logPermission.RealEndDate = time.Now()
  392. // }
  393. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  394. // stopPermission++
  395. // continue
  396. // }
  397. //
  398. // // 剩下的说明还处于试用状态,需要添加1天试用期,且需要把该品种加入到当前日志中的试用的品种列表
  399. // isAdd = true
  400. // logPermissionIdList = append(logPermissionIdList, logPermission.ChartPermissionId)
  401. // }
  402. //
  403. // //如果当前日志存在试用的品种数量 == 当前日志需要停止的品种数量
  404. // // 那么当前日志也是处于停用状态
  405. // if lenLogPermissionList == stopPermission {
  406. // log.IsStop = 1
  407. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  408. // if currTime.After(log.EndDate) {
  409. // log.RealEndDate = currTime.AddDate(0, 0, -1)
  410. // } else {
  411. // log.RealEndDate = time.Now()
  412. // }
  413. // log.Update([]string{"IsStop", "RealEndDate"})
  414. // }
  415. //
  416. // }
  417. //
  418. // // 如果需要添加,那么将该客户品种添加1天
  419. // if isAdd {
  420. //
  421. // }
  422. // // 更新客户产品的试用天数
  423. // tmpErr = company.AddCompanyProductTryOutDayTotal(v.CompanyId, v.ProductId)
  424. // if tmpErr != nil {
  425. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";更新客户产品的试用天数,Err:", tmpErr)
  426. // continue
  427. // }
  428. // }
  429. // fmt.Println("结束")
  430. //}
  431. //func Task2() {
  432. // fmt.Println("task start")
  433. //
  434. // //_, _ = maycur.GetPublicOfferingSaleLeader()
  435. // //_ = maycur.TestSyncCompanyProfile()
  436. // //_ = maycur.SyncCompanyProfile()
  437. // //_ = maycur.ImportExcelEmployeeId()
  438. // //_ = maycur.ApiTest()
  439. // //_ = maycur.ApiTest2()
  440. // fmt.Println("task end")
  441. //}
  442. // FixEnCompanyPermission 英文权限上线时修复英文客户拥有所有权限(一次性)
  443. func FixEnCompanyPermission() {
  444. var err error
  445. defer func() {
  446. if err != nil {
  447. fmt.Println("FixEnCompanyPermission Err: ", err.Error())
  448. }
  449. }()
  450. // 获取正式客户
  451. companies := make([]*models.EnglishCompany, 0)
  452. {
  453. cond := ` AND status = ?`
  454. pars := make([]interface{}, 0)
  455. pars = append(pars, 1)
  456. list, e := models.GetEnglishCompanyList(cond, pars, "")
  457. if e != nil {
  458. err = fmt.Errorf("GetEnglishCompanyList err: %s", e.Error())
  459. return
  460. }
  461. companies = list
  462. }
  463. // 获取所有权限
  464. permissions := make([]*models.EnPermission, 0)
  465. {
  466. cond := ` AND parent_id > ?`
  467. pars := make([]interface{}, 0)
  468. pars = append(pars, 0)
  469. ob := new(models.EnPermission)
  470. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  471. if e != nil {
  472. err = fmt.Errorf("GetPermissionItemsByCondition err: %s", e.Error())
  473. return
  474. }
  475. permissions = list
  476. }
  477. for _, c := range companies {
  478. ps := make([]*models.EnCompanyPermission, 0)
  479. for _, p := range permissions {
  480. ps = append(ps, &models.EnCompanyPermission{
  481. EnCompanyId: c.CompanyId,
  482. EnPermissionId: p.EnPermissionId,
  483. CreateTime: time.Now().Local(),
  484. })
  485. }
  486. if e := models.ClearAndCreateEnCompanyPermissions(c.CompanyId, ps); e != nil {
  487. err = fmt.Errorf("ClearAndCreateEnCompanyPermissions err: %s", e.Error())
  488. return
  489. }
  490. }
  491. fmt.Println("修复完成")
  492. }
  493. // ModifyEsEnglishReport 批量修改es里的英文研报信息和线上路演信息
  494. func ModifyEsEnglishReport() {
  495. fmt.Println("开始")
  496. err := ModifyAllEsEnglishReportVideo()
  497. if err != nil {
  498. err = fmt.Errorf("重置es中的英文研报信息失败:ModifyAllEnglishReportInEs err: %s", err.Error())
  499. return
  500. }
  501. fmt.Println("结束")
  502. }
  503. //func InsertBloombergIndex() {
  504. // fmt.Println("开始写入")
  505. //
  506. // start := 100000
  507. // now := time.Now()
  508. // for i := 1; i <= 100; i++ {
  509. // fmt.Printf("写入第%d个\n", i)
  510. //
  511. // start += 1
  512. // index := new(data_manage.BaseFromBloombergIndex)
  513. // index.IndexCode = fmt.Sprintf("BLID%d", start)
  514. // index.IndexName = fmt.Sprintf("模拟Bloomberg-%s", index.IndexCode)
  515. // index.Unit = "无"
  516. // index.Source = utils.DATA_SOURCE_BLOOMBERG
  517. // index.Frequency = "日度"
  518. // index.StartDate = now.AddDate(0, 0, -i)
  519. // index.EndDate = now
  520. // index.CreateTime = time.Now().Local()
  521. // index.ModifyTime = time.Now().Local()
  522. // if e := index.Create(); e != nil {
  523. // fmt.Printf("新增指标失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  524. // return
  525. // }
  526. //
  527. // insertData := make([]*data_manage.BaseFromBloombergData, 0)
  528. // for ii := 0; ii <= 50; ii++ {
  529. // indexData := new(data_manage.BaseFromBloombergData)
  530. // indexData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  531. // indexData.IndexCode = index.IndexCode
  532. // indexData.DataTime = now.AddDate(0, 0, -ii)
  533. // va := GenerateRandomFloat64InRange()
  534. // va += float64(ii)
  535. // indexData.Value = va
  536. // indexData.CreateTime = time.Now().Local()
  537. // indexData.ModifyTime = time.Now().Local()
  538. // indexData.DataTimestamp = int(indexData.DataTime.UnixNano() / 1e6)
  539. // insertData = append(insertData, indexData)
  540. // }
  541. // ob := new(data_manage.BaseFromBloombergData)
  542. // if e := ob.CreateMulti(insertData); e != nil {
  543. // fmt.Printf("新增指标数据失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  544. // return
  545. // }
  546. // }
  547. //
  548. // fmt.Println("结束写入")
  549. //}
  550. //
  551. //func GenerateRandomFloat64InRange() float64 {
  552. // var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) // 设置随机数种子
  553. //
  554. // return rnd.Float64()*11000 - 1000
  555. //}
  556. // HandleWechatArticleOp
  557. // @Description: 处理ETA报告加入到知识库
  558. func HandleWechatArticleOp() {
  559. defer func() {
  560. if err := recover(); err != nil {
  561. fmt.Println("[HandleWechatArticleOp]", err)
  562. }
  563. }()
  564. obj := rag.WechatPlatform{}
  565. for {
  566. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
  567. wechatArticleOp := new(cache.WechatArticleOp)
  568. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  569. fmt.Println("json unmarshal wrong!")
  570. return
  571. }
  572. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  573. if tmpErr != nil {
  574. // 找不到就处理失败
  575. return
  576. }
  577. switch wechatArticleOp.Source {
  578. case `add`:
  579. AddWechatPlatform(item)
  580. case `refresh`:
  581. BeachAddWechatArticle(item, 2)
  582. }
  583. })
  584. }
  585. }
  586. // HandleWechatArticleLLmOp
  587. // @Description: 处理微信文章加入知识库
  588. func HandleWechatArticleLLmOp() {
  589. defer func() {
  590. if err := recover(); err != nil {
  591. fmt.Println("[HandleWechatArticleLLmOp]", err)
  592. }
  593. }()
  594. obj := rag.WechatArticle{}
  595. for {
  596. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE_KNOWLEDGE, func(b []byte) {
  597. wechatArticleOp := new(cache.WechatArticleOp)
  598. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  599. fmt.Println("json unmarshal wrong!")
  600. return
  601. }
  602. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  603. if tmpErr != nil {
  604. // 找不到就处理失败
  605. return
  606. }
  607. // 文章加入到知识库
  608. ArticleToKnowledge(item)
  609. // 生成摘要
  610. GenerateWechatArticleAbstract(item, false)
  611. })
  612. }
  613. }
  614. // HandleEtaReportUpdateOp
  615. // @Description: 处理eta报告加入知识库操作
  616. func HandleEtaReportUpdateOp() {
  617. defer func() {
  618. if err := recover(); err != nil {
  619. fmt.Println("[HandleEtaReportKnowledgeLLmOp]", err)
  620. }
  621. }()
  622. for {
  623. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE, func(b []byte) {
  624. ragEtaReportOpOp := new(cache.RagEtaReportOp)
  625. if err := json.Unmarshal(b, &ragEtaReportOpOp); err != nil {
  626. fmt.Println("json unmarshal wrong!")
  627. return
  628. }
  629. switch ragEtaReportOpOp.Source {
  630. case `publish`:
  631. ReportAddOrModifyKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  632. case `un_publish`:
  633. ReportUnPublishedKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  634. }
  635. })
  636. }
  637. }
  638. // HandleEtaReportKnowledgeLLmOp
  639. // @Description: 处理微信文章加入知识库
  640. func HandleEtaReportKnowledgeLLmOp() {
  641. defer func() {
  642. if err := recover(); err != nil {
  643. fmt.Println("[HandleWechatArticleLLmOp]", err)
  644. }
  645. }()
  646. obj := rag.RagEtaReport{}
  647. for {
  648. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE_LLM, func(b []byte) {
  649. wechatArticleOp := new(cache.RagEtaReportLlmOp)
  650. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  651. fmt.Println("json unmarshal wrong!")
  652. return
  653. }
  654. item, tmpErr := obj.GetById(wechatArticleOp.RagEtaReportId)
  655. if tmpErr != nil {
  656. // 找不到就处理失败
  657. return
  658. }
  659. // 已经删除的就不做操作了
  660. if item.IsDeleted == 1 {
  661. return
  662. }
  663. // 未发布的就不操作了
  664. if item.IsPublished != 1 {
  665. return
  666. }
  667. // 文章加入到知识库
  668. //ArticleToKnowledge(item)
  669. // 生成摘要
  670. GenerateRagEtaReportAbstract(item, true)
  671. })
  672. }
  673. }