// Package liangyou // @Author gmy 2024/8/6 10:50:00 package liangyou import ( "context" "eta/eta_crawler/models" "eta/eta_crawler/utils" "fmt" "github.com/PuerkitoBio/goquery" "github.com/beego/beego/v2/core/logs" "github.com/chromedp/chromedp" "log" "regexp" "strconv" "strings" "unicode" ) var ( lySourceName = "lysww" // 粮油商务网 ) // TableData 用于存储表格的数据 type TableData struct { Headers []string `json:"headers"` Rows [][]string `json:"rows"` } // ImportCostProcessor // @Description: 进口成本处理器 type ImportCostProcessor struct{} func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing import cost...") // 解析关键字 if len(keywords) < 5 { return []models.BaseFromLyData{}, fmt.Errorf("ImportCostProcessor Process() : keywords must contain at least 5 elements") } // 拿到 行关键字和列关键字 columnName := keywords[len(keywords)-4] rowVariety := keywords[0] rowPort := keywords[len(keywords)-3] indexNamePrefix := keywords[:1] indexNameSuffix := keywords[1:] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 解析日期并计算当前月份 var targetMonths []string if product == "油菜籽" { targetMonths, err = utils.ParseDateAndMonthColzaOil(format) } else { targetMonths, err = utils.ParseDateAndMonth(dateText) } if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("ImportCostProcessor Process() : Failed to parse date: %v", err) } fmt.Printf("Target Month: %s\n", targetMonths) // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnName) { columnIdx = i break } } if columnIdx == -1 { log.Printf("ImportCostProcessor Process() : Column '%s' not found in table", columnName) continue } // 处理表格中的每一行 //var flag bool = true var previousRowVariety string var previousRowPort string for rowIndex, row := range tableRows { if len(row) == len(tableHeaders) { previousRowVariety = row[0] previousRowPort = row[1] } else if len(row) == len(tableHeaders)-1 { previousRowPort = row[0] row = append([]string{previousRowVariety}, row...) tableRows[rowIndex] = row } else if len(row) == len(tableHeaders)-2 { row = append([]string{previousRowVariety, previousRowPort}, row...) tableRows[rowIndex] = row } for _, targetMonth := range targetMonths { if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort { if columnIdx < len(row) { // 指标名称 indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...) indexName := strings.Join(indexNameList[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("ImportCostProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("ImportCostProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("ImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("ImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort) } break } } } } return result, nil } // ProcessingProfitProcessor // @Description: 加工利润处理器 type ProcessingProfitProcessor struct{} func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { fmt.Println("Processing processing profit...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("ProcessingProfitProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 columnName := keywords[1] rowVariety := keywords[0] indexNamePrefix := keywords[:1] indexNameSuffix := keywords[1:] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 解析日期并计算当前月份 和 后两月 yearMonths, err := utils.ConvertTimeFormatToYearMonth(format) if err != nil { return nil, err } fmt.Printf("Target yearMonth: %s\n", yearMonths) // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(columnName, header) { columnIdx = i break } } if columnIdx == -1 { log.Printf("ProcessingProfitProcessor Process() : Column '%s' not found in table", columnName) continue } // 处理表格中的每一行 var previousRowVariety string for rowIndex, row := range tableRows { if len(row) == len(tableHeaders) { previousRowVariety = row[0] } else if len(row) == len(tableHeaders)-1 { row = append([]string{previousRowVariety}, row...) tableRows[rowIndex] = row } for _, targetMonth := range yearMonths { if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth { if columnIdx < len(row) { // 指标名称 indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...) indexName := strings.Join(indexNameList[:len(keywords)-1], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("ProcessingProfitProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("ProcessingProfitProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("ProcessingProfitProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("ProcessingProfitProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName) } break } } } } return result, nil } // ShippingCostProcessor // @Description: 船运费用处理器 type ShippingCostProcessor struct{} func (p *ShippingCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { fmt.Println("Processing processing profit...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("ShippingCostProcessor Process() : keywords must contain at least 5 elements") } // 拿到 行关键字和列关键字 columnName := keywords[len(keywords)-3] rowVariety := keywords[0] rowDestination := keywords[1] rowShipType := keywords[2] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnName) { columnIdx = i break } } if columnIdx == -1 { log.Printf("ShippingCostProcessor Process() : Column '%s' not found in table", columnName) continue } // 处理表格中的每一行 for rowIndex, row := range tableRows { if len(row) == len(tableHeaders)-1 { row = append([]string{rowVariety}, row...) tableRows[rowIndex] = row rowShipType, err = extractValueInParentheses(rowVariety) if err != nil { logs.Error("ShippingCostProcessor Process() : Failed to extract value in parentheses: %v", err) continue } } if len(row) >= len(tableHeaders) && row[0] == rowVariety && (row[1] == rowDestination || strings.Contains(row[0], row[1])) && row[2] == rowShipType { if columnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-3], `:`) // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("ShippingCostProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("ShippingCostProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("ShippingCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("ShippingCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName) } break } } } return result, nil } // SupplyDemandBalanceProcessor // @Description: 供需平衡处理器 type SupplyDemandBalanceProcessor struct{} func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { // https://www.fao.com.cn/art/gG7gKTCNDHLJNsq9QRYjoQ==.htm logs.Info("Processing processing report...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 var columnName string rowVariety := keywords[1] // 提取所有表格数据 tableData := getTableData(reportContent, true) logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format) if err != nil { return nil, err } month, err := utils.GetCurrentMonth(format) if err != nil { return nil, err } monthSuffix := "预估" logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix) // 处理提取的表格数据 var result []models.BaseFromLyData headers := tableData.Headers rows := tableData.Rows for _, year := range currentYearAndNextYear { columnName = year + month + monthSuffix isCurrentYear, err := utils.IsCurrentYear(year) if err != nil { logs.Error("SupplyDemandBalanceProcessor Process() : Failed to determine if year is current year: %v", err) continue } if !isCurrentYear { format, err = utils.GetNextYearLastDay(format) if err != nil { logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get next year last day: %v", err) continue } } // 查找目标列 columnIdx := -1 for i, header := range headers { if strings.Contains(columnName, header) { columnIdx = i break } } if columnIdx == -1 { logs.Error("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName) continue } // 处理表格中的每一行 for _, row := range rows { if len(row) >= len(headers) && row[0] == rowVariety { if columnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err) } yearMonth, err := utils.GetYearMonth(format) if err != nil { logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get year month: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, yearMonth) if err != nil { logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 存在走更新逻辑 主要更新今年在去年的预估值 indexData := indexData[0] if indexData.Value != value { time, err := utils.StringToTime(indexData.ModifyTime) if err != nil { return nil, err } timeZero, err := utils.StringToTimeZero(format) if err != nil { return nil, err } if time.Before(timeZero) { // 更新指标数据 err := models.UpdateLyDataById(indexData.BaseFromLyDataId, value) if err != nil { logs.Error("SupplyDemandBalanceProcessor Process() : Failed to update data: %v", err) continue } // 更新指标库数据 edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(indexData.IndexCode, yearMonth) if err != nil { return nil, err } if len(edbIndexData) > 0 { err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value) if err != nil { return nil, err } } } } continue } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName) } break } } } return result, nil } // PurchaseShippingProcessor // @Description: 采购装船处理器 type PurchaseShippingProcessor struct{} func (p *PurchaseShippingProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing purchase shipping...") // 解析关键字 if len(keywords) < 3 { return []models.BaseFromLyData{}, fmt.Errorf("PurchaseShippingProcessor Process() : keywords must contain at least 3 elements") } // 拿到 行关键字和列关键字 columnName := keywords[len(keywords)-3] // 提取所有表格数据 tableData := getPurchaseShippingTableData(reportContent) logs.Info("PurchaseShippingProcessor Process() : Table data: %v", tableData) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 处理提取的表格数据 var result []models.BaseFromLyData headers := tableData.Headers rows := tableData.Rows // 查找目标列 columnIdx := -1 for i, header := range headers { if strings.Contains(columnName, header) { columnIdx = i break } } if columnIdx == -1 { log.Printf("PurchaseShippingProcessor Process() : Column '%s' not found in table", columnName) } else { // 处理表格中的每一行 for _, row := range rows { if len(row) >= len(headers) { if columnIdx < len(row) { if !isNumber(row[columnIdx]) { continue } // 指标名称 indexName := strings.Join(keywords[:len(keywords)-3], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("PurchaseShippingProcessor Process() : Failed to get index id: %v", err) continue } var yearMonth string number, err := utils.ConvertMonthToNumber(row[1]) if err != nil { return nil, err } yearMonth = row[0] + "-" + number isSameMonth, err := utils.IsSameMonth(format, yearMonth) if err != nil { return nil, err } if isSameMonth { yearMonth = format } else { lastDayOfMonth, err := utils.GetLastDayOfMonth(yearMonth) if err != nil { return nil, err } yearMonth = lastDayOfMonth } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("PurchaseShippingProcessor Process() : failed to parse value '%s': %v", valueStr, err) } month, err := utils.GetYearMonth(yearMonth) if err != nil { return nil, err } indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, month) if err != nil { logs.Error("PurchaseShippingProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { if indexData[0].Value != value { logs.Info("PurchaseShippingProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) lyData := indexData[0] time, err := utils.StringToTime(lyData.ModifyTime) if err != nil { return nil, err } timeZero, err := utils.StringToTimeZero(format) if err != nil { return nil, err } if time.Before(timeZero) { // 更新指标数据 err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value) if err != nil { return nil, err } // 同步更新指标库数据 须根据指标编码和日期更新 edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(lyData.IndexCode, month) if err != nil { return nil, err } if len(edbIndexData) > 0 { err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value) if err != nil { return nil, err } } } } continue } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: yearMonth, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) continue } else { log.Printf("PurchaseShippingProcessor Process() : Column index out of range for row '%s'", columnName) } break } } } return result, nil } // ProcessingReportProcessor // @Description: 加工报告处理器 type ProcessingReportProcessor struct { } func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing processing report...") // 解析关键字 if len(keywords) < 3 { return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements") } // 拿到 行关键字和列关键字 columnName := keywords[0] rowName := keywords[1] // 提取所有表格数据 tableData := getAllTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } indexName := strings.Join(keywords[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { return nil, err } // 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err) } if len(indexData) > 0 { logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 不必做更新处理,报告每周刷新,即使本周和上周数据一致,也需要每周记录 return []models.BaseFromLyData{}, nil } // 解析日期并计算当前周数 targetWeek, err := utils.ParseDateAndWeek(dateText) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err) } fmt.Printf("Target Week: %s\n", targetWeek) var result []models.BaseFromLyData // 处理提取的表格数据 for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { headerString := extractChinese(header) if strings.Contains(columnName, headerString) { // 这个表格不是很好处理,这里写的有些僵硬,后续需要优化 if columnName == "国内大豆开机率" { i = i + 2 } columnIdx = i break } } if columnIdx == -1 { logs.Error("ProcessingReportProcessor Process() : Column '%s' not found in table", columnName) continue } // 查找本周的列位置 weekIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, targetWeek) && i > columnIdx { weekIdx = i break } } if weekIdx == -1 { fmt.Printf("Week column '%s' not found in table\n", targetWeek) continue } // 查找目标行 for _, row := range tableRows { if strings.Contains(row[0], rowName) { if columnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("ProcessingReportProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需走更新逻辑,报告每日更新,即使今天和昨天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) continue } else { log.Printf("ProcessingReportProcessor Process() : Column index out of range for row '%s', '%s'", rowName, columnName) } break } } } return result, nil } // InventoryAnalysisProcessor // @Description: 库存分析处理器 type InventoryAnalysisProcessor struct{} func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { // https://www.fao.com.cn/art/yg1IKj9FpPEIDv2LefnPhQ==.htm logs.Info("Processing inventory analysis...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("InventoryAnalysisProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 columnName := keywords[0] rowVariety := keywords[1] columnSuffix := "本周" columnName = columnName + columnSuffix // 提取所有表格数据 tableData := getTableData(reportContent, true) logs.Info("InventoryAnalysisProcessor Process() : Table data: %v", tableData) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 处理提取的表格数据 var result []models.BaseFromLyData headers := tableData.Headers rows := tableData.Rows // 查找目标列 columnIdx := -1 for i, header := range headers { header := removeParentheses(header) if strings.Contains(columnName, header) { columnIdx = i break } } if columnIdx == -1 { logs.Error("InventoryAnalysisProcessor Process() : Column '%s' not found in table", columnName) } else { // 处理表格中的每一行 for _, row := range rows { if len(row) >= len(headers) && strings.Contains(row[0], rowVariety) { if columnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], ":") indexName = removeParentheses(indexName) // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("InventoryAnalysisProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("InventoryAnalysisProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("InventoryAnalysisProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("InventoryAnalysisProcessor Process() : failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) continue } else { log.Printf("InventoryAnalysisProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName) } break } } } return result, nil } // PriceSpreadArbitrageProcessor // @Description: 价差套利处理器 type PriceSpreadArbitrageProcessor struct{} func (p *PriceSpreadArbitrageProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { fmt.Println("Processing processing profit...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("PriceSpreadArbitrageProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 var columnDate string rowVariety := keywords[0] rowBourse := keywords[1] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } day, err := utils.ConvertTimeFormatToYearMonthDay(format) if err != nil { return nil, err } columnDate = day // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnDate) { columnIdx = i break } } if columnIdx == -1 { log.Printf("PriceSpreadArbitrageProcessor Process() : Column '%s' not found in table", columnDate) continue } // 处理表格中的每一行 for _, row := range tableRows { if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == rowBourse { if columnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("PriceSpreadArbitrageProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("PriceSpreadArbitrageProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("PriceSpreadArbitrageProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需走更新逻辑,报告每天更新,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("PriceSpreadArbitrageProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate) } break } } } return result, nil } // DailyTransactionProcessor // @Description: 每日成交处理器 type DailyTransactionProcessor struct{} func (p *DailyTransactionProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { fmt.Println("Processing processing profit...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : keywords must contain at least 4 elements") } // 获取第一个表格 areaTableDataList := getNoHeadTableData(reportContent) if len(areaTableDataList) == 0 { return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : No table data found") } areaTableData := areaTableDataList[0] // 获取第二个表格 blocTableData := getTableData(reportContent, false) if blocTableData.Headers == nil { return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : No table data found") } logs.Info("SupplyDemandBalanceProcessor Process() : areaTableData data: %v, blocTableData data: %v", areaTableData, blocTableData) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 处理提取的表格数据 var result []models.BaseFromLyData areaHeaders := areaTableData.Headers areaRows := areaTableData.Rows // 第一个表格 // 拿到 行关键字和列关键字 columnArea := keywords[1] var rowAreaMonthDays []string rowWeek := "地区总计" monthDay, err := utils.GetWeekdaysInSameWeek(format) if err != nil { return nil, err } rowAreaMonthDays = monthDay // 查找目标列 areaColumnIdx := -1 for i, header := range areaHeaders { if strings.Contains(header, columnArea) { areaColumnIdx = i break } } if areaColumnIdx == -1 { log.Printf("DailyTransactionProcessor Process() : One Column '%s' not found in table", columnArea) } else if !strings.Contains(strings.Join(keywords[:len(keywords)-3], ":"), "主要集团") { for _, row := range areaRows { if len(row) >= len(areaHeaders) && row[0] == rowWeek { if areaColumnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-3], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需走更新逻辑,报告每周更新,一周出来一周中每天得数据,即使本周和上周数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[areaColumnIdx] isChinese := IsChinese(valueStr) if isChinese { continue } value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 var dealDate string if row[0] == rowWeek { dealDate = format } else { date, err := utils.ConvertToDate(row[0]) if err != nil { return nil, err } dealDate = date } baseFromLyData := models.BaseFromLyData{ DataTime: dealDate, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea) } break } else { for _, monthDay := range rowAreaMonthDays { if len(row) >= len(areaHeaders) && (row[0] == monthDay && !strings.Contains(strings.Join(keywords[:len(keywords)-3], ":"), "周度")) { if areaColumnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-3], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[areaColumnIdx] isChinese := IsChinese(valueStr) if isChinese { continue } value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 var dealDate string if row[0] == rowWeek { dealDate = format } else { date, err := utils.ConvertToDate(row[0]) if err != nil { return nil, err } dealDate = date } baseFromLyData := models.BaseFromLyData{ DataTime: dealDate, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea) } break } } } } } // 第二个表格 // 拿到 行关键字和列关键字 columnBloc := keywords[len(keywords)-3] rowBloc := keywords[1] blocHeaders := blocTableData.Headers blocRows := blocTableData.Rows // 查找目标列 blocColumnIdx := -1 for i, header := range blocHeaders { if strings.Contains(header, columnBloc) { blocColumnIdx = i break } } if blocColumnIdx == -1 { log.Printf("DailyTransactionProcessor Process() : Two Column '%s' not found in table", columnBloc) } else { // 处理表格中的每一行 for _, row := range blocRows { if len(row) >= len(blocHeaders) && strings.Contains(row[0], rowBloc) { if blocColumnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-3], ":") indexName = removeParentheses(indexName) // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) continue } valueStr := row[blocColumnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", rowBloc, columnBloc) } break } } } return result, nil } // PalmOilImportCostProcessor 棕榈油进口成本 type PalmOilImportCostProcessor struct{} func (p *PalmOilImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing palm oil import cost...") // 解析关键字 if len(keywords) < 5 { return []models.BaseFromLyData{}, fmt.Errorf("PalmOilImportCostProcessor Process() : keywords must contain at least 5 elements") } // 拿到 行关键字和列关键字 columnName := keywords[len(keywords)-4] rowVariety := keywords[0] rowPort := keywords[len(keywords)-3] indexNamePrefix := keywords[:1] indexNameSuffix := keywords[1:] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 解析日期并计算当前月份 targetMonths, err := utils.GetYearMonthNoYear(format) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("PalmOilImportCostProcessor Process() : Failed to parse date: %v", err) } fmt.Printf("Target Month: %s\n", targetMonths) // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnName) { columnIdx = i break } } if columnIdx == -1 { log.Printf("PalmOilImportCostProcessor Process() : Column '%s' not found in table", columnName) continue } // 处理表格中的每一行 //var flag bool = true var previousRowVariety string var previousRowPort string var previousRowFob string for rowIndex, row := range tableRows { if len(row) == len(tableHeaders) { previousRowVariety = row[0] previousRowPort = row[1] previousRowFob = row[2] } else if len(row) == len(tableHeaders)-1 { previousRowPort = row[0] previousRowFob = row[1] row = append([]string{previousRowVariety}, row...) tableRows[rowIndex] = row } else if len(row) == len(tableHeaders)-2 { // 这段这里不需要。。。先保留吧 previousRowFob = row[0] row = append([]string{previousRowVariety, previousRowPort}, row...) tableRows[rowIndex] = row } else if len(row) == len(tableHeaders)-3 { row = append([]string{previousRowVariety, previousRowPort, previousRowFob}, row...) tableRows[rowIndex] = row } for _, targetMonth := range targetMonths { if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort { if columnIdx < len(row) { // 指标名称 indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...) indexName := strings.Join(indexNameList[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("PalmOilImportCostProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("PalmOilImportCostProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("PalmOilImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("PalmOilImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort) } break } } } } return result, nil } // ImportEstimateProcessor // @Description: 进口预估处理器 type ImportEstimateProcessor struct{} func (p *ImportEstimateProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing import estimate...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("ImportEstimateProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 var columnDates []string rowVariety := keywords[1] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } columnDates, err = utils.GetNextThreeMonthsNoYear(format) if err != nil { return nil, err } monthsLastDay, err := utils.GetNextThreeMonthsLastDay(format) if err != nil { return nil, err } // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 for _, columnDate := range columnDates { columnIdx := -1 for i, tableHeader := range tableHeaders { if strings.Contains(tableHeader, columnDate) { columnIdx = i break } } if columnIdx == -1 { log.Printf("ImportEstimateProcessor Process() : Column '%s' not found in table", columnDate) continue } else { // 处理表格中的每一行 for _, row := range tableRows { if len(row) >= len(tableHeaders) && strings.Contains(row[0], rowVariety) && isNumber(row[columnIdx]) { if columnIdx < len(row) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], `:`) // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("ImportEstimateProcessor Process() : Failed to get index id: %v", err) continue } toNumber, err := utils.ConvertMonthToNumber(columnDate) if err != nil { logs.Error("ImportEstimateProcessor Process() : Failed to convert month to number: %v", err) continue } slice, err := utils.GetElementInSlice(monthsLastDay, toNumber) if err != nil { logs.Error("ImportEstimateProcessor Process() : Failed to get element in slice: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, slice) if err != nil { logs.Error("ImportEstimateProcessor Process() : Failed to get data by index id and date: %v", err) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } if len(indexData) > 0 { if indexData[0].Value != value { logs.Info("ImportEstimateProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) lyData := indexData[0] time, err := utils.StringToTime(lyData.ModifyTime) if err != nil { return nil, err } timeZero, err := utils.StringToTimeZero(format) if err != nil { return nil, err } if lyData.Value != value && time.Before(timeZero) { // 更新指标数据 err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value) if err != nil { return nil, err } // 同步更新指标库数据 lyEdbIndexData, err := models.GetLyEdbDataByIndexCodeAndExactDataTime(lyData.IndexCode, lyData.DataTime) if err != nil { return nil, err } if len(lyEdbIndexData) > 0 { err := models.UpdateLyEdbDataById(lyEdbIndexData[0].EdbInfoId, value) if err != nil { return nil, err } } } } continue } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: slice, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("ImportEstimateProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate) } break } } } } } return result, nil } // InternationalPriceProcessor // @Description: 国际价格处理器 type InternationalPriceProcessor struct{} func (p *InternationalPriceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing international price...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("InternationalPriceProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 columnName := keywords[1] rowVariety := keywords[0] indexNamePrefix := keywords[:1] indexNameSuffix := keywords[1:] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 解析日期并计算当前月份 和 后两月 yearMonths, err := utils.ConvertTimeFormatToYearMonth(format) if err != nil { return nil, err } fmt.Printf("Target yearMonth: %s\n", yearMonths) // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnName) { columnIdx = i break } } if columnIdx == -1 { log.Printf("InternationalPriceProcessor Process() : Column '%s' not found in table", columnName) continue } // 处理表格中的每一行 var previousRowVariety string for rowIndex, row := range tableRows { if len(row) == len(tableHeaders) { previousRowVariety = row[0] } else if len(row) == len(tableHeaders)-1 { row = append([]string{previousRowVariety}, row...) tableRows[rowIndex] = row } for _, targetMonth := range yearMonths { if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth { if columnIdx < len(row) { // 指标名称 indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...) indexName := strings.Join(indexNameList[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("InternationalPriceProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("InternationalPriceProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("InternationalPriceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需更新 指标展示本月和后两月的数据,报告每天更新,每天的值可能会改变,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("InternationalPriceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName) } break } } } } return result, nil } // CanadaStatisticsBureauProcessor // @Description: 加拿大统计局处理器 type CanadaStatisticsBureauProcessor struct{} func (p *CanadaStatisticsBureauProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { logs.Info("Processing Canada statistics bureau...") // 解析关键字 if len(keywords) < 4 { return []models.BaseFromLyData{}, fmt.Errorf("CanadaStatisticsBureauProcessor Process() : keywords must contain at least 4 elements") } // 拿到 行关键字和列关键字 columnDate := "本周" rowVariety := keywords[1] // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnDate) { columnIdx = i break } } if columnIdx == -1 { log.Printf("CanadaStatisticsBureauProcessor Process() : Column '%s' not found in table", columnDate) continue } // 处理表格中的每一行 for _, row := range tableRows { if len(row) >= len(tableHeaders) { if columnIdx < len(row) { if row[0] != rowVariety { continue } // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("CanadaStatisticsBureauProcessor Process() : Failed to get index id: %v", err) continue } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format) if err != nil { logs.Error("CanadaStatisticsBureauProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("CanadaStatisticsBureauProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) // 无需更新 指标展示本周的数据,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次 continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: format, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) } else { log.Printf("CanadaStatisticsBureauProcessor Process() : Column index out of range for row '%s'", columnDate) } break } } } return result, nil } // ImportExportAnalysisProcessor // @Description: 进出口分析处理器 type ImportExportAnalysisProcessor struct{} func (p *ImportExportAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) { fmt.Println("Processing processing profit...") // 解析关键字 if len(keywords) < 3 { return []models.BaseFromLyData{}, fmt.Errorf("ImportExportAnalysisProcessor Process() : keywords must contain at least 3 elements") } // 拿到 行关键字和列关键字 var columnDates []string // 提取所有表格数据 tableData := getNoHeadTableData(reportContent) // 提取日期信息 dateText, err := getDateInfo(ctx) if err != nil { return []models.BaseFromLyData{}, err } // 时间格式转换 format, err := utils.ConvertTimeFormat(dateText) if err != nil { return []models.BaseFromLyData{}, err } // 2025年1月可能才出2024年12月的数据,所以往前取一年 columnDates, err = utils.GetCurrentYearAndLastYear(format) if err != nil { return nil, err } // 处理提取的表格数据 var result []models.BaseFromLyData for _, data := range tableData { tableHeaders := data.Headers tableRows := data.Rows for _, columnDate := range columnDates { // 查找目标列 columnIdx := -1 for i, header := range tableHeaders { if strings.Contains(header, columnDate) { columnIdx = i break } } if columnIdx == -1 { log.Printf("ImportExportAnalysisProcessor Process() : Column '%s' not found in table", columnDate) continue } // 处理表格中的每一行 for _, row := range tableRows { if len(row) >= len(tableHeaders) { if columnIdx < len(row) && isNumber(row[columnIdx]) && isNumber(row[0]) { // 指标名称 indexName := strings.Join(keywords[:len(keywords)-2], ":") // 指标编码 indexCode := utils.GenerateIndexCode(lySourceName, indexName) // 指标id获取 indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1]) if err != nil { logs.Error("ImportExportAnalysisProcessor Process() : Failed to get index id: %v", err) continue } atoi, err := strconv.Atoi(row[0]) if err != nil { return nil, err } date := columnDate[:4] + "-" + fmt.Sprintf("%02d", atoi) lastDayOfMonth, err := utils.GetLastDayOfMonth(date) if err != nil { return nil, err } indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, lastDayOfMonth) if err != nil { logs.Error("ImportExportAnalysisProcessor Process() : Failed to get data by index id and date: %v", err) continue } if len(indexData) > 0 { logs.Info("ImportExportAnalysisProcessor Process() : Data already exists for index %d and date %s", indexId, dateText) continue } valueStr := row[columnIdx] value, err := strconv.ParseFloat(valueStr, 64) if err != nil { return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err) } // 创建并添加到结果列表 baseFromLyData := models.BaseFromLyData{ DataTime: lastDayOfMonth, Value: value, BaseFromLyIndexId: indexId, IndexCode: indexCode, } result = append(result, baseFromLyData) continue } else { log.Printf("ImportExportAnalysisProcessor Process() : Column index out of range for row '%s'", columnDate) } break } } } } return result, nil } // ExtractValueInParentheses 从字符串中提取括号中的值 func extractValueInParentheses(input string) (string, error) { re := regexp.MustCompile(`(([^)]+))`) matches := re.FindStringSubmatch(input) if len(matches) > 1 { return matches[1], nil } return "", fmt.Errorf("no value found in parentheses") } // 获取指标id,根据指标名称判断,没有插入指标生成返回 func getIndexId(indexCode string, indexName string, classifyId int, sourceName string, frequency string, unit string) (int, error) { if indexCode == "lysww" { return 0, fmt.Errorf("indexCode is error") } // 判断指标是否存在 var indexId int indexInfo, err := models.GetLyIndexByCode(indexCode) if err != nil { return indexId, err } if indexInfo == nil { // 新增指标 index, err := addLyIndex(classifyId, indexCode, indexName, frequency, unit) if err != nil { return 0, err } indexId = index } else { indexId = indexInfo.BaseFromLyIndexId } return indexId, nil } // 获取页面时间信息 func getDateInfo(ctx context.Context) (string, error) { var dateText string err := chromedp.Run(ctx, chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText), ) if err != nil { return "", fmt.Errorf("processing Process() : Failed to extract report date: %v", err) } logs.Info("Processing Process() : Report Extracted Date: %s", dateText) return dateText, nil } // 获取所有表格数据 获取表格中有thead标签的数据 func getAllTableData(reportContent string) []TableData { var tableData []TableData doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent)) if err != nil { log.Fatal(err) } // 选择 id 为 "a_content" 的 div doc.Find("#a_content").Each(func(index int, item *goquery.Selection) { item.Find("table").Each(func(index int, table *goquery.Selection) { var headers []string var rows [][]string // 提取表头 table.Find("thead th").Each(func(index int, th *goquery.Selection) { headers = append(headers, th.Text()) }) // 提取表格行数据 table.Find("tbody tr").Each(func(index int, row *goquery.Selection) { var rowData []string row.Find("td").Each(func(index int, td *goquery.Selection) { rowData = append(rowData, td.Text()) }) rows = append(rows, rowData) }) // 仅在表头存在时添加到结果中 if len(headers) > 0 { tableData = append(tableData, TableData{ Headers: headers, Rows: rows, }) } }) }) return tableData } // 获取无头表格数据 func getNoHeadTableData(reportContent string) []TableData { var tableData []TableData doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent)) if err != nil { log.Fatal(err) } // Find the div with id "a_content" doc.Find("#a_content").Each(func(index int, div *goquery.Selection) { // Find all tables within the div div.Find("table").Each(func(index int, table *goquery.Selection) { var headers []string var rows [][]string // Extract table headers if any table.Find("tr").Each(func(index int, tr *goquery.Selection) { var rowData []string tr.Find("td, th").Each(func(index int, cell *goquery.Selection) { cellText := cell.Text() rowData = append(rowData, cellText) }) if index == 0 && len(rowData) > 0 { // The first row is treated as the header row headers = rowData } else if len(rowData) > 0 { // Add the row data to the rows slice rows = append(rows, rowData) } }) // Only add table data if headers are present if len(headers) > 0 { tableData = append(tableData, TableData{ Headers: headers, Rows: rows, }) } }) }) return tableData } // 获取表格数据 获取id 为 a_content 的 div 中的第一个表格 左上角那个单元格会拼在第一个,会拼上列上的合并单元格 func getTableData(reportContent string, isFirst bool) TableData { doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent)) if err != nil { log.Fatal(err) } tableData := &TableData{} // 只提取 id 为 a_content 的 div 中的第一个表格 var firstTable *goquery.Selection if isFirst { firstTable = doc.Find("div#a_content table").First() } else { firstTable = doc.Find("div#a_content table").Last() } var combinedHeaders []string // 提取表头 firstTable.Find("tr").Each(func(i int, row *goquery.Selection) { if i == 0 { // 第一行处理合并单元格,保存到 combinedHeaders row.Find("td,th").Each(func(j int, cell *goquery.Selection) { if j == 0 { // 把左上角的“年度(10/9月)”放入 Headers 第一个元素 tableData.Headers = append(tableData.Headers, strings.TrimSpace(cell.Text())) } else { // 处理其他单元格 colspan, exists := cell.Attr("colspan") if exists { spanCount := 0 fmt.Sscanf(colspan, "%d", &spanCount) for k := 0; k < spanCount; k++ { combinedHeaders = append(combinedHeaders, strings.TrimSpace(cell.Text())) } } else { combinedHeaders = append(combinedHeaders, strings.TrimSpace(cell.Text())) } } }) } else if i == 1 { // 第二行处理具体标题,组合后保存到 Headers row.Find("td,th").Each(func(j int, cell *goquery.Selection) { if j < len(combinedHeaders) { fullHeader := combinedHeaders[j] + strings.TrimSpace(cell.Text()) tableData.Headers = append(tableData.Headers, fullHeader) } }) } else { // 处理数据行 var rowData []string row.Find("td").Each(func(j int, cell *goquery.Selection) { rowData = append(rowData, strings.TrimSpace(cell.Text())) }) if len(rowData) > 0 { tableData.Rows = append(tableData.Rows, rowData) } } }) return *tableData } // 获取采购装船表格数据 func getPurchaseShippingTableData(reportContent string) TableData { doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent)) if err != nil { log.Fatal(err) } var tableData TableData // 只提取 id 为 a_content 的 div 中的第一个表格 firstTable := doc.Find("div#a_content table").First() // 提取表头 var headers []string var subHeaders []string firstTable.Find("thead tr").Each(func(i int, row *goquery.Selection) { row.Find("th").Each(func(j int, cell *goquery.Selection) { headerText := strings.TrimSpace(cell.Text()) if i == 0 { // 处理第一行表头 colspan, exists := cell.Attr("colspan") if exists { spanCount := 0 fmt.Sscanf(colspan, "%d", &spanCount) for k := 0; k < spanCount; k++ { headers = append(headers, headerText) } } else { headers = append(headers, headerText) } } else if i == 1 { // 处理第二行表头 subHeaders = append(subHeaders, headerText) } }) }) // 合并第一行和第二行表头信息 if len(subHeaders) > 0 { for i := 0; i < len(subHeaders); i++ { // 从第四个单元格开始拼接 headers[3+i] = headers[3+i] + subHeaders[i] } } tableData.Headers = headers // 处理数据行 firstTable.Find("tbody tr").Each(func(i int, row *goquery.Selection) { var rowData []string row.Find("td").Each(func(j int, cell *goquery.Selection) { rowData = append(rowData, strings.TrimSpace(cell.Text())) }) if len(rowData) > 0 { tableData.Rows = append(tableData.Rows, rowData) } }) return tableData } // 判断字符串是否是数字 func isNumeric(value string) bool { // 正则表达式匹配整数和浮点数 re := regexp.MustCompile(`^[+-]?(\d+(\.\d*)?|\.\d+)$`) return re.MatchString(value) } // 只保留汉字 func extractChinese(text string) string { re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符 return re.ReplaceAllString(text, "") } // 去除括号中的内容 包含括号 () func removeParentheses(text string) string { re := regexp.MustCompile(`\([^)]*\)`) return re.ReplaceAllString(text, "") } // IsChinese 判断传入的是否是汉字 func IsChinese(str string) bool { for _, r := range str { if unicode.Is(unicode.Han, r) { return true } } return false } // 判断是否是数字 func isNumber(str string) bool { _, err := strconv.ParseFloat(str, 64) return err == nil }