commodity_liangyou.go 19 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_data_analysis/models"
  6. "eta/eta_data_analysis/services/alarm_msg"
  7. "eta/eta_data_analysis/utils"
  8. "fmt"
  9. "github.com/beego/beego/v2/core/logs"
  10. "github.com/chromedp/cdproto/cdp"
  11. "log"
  12. "os"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "github.com/chromedp/chromedp"
  18. )
  19. var (
  20. lyLoginPath = "https://www.fao.com.cn/"
  21. )
  22. // func LyDataDeal(cont context.Context) (err error) {
  23. func main() {
  24. // 读取 JSON 文件
  25. configFile, err := os.ReadFile(utils.LY_JSON_PATH)
  26. if err != nil {
  27. fmt.Printf("读取配置文件错误: %v\n", err)
  28. return
  29. }
  30. // 定义通用的 map 结构体来解析 JSON
  31. var data map[string]map[string]map[string][]string
  32. // 解析 JSON 文件内容
  33. err = json.Unmarshal(configFile, &data)
  34. if err != nil {
  35. fmt.Printf("解析配置文件错误: %v\n", err)
  36. return
  37. }
  38. // 打印解析后的数据以验证
  39. fmt.Printf("%+v\n", data)
  40. // 创建 chromedp 执行上下文
  41. options := []chromedp.ExecAllocatorOption{
  42. chromedp.Flag("headless", false),
  43. chromedp.Flag("disable-blink-features", "AutomationControlled"),
  44. chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
  45. }
  46. allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...)
  47. defer cancel()
  48. ctx, cancel := chromedp.NewContext(allocCtx)
  49. defer cancel()
  50. // 登录操作
  51. err = login(ctx)
  52. if err != nil {
  53. fmt.Printf("登录错误: %v\n", err)
  54. return
  55. }
  56. // 遍历配置并爬取数据
  57. for product, productData := range data {
  58. for category, categoryData := range productData {
  59. for report, keywords := range categoryData {
  60. fmt.Printf("正在获取数据: %s -> %s -> %s\n", product, category, report)
  61. err = fetchReportData(ctx, product, category, report, keywords)
  62. if err != nil {
  63. fmt.Printf("获取数据错误: %s -> %s -> %s: %v\n", product, category, report, err)
  64. // 您看文章的速度太快了,歇一会再看吧
  65. if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") {
  66. return
  67. }
  68. }
  69. }
  70. }
  71. }
  72. //return nil
  73. }
  74. func login(ctx context.Context) error {
  75. return chromedp.Run(ctx,
  76. chromedp.Navigate(lyLoginPath),
  77. chromedp.Sleep(5*time.Second),
  78. chromedp.Click(`a[id="btnLogin"]`, chromedp.ByQuery),
  79. chromedp.Sleep(2*time.Second),
  80. chromedp.SetValue(`input[id="userName"]`, utils.LY_USERNAME, chromedp.ByQuery),
  81. chromedp.SetValue(`input[id="pwd"]`, utils.LY_PASSWORD, chromedp.ByQuery),
  82. chromedp.Sleep(2*time.Second),
  83. chromedp.Click(`input[id="btn_Login"]`, chromedp.ByQuery),
  84. chromedp.Sleep(5*time.Second),
  85. )
  86. }
  87. func fetchReportData(ctx context.Context, product, category, report string, keywords []string) error {
  88. // Navigate to the main page
  89. err := chromedp.Run(ctx,
  90. chromedp.Navigate(lyLoginPath),
  91. chromedp.Sleep(5*time.Second),
  92. )
  93. if err != nil {
  94. return err
  95. }
  96. // Navigate to the product page
  97. productPageURL, err := fillProductPageURL(ctx, product, category)
  98. if err != nil {
  99. return err
  100. }
  101. // Navigate to the category page
  102. var categoryPageURL string
  103. err = chromedp.Run(ctx,
  104. chromedp.Navigate(productPageURL),
  105. chromedp.Sleep(5*time.Second),
  106. chromedp.Click(fmt.Sprintf(`//div[contains(@class, "newBox")]//a[contains(text(), '%s')]`, category), chromedp.BySearch),
  107. chromedp.Sleep(5*time.Second),
  108. chromedp.Location(&categoryPageURL),
  109. )
  110. if err != nil {
  111. return err
  112. }
  113. logs.Info("categoryPageURL: %s: %s: %s", product, category, categoryPageURL)
  114. //var allReportURLs []string
  115. var allReportURLMap = make(map[string]string)
  116. for {
  117. var htmlContent string
  118. err = chromedp.Run(ctx,
  119. chromedp.Navigate(categoryPageURL),
  120. chromedp.Sleep(5*time.Second),
  121. chromedp.OuterHTML("html", &htmlContent),
  122. )
  123. if err != nil {
  124. return err
  125. }
  126. fmt.Printf("页面内容: %s\n", htmlContent)
  127. // Extract report URLs containing the partial keyword
  128. reportURLMap := extractReportURLs(htmlContent, report)
  129. //allReportURLs = append(allReportURLs, reportURLs...)
  130. for key, value := range reportURLMap {
  131. allReportURLMap[key] = value
  132. }
  133. // 测试环境跑部分数据,上线放开
  134. //break
  135. // Check if next page button is disabled
  136. // 测试环境跑部分数据,上线放开
  137. var nextPageDisabled bool
  138. err = chromedp.Run(ctx,
  139. chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled),
  140. )
  141. if err != nil {
  142. return err
  143. }
  144. if nextPageDisabled {
  145. break
  146. }
  147. // 校验获取到的url key在数据库是否存在
  148. if IsExistInDB(allReportURLMap) {
  149. logs.Info("改页报告已存在处理的报告,无需再翻页: %s: %s: %s", product, category, report)
  150. break
  151. }
  152. // Click the next page button
  153. err = chromedp.Run(ctx,
  154. chromedp.Click(`div.my-page-next`, chromedp.ByQuery),
  155. chromedp.Sleep(10*time.Second),
  156. chromedp.Location(&categoryPageURL),
  157. )
  158. if err != nil {
  159. return err
  160. }
  161. }
  162. logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLMap)
  163. if len(allReportURLMap) == 0 {
  164. return fmt.Errorf("未找到报告 URL")
  165. }
  166. // 处理报告数据
  167. for key, value := range allReportURLMap {
  168. // 查询报告是否已经处理 这里只对近7天的数据进行处理
  169. paramsLib := make(map[string]interface{})
  170. paramsLib["Url"] = key
  171. postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_INDEX_RECORD_BY_URL)
  172. if err != nil {
  173. // 有错误就不继续执行
  174. log.Printf("postEdbLib err: %v", err)
  175. continue
  176. }
  177. var requestResponse models.RequestResponse[models.BaseFromLyIndexRecord]
  178. err = json.Unmarshal(postEdbLib, &requestResponse)
  179. lyIndexRecord := requestResponse.Data
  180. if lyIndexRecord.DataTime != "" {
  181. toTime, err := utils.StringToTime(lyIndexRecord.DataTime + " 00:00:00")
  182. if err != nil {
  183. logs.Error("时间格式转换错误: %s: %s: %s: %s: %v", product, category, report, key, err)
  184. continue
  185. }
  186. if time.Now().Sub(toTime) > 7*24*time.Hour {
  187. logs.Info("报告已处理: %s: %s: %s: %s", product, category, report, key)
  188. continue
  189. }
  190. }
  191. // 随机睡眠
  192. rand := utils.RangeRand(20, 100)
  193. fmt.Println(report+";sleep:", strconv.Itoa(int(rand)))
  194. time.Sleep(time.Duration(rand) * time.Second)
  195. err = processReport(ctx, product, category, key, keywords)
  196. if err != nil {
  197. logs.Error("处理报告错误: %s: %s: %s: %s: %v", product, category, report, key, err)
  198. if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") {
  199. // 如果报告内容包含 “您看文章的速度太快了,歇一会再看吧” 则停止处理,发短信通知
  200. // 发送短信通知
  201. alarm_msg.SendAlarmMsg(fmt.Sprintf("粮油商务网-爬取指标数据被限制,请稍后重试, ErrMsg: %s", err.Error()), 1)
  202. return nil
  203. }
  204. continue
  205. }
  206. format, err := utils.ConvertTimeFormat(value)
  207. if err != nil {
  208. logs.Error("时间格式转换错误: %s, %s, %v: %v", product, category, value, err)
  209. continue
  210. }
  211. // 处理报告成功,将维护指标数据读取进度到数据库,避免后面重复读取
  212. record := &models.BaseFromLyIndexRecord{
  213. CreateTime: utils.GetCurrentTime(),
  214. ModifyTime: utils.GetCurrentTime(),
  215. Product: product,
  216. Category: category,
  217. Url: key,
  218. DataTime: format,
  219. }
  220. // 转换成json
  221. marshal, err := json.Marshal(record)
  222. if err != nil {
  223. logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err)
  224. continue
  225. }
  226. // json 转 interface
  227. var result map[string]interface{}
  228. err = json.Unmarshal(marshal, &result)
  229. postEdbLib, err = utils.PostEdbLibRequest(result, utils.ADD_LY_INDEX_RECORD)
  230. if err != nil {
  231. // 有错误就不继续执行
  232. logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err)
  233. continue
  234. }
  235. logs.Info("维护指标数据读取进度成功: %s, %s, %v", product, category, key)
  236. }
  237. return nil
  238. }
  239. func IsExistInDB(urlMap map[string]string) bool {
  240. var urlList []string
  241. for key, _ := range urlMap {
  242. urlList = append(urlList, key)
  243. }
  244. paramsLib := make(map[string]interface{})
  245. paramsLib["UrlList"] = urlList
  246. postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.CHECK_LY_INDEX_RECORD_IS_EXIST)
  247. if err != nil {
  248. // 有错误就不继续执行
  249. log.Printf("postEdbLib err: %v", err)
  250. return false
  251. }
  252. var requestResponse models.RequestResponse[bool]
  253. err = json.Unmarshal(postEdbLib, &requestResponse)
  254. if err != nil {
  255. return false
  256. }
  257. return requestResponse.Data
  258. }
  259. func fillProductPageURL(ctx context.Context, product string, category string) (string, error) {
  260. // 选择 dl 标签下所有 a 标签的 XPath
  261. selector := `//dl[contains(@class, 'dl_hot')]//a`
  262. logs.Info("选择器表达式: %s", selector)
  263. var nodes []*cdp.Node
  264. var productPageURL string
  265. // 获取 dl 标签下的所有 a 标签节点
  266. err := chromedp.Run(ctx,
  267. chromedp.WaitReady(selector, chromedp.BySearch),
  268. chromedp.Nodes(selector, &nodes, chromedp.BySearch),
  269. )
  270. if err != nil {
  271. return "", err
  272. }
  273. // 提取并打印所有 a 标签的 OuterHTML
  274. var targetURL string
  275. for _, node := range nodes {
  276. var outerHTML string
  277. // 获取 a 标签的 OuterHTML
  278. err = chromedp.Run(ctx,
  279. chromedp.OuterHTML(node.FullXPath(), &outerHTML, chromedp.BySearch),
  280. )
  281. if err != nil {
  282. return "", err
  283. }
  284. // 打印获取的 OuterHTML 内容
  285. logs.Info("Link OuterHTML: %s", outerHTML)
  286. // 从 OuterHTML 中提取 href 和文本内容
  287. // 使用正则或字符串处理提取 href 和文本内容
  288. href, linkText := extractHrefAndText(outerHTML)
  289. // 打印提取的 href 和文本内容
  290. logs.Info("Link Text: %s, Href: %s", linkText, href)
  291. // 如果文本内容匹配目标产品
  292. if linkText == product {
  293. // 拼接完整的 URL
  294. /*if !strings.HasPrefix(href, "http") {
  295. href = lyLoginPath + href
  296. }*/
  297. targetURL = href
  298. break
  299. }
  300. }
  301. if targetURL == "" {
  302. return "", fmt.Errorf("未找到匹配的产品链接")
  303. }
  304. // 显示更多内容
  305. err = chromedp.Run(ctx,
  306. chromedp.Evaluate(`document.getElementById("moreSpeList").style.display = "block";`, nil),
  307. )
  308. if err != nil {
  309. return "", err
  310. }
  311. // 点击目标产品的链接
  312. clickSelector := fmt.Sprintf(`//a[@href='%s']`, targetURL)
  313. err = chromedp.Run(ctx,
  314. chromedp.WaitReady(clickSelector, chromedp.BySearch),
  315. chromedp.Click(clickSelector, chromedp.BySearch),
  316. chromedp.Sleep(5*time.Second),
  317. chromedp.Location(&productPageURL),
  318. )
  319. if err != nil {
  320. return "", err
  321. }
  322. // 返回点击后的页面URL
  323. logs.Info("productPageURL: %s", productPageURL)
  324. return productPageURL, nil
  325. }
  326. // extractHrefAndText 从 OuterHTML 提取 href 和文本内容的辅助函数
  327. func extractHrefAndText(outerHTML string) (string, string) {
  328. // 使用正则表达式或其他字符串处理方法提取 href 和文本内容
  329. // 这里只是一个简单的例子,具体实现需要根据 HTML 结构来调整
  330. hrefRegex := `href="([^"]+)"`
  331. textRegex := `>([^<]+)<`
  332. hrefMatches := regexp.MustCompile(hrefRegex).FindStringSubmatch(outerHTML)
  333. textMatches := regexp.MustCompile(textRegex).FindStringSubmatch(outerHTML)
  334. href := ""
  335. linkText := ""
  336. if len(hrefMatches) > 1 {
  337. href = hrefMatches[1]
  338. }
  339. if len(textMatches) > 1 {
  340. linkText = textMatches[1]
  341. }
  342. return href, linkText
  343. }
  344. // Extract report URLs from the HTML content
  345. func extractReportURLs(htmlContent, keyword string) map[string]string {
  346. //var reportURLs []string
  347. var reportURLMap = make(map[string]string)
  348. var reportURL string
  349. // Find all occurrences of the keyword and extract report URLs
  350. content := htmlContent
  351. for {
  352. startIdx := strings.Index(content, keyword)
  353. if startIdx == -1 {
  354. break
  355. }
  356. startIdx += len(keyword)
  357. // Extract the URL from the HTML content
  358. urlStartIdx := strings.LastIndex(content[:startIdx], `href="`) + len(`href="`)
  359. urlEndIdx := strings.Index(content[urlStartIdx:], `"`) + urlStartIdx
  360. if urlStartIdx > 0 && urlEndIdx > urlStartIdx {
  361. reportURL = content[urlStartIdx:urlEndIdx]
  362. //reportURLs = append(reportURLs, content[urlStartIdx:urlEndIdx])
  363. }
  364. content = content[startIdx:]
  365. // Now extract the content inside the first <div class="short_right">
  366. divStartIdx := strings.Index(content, `<div class="short_right">`)
  367. if divStartIdx != -1 {
  368. divStartIdx += len(`<div class="short_right">`)
  369. divEndIdx := strings.Index(content[divStartIdx:], `</div>`) + divStartIdx
  370. if divEndIdx > divStartIdx {
  371. shortRightContent := content[divStartIdx:divEndIdx]
  372. // Extract the first <div> content inside <div class="short_right">
  373. innerDivStartIdx := strings.Index(shortRightContent, `<div>`)
  374. if innerDivStartIdx != -1 {
  375. innerDivStartIdx += len(`<div>`)
  376. //innerDivEndIdx := strings.Index(shortRightContent[innerDivStartIdx:], `</div>`) + innerDivStartIdx
  377. innerDivContent := shortRightContent[innerDivStartIdx:]
  378. fmt.Println("Inner Div Content:", innerDivContent)
  379. reportURLMap[reportURL] = innerDivContent
  380. }
  381. }
  382. }
  383. }
  384. return reportURLMap
  385. }
  386. // Process the report data
  387. func processReport(ctx context.Context, product string, category string, reportURL string, keywords []string) error {
  388. // Navigate to the report page
  389. var reportContent string
  390. err := chromedp.Run(ctx,
  391. chromedp.Navigate(lyLoginPath+reportURL),
  392. chromedp.WaitVisible("body", chromedp.ByQuery), // 等待 body 元素可见,确保页面已加载
  393. chromedp.Sleep(5*time.Second), // 等待额外时间,以确保动态内容加载
  394. chromedp.OuterHTML("html", &reportContent), // 获取页面 HTML 内容
  395. )
  396. if err != nil {
  397. return err
  398. }
  399. // 如果文章内容包含 “您看文章的速度太快了,歇一会再看吧” 则返回指定错误
  400. if strings.Contains(reportContent, "您看文章的速度太快了,歇一会再看吧") {
  401. return fmt.Errorf("您看文章的速度太快了,歇一会再看吧")
  402. }
  403. var lyIndexDataList []models.BaseFromLyData
  404. var edbDataLyList []models.EdbDataLy
  405. // Process the data based on keywords
  406. for _, keyword := range keywords {
  407. partialKeyword := strings.Split(keyword, ":")
  408. // Select appropriate processor based on product and category
  409. processor, err := GetProcessor(product, category)
  410. if err != nil {
  411. return err
  412. }
  413. // 查询报告所属分类
  414. paramsLib := make(map[string]interface{})
  415. paramsLib["CategoryName"] = product
  416. postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_CLASSIFY_BY_NAME)
  417. if err != nil {
  418. // 有错误就不继续执行
  419. log.Printf("postEdbLib err: %v", err)
  420. continue
  421. }
  422. var requestResponse models.RequestResponse[models.BaseFromLyClassify]
  423. err = json.Unmarshal(postEdbLib, &requestResponse)
  424. if err != nil {
  425. return err
  426. }
  427. classify := requestResponse.Data
  428. // Process the report content using the selected processor
  429. baseFromLyDataList, err := processor.Process(ctx, product, reportContent, partialKeyword, classify.BaseFromLyClassifyId)
  430. if err != nil {
  431. return err
  432. }
  433. if len(baseFromLyDataList) > 0 {
  434. for _, baseFromLyData := range baseFromLyDataList {
  435. if baseFromLyData.DataTime != "" && baseFromLyData.IndexCode != "" && baseFromLyData.IndexCode != "lysww" {
  436. baseFromLyData.CreateTime = utils.GetCurrentTime()
  437. baseFromLyData.ModifyTime = utils.GetCurrentTime()
  438. // 补充 判断是否存在于指标库
  439. paramsLib = make(map[string]interface{})
  440. paramsLib["IndexCode"] = baseFromLyData.IndexCode
  441. paramsLib["Source"] = utils.DATA_SOURCE_LY
  442. postEdbLib, err := httpRequestFill(paramsLib, utils.GET_EDB_INFO_BY_INDEX_CODE)
  443. if err != nil {
  444. // 有错误就不继续执行
  445. log.Printf("postEdbLib err: %v", err)
  446. continue
  447. }
  448. var requestResponse models.RequestResponse[models.EdbInfo]
  449. err = json.Unmarshal(postEdbLib, &requestResponse)
  450. if err != nil {
  451. log.Printf("postEdbLib err: %v", err)
  452. continue
  453. }
  454. lyIndexDataList = append(lyIndexDataList, baseFromLyData)
  455. edbIndexData := requestResponse.Data
  456. if edbIndexData.EdbCode == "" {
  457. // 不存在 不用维护指标库数据
  458. continue
  459. }
  460. // 存在 则同步新增指标库中的指标数据
  461. edbDataLy := models.EdbDataLy{
  462. CreateTime: utils.GetCurrentTime(),
  463. ModifyTime: utils.GetCurrentTime(),
  464. EdbInfoId: edbIndexData.EdbInfoId,
  465. EdbCode: edbIndexData.EdbCode,
  466. DataTime: baseFromLyData.DataTime,
  467. Value: baseFromLyData.Value,
  468. DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
  469. }
  470. edbDataLyList = append(edbDataLyList, edbDataLy)
  471. }
  472. }
  473. }
  474. }
  475. // 新增指标数据
  476. if len(lyIndexDataList) > 0 {
  477. // 转换成json
  478. marshal, err := json.Marshal(lyIndexDataList)
  479. if err != nil {
  480. log.Printf("postEdbLib err: %v", err)
  481. return err
  482. }
  483. _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_LY_DATA_LIST, string(marshal), "application/json")
  484. if err != nil {
  485. // 有错误就不继续执行
  486. log.Printf("postEdbLib err: %v", err)
  487. return err
  488. }
  489. }
  490. // 新增指标库数据
  491. if len(edbDataLyList) > 0 {
  492. // 转换成json
  493. marshal, err := json.Marshal(edbDataLyList)
  494. if err != nil {
  495. log.Printf("postEdbLib err: %v", err)
  496. return err
  497. }
  498. _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_LY_EDB_DATA, string(marshal), "application/json")
  499. //_, err := httpRequestFill(edbDataLyList, utils.ADD_BATCH_LY_EDB_DATA)
  500. if err != nil {
  501. // 有错误就不继续执行
  502. log.Printf("postEdbLib err: %v", err)
  503. return err
  504. }
  505. }
  506. return nil
  507. }
  508. func addLyIndex(classifyId int, indexCode string, indexName string, unit string, frequency string) (int, error) {
  509. // 添加指标
  510. index := &models.BaseFromLyIndex{
  511. CreateTime: utils.GetCurrentTime(),
  512. ModifyTime: utils.GetCurrentTime(),
  513. BaseFromLyClassifyId: classifyId,
  514. IndexCode: indexCode,
  515. IndexName: indexName,
  516. Frequency: frequency,
  517. Unit: unit,
  518. EdbExist: 0,
  519. }
  520. postEdbLib, err := httpRequestFill(index, utils.ADD_LY_INDEX)
  521. if err != nil {
  522. // 有错误就不继续执行
  523. log.Printf("postEdbLib err: %v", err)
  524. return 0, err
  525. }
  526. var requestResponse models.RequestResponse[int64]
  527. err = json.Unmarshal(postEdbLib, &requestResponse)
  528. indexId := requestResponse.Data
  529. return int(indexId), nil
  530. }
  531. func httpRequestFill(data interface{}, urlMethod string) (postEdbLib []byte, err error) {
  532. // 转换成json
  533. marshal, err := json.Marshal(data)
  534. if err != nil {
  535. return nil, err
  536. }
  537. // json 转 interface
  538. var result map[string]interface{}
  539. err = json.Unmarshal(marshal, &result)
  540. if err != nil {
  541. return nil, err
  542. }
  543. postEdbLib, err = utils.PostEdbLibRequest(result, urlMethod)
  544. if err != nil {
  545. // 有错误就不继续执行
  546. log.Printf("postEdbLib err: %v", err)
  547. return nil, err
  548. }
  549. return postEdbLib, nil
  550. }