package main import ( "context" "encoding/json" "eta/eta_data_analysis/models" "eta/eta_data_analysis/services/alarm_msg" "eta/eta_data_analysis/utils" "fmt" "github.com/beego/beego/v2/core/logs" "github.com/chromedp/cdproto/cdp" "log" "os" "regexp" "strconv" "strings" "time" "github.com/chromedp/chromedp" ) var ( lyLoginPath = "https://www.fao.com.cn/" ) // func LyDataDeal(cont context.Context) (err error) { func main() { // 读取 JSON 文件 configFile, err := os.ReadFile(utils.LY_JSON_PATH) if err != nil { fmt.Printf("读取配置文件错误: %v\n", err) return } // 定义通用的 map 结构体来解析 JSON var data map[string]map[string]map[string][]string // 解析 JSON 文件内容 err = json.Unmarshal(configFile, &data) if err != nil { fmt.Printf("解析配置文件错误: %v\n", err) return } // 打印解析后的数据以验证 fmt.Printf("%+v\n", data) // 创建 chromedp 执行上下文 options := []chromedp.ExecAllocatorOption{ chromedp.Flag("headless", false), chromedp.Flag("disable-blink-features", "AutomationControlled"), chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`), } allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...) defer cancel() ctx, cancel := chromedp.NewContext(allocCtx) defer cancel() // 登录操作 err = login(ctx) if err != nil { fmt.Printf("登录错误: %v\n", err) return } // 遍历配置并爬取数据 for product, productData := range data { for category, categoryData := range productData { for report, keywords := range categoryData { fmt.Printf("正在获取数据: %s -> %s -> %s\n", product, category, report) err = fetchReportData(ctx, product, category, report, keywords) if err != nil { fmt.Printf("获取数据错误: %s -> %s -> %s: %v\n", product, category, report, err) // 您看文章的速度太快了,歇一会再看吧 if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") { return } } } } } //return nil } func login(ctx context.Context) error { return chromedp.Run(ctx, chromedp.Navigate(lyLoginPath), chromedp.Sleep(5*time.Second), chromedp.Click(`a[id="btnLogin"]`, chromedp.ByQuery), chromedp.Sleep(2*time.Second), chromedp.SetValue(`input[id="userName"]`, utils.LY_USERNAME, chromedp.ByQuery), chromedp.SetValue(`input[id="pwd"]`, utils.LY_PASSWORD, chromedp.ByQuery), chromedp.Sleep(2*time.Second), chromedp.Click(`input[id="btn_Login"]`, chromedp.ByQuery), chromedp.Sleep(5*time.Second), ) } func fetchReportData(ctx context.Context, product, category, report string, keywords []string) error { // Navigate to the main page err := chromedp.Run(ctx, chromedp.Navigate(lyLoginPath), chromedp.Sleep(5*time.Second), ) if err != nil { return err } // Navigate to the product page productPageURL, err := fillProductPageURL(ctx, product, category) if err != nil { return err } // Navigate to the category page var categoryPageURL string err = chromedp.Run(ctx, chromedp.Navigate(productPageURL), chromedp.Sleep(5*time.Second), chromedp.Click(fmt.Sprintf(`//div[contains(@class, "newBox")]//a[contains(text(), '%s')]`, category), chromedp.BySearch), chromedp.Sleep(5*time.Second), chromedp.Location(&categoryPageURL), ) if err != nil { return err } logs.Info("categoryPageURL: %s: %s: %s", product, category, categoryPageURL) //var allReportURLs []string var allReportURLMap = make(map[string]string) for { var htmlContent string err = chromedp.Run(ctx, chromedp.Navigate(categoryPageURL), chromedp.Sleep(5*time.Second), chromedp.OuterHTML("html", &htmlContent), ) if err != nil { return err } fmt.Printf("页面内容: %s\n", htmlContent) // Extract report URLs containing the partial keyword reportURLMap := extractReportURLs(htmlContent, report) //allReportURLs = append(allReportURLs, reportURLs...) for key, value := range reportURLMap { allReportURLMap[key] = value } // 测试环境跑部分数据,上线放开 //break // Check if next page button is disabled // 测试环境跑部分数据,上线放开 var nextPageDisabled bool err = chromedp.Run(ctx, chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled), ) if err != nil { return err } if nextPageDisabled { break } // 校验获取到的url key在数据库是否存在 if IsExistInDB(allReportURLMap) { logs.Info("改页报告已存在处理的报告,无需再翻页: %s: %s: %s", product, category, report) break } // Click the next page button err = chromedp.Run(ctx, chromedp.Click(`div.my-page-next`, chromedp.ByQuery), chromedp.Sleep(10*time.Second), chromedp.Location(&categoryPageURL), ) if err != nil { return err } } logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLMap) if len(allReportURLMap) == 0 { return fmt.Errorf("未找到报告 URL") } // 处理报告数据 for key, value := range allReportURLMap { // 查询报告是否已经处理 这里只对近7天的数据进行处理 paramsLib := make(map[string]interface{}) paramsLib["Url"] = key postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_INDEX_RECORD_BY_URL) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) continue } var requestResponse models.RequestResponse[models.BaseFromLyIndexRecord] err = json.Unmarshal(postEdbLib, &requestResponse) lyIndexRecord := requestResponse.Data if lyIndexRecord.DataTime != "" { toTime, err := utils.StringToTime(lyIndexRecord.DataTime + " 00:00:00") if err != nil { logs.Error("时间格式转换错误: %s: %s: %s: %s: %v", product, category, report, key, err) continue } if time.Now().Sub(toTime) > 7*24*time.Hour { logs.Info("报告已处理: %s: %s: %s: %s", product, category, report, key) continue } } // 随机睡眠 rand := utils.RangeRand(20, 100) fmt.Println(report+";sleep:", strconv.Itoa(int(rand))) time.Sleep(time.Duration(rand) * time.Second) err = processReport(ctx, product, category, key, keywords) if err != nil { logs.Error("处理报告错误: %s: %s: %s: %s: %v", product, category, report, key, err) if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") { // 如果报告内容包含 “您看文章的速度太快了,歇一会再看吧” 则停止处理,发短信通知 // 发送短信通知 alarm_msg.SendAlarmMsg(fmt.Sprintf("粮油商务网-爬取指标数据被限制,请稍后重试, ErrMsg: %s", err.Error()), 1) return nil } continue } format, err := utils.ConvertTimeFormat(value) if err != nil { logs.Error("时间格式转换错误: %s, %s, %v: %v", product, category, value, err) continue } // 处理报告成功,将维护指标数据读取进度到数据库,避免后面重复读取 record := &models.BaseFromLyIndexRecord{ CreateTime: utils.GetCurrentTime(), ModifyTime: utils.GetCurrentTime(), Product: product, Category: category, Url: key, DataTime: format, } // 转换成json marshal, err := json.Marshal(record) if err != nil { logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err) continue } // json 转 interface var result map[string]interface{} err = json.Unmarshal(marshal, &result) postEdbLib, err = utils.PostEdbLibRequest(result, utils.ADD_LY_INDEX_RECORD) if err != nil { // 有错误就不继续执行 logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err) continue } logs.Info("维护指标数据读取进度成功: %s, %s, %v", product, category, key) } return nil } func IsExistInDB(urlMap map[string]string) bool { var urlList []string for key, _ := range urlMap { urlList = append(urlList, key) } paramsLib := make(map[string]interface{}) paramsLib["UrlList"] = urlList postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.CHECK_LY_INDEX_RECORD_IS_EXIST) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return false } var requestResponse models.RequestResponse[bool] err = json.Unmarshal(postEdbLib, &requestResponse) if err != nil { return false } return requestResponse.Data } func fillProductPageURL(ctx context.Context, product string, category string) (string, error) { // 选择 dl 标签下所有 a 标签的 XPath selector := `//dl[contains(@class, 'dl_hot')]//a` logs.Info("选择器表达式: %s", selector) var nodes []*cdp.Node var productPageURL string // 获取 dl 标签下的所有 a 标签节点 err := chromedp.Run(ctx, chromedp.WaitReady(selector, chromedp.BySearch), chromedp.Nodes(selector, &nodes, chromedp.BySearch), ) if err != nil { return "", err } // 提取并打印所有 a 标签的 OuterHTML var targetURL string for _, node := range nodes { var outerHTML string // 获取 a 标签的 OuterHTML err = chromedp.Run(ctx, chromedp.OuterHTML(node.FullXPath(), &outerHTML, chromedp.BySearch), ) if err != nil { return "", err } // 打印获取的 OuterHTML 内容 logs.Info("Link OuterHTML: %s", outerHTML) // 从 OuterHTML 中提取 href 和文本内容 // 使用正则或字符串处理提取 href 和文本内容 href, linkText := extractHrefAndText(outerHTML) // 打印提取的 href 和文本内容 logs.Info("Link Text: %s, Href: %s", linkText, href) // 如果文本内容匹配目标产品 if linkText == product { // 拼接完整的 URL /*if !strings.HasPrefix(href, "http") { href = lyLoginPath + href }*/ targetURL = href break } } if targetURL == "" { return "", fmt.Errorf("未找到匹配的产品链接") } // 显示更多内容 err = chromedp.Run(ctx, chromedp.Evaluate(`document.getElementById("moreSpeList").style.display = "block";`, nil), ) if err != nil { return "", err } // 点击目标产品的链接 clickSelector := fmt.Sprintf(`//a[@href='%s']`, targetURL) err = chromedp.Run(ctx, chromedp.WaitReady(clickSelector, chromedp.BySearch), chromedp.Click(clickSelector, chromedp.BySearch), chromedp.Sleep(5*time.Second), chromedp.Location(&productPageURL), ) if err != nil { return "", err } // 返回点击后的页面URL logs.Info("productPageURL: %s", productPageURL) return productPageURL, nil } // extractHrefAndText 从 OuterHTML 提取 href 和文本内容的辅助函数 func extractHrefAndText(outerHTML string) (string, string) { // 使用正则表达式或其他字符串处理方法提取 href 和文本内容 // 这里只是一个简单的例子,具体实现需要根据 HTML 结构来调整 hrefRegex := `href="([^"]+)"` textRegex := `>([^<]+)<` hrefMatches := regexp.MustCompile(hrefRegex).FindStringSubmatch(outerHTML) textMatches := regexp.MustCompile(textRegex).FindStringSubmatch(outerHTML) href := "" linkText := "" if len(hrefMatches) > 1 { href = hrefMatches[1] } if len(textMatches) > 1 { linkText = textMatches[1] } return href, linkText } // Extract report URLs from the HTML content func extractReportURLs(htmlContent, keyword string) map[string]string { //var reportURLs []string var reportURLMap = make(map[string]string) var reportURL string // Find all occurrences of the keyword and extract report URLs content := htmlContent for { startIdx := strings.Index(content, keyword) if startIdx == -1 { break } startIdx += len(keyword) // Extract the URL from the HTML content urlStartIdx := strings.LastIndex(content[:startIdx], `href="`) + len(`href="`) urlEndIdx := strings.Index(content[urlStartIdx:], `"`) + urlStartIdx if urlStartIdx > 0 && urlEndIdx > urlStartIdx { reportURL = content[urlStartIdx:urlEndIdx] //reportURLs = append(reportURLs, content[urlStartIdx:urlEndIdx]) } content = content[startIdx:] // Now extract the content inside the first