task.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/cache"
  5. "eta/eta_api/models"
  6. "eta/eta_api/models/rag"
  7. "eta/eta_api/services/binlog"
  8. "eta/eta_api/services/data"
  9. edbmonitor "eta/eta_api/services/edb_monitor"
  10. "eta/eta_api/services/llm"
  11. "eta/eta_api/utils"
  12. "fmt"
  13. "strings"
  14. "time"
  15. )
  16. func Task() {
  17. fmt.Println("task start")
  18. {
  19. // 修复客户试用数据
  20. //FixCompanyUpdateData()
  21. //FixCompanyTryDay()
  22. //FixCompanyTryDay()
  23. //StaticCompanyTryDay()
  24. //return
  25. }
  26. //FixPermissionStatus()
  27. //GetCompanyInfo()
  28. //ReportCount()
  29. //windSourceUrl:=`http://47.100.166.55:7002/edbInfo/wind/?EdbCode=M0001427&StartDate=2020-11-01&EndDate=2021-03-01`
  30. //data.AddAllArticle()
  31. //data.GetSmmIndex()
  32. //data.GetSmmIndexData()
  33. go AutoInsertLogToDB()
  34. //手工数据表格导入后的指标库刷新
  35. go ImportManualDataRefresh()
  36. // 加入上海钢联指标数据之后的刷新
  37. go MysteelChemicalDataAdd()
  38. //修复用户关注标识
  39. //GetWxUsersSubscribe()
  40. go AutoInsertAdminOperateRecordToDB()
  41. // 指标刷新
  42. go data.HandleEdbRefreshQueue()
  43. // 进行指标替换操作
  44. go DealReplaceEdbCache()
  45. // TODO:监听这里需要找下达梦的方案,主流程先跑起来再处理
  46. // 监听binlog
  47. if utils.MYSQL_DATA_BINLOG_URL != "" && utils.DbDriverName == `mysql` {
  48. go binlog.ListenMysql()
  49. go edbmonitor.HandleEdbMonitorEdbInfo()
  50. // 监听数据源binlog写入es
  51. go binlog.HandleDataSourceChange2Es()
  52. }
  53. go StartSessionManager()
  54. go llm.SaveAllChatRecordsToDB()
  55. // 定时任务进行微信文章操作
  56. // 定时任务进行微信公众号添加、文章刷新操作(暂不处理了,改成其他服务处理)
  57. //go HandleWechatArticleOp()
  58. // 定时任务进行微信文章LLM操作
  59. go HandleWechatArticleLLmOp()
  60. // 队列任务将eta报告同步到知识库操作
  61. go HandleEtaReportKnowledgeLLmOp()
  62. // 权益报告监听入库
  63. go AutoInsertRaiReport()
  64. // TODO:数据修复
  65. //FixNewEs()
  66. fmt.Println("task end")
  67. }
  68. //// 每日发布晨报
  69. //func AutoPublishDayReport() {
  70. // defer func() {
  71. // if err := recover(); err != nil {
  72. // fmt.Println("[AutoPublishDayReport]", err)
  73. // }
  74. // }()
  75. //
  76. // // 每日8:42发布晨报
  77. // ticker := time.Tick(50 * time.Second)
  78. // for range ticker {
  79. // nowTime := time.Now()
  80. // clock := nowTime.Format("1504")
  81. // if clock == "0842" {
  82. // if err := PublishTodayDayReport(); err != nil {
  83. // go alarm_msg.SendAlarmMsg(fmt.Sprint("每日晨报自动发送 AutoPublishDayReport ERR:", err), 3)
  84. // //utils.SendEmail(utils.APPNAME+" "+utils.RunMode+" 失败提醒", fmt.Sprint("AutoPublishDayReport ERR:", err), utils.EmailSendToUsers)
  85. // }
  86. // }
  87. // }
  88. //}
  89. // ImportManualDataRefresh 导入手工数据后的刷新
  90. func ImportManualDataRefresh() {
  91. defer func() {
  92. if err := recover(); err != nil {
  93. fmt.Println("[ImportManualDataRefresh]", err)
  94. }
  95. }()
  96. for {
  97. utils.Rc.Brpop(utils.CACHE_IMPORT_MANUAL_DATA, func(b []byte) {
  98. edbCode := string(b)
  99. edbCode = strings.TrimPrefix(edbCode, `"`)
  100. edbCode = strings.TrimSuffix(edbCode, `"`)
  101. data.RefreshManualData(edbCode)
  102. })
  103. }
  104. }
  105. // MysteelChemicalDataAdd 加入上海钢联指标数据之后的刷新
  106. func MysteelChemicalDataAdd() {
  107. defer func() {
  108. if err := recover(); err != nil {
  109. fmt.Println("[ImportManualDataRefresh]", err)
  110. }
  111. }()
  112. for {
  113. utils.Rc.Brpop(utils.CACHE_MYSTEEL_CHEMICAL_ADD_DATA, func(b []byte) {
  114. edbCode := string(b)
  115. edbCode = strings.TrimPrefix(edbCode, `"`)
  116. edbCode = strings.TrimSuffix(edbCode, `"`)
  117. data.RefreshMysteelChemicalData(edbCode)
  118. })
  119. }
  120. }
  121. //func init() {
  122. // fmt.Println("start task init")
  123. // UpdateEnglishEmailLogErrMsg()
  124. // fmt.Println("end task init")
  125. //}
  126. //
  127. //// UpdateEnglishEmailLogErrMsg 更新英文邮件日志的ErrMsg(研报后台4.2上线后执行, 仅一次)
  128. //func UpdateEnglishEmailLogErrMsg() {
  129. // var cond string
  130. // var pars []interface{}
  131. // list, e := models.GetEnglishReportEmailLogList(cond, pars)
  132. // if e != nil {
  133. // fmt.Println("获取日志列表失败")
  134. // return
  135. // }
  136. // for _, v := range list {
  137. // if v.SendStatus != 0 || v.Source != 1 || v.Result == "" || v.ErrMsg != "" {
  138. // continue
  139. // }
  140. // // 取出错误信息
  141. // fmt.Printf("正在更新%d\n", v.Id)
  142. // r := new(AliyunEmailResult)
  143. // if e = json.Unmarshal([]byte(v.Result), &r); e != nil {
  144. // fmt.Println("JSON解析报错了1" + e.Error())
  145. // continue
  146. // }
  147. // rd := new(AliyunEmailResultData)
  148. // res := strings.Replace(r.Data, `\`, ``, -1)
  149. // if e = json.Unmarshal([]byte(res), &rd); e != nil {
  150. // fmt.Println("JSON解析报错了2" + e.Error())
  151. // continue
  152. // }
  153. // v.ErrMsg = rd.Message
  154. // if e = v.Update([]string{"ErrMsg"}); e != nil {
  155. // fmt.Println("更新失败了" + e.Error())
  156. // continue
  157. // }
  158. // }
  159. // fmt.Println("更新成功")
  160. //}
  161. //func FixCompanyUpdateData1() {
  162. // list, err := company.GetCompanyProductUpdateLogList()
  163. // if err != nil {
  164. // fmt.Println("获取客户变更数据失败:", err)
  165. // return
  166. // }
  167. //
  168. // nowTime := time.Now()
  169. // for _, v := range list {
  170. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  171. // //if tmpErr != nil {
  172. // // fmt.Println(v.Id, "找数据,", tmpErr)
  173. // // continue
  174. // //}
  175. //
  176. // //permissionList := make([]*company.CompanyReportPermission, 0)
  177. // //switch v.Source {
  178. // //case "add", "receive", "thaw", "delay", "apply_receive":
  179. // //
  180. // //}
  181. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  182. // if err != nil {
  183. // fmt.Println("err:", err)
  184. // continue
  185. // }
  186. //
  187. // startDate := v.CreateTime
  188. // endDate := v.CreateTime.AddDate(0, 2, 0)
  189. // isStop := 1
  190. // realEndDate := endDate
  191. // if realEndDate.After(nowTime) {
  192. // realEndDate = nowTime
  193. // isStop = 0
  194. // }
  195. // for _, permission := range permissionList {
  196. // permission.StartDate = startDate.Format(utils.FormatDate)
  197. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  198. // }
  199. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  200. // Id: 0,
  201. // CompanyId: v.CompanyId,
  202. // ProductId: v.ProductId,
  203. // SellerId: v.SellerId,
  204. // SellerName: v.SellerName,
  205. // Source: v.Source,
  206. // StartDate: startDate,
  207. // EndDate: endDate,
  208. // RealEndDate: realEndDate,
  209. // IsStop: isStop,
  210. // CreateTime: v.CreateTime,
  211. // }
  212. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  213. // }
  214. // //fmt.Println("结束")
  215. //}
  216. //func FixCompanyUpdateData2() {
  217. // list, err := company.GetTryOutCompanyOperationRecordList()
  218. // if err != nil {
  219. // fmt.Println("获取客户变更数据失败:", err)
  220. // return
  221. // }
  222. //
  223. // nowTime := time.Now()
  224. // for _, v := range list {
  225. // //item, tmpErr := company.GetCompanyProductLogItem(v.CompanyId, v.ProductId, v.CreateTime)
  226. // //if tmpErr != nil {
  227. // // fmt.Println(v.Id, "找数据,", tmpErr)
  228. // // continue
  229. // //}
  230. //
  231. // //permissionList := make([]*company.CompanyReportPermission, 0)
  232. // //switch v.Source {
  233. // //case "add", "receive", "thaw", "delay", "apply_receive":
  234. // //
  235. // //}
  236. // permissionList, err := company.GetCompanyReportPermission(v.CompanyId, v.ProductId)
  237. // if err != nil {
  238. // fmt.Println("err:", err)
  239. // continue
  240. // }
  241. //
  242. // startDate := v.CreateTime
  243. // endDate := v.CreateTime.AddDate(0, 2, 0)
  244. // isStop := 1
  245. // realEndDate := endDate
  246. // if realEndDate.After(nowTime) {
  247. // realEndDate = nowTime
  248. // isStop = 0
  249. // }
  250. // for _, permission := range permissionList {
  251. // permission.StartDate = startDate.Format(utils.FormatDate)
  252. // permission.EndDate = v.CreateTime.Format(utils.FormatDate)
  253. // }
  254. // sellerName := ``
  255. // {
  256. // sysUser, _ := system.GetSysAdminById(v.SellerId)
  257. // if sysUser != nil {
  258. // sellerName = sysUser.RealName
  259. // }
  260. // }
  261. // companyProductTryOutUpdateLogItem := &company.CompanyProductTryOutUpdateLog{
  262. // Id: 0,
  263. // CompanyId: v.CompanyId,
  264. // ProductId: v.ProductId,
  265. // SellerId: v.SellerId,
  266. // SellerName: sellerName,
  267. // Source: "formal_to_try_out",
  268. // StartDate: startDate,
  269. // EndDate: endDate,
  270. // RealEndDate: realEndDate,
  271. // IsStop: isStop,
  272. // CreateTime: v.CreateTime,
  273. // }
  274. // err = company.AddCompanyProductTryOutUpdateLog(companyProductTryOutUpdateLogItem, permissionList)
  275. // }
  276. // fmt.Println("结束")
  277. //}
  278. // FixCompanyTryDay 修复试用天数
  279. //func FixCompanyTryDay() {
  280. // list, err := company.GetCompanyProductTryOutUpdateGroup()
  281. // if err != nil {
  282. // fmt.Println("获取客户变更数据失败:", err)
  283. // return
  284. // }
  285. //
  286. // lenList := len(list)
  287. // for k, v := range list {
  288. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  289. // companyProduct, tmpErr := company.GetCompanyProductByCompanyIdAndProductId(v.CompanyId, v.ProductId)
  290. // if tmpErr != nil {
  291. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";找不到对应的客户,Err:", tmpErr)
  292. // continue
  293. // }
  294. //
  295. // logList, err := company.GetCompanyProductTryOutUpdateList(v.CompanyId, v.ProductId)
  296. // if err != nil {
  297. // fmt.Println("查找客户日志失败,err:", err)
  298. // continue
  299. // }
  300. //
  301. // //lenLog := len(logList)
  302. // var day int //实际试用天数
  303. // var endDate time.Time
  304. // for _, log := range logList {
  305. // startDate := log.StartDate
  306. // if endDate.IsZero() {
  307. // endDate = log.RealEndDate
  308. // day = utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  309. // } else {
  310. // if log.RealEndDate.After(endDate) {
  311. // if endDate.After(startDate) {
  312. // startDate = endDate
  313. // }
  314. // if startDate.Equal(log.EndDate) {
  315. // day += utils.GetTimeSubDay(startDate, log.RealEndDate)
  316. // } else {
  317. // day += utils.GetTimeSubDay(startDate, log.RealEndDate) + 1
  318. // }
  319. // endDate = log.RealEndDate
  320. // }
  321. // }
  322. // }
  323. // companyProduct.TryOutDayTotal = day
  324. // companyProduct.Update([]string{"TryOutDayTotal"})
  325. // }
  326. // fmt.Println("结束")
  327. //}
  328. // StaticCompanyTryDay 定时任务每天更新试用天数
  329. //func StaticCompanyTryDay() {
  330. // list, err := company.GetCompanyProductTryOutUpdateNoStopGroup()
  331. // if err != nil {
  332. // fmt.Println("获取客户变更数据失败:", err)
  333. // return
  334. // }
  335. //
  336. // lenList := len(list)
  337. // for k, v := range list {
  338. // isAdd := false //是否要增加一天,默认不加
  339. // fmt.Println("剩余", lenList-k-1, "条数据修复")
  340. // permissionList, tmpErr := company.GetCompanyReportPermissionByStatus(v.CompanyId, v.ProductId, "试用")
  341. // if tmpErr == nil {
  342. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";寻找对应的试用品种失败,Err:", tmpErr)
  343. // continue
  344. // }
  345. // currPermissionIdList := make([]int, 0) //当前试用的品种
  346. // for _, permission := range permissionList {
  347. // currPermissionIdList = append(currPermissionIdList, permission.ChartPermissionId)
  348. // }
  349. //
  350. // //获取所有未停止的记录列表
  351. // logList, err := company.GetCompanyProductTryOutUpdateNoStopListByEndDate(v.CompanyId, v.ProductId)
  352. // if err != nil {
  353. // fmt.Println("查找客户日志失败,err:", err)
  354. // continue
  355. // }
  356. //
  357. // logPermissionIdList := make([]int, 0) //当前日志中的试用的品种
  358. // for _, log := range logList {
  359. // //获取所有未停止的品种试用记录列表
  360. // logPermissionList, tmpErr := company.GetCompanyProductTryOutPermissionUpdateNoStopListByEndDate(log.Id)
  361. // if tmpErr != nil {
  362. // fmt.Println("查找客户品种变更日志失败,err:", tmpErr)
  363. // continue
  364. // }
  365. // lenLogPermissionList := len(logPermissionList) //当前日志存在试用的品种数量
  366. // stopPermission := 0 //当前日志需要停止的品种数量
  367. //
  368. // currTime := time.Now() //当前时间
  369. // for _, logPermission := range logPermissionList {
  370. // if utils.InArrayByInt(logPermissionIdList, logPermission.ChartPermissionId) {
  371. // // 如果已经被其他记录使用了,那么就将当前记录给标记停止
  372. // logPermission.IsStop = 1
  373. //
  374. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  375. // if currTime.After(logPermission.EndDate) {
  376. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  377. // } else {
  378. // logPermission.RealEndDate = time.Now()
  379. // }
  380. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  381. // stopPermission++
  382. // continue
  383. // } else if !utils.InArrayByInt(currPermissionIdList, logPermission.ChartPermissionId) {
  384. // // 如果该品种不在当前客户的品种里面,那么也要将当前记录给标记停止
  385. // logPermission.IsStop = 1
  386. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  387. // if currTime.After(logPermission.EndDate) {
  388. // logPermission.RealEndDate = currTime.AddDate(0, 0, -1)
  389. // } else {
  390. // logPermission.RealEndDate = time.Now()
  391. // }
  392. // logPermission.Update([]string{"IsStop", "RealEndDate"})
  393. // stopPermission++
  394. // continue
  395. // }
  396. //
  397. // // 剩下的说明还处于试用状态,需要添加1天试用期,且需要把该品种加入到当前日志中的试用的品种列表
  398. // isAdd = true
  399. // logPermissionIdList = append(logPermissionIdList, logPermission.ChartPermissionId)
  400. // }
  401. //
  402. // //如果当前日志存在试用的品种数量 == 当前日志需要停止的品种数量
  403. // // 那么当前日志也是处于停用状态
  404. // if lenLogPermissionList == stopPermission {
  405. // log.IsStop = 1
  406. // // 如果当前时间晚于结束日期,那么实际结束日期就是今天之前一天,否则就是当天
  407. // if currTime.After(log.EndDate) {
  408. // log.RealEndDate = currTime.AddDate(0, 0, -1)
  409. // } else {
  410. // log.RealEndDate = time.Now()
  411. // }
  412. // log.Update([]string{"IsStop", "RealEndDate"})
  413. // }
  414. //
  415. // }
  416. //
  417. // // 如果需要添加,那么将该客户品种添加1天
  418. // if isAdd {
  419. //
  420. // }
  421. // // 更新客户产品的试用天数
  422. // tmpErr = company.AddCompanyProductTryOutDayTotal(v.CompanyId, v.ProductId)
  423. // if tmpErr != nil {
  424. // fmt.Println("company_id:", v.CompanyId, ";product_id:", v.ProductId, ";更新客户产品的试用天数,Err:", tmpErr)
  425. // continue
  426. // }
  427. // }
  428. // fmt.Println("结束")
  429. //}
  430. //func Task2() {
  431. // fmt.Println("task start")
  432. //
  433. // //_, _ = maycur.GetPublicOfferingSaleLeader()
  434. // //_ = maycur.TestSyncCompanyProfile()
  435. // //_ = maycur.SyncCompanyProfile()
  436. // //_ = maycur.ImportExcelEmployeeId()
  437. // //_ = maycur.ApiTest()
  438. // //_ = maycur.ApiTest2()
  439. // fmt.Println("task end")
  440. //}
  441. // FixEnCompanyPermission 英文权限上线时修复英文客户拥有所有权限(一次性)
  442. func FixEnCompanyPermission() {
  443. var err error
  444. defer func() {
  445. if err != nil {
  446. fmt.Println("FixEnCompanyPermission Err: ", err.Error())
  447. }
  448. }()
  449. // 获取正式客户
  450. companies := make([]*models.EnglishCompany, 0)
  451. {
  452. cond := ` AND status = ?`
  453. pars := make([]interface{}, 0)
  454. pars = append(pars, 1)
  455. list, e := models.GetEnglishCompanyList(cond, pars, "")
  456. if e != nil {
  457. err = fmt.Errorf("GetEnglishCompanyList err: %s", e.Error())
  458. return
  459. }
  460. companies = list
  461. }
  462. // 获取所有权限
  463. permissions := make([]*models.EnPermission, 0)
  464. {
  465. cond := ` AND parent_id > ?`
  466. pars := make([]interface{}, 0)
  467. pars = append(pars, 0)
  468. ob := new(models.EnPermission)
  469. list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
  470. if e != nil {
  471. err = fmt.Errorf("GetPermissionItemsByCondition err: %s", e.Error())
  472. return
  473. }
  474. permissions = list
  475. }
  476. for _, c := range companies {
  477. ps := make([]*models.EnCompanyPermission, 0)
  478. for _, p := range permissions {
  479. ps = append(ps, &models.EnCompanyPermission{
  480. EnCompanyId: c.CompanyId,
  481. EnPermissionId: p.EnPermissionId,
  482. CreateTime: time.Now().Local(),
  483. })
  484. }
  485. if e := models.ClearAndCreateEnCompanyPermissions(c.CompanyId, ps); e != nil {
  486. err = fmt.Errorf("ClearAndCreateEnCompanyPermissions err: %s", e.Error())
  487. return
  488. }
  489. }
  490. fmt.Println("修复完成")
  491. }
  492. // ModifyEsEnglishReport 批量修改es里的英文研报信息和线上路演信息
  493. func ModifyEsEnglishReport() {
  494. fmt.Println("开始")
  495. err := ModifyAllEsEnglishReportVideo()
  496. if err != nil {
  497. err = fmt.Errorf("重置es中的英文研报信息失败:ModifyAllEnglishReportInEs err: %s", err.Error())
  498. return
  499. }
  500. fmt.Println("结束")
  501. }
  502. //func InsertBloombergIndex() {
  503. // fmt.Println("开始写入")
  504. //
  505. // start := 100000
  506. // now := time.Now()
  507. // for i := 1; i <= 100; i++ {
  508. // fmt.Printf("写入第%d个\n", i)
  509. //
  510. // start += 1
  511. // index := new(data_manage.BaseFromBloombergIndex)
  512. // index.IndexCode = fmt.Sprintf("BLID%d", start)
  513. // index.IndexName = fmt.Sprintf("模拟Bloomberg-%s", index.IndexCode)
  514. // index.Unit = "无"
  515. // index.Source = utils.DATA_SOURCE_BLOOMBERG
  516. // index.Frequency = "日度"
  517. // index.StartDate = now.AddDate(0, 0, -i)
  518. // index.EndDate = now
  519. // index.CreateTime = time.Now().Local()
  520. // index.ModifyTime = time.Now().Local()
  521. // if e := index.Create(); e != nil {
  522. // fmt.Printf("新增指标失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  523. // return
  524. // }
  525. //
  526. // insertData := make([]*data_manage.BaseFromBloombergData, 0)
  527. // for ii := 0; ii <= 50; ii++ {
  528. // indexData := new(data_manage.BaseFromBloombergData)
  529. // indexData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
  530. // indexData.IndexCode = index.IndexCode
  531. // indexData.DataTime = now.AddDate(0, 0, -ii)
  532. // va := GenerateRandomFloat64InRange()
  533. // va += float64(ii)
  534. // indexData.Value = va
  535. // indexData.CreateTime = time.Now().Local()
  536. // indexData.ModifyTime = time.Now().Local()
  537. // indexData.DataTimestamp = int(indexData.DataTime.UnixNano() / 1e6)
  538. // insertData = append(insertData, indexData)
  539. // }
  540. // ob := new(data_manage.BaseFromBloombergData)
  541. // if e := ob.CreateMulti(insertData); e != nil {
  542. // fmt.Printf("新增指标数据失败, IndexCode: %s, err: %s", index.IndexCode, e.Error())
  543. // return
  544. // }
  545. // }
  546. //
  547. // fmt.Println("结束写入")
  548. //}
  549. //
  550. //func GenerateRandomFloat64InRange() float64 {
  551. // var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) // 设置随机数种子
  552. //
  553. // return rnd.Float64()*11000 - 1000
  554. //}
  555. // HandleSearchByWechatOp
  556. // @Description: 处理微信爬虫
  557. func HandleWechatArticleOp() {
  558. defer func() {
  559. if err := recover(); err != nil {
  560. fmt.Println("[HandleWechatArticleOp]", err)
  561. }
  562. }()
  563. obj := rag.WechatPlatform{}
  564. for {
  565. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
  566. wechatArticleOp := new(cache.WechatArticleOp)
  567. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  568. fmt.Println("json unmarshal wrong!")
  569. return
  570. }
  571. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  572. if tmpErr != nil {
  573. // 找不到就处理失败
  574. return
  575. }
  576. switch wechatArticleOp.Source {
  577. case `add`:
  578. AddWechatPlatform(item)
  579. case `refresh`:
  580. BeachAddWechatArticle(item, 2)
  581. }
  582. })
  583. }
  584. }
  585. // HandleWechatArticleLLmOp
  586. // @Description: 处理微信文章加入知识库
  587. func HandleWechatArticleLLmOp() {
  588. defer func() {
  589. if err := recover(); err != nil {
  590. fmt.Println("[HandleWechatArticleLLmOp]", err)
  591. }
  592. }()
  593. obj := rag.WechatArticle{}
  594. for {
  595. utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE_KNOWLEDGE, func(b []byte) {
  596. wechatArticleOp := new(cache.WechatArticleOp)
  597. if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
  598. fmt.Println("json unmarshal wrong!")
  599. return
  600. }
  601. item, tmpErr := obj.GetById(wechatArticleOp.WechatPlatformId)
  602. if tmpErr != nil {
  603. // 找不到就处理失败
  604. return
  605. }
  606. // 文章加入到知识库
  607. ArticleToKnowledge(item)
  608. // 生成摘要
  609. //GenerateArticleAbstract(item)
  610. })
  611. }
  612. }
  613. // HandleEtaReportKnowledgeLLmOp
  614. // @Description: 处理eta报告加入知识库操作
  615. func HandleEtaReportKnowledgeLLmOp() {
  616. defer func() {
  617. if err := recover(); err != nil {
  618. fmt.Println("[HandleEtaReportKnowledgeLLmOp]", err)
  619. }
  620. }()
  621. for {
  622. utils.Rc.Brpop(utils.CACHE_ETA_REPORT_KNOWLEDGE, func(b []byte) {
  623. ragEtaReportOpOp := new(cache.RagEtaReportOpOp)
  624. if err := json.Unmarshal(b, &ragEtaReportOpOp); err != nil {
  625. fmt.Println("json unmarshal wrong!")
  626. return
  627. }
  628. switch ragEtaReportOpOp.Source {
  629. case `publish`:
  630. ReportAddOrModifyKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  631. case `un_publish`:
  632. ReportUnPublishedKnowledgeByReportId(ragEtaReportOpOp.ReportId)
  633. }
  634. })
  635. }
  636. }