123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- package liangyou
- import (
- "context"
- "encoding/json"
- "eta/eta_data_analysis/models"
- "eta/eta_data_analysis/services/alarm_msg"
- "eta/eta_data_analysis/utils"
- "fmt"
- "github.com/beego/beego/v2/core/logs"
- "github.com/chromedp/cdproto/cdp"
- "log"
- "os"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/chromedp/chromedp"
- )
- var (
- lyLoginPath = "https://www.fao.com.cn/"
- )
- func LyDataDeal(cont context.Context) (err error) {
- // 读取 JSON 文件
- configFile, err := os.ReadFile(utils.LY_JSON_PATH)
- if err != nil {
- fmt.Printf("读取配置文件错误: %v\n", err)
- return
- }
- // 定义通用的 map 结构体来解析 JSON
- var data map[string]map[string]map[string][]string
- // 解析 JSON 文件内容
- err = json.Unmarshal(configFile, &data)
- if err != nil {
- fmt.Printf("解析配置文件错误: %v\n", err)
- return
- }
- // 打印解析后的数据以验证
- fmt.Printf("%+v\n", data)
- // 创建 chromedp 执行上下文
- options := []chromedp.ExecAllocatorOption{
- chromedp.Flag("headless", false),
- chromedp.Flag("disable-blink-features", "AutomationControlled"),
- chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
- }
- allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...)
- defer cancel()
- ctx, cancel := chromedp.NewContext(allocCtx)
- defer cancel()
- // 登录操作
- err = login(ctx)
- if err != nil {
- fmt.Printf("登录错误: %v\n", err)
- return
- }
- // 遍历配置并爬取数据
- for product, productData := range data {
- for category, categoryData := range productData {
- for report, keywords := range categoryData {
- fmt.Printf("正在获取数据: %s -> %s -> %s\n", product, category, report)
- err = fetchReportData(ctx, product, category, report, keywords)
- if err != nil {
- fmt.Printf("获取数据错误: %s -> %s -> %s: %v\n", product, category, report, err)
- // 您看文章的速度太快了,歇一会再看吧
- if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") {
- return
- }
- }
- }
- }
- }
- return nil
- }
- func login(ctx context.Context) error {
- return chromedp.Run(ctx,
- chromedp.Navigate(lyLoginPath),
- chromedp.Sleep(5*time.Second),
- chromedp.Click(`a[id="btnLogin"]`, chromedp.ByQuery),
- chromedp.Sleep(2*time.Second),
- chromedp.SetValue(`input[id="userName"]`, utils.LY_USERNAME, chromedp.ByQuery),
- chromedp.SetValue(`input[id="pwd"]`, utils.LY_PASSWORD, chromedp.ByQuery),
- chromedp.Sleep(2*time.Second),
- chromedp.Click(`input[id="btn_Login"]`, chromedp.ByQuery),
- chromedp.Sleep(5*time.Second),
- )
- }
- func fetchReportData(ctx context.Context, product, category, report string, keywords []string) error {
- // Navigate to the main page
- err := chromedp.Run(ctx,
- chromedp.Navigate(lyLoginPath),
- chromedp.Sleep(5*time.Second),
- )
- if err != nil {
- return err
- }
- // Navigate to the product page
- productPageURL, err := fillProductPageURL(ctx, product, category)
- if err != nil {
- return err
- }
- // Navigate to the category page
- var categoryPageURL string
- err = chromedp.Run(ctx,
- chromedp.Navigate(productPageURL),
- chromedp.Sleep(5*time.Second),
- chromedp.Click(fmt.Sprintf(`//div[contains(@class, "newBox")]//a[contains(text(), '%s')]`, category), chromedp.BySearch),
- chromedp.Sleep(5*time.Second),
- chromedp.Location(&categoryPageURL),
- )
- if err != nil {
- return err
- }
- logs.Info("categoryPageURL: %s: %s: %s", product, category, categoryPageURL)
- //var allReportURLs []string
- var allReportURLMap = make(map[string]string)
- for {
- var htmlContent string
- err = chromedp.Run(ctx,
- chromedp.Navigate(categoryPageURL),
- chromedp.Sleep(5*time.Second),
- chromedp.OuterHTML("html", &htmlContent),
- )
- if err != nil {
- return err
- }
- fmt.Printf("页面内容: %s\n", htmlContent)
- // Extract report URLs containing the partial keyword
- reportURLMap := extractReportURLs(htmlContent, report)
- //allReportURLs = append(allReportURLs, reportURLs...)
- for key, value := range reportURLMap {
- allReportURLMap[key] = value
- }
- // 测试环境跑部分数据,上线放开
- //break
- // Check if next page button is disabled
- // 测试环境跑部分数据,上线放开
- var nextPageDisabled bool
- err = chromedp.Run(ctx,
- chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled),
- )
- if err != nil {
- return err
- }
- if nextPageDisabled {
- break
- }
- // Click the next page button
- err = chromedp.Run(ctx,
- chromedp.Click(`div.my-page-next`, chromedp.ByQuery),
- chromedp.Sleep(5*time.Second),
- chromedp.Location(&categoryPageURL),
- )
- if err != nil {
- return err
- }
- }
- logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLMap)
- if len(allReportURLMap) == 0 {
- return fmt.Errorf("未找到报告 URL")
- }
- // 处理报告数据
- for key, value := range allReportURLMap {
- // 查询报告是否已经处理 这里只对近7天的数据进行处理
- paramsLib := make(map[string]interface{})
- paramsLib["Url"] = key
- postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_INDEX_RECORD_BY_URL)
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- continue
- }
- var requestResponse models.RequestResponse[models.BaseFromLyIndexRecord]
- err = json.Unmarshal(postEdbLib, &requestResponse)
- lyIndexRecord := requestResponse.Data
- if lyIndexRecord.DataTime != "" {
- toTime, err := utils.StringToTime(lyIndexRecord.DataTime + " 00:00:00")
- if err != nil {
- logs.Error("时间格式转换错误: %s: %s: %s: %s: %v", product, category, report, key, err)
- continue
- }
- if time.Now().Sub(toTime) > 7*24*time.Hour {
- logs.Info("报告已处理: %s: %s: %s: %s", product, category, report, key)
- continue
- }
- }
- // 随机睡眠
- rand := utils.RangeRand(20, 100)
- fmt.Println(report+";sleep:", strconv.Itoa(int(rand)))
- time.Sleep(time.Duration(rand) * time.Second)
- err = processReport(ctx, product, category, key, keywords)
- if err != nil {
- logs.Error("处理报告错误: %s: %s: %s: %s: %v", product, category, report, key, err)
- if strings.Contains(err.Error(), "您看文章的速度太快了,歇一会再看吧") {
- // 如果报告内容包含 “您看文章的速度太快了,歇一会再看吧” 则停止处理,发短信通知
- // 发送短信通知
- alarm_msg.SendAlarmMsg(fmt.Sprintf("粮油商务网-爬取指标数据被限制,请稍后重试, ErrMsg: %s", err.Error()), 1)
- return nil
- }
- continue
- }
- format, err := utils.ConvertTimeFormat(value)
- if err != nil {
- logs.Error("时间格式转换错误: %s, %s, %v: %v", product, category, value, err)
- continue
- }
- // 处理报告成功,将维护指标数据读取进度到数据库,避免后面重复读取
- record := &models.BaseFromLyIndexRecord{
- CreateTime: utils.GetCurrentTime(),
- ModifyTime: utils.GetCurrentTime(),
- Product: product,
- Category: category,
- Url: key,
- DataTime: format,
- }
- // 转换成json
- marshal, err := json.Marshal(record)
- if err != nil {
- logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err)
- continue
- }
- // json 转 interface
- var result map[string]interface{}
- err = json.Unmarshal(marshal, &result)
- postEdbLib, err = utils.PostEdbLibRequest(result, utils.ADD_LY_INDEX_RECORD)
- if err != nil {
- // 有错误就不继续执行
- logs.Error("维护指标数据读取进度错误: %s, %s, %v: %v", product, category, key, err)
- continue
- }
- logs.Info("维护指标数据读取进度成功: %s, %s, %v", product, category, key)
- }
- return nil
- }
- func fillProductPageURL(ctx context.Context, product string, category string) (string, error) {
- // 选择 dl 标签下所有 a 标签的 XPath
- selector := `//dl[contains(@class, 'dl_hot')]//a`
- logs.Info("选择器表达式: %s", selector)
- var nodes []*cdp.Node
- var productPageURL string
- // 获取 dl 标签下的所有 a 标签节点
- err := chromedp.Run(ctx,
- chromedp.WaitReady(selector, chromedp.BySearch),
- chromedp.Nodes(selector, &nodes, chromedp.BySearch),
- )
- if err != nil {
- return "", err
- }
- // 提取并打印所有 a 标签的 OuterHTML
- var targetURL string
- for _, node := range nodes {
- var outerHTML string
- // 获取 a 标签的 OuterHTML
- err = chromedp.Run(ctx,
- chromedp.OuterHTML(node.FullXPath(), &outerHTML, chromedp.BySearch),
- )
- if err != nil {
- return "", err
- }
- // 打印获取的 OuterHTML 内容
- logs.Info("Link OuterHTML: %s", outerHTML)
- // 从 OuterHTML 中提取 href 和文本内容
- // 使用正则或字符串处理提取 href 和文本内容
- href, linkText := extractHrefAndText(outerHTML)
- // 打印提取的 href 和文本内容
- logs.Info("Link Text: %s, Href: %s", linkText, href)
- // 如果文本内容匹配目标产品
- if linkText == product {
- // 拼接完整的 URL
- /*if !strings.HasPrefix(href, "http") {
- href = lyLoginPath + href
- }*/
- targetURL = href
- break
- }
- }
- if targetURL == "" {
- return "", fmt.Errorf("未找到匹配的产品链接")
- }
- // 显示更多内容
- err = chromedp.Run(ctx,
- chromedp.Evaluate(`document.getElementById("moreSpeList").style.display = "block";`, nil),
- )
- if err != nil {
- return "", err
- }
- // 点击目标产品的链接
- clickSelector := fmt.Sprintf(`//a[@href='%s']`, targetURL)
- err = chromedp.Run(ctx,
- chromedp.WaitReady(clickSelector, chromedp.BySearch),
- chromedp.Click(clickSelector, chromedp.BySearch),
- chromedp.Sleep(5*time.Second),
- chromedp.Location(&productPageURL),
- )
- if err != nil {
- return "", err
- }
- // 返回点击后的页面URL
- logs.Info("productPageURL: %s", productPageURL)
- return productPageURL, nil
- }
- // extractHrefAndText 从 OuterHTML 提取 href 和文本内容的辅助函数
- func extractHrefAndText(outerHTML string) (string, string) {
- // 使用正则表达式或其他字符串处理方法提取 href 和文本内容
- // 这里只是一个简单的例子,具体实现需要根据 HTML 结构来调整
- hrefRegex := `href="([^"]+)"`
- textRegex := `>([^<]+)<`
- hrefMatches := regexp.MustCompile(hrefRegex).FindStringSubmatch(outerHTML)
- textMatches := regexp.MustCompile(textRegex).FindStringSubmatch(outerHTML)
- href := ""
- linkText := ""
- if len(hrefMatches) > 1 {
- href = hrefMatches[1]
- }
- if len(textMatches) > 1 {
- linkText = textMatches[1]
- }
- return href, linkText
- }
- // Extract report URLs from the HTML content
- func extractReportURLs(htmlContent, keyword string) map[string]string {
- //var reportURLs []string
- var reportURLMap = make(map[string]string)
- var reportURL string
- // Find all occurrences of the keyword and extract report URLs
- content := htmlContent
- for {
- startIdx := strings.Index(content, keyword)
- if startIdx == -1 {
- break
- }
- startIdx += len(keyword)
- // Extract the URL from the HTML content
- urlStartIdx := strings.LastIndex(content[:startIdx], `href="`) + len(`href="`)
- urlEndIdx := strings.Index(content[urlStartIdx:], `"`) + urlStartIdx
- if urlStartIdx > 0 && urlEndIdx > urlStartIdx {
- reportURL = content[urlStartIdx:urlEndIdx]
- //reportURLs = append(reportURLs, content[urlStartIdx:urlEndIdx])
- }
- content = content[startIdx:]
- // Now extract the content inside the first <div class="short_right">
- divStartIdx := strings.Index(content, `<div class="short_right">`)
- if divStartIdx != -1 {
- divStartIdx += len(`<div class="short_right">`)
- divEndIdx := strings.Index(content[divStartIdx:], `</div>`) + divStartIdx
- if divEndIdx > divStartIdx {
- shortRightContent := content[divStartIdx:divEndIdx]
- // Extract the first <div> content inside <div class="short_right">
- innerDivStartIdx := strings.Index(shortRightContent, `<div>`)
- if innerDivStartIdx != -1 {
- innerDivStartIdx += len(`<div>`)
- //innerDivEndIdx := strings.Index(shortRightContent[innerDivStartIdx:], `</div>`) + innerDivStartIdx
- innerDivContent := shortRightContent[innerDivStartIdx:]
- fmt.Println("Inner Div Content:", innerDivContent)
- reportURLMap[reportURL] = innerDivContent
- }
- }
- }
- }
- return reportURLMap
- }
- // Process the report data
- func processReport(ctx context.Context, product string, category string, reportURL string, keywords []string) error {
- // Navigate to the report page
- var reportContent string
- err := chromedp.Run(ctx,
- chromedp.Navigate(lyLoginPath+reportURL),
- chromedp.WaitVisible("body", chromedp.ByQuery), // 等待 body 元素可见,确保页面已加载
- chromedp.Sleep(5*time.Second), // 等待额外时间,以确保动态内容加载
- chromedp.OuterHTML("html", &reportContent), // 获取页面 HTML 内容
- )
- if err != nil {
- return err
- }
- // 如果文章内容包含 “您看文章的速度太快了,歇一会再看吧” 则返回指定错误
- if strings.Contains(reportContent, "您看文章的速度太快了,歇一会再看吧") {
- return fmt.Errorf("您看文章的速度太快了,歇一会再看吧")
- }
- var lyIndexDataList []models.BaseFromLyData
- var edbDataLyList []models.EdbDataLy
- // Process the data based on keywords
- for _, keyword := range keywords {
- partialKeyword := strings.Split(keyword, ":")
- // Select appropriate processor based on product and category
- processor, err := GetProcessor(product, category)
- if err != nil {
- return err
- }
- // 查询报告所属分类
- paramsLib := make(map[string]interface{})
- paramsLib["CategoryName"] = product
- postEdbLib, err := utils.PostEdbLibRequest(paramsLib, utils.GET_LY_CLASSIFY_BY_NAME)
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- continue
- }
- var requestResponse models.RequestResponse[models.BaseFromLyClassify]
- err = json.Unmarshal(postEdbLib, &requestResponse)
- if err != nil {
- return err
- }
- classify := requestResponse.Data
- // Process the report content using the selected processor
- baseFromLyDataList, err := processor.Process(ctx, product, reportContent, partialKeyword, classify.BaseFromLyClassifyId)
- if err != nil {
- return err
- }
- if len(baseFromLyDataList) > 0 {
- for _, baseFromLyData := range baseFromLyDataList {
- if baseFromLyData.DataTime != "" && baseFromLyData.IndexCode != "" && baseFromLyData.IndexCode != "lysww" {
- baseFromLyData.CreateTime = utils.GetCurrentTime()
- baseFromLyData.ModifyTime = utils.GetCurrentTime()
- // 补充 判断是否存在于指标库
- paramsLib = make(map[string]interface{})
- paramsLib["IndexCode"] = baseFromLyData.IndexCode
- paramsLib["Source"] = utils.DATA_SOURCE_LY
- postEdbLib, err := httpRequestFill(paramsLib, utils.GET_EDB_INFO_BY_INDEX_CODE)
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- continue
- }
- var requestResponse models.RequestResponse[models.EdbInfo]
- err = json.Unmarshal(postEdbLib, &requestResponse)
- if err != nil {
- log.Printf("postEdbLib err: %v", err)
- continue
- }
- lyIndexDataList = append(lyIndexDataList, baseFromLyData)
- edbIndexData := requestResponse.Data
- if edbIndexData.EdbCode == "" {
- // 不存在 不用维护指标库数据
- continue
- }
- // 存在 则同步新增指标库中的指标数据
- edbDataLy := models.EdbDataLy{
- CreateTime: utils.GetCurrentTime(),
- ModifyTime: utils.GetCurrentTime(),
- EdbInfoId: edbIndexData.EdbInfoId,
- EdbCode: edbIndexData.EdbCode,
- DataTime: baseFromLyData.DataTime,
- Value: baseFromLyData.Value,
- DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
- }
- edbDataLyList = append(edbDataLyList, edbDataLy)
- }
- }
- }
- }
- // 新增指标数据
- if len(lyIndexDataList) > 0 {
- // 转换成json
- marshal, err := json.Marshal(lyIndexDataList)
- if err != nil {
- log.Printf("postEdbLib err: %v", err)
- return err
- }
- _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_LY_DATA_LIST, string(marshal), "application/json")
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- return err
- }
- }
- // 新增指标库数据
- if len(edbDataLyList) > 0 {
- // 转换成json
- marshal, err := json.Marshal(edbDataLyList)
- if err != nil {
- log.Printf("postEdbLib err: %v", err)
- return err
- }
- _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_LY_EDB_DATA, string(marshal), "application/json")
- //_, err := httpRequestFill(edbDataLyList, utils.ADD_BATCH_LY_EDB_DATA)
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- return err
- }
- }
- return nil
- }
- func addLyIndex(classifyId int, indexCode string, indexName string, unit string, frequency string) (int, error) {
- // 添加指标
- index := &models.BaseFromLyIndex{
- CreateTime: utils.GetCurrentTime(),
- ModifyTime: utils.GetCurrentTime(),
- BaseFromLyClassifyId: classifyId,
- IndexCode: indexCode,
- IndexName: indexName,
- Frequency: frequency,
- Unit: unit,
- EdbExist: 0,
- }
- postEdbLib, err := httpRequestFill(index, utils.ADD_LY_INDEX)
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- return 0, err
- }
- var requestResponse models.RequestResponse[int64]
- err = json.Unmarshal(postEdbLib, &requestResponse)
- indexId := requestResponse.Data
- return int(indexId), nil
- }
- func httpRequestFill(data interface{}, urlMethod string) (postEdbLib []byte, err error) {
- // 转换成json
- marshal, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- // json 转 interface
- var result map[string]interface{}
- err = json.Unmarshal(marshal, &result)
- if err != nil {
- return nil, err
- }
- postEdbLib, err = utils.PostEdbLibRequest(result, urlMethod)
- if err != nil {
- // 有错误就不继续执行
- log.Printf("postEdbLib err: %v", err)
- return nil, err
- }
- return postEdbLib, nil
- }
|