// @Author gmy 2024/8/6 10:50:00 package main import ( "context" "encoding/json" "eta/eta_crawler/models" "eta/eta_crawler/utils" "fmt" "github.com/PuerkitoBio/goquery" "github.com/beego/beego/v2/core/logs" "github.com/chromedp/chromedp" "log" "regexp" "strconv" "strings" ) const ( sourceName = "lysww" // 粮油商务网 ) // ImportCostProcessor // @Description: 进口成本处理器 type ImportCostProcessor struct{} func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) { fmt.Println("Processing import cost...") // 实现具体的处理逻辑 return models.BaseFromLyData{}, nil } // ProcessingProfitProcessor // @Description: 加工利润处理器 type ProcessingProfitProcessor struct{} func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) { fmt.Println("Processing processing profit...") // 实现具体的处理逻辑 return models.BaseFromLyData{}, nil } // ProcessingReportProcessor // @Description: 加工报告处理器 type ProcessingReportProcessor struct { } // TableData 用于存储表格的数据 type TableData struct { Headers []string `json:"headers"` Rows [][]string `json:"rows"` } func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) { logs.Info("Processing processing report...") // 解析关键字 if len(keywords) < 3 { return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements") } // 拿到 行关键字和列关键字 columnName := keywords[0] rowName := keywords[1] // 提取所有表格数据 var tableData []TableData doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent)) if err != nil { log.Fatal(err) } // 选择 id 为 "a_content" 的 div doc.Find("#a_content").Each(func(index int, item *goquery.Selection) { item.Find("table").Each(func(index int, table *goquery.Selection) { var headers []string var rows [][]string // 提取表头 table.Find("thead th").Each(func(index int, th *goquery.Selection) { headers = append(headers, th.Text()) }) // 提取表格行数据 table.Find("tbody tr").Each(func(index int, row *goquery.Selection) { var rowData []string row.Find("td").Each(func(index int, td *goquery.Selection) { rowData = append(rowData, td.Text()) }) rows = append(rows, rowData) }) // 仅在表头存在时添加到结果中 if len(headers) > 0 { tableData = append(tableData, TableData{ Headers: headers, Rows: rows, }) } }) }) // 打印提取的数据以进行调试 dataJSON, _ := json.MarshalIndent(tableData, "", " ") fmt.Printf("Extracted Table Data: %s\n", dataJSON) // 提取日期信息 var dateText string err = chromedp.Run(ctx, chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText), ) if err != nil { return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to extract date: %v", err) } logs.Info("ProcessingReportProcessor Process() : Extracted Date: %s", dateText) // 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return models.BaseFromLyData{}, err } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err) } if len(indexData) > 0 { logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) return models.BaseFromLyData{}, nil } // 解析日期并计算当前周数 targetWeek, err := utils.ParseDateAndWeek(dateText) if err != nil { return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err) } fmt.Printf("Target Week: %s\n", targetWeek) // 处理提取的表格数据 for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { headerString := extractChinese(header) if strings.Contains(columnName, headerString) { columnIdx = i break } } if columnIdx == -1 { logs.Error("ProcessingReportProcessor Process() : Column '%s' not found in table", columnName) continue } // 查找本周的列位置 weekIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, targetWeek) && i > columnIdx { weekIdx = i break } } if weekIdx == -1 { fmt.Printf("Week column '%s' not found in table\n", targetWeek) continue } // 查找目标行 for _, row := range tableRows { if len(row) > 0 && strings.Contains(row[0], rowName) { if weekIdx < len(row) { logs.Info("Value in column '%s' - '%s': %s", columnName, rowName, row[columnIdx]) numFlag := isNumeric(row[columnIdx]) if numFlag { value, err := strconv.ParseFloat(row[columnIdx], 64) if err != nil { logs.Error("ProcessingReportProcessor Process() : Error converting value to float64: %v", err) return models.BaseFromLyData{}, err } // 返回BaseFromLyData对象的数据 baseFromLyData := models.BaseFromLyData{ DataTime: dateText, Value: value, } return baseFromLyData, nil } } else { logs.Error("ProcessingReportProcessor Process() : Column index out of range") } } } } // TODO 后面把这个日志打印,不做返回错误处理,一个指标找不到会导致后续指标无法处理 return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : No matching row found for '%s'", rowName) } // 判断字符串是否是数字 func isNumeric(value string) bool { // 正则表达式匹配整数和浮点数 re := regexp.MustCompile(`^[+-]?(\d+(\.\d*)?|\.\d+)$`) return re.MatchString(value) } // 只保留汉字 func extractChinese(text string) string { re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符 return re.ReplaceAllString(text, "") }