commodity_liangyou.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. package liangyou
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_data_analysis/models"
  6. "eta/eta_data_analysis/services/alarm_msg"
  7. "eta/eta_data_analysis/utils"
  8. "fmt"
  9. "github.com/beego/beego/v2/core/logs"
  10. "github.com/chromedp/cdproto/cdp"
  11. "log"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "github.com/chromedp/chromedp"
  18. )
  19. var (
  20. lyLoginPath = "https://www.fao.com.cn/"
  21. )
  22. func LyDataDeal(cont context.Context) (err error) {
  23. // 读取 JSON 文件
  24. configFile, err := os.ReadFile(utils.LY_JSON_PATH)
  25. if err != nil {
  26. fmt.Printf("读取配置文件错误: %v\n", err)
  27. return
  28. }
  29. // 定义通用的 map 结构体来解析 JSON
  30. var data map[string]map[string]map[string][]string
  31. // 解析 JSON 文件内容
  32. err = json.Unmarshal(configFile, &data)
  33. if err != nil {
  34. fmt.Printf("解析配置文件错误: %v\n", err)
  35. return
  36. }
  37. // 打印解析后的数据以验证
  38. fmt.Printf("%+v\n", data)
  39. // 创建 chromedp 执行上下文
  40. options := []chromedp.ExecAllocatorOption{
  41. chromedp.Flag("headless", false),
  42. chromedp.Flag("disable-blink-features", "AutomationControlled"),
  43. chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
  44. }
  45. allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...)
  46. defer cancel()
  47. ctx, cancel := chromedp.NewContext(allocCtx)
  48. defer cancel()
  49. // 登录操作
  50. err = login(ctx)
  51. if err != nil {
  52. fmt.Printf("登录错误: %v\n", err)
  53. return
  54. }
  55. // 遍历配置并爬取数据
  56. for product, productData := range data {
  57. for category, categoryData := range productData {
  58. for report, keywords := range categoryData {
  59. fmt.Printf("正在获取数据: %s -> %s -> %s\n", product, category, report)
  60. err = fetchReportData(ctx, product, category, report, keywords)
  61. if err != nil {
  62. fmt.Printf("获取数据错误: %s -> %s -> %s: %v\n", product, category, report, err)
  63. // 您看文章的速度太快了,歇一会再看吧
  64. if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") {
  65. return
  66. }
  67. }
  68. }
  69. }
  70. }
  71. return nil
  72. }
  73. func login(ctx context.Context) error {
  74. return chromedp.Run(ctx,
  75. chromedp.Navigate(lyLoginPath),
  76. chromedp.Sleep(5*time.Second),
  77. chromedp.Click(`a[id="btnLogin"]`, chromedp.ByQuery),
  78. chromedp.Sleep(2*time.Second),
  79. chromedp.SetValue(`input[id="userName"]`, utils.LY_USERNAME, chromedp.ByQuery),
  80. chromedp.SetValue(`input[id="pwd"]`, utils.LY_PASSWORD, chromedp.ByQuery),
  81. chromedp.Sleep(2*time.Second),
  82. chromedp.Click(`input[id="btn_Login"]`, chromedp.ByQuery),
  83. chromedp.Sleep(5*time.Second),
  84. )
  85. }
  86. func fetchReportData(ctx context.Context, product, category, report string, keywords []string) error {
  87. // Navigate to the main page
  88. err := chromedp.Run(ctx,
  89. chromedp.Navigate(lyLoginPath),
  90. chromedp.Sleep(5*time.Second),
  91. )
  92. if err != nil {
  93. return err
  94. }
  95. // Navigate to the product page
  96. productPageURL, err := fillProductPageURL(ctx, product, category)
  97. if err != nil {
  98. return err
  99. }
  100. // Navigate to the category page
  101. var categoryPageURL string
  102. err = chromedp.Run(ctx,
  103. chromedp.Navigate(productPageURL),
  104. chromedp.Sleep(5*time.Second),
  105. chromedp.Click(fmt.Sprintf(`//div[contains(@class, "newBox")]//a[contains(text(), '%s')]`, category), chromedp.BySearch),
  106. chromedp.Sleep(5*time.Second),
  107. chromedp.Location(&categoryPageURL),
  108. )
  109. if err != nil {
  110. return err
  111. }
  112. logs.Info("categoryPageURL: %s: %s: %s", product, category, categoryPageURL)
  113. //var allReportURLs []string
  114. var allReportURLMap = make(map[string]string)
  115. for {
  116. var htmlContent string
  117. err = chromedp.Run(ctx,
  118. chromedp.Navigate(categoryPageURL),
  119. chromedp.Sleep(5*time.Second),
  120. chromedp.OuterHTML("html", &htmlContent),
  121. )
  122. if err != nil {
  123. return err
  124. }
  125. fmt.Printf("页面内容: %s\n", htmlContent)
  126. // Extract report URLs containing the partial keyword
  127. reportURLMap := extractReportURLs(htmlContent, report)
  128. //allReportURLs = append(allReportURLs, reportURLs...)
  129. for key, value := range reportURLMap {
  130. allReportURLMap[key] = value
  131. }
  132. // 测试环境跑部分数据,上线放开
  133. //break
  134. // Check if next page button is disabled
  135. // 测试环境跑部分数据,上线放开
  136. var nextPageDisabled bool
  137. err = chromedp.Run(ctx,
  138. chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled),
  139. )
  140. if err != nil {
  141. return err
  142. }
  143. if nextPageDisabled {
  144. break
  145. }
  146. // Click the next page button
  147. err = chromedp.Run(ctx,
  148. chromedp.Click(`div.my-page-next`, chromedp.ByQuery),
  149. chromedp.Sleep(5*time.Second),
  150. chromedp.Location(&categoryPageURL),
  151. )
  152. if err != nil {
  153. return err
  154. }
  155. }
  156. logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLMap)
  157. if len(allReportURLMap) == 0 {
  158. return fmt.Errorf("未找到报告 URL")
  159. }
  160. // 处理报告数据
  161. for key, value := range allReportURLMap {
  162. // 查询报告是否已经处理 这里只对近7天的数据进行处理
  163. paramsLib := make(map[string]interface{})
  164. paramsLib["Url"] = key
  165. postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_INDEX_RECORD_BY_URL)
  166. if err != nil {
  167. // 有错误就不继续执行
  168. log.Printf("postEdbLib err: %v", err)
  169. continue
  170. }
  171. var requestResponse models.RequestResponse[models.BaseFromLyIndexRecord]
  172. err = json.Unmarshal(postEdbLib, &requestResponse)
  173. lyIndexRecord := requestResponse.Data
  174. if lyIndexRecord.DataTime != "" {
  175. toTime, err := utils.StringToTime(lyIndexRecord.DataTime + " 00:00:00")
  176. if err != nil {
  177. logs.Error("时间格式转换错误: %s: %s: %s: %s: %v", product, category, report, key, err)
  178. continue
  179. }
  180. if time.Now().Sub(toTime) > 7*24*time.Hour {
  181. logs.Info("报告已处理: %s: %s: %s: %s", product, category, report, key)
  182. continue
  183. }
  184. }
  185. // 随机睡眠
  186. rand := utils.RangeRand(20, 100)
  187. fmt.Println(report+";sleep:", strconv.Itoa(int(rand)))
  188. time.Sleep(time.Duration(rand) * time.Second)
  189. err = processReport(ctx, product, category, key, keywords)
  190. if err != nil {
  191. logs.Error("处理报告错误: %s: %s: %s: %s: %v", product, category, report, key, err)
  192. if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") {
  193. // 如果报告内容包含 “您看文章的速度太快了,歇一会再看吧” 则停止处理,发短信通知
  194. // 发送短信通知
  195. alarm_msg.SendAlarmMsg(fmt.Sprintf("粮油商务网-爬取指标数据被限制,请稍后重试, ErrMsg: %s", err.Error()), 1)
  196. return nil
  197. }
  198. continue
  199. }
  200. format, err := utils.ConvertTimeFormat(value)
  201. if err != nil {
  202. logs.Error("时间格式转换错误: %s, %s, %v: %v", product, category, value, err)
  203. continue
  204. }
  205. // 处理报告成功,将维护指标数据读取进度到数据库,避免后面重复读取
  206. record := &models.BaseFromLyIndexRecord{
  207. CreateTime: utils.GetCurrentTime(),
  208. ModifyTime: utils.GetCurrentTime(),
  209. Product: product,
  210. Category: category,
  211. Url: key,
  212. DataTime: format,
  213. }
  214. // 转换成json
  215. marshal, err := json.Marshal(record)
  216. if err != nil {
  217. logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err)
  218. continue
  219. }
  220. // json 转 interface
  221. var result map[string]interface{}
  222. err = json.Unmarshal(marshal, &result)
  223. postEdbLib, err = utils.PostEdbLibRequest(result, utils.ADD_LY_INDEX_RECORD)
  224. if err != nil {
  225. // 有错误就不继续执行
  226. logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err)
  227. continue
  228. }
  229. logs.Info("维护指标数据读取进度成功: %s, %s, %v", product, category, key)
  230. }
  231. return nil
  232. }
  233. func fillProductPageURL(ctx context.Context, product string, category string) (string, error) {
  234. // 选择 dl 标签下所有 a 标签的 XPath
  235. selector := `//dl[contains(@class, 'dl_hot')]//a`
  236. logs.Info("选择器表达式: %s", selector)
  237. var nodes []*cdp.Node
  238. var productPageURL string
  239. // 获取 dl 标签下的所有 a 标签节点
  240. err := chromedp.Run(ctx,
  241. chromedp.WaitReady(selector, chromedp.BySearch),
  242. chromedp.Nodes(selector, &nodes, chromedp.BySearch),
  243. )
  244. if err != nil {
  245. return "", err
  246. }
  247. // 提取并打印所有 a 标签的 OuterHTML
  248. var targetURL string
  249. for _, node := range nodes {
  250. var outerHTML string
  251. // 获取 a 标签的 OuterHTML
  252. err = chromedp.Run(ctx,
  253. chromedp.OuterHTML(node.FullXPath(), &outerHTML, chromedp.BySearch),
  254. )
  255. if err != nil {
  256. return "", err
  257. }
  258. // 打印获取的 OuterHTML 内容
  259. logs.Info("Link OuterHTML: %s", outerHTML)
  260. // 从 OuterHTML 中提取 href 和文本内容
  261. // 使用正则或字符串处理提取 href 和文本内容
  262. href, linkText := extractHrefAndText(outerHTML)
  263. // 打印提取的 href 和文本内容
  264. logs.Info("Link Text: %s, Href: %s", linkText, href)
  265. // 如果文本内容匹配目标产品
  266. if linkText == product {
  267. // 拼接完整的 URL
  268. /*if !strings.HasPrefix(href, "http") {
  269. href = lyLoginPath + href
  270. }*/
  271. targetURL = href
  272. break
  273. }
  274. }
  275. if targetURL == "" {
  276. return "", fmt.Errorf("未找到匹配的产品链接")
  277. }
  278. // 显示更多内容
  279. err = chromedp.Run(ctx,
  280. chromedp.Evaluate(`document.getElementById("moreSpeList").style.display = "block";`, nil),
  281. )
  282. if err != nil {
  283. return "", err
  284. }
  285. // 点击目标产品的链接
  286. clickSelector := fmt.Sprintf(`//a[@href='%s']`, targetURL)
  287. err = chromedp.Run(ctx,
  288. chromedp.WaitReady(clickSelector, chromedp.BySearch),
  289. chromedp.Click(clickSelector, chromedp.BySearch),
  290. chromedp.Sleep(5*time.Second),
  291. chromedp.Location(&productPageURL),
  292. )
  293. if err != nil {
  294. return "", err
  295. }
  296. // 返回点击后的页面URL
  297. logs.Info("productPageURL: %s", productPageURL)
  298. return productPageURL, nil
  299. }
  300. // extractHrefAndText 从 OuterHTML 提取 href 和文本内容的辅助函数
  301. func extractHrefAndText(outerHTML string) (string, string) {
  302. // 使用正则表达式或其他字符串处理方法提取 href 和文本内容
  303. // 这里只是一个简单的例子,具体实现需要根据 HTML 结构来调整
  304. hrefRegex := `href="([^"]+)"`
  305. textRegex := `>([^<]+)<`
  306. hrefMatches := regexp.MustCompile(hrefRegex).FindStringSubmatch(outerHTML)
  307. textMatches := regexp.MustCompile(textRegex).FindStringSubmatch(outerHTML)
  308. href := ""
  309. linkText := ""
  310. if len(hrefMatches) > 1 {
  311. href = hrefMatches[1]
  312. }
  313. if len(textMatches) > 1 {
  314. linkText = textMatches[1]
  315. }
  316. return href, linkText
  317. }
  318. // Extract report URLs from the HTML content
  319. func extractReportURLs(htmlContent, keyword string) map[string]string {
  320. //var reportURLs []string
  321. var reportURLMap = make(map[string]string)
  322. var reportURL string
  323. // Find all occurrences of the keyword and extract report URLs
  324. content := htmlContent
  325. for {
  326. startIdx := strings.Index(content, keyword)
  327. if startIdx == -1 {
  328. break
  329. }
  330. startIdx += len(keyword)
  331. // Extract the URL from the HTML content
  332. urlStartIdx := strings.LastIndex(content[:startIdx], `href="`) + len(`href="`)
  333. urlEndIdx := strings.Index(content[urlStartIdx:], `"`) + urlStartIdx
  334. if urlStartIdx > 0 && urlEndIdx > urlStartIdx {
  335. reportURL = content[urlStartIdx:urlEndIdx]
  336. //reportURLs = append(reportURLs, content[urlStartIdx:urlEndIdx])
  337. }
  338. content = content[startIdx:]
  339. // Now extract the content inside the first <div class="short_right">
  340. divStartIdx := strings.Index(content, `<div class="short_right">`)
  341. if divStartIdx != -1 {
  342. divStartIdx += len(`<div class="short_right">`)
  343. divEndIdx := strings.Index(content[divStartIdx:], `</div>`) + divStartIdx
  344. if divEndIdx > divStartIdx {
  345. shortRightContent := content[divStartIdx:divEndIdx]
  346. // Extract the first <div> content inside <div class="short_right">
  347. innerDivStartIdx := strings.Index(shortRightContent, `<div>`)
  348. if innerDivStartIdx != -1 {
  349. innerDivStartIdx += len(`<div>`)
  350. //innerDivEndIdx := strings.Index(shortRightContent[innerDivStartIdx:], `</div>`) + innerDivStartIdx
  351. innerDivContent := shortRightContent[innerDivStartIdx:]
  352. fmt.Println("Inner Div Content:", innerDivContent)
  353. reportURLMap[reportURL] = innerDivContent
  354. }
  355. }
  356. }
  357. }
  358. return reportURLMap
  359. }
  360. // Process the report data
  361. func processReport(ctx context.Context, product string, category string, reportURL string, keywords []string) error {
  362. // Navigate to the report page
  363. var reportContent string
  364. err := chromedp.Run(ctx,
  365. chromedp.Navigate(lyLoginPath+reportURL),
  366. chromedp.WaitVisible("body", chromedp.ByQuery), // 等待 body 元素可见,确保页面已加载
  367. chromedp.Sleep(5*time.Second), // 等待额外时间,以确保动态内容加载
  368. chromedp.OuterHTML("html", &reportContent), // 获取页面 HTML 内容
  369. )
  370. if err != nil {
  371. return err
  372. }
  373. // 如果文章内容包含 “您看文章的速度太快了,歇一会再看吧” 则返回指定错误
  374. if strings.Contains(reportContent, "您看文章的速度太快了,歇一会再看吧") {
  375. return fmt.Errorf("您看文章的速度太快了,歇一会再看吧")
  376. }
  377. var lyIndexDataList []models.BaseFromLyData
  378. var edbDataLyList []models.EdbDataLy
  379. // Process the data based on keywords
  380. for _, keyword := range keywords {
  381. partialKeyword := strings.Split(keyword, ":")
  382. // Select appropriate processor based on product and category
  383. processor, err := GetProcessor(product, category)
  384. if err != nil {
  385. return err
  386. }
  387. // 查询报告所属分类
  388. paramsLib := make(map[string]interface{})
  389. paramsLib["CategoryName"] = product
  390. postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_CLASSIFY_BY_NAME)
  391. if err != nil {
  392. // 有错误就不继续执行
  393. log.Printf("postEdbLib err: %v", err)
  394. continue
  395. }
  396. var requestResponse models.RequestResponse[models.BaseFromLyClassify]
  397. err = json.Unmarshal(postEdbLib, &requestResponse)
  398. if err != nil {
  399. return err
  400. }
  401. classify := requestResponse.Data
  402. // Process the report content using the selected processor
  403. baseFromLyDataList, err := processor.Process(ctx, product, reportContent, partialKeyword, classify.BaseFromLyClassifyId)
  404. if err != nil {
  405. return err
  406. }
  407. if len(baseFromLyDataList) > 0 {
  408. for _, baseFromLyData := range baseFromLyDataList {
  409. if baseFromLyData.DataTime != "" && baseFromLyData.IndexCode != "" && baseFromLyData.IndexCode != "lysww" {
  410. baseFromLyData.CreateTime = utils.GetCurrentTime()
  411. baseFromLyData.ModifyTime = utils.GetCurrentTime()
  412. // 补充 判断是否存在于指标库
  413. paramsLib = make(map[string]interface{})
  414. paramsLib["IndexCode"] = baseFromLyData.IndexCode
  415. paramsLib["Source"] = utils.DATA_SOURCE_LY
  416. postEdbLib, err := httpRequestFill(paramsLib, utils.GET_EDB_INFO_BY_INDEX_CODE)
  417. if err != nil {
  418. // 有错误就不继续执行
  419. log.Printf("postEdbLib err: %v", err)
  420. continue
  421. }
  422. var requestResponse models.RequestResponse[models.EdbInfo]
  423. err = json.Unmarshal(postEdbLib, &requestResponse)
  424. if err != nil {
  425. log.Printf("postEdbLib err: %v", err)
  426. continue
  427. }
  428. lyIndexDataList = append(lyIndexDataList, baseFromLyData)
  429. edbIndexData := requestResponse.Data
  430. if edbIndexData.EdbCode == "" {
  431. // 不存在 不用维护指标库数据
  432. continue
  433. }
  434. // 存在 则同步新增指标库中的指标数据
  435. edbDataLy := models.EdbDataLy{
  436. CreateTime: utils.GetCurrentTime(),
  437. ModifyTime: utils.GetCurrentTime(),
  438. EdbInfoId: edbIndexData.EdbInfoId,
  439. EdbCode: edbIndexData.EdbCode,
  440. DataTime: baseFromLyData.DataTime,
  441. Value: baseFromLyData.Value,
  442. DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
  443. }
  444. edbDataLyList = append(edbDataLyList, edbDataLy)
  445. }
  446. }
  447. }
  448. }
  449. // 新增指标数据
  450. if len(lyIndexDataList) > 0 {
  451. // 转换成json
  452. marshal, err := json.Marshal(lyIndexDataList)
  453. if err != nil {
  454. log.Printf("postEdbLib err: %v", err)
  455. return err
  456. }
  457. _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_LY_DATA_LIST, string(marshal), "application/json")
  458. if err != nil {
  459. // 有错误就不继续执行
  460. log.Printf("postEdbLib err: %v", err)
  461. return err
  462. }
  463. }
  464. // 新增指标库数据
  465. if len(edbDataLyList) > 0 {
  466. // 转换成json
  467. marshal, err := json.Marshal(edbDataLyList)
  468. if err != nil {
  469. log.Printf("postEdbLib err: %v", err)
  470. return err
  471. }
  472. _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_LY_EDB_DATA, string(marshal), "application/json")
  473. //_, err := httpRequestFill(edbDataLyList, utils.ADD_BATCH_LY_EDB_DATA)
  474. if err != nil {
  475. // 有错误就不继续执行
  476. log.Printf("postEdbLib err: %v", err)
  477. return err
  478. }
  479. }
  480. return nil
  481. }
  482. func addLyIndex(classifyId int, indexCode string, indexName string, unit string, frequency string) (int, error) {
  483. // 添加指标
  484. index := &models.BaseFromLyIndex{
  485. CreateTime: utils.GetCurrentTime(),
  486. ModifyTime: utils.GetCurrentTime(),
  487. BaseFromLyClassifyId: classifyId,
  488. IndexCode: indexCode,
  489. IndexName: indexName,
  490. Frequency: frequency,
  491. Unit: unit,
  492. EdbExist: 0,
  493. }
  494. postEdbLib, err := httpRequestFill(index, utils.ADD_LY_INDEX)
  495. if err != nil {
  496. // 有错误就不继续执行
  497. log.Printf("postEdbLib err: %v", err)
  498. return 0, err
  499. }
  500. var requestResponse models.RequestResponse[int64]
  501. err = json.Unmarshal(postEdbLib, &requestResponse)
  502. indexId := requestResponse.Data
  503. return int(indexId), nil
  504. }
  505. func httpRequestFill(data interface{}, urlMethod string) (postEdbLib []byte, err error) {
  506. // 转换成json
  507. marshal, err := json.Marshal(data)
  508. if err != nil {
  509. return nil, err
  510. }
  511. // json 转 interface
  512. var result map[string]interface{}
  513. err = json.Unmarshal(marshal, &result)
  514. if err != nil {
  515. return nil, err
  516. }
  517. postEdbLib, err = utils.PostEdbLibRequest(result, urlMethod)
  518. if err != nil {
  519. // 有错误就不继续执行
  520. log.Printf("postEdbLib err: %v", err)
  521. return nil, err
  522. }
  523. return postEdbLib, nil
  524. }