task.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/cache"
  5. "eta/eta_api/models"
  6. "eta/eta_api/models/rag"
  7. "eta/eta_api/services/binlog"
  8. "eta/eta_api/services/data"
  9. edbmonitor "eta/eta_api/services/edb_monitor"
  10. "eta/eta_api/services/llm"
  11. "eta/eta_api/utils"
  12. "fmt"
  13. "strings"
  14. "time"
  15. )
  16. func Task() {
  17. fmt.Println("task start")
  18. {
  19. // 修复客户试用数据
  20. //FixCompanyUpdateData()
  21. //FixCompanyTryDay()
  22. //FixCompanyTryDay()
  23. //StaticCompanyTryDay()
  24. //return
  25. }
  26. //FixPermissionStatus()
  27. //GetCompanyInfo()
  28. //ReportCount()
  29. //windSourceUrl:=`http://47.100.166.55:7002/edbInfo/wind/?EdbCode=M0001427&StartDate=2020-11-01&EndDate=2021-03-01`
  30. //data.AddAllArticle()
  31. //data.GetSmmIndex()
  32. //data.GetSmmIndexData()
  33. go AutoInsertLogToDB()
  34. //手工数据表格导入后的指标库刷新
  35. go ImportManualDataRefresh()
  36. // 加入上海钢联指标数据之后的刷新
  37. go MysteelChemicalDataAdd()
  38. //修复用户关注标识
  39. //GetWxUsersSubscribe()
  40. go AutoInsertAdminOperateRecordToDB()
  41. // 指标刷新
  42. go data.HandleEdbRefreshQueue()
  43. // 进行指标替换操作
  44. go DealReplaceEdbCache()
  45. // TODO:监听这里需要找下达梦的方案,主流程先跑起来再处理
  46. // 监听binlog
  47. if utils.MYSQL_DATA_BINLOG_URL != "" && utils.DbDriverName == `mysql` {
  48. go binlog.ListenMysql()
  49. go edbmonitor.HandleEdbMonitorEdbInfo()
  50. // 监听数据源binlog写入es
  51. go binlog.HandleDataSourceChange2Es()
  52. }
  53. go StartSessionManager()
  54. go llm.SaveAllChatRecordsToDB()
  55. // 定时任务进行微信文章操作
  56. // 定时任务进行微信公众号添加、文章刷新操作(暂不处理了,改成其他服务处理)
  57. //go HandleWechatArticleOp()
  58. // 定时任务进行微信文章LLM操作
  59. go HandleWechatArticleLLmOp()
  60. // 队列任务将eta报告同步到知识库操作
  61. go HandleEtaReportUpdateOp()
  62. // 定时任务进行eta报告进行LLM操作
  63. go HandleEtaReportKnowledgeLLmOp()
  64. // 定时任务进行进行AI报告/文章的摘要任务LLM操作
  65. go HandleAiArticleAbstractLlmOp()
  66. // 权益报告监听入库
  67. go AutoInsertRaiReport()
  68. go llm.DealHistoryArticleDafTags()
  69. // 巡检信息发送
  70. go AutoCheckWebsocketMessageList()
  71. go StartGenerateReportPDF()
  72. // TODO:数据修复
  73. //FixNewEs()
  74. fmt.Println("task end")
  75. }
  76. //// 每日发布晨报
  77. //func AutoPublishDayReport() {
  78. // defer func() {
  79. // if err := recover(); err != nil {
  80. // fmt.Println("[AutoPublishDayReport]", err)
  81. // }
  82. // }()
  83. //
  84. // // 每日8:42发布晨报
  85. // ticker := time.Tick(50 * time.Second)
  86. // for range ticker {
  87. // nowTime := time.Now()
  88. // clock := nowTime.Format("1504")
  89. // if clock == "0842" {
  90. // if err := PublishTodayDayReport(); err != nil {
  91. // go alarm_msg.SendAlarmMsg(fmt.Sprint("每日晨报自动发送 AutoPublishDayReport ERR:", err), 3)
  92. // //utils.SendEmail(utils.APPNAME+" "+utils.RunMode+" 失败提醒", fmt.Sprint("AutoPublishDayReport ERR:", err), utils.EmailSendToUsers)
  93. // }
  94. // }
  95. // }
  96. //}
  97. // ImportManualDataRefresh 导入手工数据后的刷新
  98. func ImportManualDataRefresh() {
  99. defer func() {
  100. if err := recover(); err != nil {
  101. fmt.Println("[ImportManualDataRefresh]", err)
  102. }
  103. }()
  104. for {
  105. utils.Rc.Brpop(utils.CACHE_IMPORT_MANUAL_DATA, func(b []byte) {
  106. edbCode := string(b)
  107. edbCode = strings.TrimPrefix(edbCode, `"`)
  108. edbCode = strings.TrimSuffix(edbCode, `"`)
  109. data.RefreshManualData(edbCode)
  110. })
  111. }
  112. }
  113. // MysteelChemicalDataAdd 加入上海钢联指标数据之后的刷新
  114. func MysteelChemicalDataAdd() {
  115. defer func() {
  116. if err := recover(); err != nil {
  117. fmt.Println("[ImportManualDataRefresh]", err)
  118. }
  119. }()
  120. for {
  121. utils.Rc.Brpop(utils.CACHE_MYSTEEL_CHEMICAL_ADD_DATA, func(b []byte) {
  122. edbCode := string(b)
  123. edbCode = strings.TrimPrefix(edbCode, `"`)
  124. edbCode = strings.TrimSuffix(edbCode, `"`)
  125. data.RefreshMysteelChemicalData(edbCode)
  126. })
  127. }
  128. }
  129. //func init() {
  130. // fmt.Println("start task init")
  131. // UpdateEnglishEmailLogErrMsg()
  132. // fmt.Println("end task init")
  133. //}
  134. //
  135. //// UpdateEnglishEmailLogErrMsg 更新英文邮件日志的ErrMsg(研报后台4.2上线后执行, 仅一次)
  136. //func UpdateEnglishEmailLogErrMsg() {
  137. // var cond string
  138. // var pars []interface{}
  139. // list, e := models.GetEnglishReportEmailLogList(cond, pars)
  140. // if e != nil {
  141. // fmt.Println("获取日志列表失败")
  142. // return
  143. // }
  144. // for _, v := range list {
  145. // if v.SendStatus != 0 || v.Source != 1 || v.Result == "" || v.ErrMsg != "" {
  146. // continue
  147. // }
  148. // // 取出错误信息
  149. // fmt.Printf("正在更新%d\n", v.Id)
  150. // r := new(AliyunEmailResult)
  151. // if e = json.Unmarshal([]byte(v.Result), &r); e != nil {
  152. // fmt.Println("JSON解析报错了1" + e.Error())
  153. // continue
  154. // }
  155. // rd := new(AliyunEmailResultData)
  156. // res := strings.Replace(r.Data, `\`, ``, -1)
  157. // if e = json.Unmarshal([]byte(res), &rd); e != nil {
  158. // fmt.Println("JSON解析报错了2" + e.Error())
  159. // continue
  160. // }
  161. // v.ErrMsg = rd.Message
  162. // if e = v.Update([]string{"ErrMsg"}); e != nil {
  163. // fmt.Println("更新失败了" + e.Error())
  164. // continue
  165. // }
  166. // }
  167. // fmt.Println("更新成功")
  168. //}
  169. //func FixCompanyUpdateData1() {
  170. // list, err := company.GetCompanyProductUpdateLogList()
  171. // if err != nil {
  172. // fmt.Println("获取客户变更数据失败:", err)
  173. // return
  174. // }
  175. //
  176. // nowTime := time.Now()
  177. // for _, v := range list {
  178. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  179. // //if tmpErr != nil {
  180. // // fmt.Println(v.Id, "找数据,", tmpErr)
  181. // // continue
  182. // //}
  183. //
  184. // //permissionList := make([]*company.CompanyReportPermission, 0)
  185. // //switch v.Source {
  186. // //case "add", "receive", "thaw", "delay", "apply_receive":
  187. // //
  188. // //}
  189. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  190. // if err != nil {
  191. // fmt.Println("err:", err)
  192. // continue
  193. // }
  194. //
  195. // startDate := v.CreateTime
  196. // endDate := v.CreateTime.AddDate(0, 2, 0)
  197. // isStop := 1
  198. // realEndDate := endDate
  199. // if realEndDate.After(nowTime) {
  200. // realEndDate = nowTime
  201. // isStop = 0
  202. // }
  203. // for _, permission := range permissionList {
  204. // permission.StartDate = startDate.Format(utils.FormatDate)
  205. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  206. // }
  207. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  208. // Id: 0,
  209. // CompanyId: v.CompanyId,
  210. // ProductId: v.ProductId,
  211. // SellerId: v.SellerId,
  212. // SellerName: v.SellerName,
  213. // Source: v.Source,
  214. // StartDate: startDate,
  215. // EndDate: endDate,
  216. // RealEndDate: realEndDate,
  217. // IsStop: isStop,
  218. // CreateTime: v.CreateTime,
  219. // }
  220. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  221. // }
  222. // //fmt.Println("结束")
  223. //}
  224. //func FixCompanyUpdateData2() {
  225. // list, err := company.GetTryOutCompanyOperationRecordList()
  226. // if err != nil {
  227. // fmt.Println("获取客户变更数据失败:", err)
  228. // return
  229. // }
  230. //
  231. // nowTime := time.Now()
  232. // for _, v := range list {
  233. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  234. // //if tmpErr != nil {
  235. // // fmt.Println(v.Id, "找数据,", tmpErr)
  236. // // continue
  237. // //}
  238. //
  239. // //permissionList := make([]*company.CompanyReportPermission, 0)
  240. // //switch v.Source {
  241. // //case "add", "receive", "thaw", "delay", "apply_receive":
  242. // //
  243. // //}
  244. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  245. // if err != nil {
  246. // fmt.Println("err:", err)
  247. // continue
  248. // }
  249. //
  250. // startDate := v.CreateTime
  251. // endDate := v.CreateTime.AddDate(0, 2, 0)
  252. // isStop := 1
  253. // realEndDate := endDate
  254. // if realEndDate.After(nowTime) {
  255. // realEndDate = nowTime
  256. // isStop = 0
  257. // }
  258. // for _, permission := range permissionList {
  259. // permission.StartDate = startDate.Format(utils.FormatDate)
  260. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  261. // }
  262. // sellerName := ``
  263. // {
  264. // sysUser, _ := system.GetSysAdminById(v.SellerId)
  265. // if sysUser != nil {
  266. // sellerName = sysUser.RealName
  267. // }
  268. // }
  269. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  270. // Id: 0,
  271. // CompanyId: v.CompanyId,
  272. // ProductId: v.ProductId,
  273. // SellerId: v.SellerId,
  274. // SellerName: sellerName,
  275. // Source: "formal_to_try_out",
  276. // StartDate: startDate,
  277. // EndDate: endDate,
  278. // RealEndDate: realEndDate,
  279. // IsStop: isStop,
  280. // CreateTime: v.CreateTime,
  281. // }
  282. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  283. // }
  284. // fmt.Println("结束")
  285. //}
  286. // FixCompanyTryDay 修复试用天数
  287. //func FixCompanyTryDay() {
  288. // list, err := company.GetCompanyProductTryOutUpdateGroup()
  289. // if err != nil {
  290. // fmt.Println("获取客户变更数据失败:", err)
  291. // return
  292. // }
  293. //
  294. // lenList := len(list)
  295. // for k, v := range list {
  296. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  297. // companyProduct, tmpErr := company.GetCompanyProductByCompanyIdAndProductId(v.CompanyId, v.ProductId)
  298. // if tmpErr != nil {
  299. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";找不到对应的客户,Err:", tmpErr)
  300. // continue
  301. // }
  302. //
  303. // logList, err := company.GetCompanyProductTryOutUpdateList(v.CompanyId, v.ProductId)
  304. // if err != nil {
  305. // fmt.Println("查找客户日志失败,err:", err)
  306. // continue
  307. // }
  308. //
  309. // //lenLog := len(logList)
  310. // var day int //实际试用天数
  311. // var endDate time.Time
  312. // for _, log := range logList {
  313. // startDate := log.StartDate
  314. // if endDate.IsZero() {
  315. // endDate = log.RealEndDate
  316. // day = utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  317. // } else {
  318. // if log.RealEndDate.After(endDate) {
  319. // if endDate.After(startDate) {
  320. // startDate = endDate
  321. // }
  322. // if startDate.Equal(log.EndDate) {
  323. // day += utils.GetTimeSubDay(startDate, log.RealEndDate)
  324. // } else {
  325. // day += utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  326. // }
  327. // endDate = log.RealEndDate
  328. // }
  329. // }
  330. // }
  331. // companyProduct.TryOutDayTotal = day
  332. // companyProduct.Update([]string{"TryOutDayTotal"})
  333. // }
  334. // fmt.Println("结束")
  335. //}
  336. // StaticCompanyTryDay 定时任务每天更新试用天数
  337. //func StaticCompanyTryDay() {
  338. // list, err := company.GetCompanyProductTryOutUpdateNoStopGroup()
  339. // if err != nil {
  340. // fmt.Println("获取客户变更数据失败:", err)
  341. // return
  342. // }
  343. //
  344. // lenList := len(list)
  345. // for k, v := range list {
  346. // isAdd := false //是否要增加一天,默认不加
  347. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  348. // permissionList, tmpErr := company.GetCompanyReportPermissionByStatus(v.CompanyId, v.ProductId, "试用")
  349. // if tmpErr == nil {
  350. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";寻找对应的试用品种失败,Err:", tmpErr)
  351. // continue
  352. // }
  353. // currPermissionIdList := make([]int, 0) //当前试用的品种
  354. // for _, permission := range permissionList {
  355. // currPermissionIdList = append(currPermissionIdList, permission.ChartPermissionId)
  356. // }
  357. //
  358. // //获取所有未停止的记录列表
  359. // logList, err := company.GetCompanyProductTryOutUpdateNoStopListByEndDate(v.CompanyId, v.ProductId)
  360. // if err != nil {
  361. // fmt.Println("查找客户日志失败,err:", err)
  362. // continue
  363. // }
  364. //
  365. // logPermissionIdList := make([]int, 0) //当前日志中的试用的品种
  366. // for _, log := range logList {
  367. // //获取所有未停止的品种试用记录列表
  368. // logPermissionList, tmpErr := company.GetCompanyProductTryOutPermissionUpdateNoStopListByEndDate(log.Id)
  369. // if tmpErr != nil {
  370. // fmt.Println("查找客户品种变更日志失败,err:", tmpErr)
  371. // continue
  372. // }
  373. // lenLogPermissionList := len(logPermissionList) //当前日志存在试用的品种数量
  374. // stopPermission := 0 //当前日志需要停止的品种数量
  375. //
  376. // currTime := time.Now() //当前时间
  377. // for _, logPermission := range logPermissionList {
  378. // if utils.InArrayByInt(logPermissionIdList, logPermission.ChartPermissionId) {
  379. // // 如果已经被其他记录使用了,那么就将当前记录给标记停止
  380. // logPermission.IsStop = 1
  381. //
  382. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  383. // if currTime.After(logPermission.EndDate) {
  384. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  385. // } else {
  386. // logPermission.RealEndDate = time.Now()
  387. // }
  388. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  389. // stopPermission++
  390. // continue
  391. // } else if !utils.InArrayByInt(currPermissionIdList, logPermission.ChartPermissionId) {
  392. // // 如果该品种不在当前客户的品种里面,那么也要将当前记录给标记停止
  393. // logPermission.IsStop = 1
  394. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  395. // if currTime.After(logPermission.EndDate) {
  396. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  397. // } else {
  398. // logPermission.RealEndDate = time.Now()
  399. // }
  400. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  401. // stopPermission++
  402. // continue
  403. // }
  404. //
  405. // // 剩下的说明还处于试用状态,需要添加1天试用期,且需要把该品种加入到当前日志中的试用的品种列表
  406. // isAdd = true
  407. // logPermissionIdList = append(logPermissionIdList, logPermission.ChartPermissionId)
  408. // }
  409. //
  410. // //如果当前日志存在试用的品种数量 == 当前日志需要停止的品种数量
  411. // // 那么当前日志也是处于停用状态
  412. // if lenLogPermissionList == stopPermission {
  413. // log.IsStop = 1
  414. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  415. // if currTime.After(log.EndDate) {
  416. // log.RealEndDate = currTime.AddDate(0, 0, -1)
  417. // } else {
  418. // log.RealEndDate = time.Now()
  419. // }
  420. // log.Update([]string{"IsStop", "RealEndDate"})
  421. // }
  422. //
  423. // }
  424. //
  425. // // 如果需要添加,那么将该客户品种添加1天
  426. // if isAdd {
  427. //
  428. // }
  429. // // 更新客户产品的试用天数
  430. // tmpErr = company.AddCompanyProductTryOutDayTotal(v.CompanyId, v.ProductId)
  431. // if tmpErr != nil {
  432. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";更新客户产品的试用天数,Err:", tmpErr)
  433. // continue
  434. // }
  435. // }
  436. // fmt.Println("结束")
  437. //}
  438. //func Task2() {
  439. // fmt.Println("task start")
  440. //
  441. // //_, _ = maycur.GetPublicOfferingSaleLeader()
  442. // //_ = maycur.TestSyncCompanyProfile()
  443. // //_ = maycur.SyncCompanyProfile()
  444. // //_ = maycur.ImportExcelEmployeeId()
  445. // //_ = maycur.ApiTest()
  446. // //_ = maycur.ApiTest2()
  447. // fmt.Println("task end")
  448. //}
  449. // FixEnCompanyPermission 英文权限上线时修复英文客户拥有所有权限(一次性)
  450. func FixEnCompanyPermission() {
  451. var err error
  452. defer func() {
  453. if err != nil {
  454. fmt.Println("FixEnCompanyPermission Err: ", err.Error())
  455. }
  456. }()
  457. // 获取正式客户
  458. companies := make([]*models.EnglishCompany, 0)
  459. {
  460. cond := ` AND status = ?`
  461. pars := make([]interface{}, 0)
  462. pars = append(pars, 1)
  463. list, e := models.GetEnglishCompanyList(cond, pars, "")
  464. if e != nil {
  465. err = fmt.Errorf("GetEnglishCompanyList err: %s", e.Error())
  466. return
  467. }
  468. companies = list
  469. }
  470. // 获取所有权限
  471. permissions := make([]*models.EnPermission, 0)
  472. {
  473. cond := ` AND parent_id > ?`
  474. pars := make([]interface{}, 0)
  475. pars = append(pars, 0)
  476. ob := new(models.EnPermission)
  477. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  478. if e != nil {
  479. err = fmt.Errorf("GetPermissionItemsByCondition err: %s", e.Error())
  480. return
  481. }
  482. permissions = list
  483. }
  484. for _, c := range companies {
  485. ps := make([]*models.EnCompanyPermission, 0)
  486. for _, p := range permissions {
  487. ps = append(ps, &models.EnCompanyPermission{
  488. EnCompanyId: c.CompanyId,
  489. EnPermissionId: p.EnPermissionId,
  490. CreateTime: time.Now().Local(),
  491. })
  492. }
  493. if e := models.ClearAndCreateEnCompanyPermissions(c.CompanyId, ps); e != nil {
  494. err = fmt.Errorf("ClearAndCreateEnCompanyPermissions err: %s", e.Error())
  495. return
  496. }
  497. }
  498. fmt.Println("修复完成")
  499. }
  500. // ModifyEsEnglishReport 批量修改es里的英文研报信息和线上路演信息
  501. func ModifyEsEnglishReport() {
  502. fmt.Println("开始")
  503. err := ModifyAllEsEnglishReportVideo()
  504. if err != nil {
  505. err = fmt.Errorf("重置es中的英文研报信息失败:ModifyAllEnglishReportInEs err: %s", err.Error())
  506. return
  507. }
  508. fmt.Println("结束")
  509. }
  510. //func InsertBloombergIndex() {
  511. // fmt.Println("开始写入")
  512. //
  513. // start := 100000
  514. // now := time.Now()
  515. // for i := 1; i <= 100; i++ {
  516. // fmt.Printf("写入第%d个\n", i)
  517. //
  518. // start += 1
  519. // index := new(data_manage.BaseFromBloombergIndex)
  520. // index.IndexCode = fmt.Sprintf("BLID%d", start)
  521. // index.IndexName = fmt.Sprintf("模拟Bloomberg-%s", index.IndexCode)
  522. // index.Unit = "无"
  523. // index.Source = utils.DATA_SOURCE_BLOOMBERG
  524. // index.Frequency = "日度"
  525. // index.StartDate = now.AddDate(0, 0, -i)
  526. // index.EndDate = now
  527. // index.CreateTime = time.Now().Local()
  528. // index.ModifyTime = time.Now().Local()
  529. // if e := index.Create(); e != nil {
  530. // fmt.Printf("新增指标失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  531. // return
  532. // }
  533. //
  534. // insertData := make([]*data_manage.BaseFromBloombergData, 0)
  535. // for ii := 0; ii <= 50; ii++ {
  536. // indexData := new(data_manage.BaseFromBloombergData)
  537. // indexData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  538. // indexData.IndexCode = index.IndexCode
  539. // indexData.DataTime = now.AddDate(0, 0, -ii)
  540. // va := GenerateRandomFloat64InRange()
  541. // va += float64(ii)
  542. // indexData.Value = va
  543. // indexData.CreateTime = time.Now().Local()
  544. // indexData.ModifyTime = time.Now().Local()
  545. // indexData.DataTimestamp = int(indexData.DataTime.UnixNano() / 1e6)
  546. // insertData = append(insertData, indexData)
  547. // }
  548. // ob := new(data_manage.BaseFromBloombergData)
  549. // if e := ob.CreateMulti(insertData); e != nil {
  550. // fmt.Printf("新增指标数据失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  551. // return
  552. // }
  553. // }
  554. //
  555. // fmt.Println("结束写入")
  556. //}
  557. //
  558. //func GenerateRandomFloat64InRange() float64 {
  559. // var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) // 设置随机数种子
  560. //
  561. // return rnd.Float64()*11000 - 1000
  562. //}
  563. // HandleWechatArticleOp
  564. // @Description: 处理ETA报告加入到知识库
  565. func HandleWechatArticleOp() {
  566. defer func() {
  567. if err := recover(); err != nil {
  568. fmt.Println("[HandleWechatArticleOp]", err)
  569. }
  570. }()
  571. obj := rag.WechatPlatform{}
  572. for {
  573. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
  574. wechatArticleOp := new(cache.WechatPlatformOp)
  575. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  576. fmt.Println("json unmarshal wrong!")
  577. return
  578. }
  579. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  580. if tmpErr != nil {
  581. // 找不到就处理失败
  582. return
  583. }
  584. switch wechatArticleOp.Source {
  585. case `add`:
  586. AddWechatPlatform(item)
  587. case `refresh`:
  588. BeachAddWechatArticle(item, 2)
  589. }
  590. })
  591. }
  592. }
  593. // HandleWechatArticleLLmOp
  594. // @Description: 处理微信文章加入知识库
  595. func HandleWechatArticleLLmOp() {
  596. defer func() {
  597. if err := recover(); err != nil {
  598. fmt.Println("[HandleWechatArticleLLmOp]", err)
  599. }
  600. }()
  601. for {
  602. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE_KNOWLEDGE, handleWechatArticleLLmOp)
  603. }
  604. }
  605. // handleWechatArticleLLmOp
  606. // @Description: 处理微信文章加入知识库
  607. // @author: Roc
  608. // @datetime 2025-04-24 13:33:08
  609. // @param b []byte
  610. func handleWechatArticleLLmOp(b []byte) {
  611. var err error
  612. defer func() {
  613. if err != nil {
  614. utils.FileLog.Error("[handleWechatArticleLLmOp] params:%s;err:%s", string(b), err.Error())
  615. }
  616. }()
  617. obj := rag.WechatArticle{}
  618. wechatArticleOp := new(cache.WechatArticleOp)
  619. if err = json.Unmarshal(b, &wechatArticleOp); err != nil {
  620. fmt.Println("json unmarshal wrong!")
  621. return
  622. }
  623. item, err := obj.GetById(wechatArticleOp.WechatArticleId)
  624. if err != nil {
  625. // 找不到就处理失败
  626. return
  627. }
  628. // 文章加入到知识库
  629. ArticleToKnowledge(item)
  630. // 生成摘要
  631. if wechatArticleOp.QuestionId <= 0 {
  632. // 全部摘要生成
  633. GenerateWechatArticleAbstract(item, false)
  634. } else {
  635. questionObj := rag.Question{}
  636. questionInfo, tmpErr := questionObj.GetByID(wechatArticleOp.QuestionId)
  637. if tmpErr != nil {
  638. err = tmpErr
  639. return
  640. }
  641. // 指定指定摘要生成
  642. err = GenerateWechatArticleAbstractByQuestion(item, questionInfo, false)
  643. }
  644. }
  645. // HandleEtaReportUpdateOp
  646. // @Description: 处理eta报告加入知识库操作
  647. func HandleEtaReportUpdateOp() {
  648. defer func() {
  649. if err := recover(); err != nil {
  650. fmt.Println("[HandleEtaReportUpdateOp]", err)
  651. }
  652. }()
  653. for {
  654. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE, func(b []byte) {
  655. ragEtaReportOpOp := new(cache.RagEtaReportOp)
  656. if err := json.Unmarshal(b, &ragEtaReportOpOp); err != nil {
  657. fmt.Println("json unmarshal wrong!")
  658. return
  659. }
  660. switch ragEtaReportOpOp.Source {
  661. case `publish`:
  662. ReportAddOrModifyKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  663. case `un_publish`:
  664. ReportUnPublishedKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  665. }
  666. })
  667. }
  668. }
  669. // HandleEtaReportKnowledgeLLmOp
  670. // @Description: 处理微信文章加入知识库
  671. func HandleEtaReportKnowledgeLLmOp() {
  672. defer func() {
  673. if err := recover(); err != nil {
  674. fmt.Println("[HandleEtaReportKnowledgeLLmOp]", err)
  675. }
  676. }()
  677. for {
  678. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE_LLM, handleEtaReportKnowledgeLLmOp)
  679. }
  680. }
  681. // handleEtaReportKnowledgeLLmOp
  682. // @Description: 处理微信文章加入知识库操作
  683. // @author: Roc
  684. // @datetime 2025-04-24 14:04:10
  685. // @param b []byte
  686. func handleEtaReportKnowledgeLLmOp(b []byte) {
  687. var err error
  688. defer func() {
  689. if err != nil {
  690. utils.FileLog.Error("[handleEtaReportKnowledgeLLmOp] params:%s;err:%s", string(b), err.Error())
  691. }
  692. }()
  693. obj := rag.RagEtaReport{}
  694. wechatArticleOp := new(cache.RagEtaReportLlmOp)
  695. if err = json.Unmarshal(b, &wechatArticleOp); err != nil {
  696. fmt.Println("json unmarshal wrong!")
  697. return
  698. }
  699. item, err := obj.GetById(wechatArticleOp.RagEtaReportId)
  700. if err != nil {
  701. // 找不到就处理失败
  702. return
  703. }
  704. // 已经删除的就不做操作了
  705. if item.IsDeleted == 1 {
  706. return
  707. }
  708. // 未发布的就不操作了
  709. if item.IsPublished != 1 {
  710. return
  711. }
  712. // 文章加入到知识库
  713. //ArticleToKnowledge(item)
  714. // 生成摘要
  715. if wechatArticleOp.QuestionId <= 0 {
  716. // 全部提示词摘要生成
  717. GenerateRagEtaReportAbstract(item, wechatArticleOp.ForceGenerate)
  718. } else {
  719. questionObj := rag.Question{}
  720. questionInfo, tmpErr := questionObj.GetByID(wechatArticleOp.QuestionId)
  721. if tmpErr != nil {
  722. err = tmpErr
  723. return
  724. }
  725. // 全部提示词摘要生成
  726. err = GenerateRagEtaReportAbstractByQuestion(item, questionInfo, wechatArticleOp.ForceGenerate)
  727. }
  728. }
  729. // HandleAiArticleAbstractLlmOp
  730. // @Description: 处理AI库的报告摘要生成(批量任务)
  731. // @author: Roc
  732. // @datetime 2025-04-24 10:25:51
  733. func HandleAiArticleAbstractLlmOp() {
  734. defer func() {
  735. if err := recover(); err != nil {
  736. fmt.Println("[HandleAiArticleAbstractLlmOp]", err)
  737. }
  738. }()
  739. for {
  740. utils.Rc.Brpop(utils.CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK, handleAiArticleAbstractLlmOp)
  741. }
  742. }
  743. var aiTaskHandleIdMap = map[int]bool{}
  744. // todo 任务开始时间
  745. // handleAiArticleAbstractLlmOp
  746. // @Description: 处理AI库的报告摘要生成(批量任务)
  747. // @author: Roc
  748. // @datetime 2025-04-24 11:26:05
  749. // @param b []byte
  750. func handleAiArticleAbstractLlmOp(b []byte) {
  751. var err error
  752. defer func() {
  753. if err != nil {
  754. utils.FileLog.Error("[handleAiArticleAbstractLlmOp] params:%s;err:%s", string(b), err.Error())
  755. }
  756. }()
  757. obj := rag.AiTaskRecord{}
  758. aiTaskRecordOp := new(cache.AiTaskRecordOp)
  759. if err = json.Unmarshal(b, &aiTaskRecordOp); err != nil {
  760. fmt.Println("json unmarshal wrong!")
  761. return
  762. }
  763. item, err := obj.GetByID(aiTaskRecordOp.AiTaskRecordId)
  764. if err != nil {
  765. err = fmt.Errorf("查找任务记录状态失败, err: %s", err.Error())
  766. return
  767. }
  768. // 如果没有处理过该任务,那么就标记该任务开始
  769. if _, ok := aiTaskHandleIdMap[item.AiTaskID]; !ok {
  770. aiTaskObj := rag.AiTask{}
  771. aiTaskInfo, tmpErr := aiTaskObj.GetByID(item.AiTaskID)
  772. if tmpErr != nil {
  773. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  774. return
  775. }
  776. // 如果任务是初始化,那么就标记开始
  777. if aiTaskInfo.Status == `init` {
  778. aiTaskInfo.StartTime = time.Now()
  779. aiTaskInfo.Status = `processing`
  780. aiTaskInfo.UpdateTime = time.Now()
  781. tmpErr = aiTaskInfo.Update([]string{`start_time`, "status", "update_time"})
  782. if tmpErr != nil {
  783. utils.FileLog.Error("标记任务开始状态失败, err: %s", tmpErr.Error())
  784. }
  785. }
  786. }
  787. // 处理完成后标记任务状态
  788. defer func() {
  789. // 修改任务状态
  790. todoCount, tmpErr := obj.GetCountByCondition(fmt.Sprintf(` AND %s = ? AND %s = ? `, rag.AiTaskColumns.AiTaskID, rag.AiTaskColumns.Status), []interface{}{item.AiTaskID, `待处理`})
  791. if tmpErr != nil {
  792. err = fmt.Errorf("查找剩余任务数量失败, err: %s", tmpErr.Error())
  793. return
  794. }
  795. if todoCount <= 0 {
  796. aiTaskObj := rag.AiTask{}
  797. aiTaskInfo, tmpErr := aiTaskObj.GetByID(item.AiTaskID)
  798. if tmpErr != nil {
  799. err = fmt.Errorf("查找任务失败, err: %s", tmpErr.Error())
  800. return
  801. }
  802. aiTaskInfo.EndTime = time.Now()
  803. aiTaskInfo.Status = `done`
  804. aiTaskInfo.UpdateTime = time.Now()
  805. tmpErr = aiTaskInfo.Update([]string{`end_time`, "status", "update_time"})
  806. if tmpErr != nil {
  807. utils.FileLog.Error("标记任务状态失败, err: %s", tmpErr.Error())
  808. }
  809. }
  810. return
  811. }()
  812. // 不是待处理就不处理
  813. if item.Status != `待处理` {
  814. return
  815. }
  816. // 处理完成后标记记录状态
  817. defer func() {
  818. status := `处理成功`
  819. remark := ``
  820. if err != nil {
  821. status = `处理失败`
  822. remark = err.Error()
  823. }
  824. item.Status = status
  825. item.Remark = remark
  826. item.ModifyTime = time.Now()
  827. tmpErr := item.Update([]string{"status", "remark", "modify_time"})
  828. if tmpErr != nil {
  829. utils.FileLog.Error("标记任务记录状态失败, err: %s", tmpErr.Error())
  830. }
  831. }()
  832. var params rag.QuestionGenerateAbstractParam
  833. if err = json.Unmarshal([]byte(item.Parameters), &params); err != nil {
  834. fmt.Println("json unmarshal wrong!")
  835. return
  836. }
  837. // 查找提示词
  838. questionObj := rag.Question{}
  839. questionInfo, tmpErr := questionObj.GetByID(params.QuestionId)
  840. if tmpErr != nil {
  841. // 找不到就处理失败
  842. err = fmt.Errorf("查找提示词失败, err: %s", err.Error())
  843. return
  844. }
  845. switch params.ArticleType {
  846. case `wechat_article`:
  847. articleObj := rag.WechatArticle{}
  848. articleInfo, tmpErr := articleObj.GetById(params.ArticleId)
  849. if tmpErr != nil {
  850. err = tmpErr
  851. // 找不到就处理失败
  852. return
  853. }
  854. // 生成摘要
  855. err = GenerateWechatArticleAbstractByQuestion(articleInfo, questionInfo, true)
  856. case `rag_eta_report`:
  857. articleObj := rag.RagEtaReport{}
  858. articleInfo, tmpErr := articleObj.GetById(params.ArticleId)
  859. if tmpErr != nil {
  860. err = tmpErr
  861. // 找不到就处理失败
  862. return
  863. }
  864. // 已经删除的就不做操作了
  865. if articleInfo.IsDeleted == 1 {
  866. return
  867. }
  868. // 未发布的就不操作了
  869. if articleInfo.IsPublished != 1 {
  870. return
  871. }
  872. // 生成摘要
  873. err = GenerateRagEtaReportAbstractByQuestion(articleInfo, questionInfo, true)
  874. }
  875. }