package main import ( "context" "encoding/json" "eta/eta_data_analysis/models" "eta/eta_data_analysis/services/alarm_msg" "eta/eta_data_analysis/utils" "fmt" "github.com/beego/beego/v2/core/logs" "github.com/chromedp/cdproto/cdp" "log" "os" "regexp" "strconv" "strings" "time" "github.com/chromedp/chromedp" ) var ( lyLoginPath = "https://www.fao.com.cn/" ) // func LyDataDeal(cont context.Context) (err error) { func main() { // 读取 JSON 文件 configFile, err := os.ReadFile(utils.LY_JSON_PATH) if err != nil { fmt.Printf("读取配置文件错误: %v\n", err) return } // 定义通用的 map 结构体来解析 JSON var data map[string]map[string]map[string][]string // 解析 JSON 文件内容 err = json.Unmarshal(configFile, &data) if err != nil { fmt.Printf("解析配置文件错误: %v\n", err) return } // 打印解析后的数据以验证 fmt.Printf("%+v\n", data) // 创建 chromedp 执行上下文 options := []chromedp.ExecAllocatorOption{ chromedp.Flag("headless", false), chromedp.Flag("disable-blink-features", "AutomationControlled"), chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`), } allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...) defer cancel() ctx, cancel := chromedp.NewContext(allocCtx) defer cancel() // 登录操作 err = login(ctx) if err != nil { fmt.Printf("登录错误: %v\n", err) return } // 遍历配置并爬取数据 for product, productData := range data { for category, categoryData := range productData { for report, keywords := range categoryData { fmt.Printf("正在获取数据: %s -> %s -> %s\n", product, category, report) err = fetchReportData(ctx, product, category, report, keywords) if err != nil { fmt.Printf("获取数据错误: %s -> %s -> %s: %v\n", product, category, report, err) // 您看文章的速度太快了,歇一会再看吧 if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") { return } } } } } //return nil } func login(ctx context.Context) error { return chromedp.Run(ctx, chromedp.Navigate(lyLoginPath), chromedp.Sleep(5*time.Second), chromedp.Click(`a[id="btnLogin"]`, chromedp.ByQuery), chromedp.Sleep(2*time.Second), chromedp.SetValue(`input[id="userName"]`, utils.LY_USERNAME, chromedp.ByQuery), chromedp.SetValue(`input[id="pwd"]`, utils.LY_PASSWORD, chromedp.ByQuery), chromedp.Sleep(2*time.Second), chromedp.Click(`input[id="btn_Login"]`, chromedp.ByQuery), chromedp.Sleep(5*time.Second), ) } func fetchReportData(ctx context.Context, product, category, report string, keywords []string) error { // Navigate to the main page err := chromedp.Run(ctx, chromedp.Navigate(lyLoginPath), chromedp.Sleep(5*time.Second), ) if err != nil { return err } // Navigate to the product page productPageURL, err := fillProductPageURL(ctx, product, category) if err != nil { return err } // Navigate to the category page var categoryPageURL string err = chromedp.Run(ctx, chromedp.Navigate(productPageURL), chromedp.Sleep(5*time.Second), chromedp.Click(fmt.Sprintf(`//div[contains(@class, "newBox")]//a[contains(text(), '%s')]`, category), chromedp.BySearch), chromedp.Sleep(5*time.Second), chromedp.Location(&categoryPageURL), ) if err != nil { return err } logs.Info("categoryPageURL: %s: %s: %s", product, category, categoryPageURL) //var allReportURLs []string var allReportURLMap = make(map[string]string) for { var htmlContent string err = chromedp.Run(ctx, chromedp.Navigate(categoryPageURL), chromedp.Sleep(5*time.Second), chromedp.OuterHTML("html", &htmlContent), ) if err != nil { return err } fmt.Printf("页面内容: %s\n", htmlContent) // Extract report URLs containing the partial keyword reportURLMap := extractReportURLs(htmlContent, report) //allReportURLs = append(allReportURLs, reportURLs...) for key, value := range reportURLMap { allReportURLMap[key] = value } // 测试环境跑部分数据,上线放开 //break // Check if next page button is disabled // 测试环境跑部分数据,上线放开 var nextPageDisabled bool err = chromedp.Run(ctx, chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled), ) if err != nil { return err } if nextPageDisabled { break } // 校验获取到的url key在数据库是否存在 if IsExistInDB(allReportURLMap) { logs.Info("改页报告已存在处理的报告,无需再翻页: %s: %s: %s", product, category, report) break } // Click the next page button err = chromedp.Run(ctx, chromedp.Click(`div.my-page-next`, chromedp.ByQuery), chromedp.Sleep(10*time.Second), chromedp.Location(&categoryPageURL), ) if err != nil { return err } } logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLMap) if len(allReportURLMap) == 0 { return fmt.Errorf("未找到报告 URL") } // 处理报告数据 for key, value := range allReportURLMap { // 查询报告是否已经处理 这里只对近7天的数据进行处理 paramsLib := make(map[string]interface{}) paramsLib["Url"] = key postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_INDEX_RECORD_BY_URL) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) continue } var requestResponse models.RequestResponse[models.BaseFromLyIndexRecord] err = json.Unmarshal(postEdbLib, &requestResponse) lyIndexRecord := requestResponse.Data if lyIndexRecord.DataTime != "" { toTime, err := utils.StringToTime(lyIndexRecord.DataTime + " 00:00:00") if err != nil { logs.Error("时间格式转换错误: %s: %s: %s: %s: %v", product, category, report, key, err) continue } if time.Now().Sub(toTime) > 7*24*time.Hour { logs.Info("报告已处理: %s: %s: %s: %s", product, category, report, key) continue } } // 随机睡眠 rand := utils.RangeRand(20, 100) fmt.Println(report+";sleep:", strconv.Itoa(int(rand))) time.Sleep(time.Duration(rand) * time.Second) err = processReport(ctx, product, category, key, keywords) if err != nil { logs.Error("处理报告错误: %s: %s: %s: %s: %v", product, category, report, key, err) if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") { // 如果报告内容包含 “您看文章的速度太快了,歇一会再看吧” 则停止处理,发短信通知 // 发送短信通知 alarm_msg.SendAlarmMsg(fmt.Sprintf("粮油商务网-爬取指标数据被限制,请稍后重试, ErrMsg: %s", err.Error()), 1) return nil } continue } format, err := utils.ConvertTimeFormat(value) if err != nil { logs.Error("时间格式转换错误: %s, %s, %v: %v", product, category, value, err) continue } // 处理报告成功,将维护指标数据读取进度到数据库,避免后面重复读取 record := &models.BaseFromLyIndexRecord{ CreateTime: utils.GetCurrentTime(), ModifyTime: utils.GetCurrentTime(), Product: product, Category: category, Url: key, DataTime: format, } // 转换成json marshal, err := json.Marshal(record) if err != nil { logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err) continue } // json 转 interface var result map[string]interface{} err = json.Unmarshal(marshal, &result) postEdbLib, err = utils.PostEdbLibRequest(result, utils.ADD_LY_INDEX_RECORD) if err != nil { // 有错误就不继续执行 logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err) continue } logs.Info("维护指标数据读取进度成功: %s, %s, %v", product, category, key) } return nil } func IsExistInDB(urlMap map[string]string) bool { var urlList []string for key, _ := range urlMap { urlList = append(urlList, key) } paramsLib := make(map[string]interface{}) paramsLib["UrlList"] = urlList postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.CHECK_LY_INDEX_RECORD_IS_EXIST) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return false } var requestResponse models.RequestResponse[bool] err = json.Unmarshal(postEdbLib, &requestResponse) if err != nil { return false } return requestResponse.Data } func fillProductPageURL(ctx context.Context, product string, category string) (string, error) { // 选择 dl 标签下所有 a 标签的 XPath selector := `//dl[contains(@class, 'dl_hot')]//a` logs.Info("选择器表达式: %s", selector) var nodes []*cdp.Node var productPageURL string // 获取 dl 标签下的所有 a 标签节点 err := chromedp.Run(ctx, chromedp.WaitReady(selector, chromedp.BySearch), chromedp.Nodes(selector, &nodes, chromedp.BySearch), ) if err != nil { return "", err } // 提取并打印所有 a 标签的 OuterHTML var targetURL string for _, node := range nodes { var outerHTML string // 获取 a 标签的 OuterHTML err = chromedp.Run(ctx, chromedp.OuterHTML(node.FullXPath(), &outerHTML, chromedp.BySearch), ) if err != nil { return "", err } // 打印获取的 OuterHTML 内容 logs.Info("Link OuterHTML: %s", outerHTML) // 从 OuterHTML 中提取 href 和文本内容 // 使用正则或字符串处理提取 href 和文本内容 href, linkText := extractHrefAndText(outerHTML) // 打印提取的 href 和文本内容 logs.Info("Link Text: %s, Href: %s", linkText, href) // 如果文本内容匹配目标产品 if linkText == product { // 拼接完整的 URL /*if !strings.HasPrefix(href, "http") { href = lyLoginPath + href }*/ targetURL = href break } } if targetURL == "" { return "", fmt.Errorf("未找到匹配的产品链接") } // 显示更多内容 err = chromedp.Run(ctx, chromedp.Evaluate(`document.getElementById("moreSpeList").style.display = "block";`, nil), ) if err != nil { return "", err } // 点击目标产品的链接 clickSelector := fmt.Sprintf(`//a[@href='%s']`, targetURL) err = chromedp.Run(ctx, chromedp.WaitReady(clickSelector, chromedp.BySearch), chromedp.Click(clickSelector, chromedp.BySearch), chromedp.Sleep(5*time.Second), chromedp.Location(&productPageURL), ) if err != nil { return "", err } // 返回点击后的页面URL logs.Info("productPageURL: %s", productPageURL) return productPageURL, nil } // extractHrefAndText 从 OuterHTML 提取 href 和文本内容的辅助函数 func extractHrefAndText(outerHTML string) (string, string) { // 使用正则表达式或其他字符串处理方法提取 href 和文本内容 // 这里只是一个简单的例子,具体实现需要根据 HTML 结构来调整 hrefRegex := `href="([^"]+)"` textRegex := `>([^<]+)<` hrefMatches := regexp.MustCompile(hrefRegex).FindStringSubmatch(outerHTML) textMatches := regexp.MustCompile(textRegex).FindStringSubmatch(outerHTML) href := "" linkText := "" if len(hrefMatches) > 1 { href = hrefMatches[1] } if len(textMatches) > 1 { linkText = textMatches[1] } return href, linkText } // Extract report URLs from the HTML content func extractReportURLs(htmlContent, keyword string) map[string]string { //var reportURLs []string var reportURLMap = make(map[string]string) var reportURL string // Find all occurrences of the keyword and extract report URLs content := htmlContent for { startIdx := strings.Index(content, keyword) if startIdx == -1 { break } startIdx += len(keyword) // Extract the URL from the HTML content urlStartIdx := strings.LastIndex(content[:startIdx], `href="`) + len(`href="`) urlEndIdx := strings.Index(content[urlStartIdx:], `"`) + urlStartIdx if urlStartIdx > 0 && urlEndIdx > urlStartIdx { reportURL = content[urlStartIdx:urlEndIdx] //reportURLs = append(reportURLs, content[urlStartIdx:urlEndIdx]) } content = content[startIdx:] // Now extract the content inside the first
divStartIdx := strings.Index(content, `
`) if divStartIdx != -1 { divStartIdx += len(`
`) divEndIdx := strings.Index(content[divStartIdx:], `
`) + divStartIdx if divEndIdx > divStartIdx { shortRightContent := content[divStartIdx:divEndIdx] // Extract the first
content inside
innerDivStartIdx := strings.Index(shortRightContent, `
`) if innerDivStartIdx != -1 { innerDivStartIdx += len(`
`) //innerDivEndIdx := strings.Index(shortRightContent[innerDivStartIdx:], `
`) + innerDivStartIdx innerDivContent := shortRightContent[innerDivStartIdx:] fmt.Println("Inner Div Content:", innerDivContent) reportURLMap[reportURL] = innerDivContent } } } } return reportURLMap } // Process the report data func processReport(ctx context.Context, product string, category string, reportURL string, keywords []string) error { // Navigate to the report page var reportContent string err := chromedp.Run(ctx, chromedp.Navigate(lyLoginPath+reportURL), chromedp.WaitVisible("body", chromedp.ByQuery), // 等待 body 元素可见,确保页面已加载 chromedp.Sleep(5*time.Second), // 等待额外时间,以确保动态内容加载 chromedp.OuterHTML("html", &reportContent), // 获取页面 HTML 内容 ) if err != nil { return err } // 如果文章内容包含 “您看文章的速度太快了,歇一会再看吧” 则返回指定错误 if strings.Contains(reportContent, "您看文章的速度太快了,歇一会再看吧") { return fmt.Errorf("您看文章的速度太快了,歇一会再看吧") } var lyIndexDataList []models.BaseFromLyData var edbDataLyList []models.EdbDataLy // Process the data based on keywords for _, keyword := range keywords { partialKeyword := strings.Split(keyword, ":") // Select appropriate processor based on product and category processor, err := GetProcessor(product, category) if err != nil { return err } // 查询报告所属分类 paramsLib := make(map[string]interface{}) paramsLib["CategoryName"] = product postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_CLASSIFY_BY_NAME) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) continue } var requestResponse models.RequestResponse[models.BaseFromLyClassify] err = json.Unmarshal(postEdbLib, &requestResponse) if err != nil { return err } classify := requestResponse.Data // Process the report content using the selected processor baseFromLyDataList, err := processor.Process(ctx, product, reportContent, partialKeyword, classify.BaseFromLyClassifyId) if err != nil { return err } if len(baseFromLyDataList) > 0 { for _, baseFromLyData := range baseFromLyDataList { if baseFromLyData.DataTime != "" && baseFromLyData.IndexCode != "" && baseFromLyData.IndexCode != "lysww" { baseFromLyData.CreateTime = utils.GetCurrentTime() baseFromLyData.ModifyTime = utils.GetCurrentTime() // 补充 判断是否存在于指标库 paramsLib = make(map[string]interface{}) paramsLib["IndexCode"] = baseFromLyData.IndexCode paramsLib["Source"] = utils.DATA_SOURCE_LY postEdbLib, err := httpRequestFill(paramsLib, utils.GET_EDB_INFO_BY_INDEX_CODE) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) continue } var requestResponse models.RequestResponse[models.EdbInfo] err = json.Unmarshal(postEdbLib, &requestResponse) if err != nil { log.Printf("postEdbLib err: %v", err) continue } lyIndexDataList = append(lyIndexDataList, baseFromLyData) edbIndexData := requestResponse.Data if edbIndexData.EdbCode == "" { // 不存在 不用维护指标库数据 continue } // 存在 则同步新增指标库中的指标数据 edbDataLy := models.EdbDataLy{ CreateTime: utils.GetCurrentTime(), ModifyTime: utils.GetCurrentTime(), EdbInfoId: edbIndexData.EdbInfoId, EdbCode: edbIndexData.EdbCode, DataTime: baseFromLyData.DataTime, Value: baseFromLyData.Value, DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)), } edbDataLyList = append(edbDataLyList, edbDataLy) } } } } // 新增指标数据 if len(lyIndexDataList) > 0 { // 转换成json marshal, err := json.Marshal(lyIndexDataList) if err != nil { log.Printf("postEdbLib err: %v", err) return err } _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_LY_DATA_LIST, string(marshal), "application/json") if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return err } } // 新增指标库数据 if len(edbDataLyList) > 0 { // 转换成json marshal, err := json.Marshal(edbDataLyList) if err != nil { log.Printf("postEdbLib err: %v", err) return err } _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_LY_EDB_DATA, string(marshal), "application/json") //_, err := httpRequestFill(edbDataLyList, utils.ADD_BATCH_LY_EDB_DATA) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return err } } return nil } func addLyIndex(classifyId int, indexCode string, indexName string, unit string, frequency string) (int, error) { // 添加指标 index := &models.BaseFromLyIndex{ CreateTime: utils.GetCurrentTime(), ModifyTime: utils.GetCurrentTime(), BaseFromLyClassifyId: classifyId, IndexCode: indexCode, IndexName: indexName, Frequency: frequency, Unit: unit, EdbExist: 0, } postEdbLib, err := httpRequestFill(index, utils.ADD_LY_INDEX) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return 0, err } var requestResponse models.RequestResponse[int64] err = json.Unmarshal(postEdbLib, &requestResponse) indexId := requestResponse.Data return int(indexId), nil } func httpRequestFill(data interface{}, urlMethod string) (postEdbLib []byte, err error) { // 转换成json marshal, err := json.Marshal(data) if err != nil { return nil, err } // json 转 interface var result map[string]interface{} err = json.Unmarshal(marshal, &result) if err != nil { return nil, err } postEdbLib, err = utils.PostEdbLibRequest(result, urlMethod) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return nil, err } return postEdbLib, nil }