浏览代码

粮油商务网数据对接-大豆

gmy 8 月之前
父节点
当前提交
d9f10ed44f
共有 6 个文件被更改,包括 814 次插入151 次删除
  1. 13 28
      cmd/commodity_liangyou.go
  2. 684 65
      cmd/processor_business_logic.go
  3. 5 1
      cmd/processor_factory.go
  4. 15 54
      static/liangyou.json
  5. 83 1
      utils/date_util.go
  6. 14 2
      utils/index_code_util.go

+ 13 - 28
cmd/commodity_liangyou.go

@@ -155,8 +155,9 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 		reportURLs := extractReportURLs(htmlContent, report)
 		allReportURLs = append(allReportURLs, reportURLs...)
 
+		break
 		// Check if next page button is disabled
-		var nextPageDisabled bool
+		/*var nextPageDisabled bool
 		err = chromedp.Run(ctx,
 			chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled),
 		)
@@ -176,7 +177,7 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 		)
 		if err != nil {
 			return err
-		}
+		}*/
 	}
 
 	logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLs)
@@ -251,38 +252,22 @@ func processReport(ctx context.Context, product string, category string, reportU
 			return err
 		}
 
-		// 指标名称
-		indexName := strings.Join(partialKeyword[:len(partialKeyword)-2], ":")
-		// 指标编码
-		indexCode := utils.GenerateIndexCode(sourceName, indexName)
-		// 判断指标是否存在
-		var indexId int
-		indexInfo, err := models.GetLyIndexByCode(indexCode)
-		if err != nil {
-			// 新增指标
-			index, err := addLyIndex(classify.BaseFromLyClassifyId, indexCode, indexName, partialKeyword[len(partialKeyword)-2], partialKeyword[len(partialKeyword)-1])
-			if err != nil {
-				return err
-			}
-			indexId = index
-		} else {
-			indexId = indexInfo.BaseFromLyIndexId
-		}
-
 		// Process the report content using the selected processor
-		baseFromLyData, err := processor.Process(ctx, product, reportContent, partialKeyword, indexId)
+		baseFromLyDataList, err := processor.Process(ctx, product, reportContent, partialKeyword, classify.BaseFromLyClassifyId)
 		if err != nil {
 			// 这块逻辑会导致,如果有一个指标处理失败,后续的指标也无法处理,目前出现失败的原因有 产品提供表格中的字段在页面找不到
 			return err
 		}
-
-		if baseFromLyData.DataTime != "" && baseFromLyData.Value != 0 {
-			baseFromLyData.IndexCode = indexCode
-			baseFromLyData.BaseFromLyIndexId = indexId
-			baseFromLyData.CreateTime = utils.GetCurrentTime()
-			baseFromLyData.ModifyTime = utils.GetCurrentTime()
-			lyIndexDataList = append(lyIndexDataList, baseFromLyData)
+		if len(baseFromLyDataList) > 0 {
+			for _, baseFromLyData := range baseFromLyDataList {
+				if baseFromLyData.DataTime != "" && baseFromLyData.Value != 0 {
+					baseFromLyData.CreateTime = utils.GetCurrentTime()
+					baseFromLyData.ModifyTime = utils.GetCurrentTime()
+					lyIndexDataList = append(lyIndexDataList, baseFromLyData)
+				}
+			}
 		}
+
 	}
 	// 新增指标数据
 	err = models.AddLyDataList(lyIndexDataList)

+ 684 - 65
cmd/processor_business_logic.go

@@ -3,7 +3,6 @@ package main
 
 import (
 	"context"
-	"encoding/json"
 	"eta/eta_crawler/models"
 	"eta/eta_crawler/utils"
 	"fmt"
@@ -20,122 +19,556 @@ const (
 	sourceName = "lysww" // 粮油商务网
 )
 
+// TableData 用于存储表格的数据
+type TableData struct {
+	Headers []string   `json:"headers"`
+	Rows    [][]string `json:"rows"`
+}
+
 // ImportCostProcessor
 // @Description: 进口成本处理器
 type ImportCostProcessor struct{}
 
-func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
+func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
 	fmt.Println("Processing import cost...")
-	// 实现具体的处理逻辑
-	return models.BaseFromLyData{}, nil
+
+	// 解析关键字
+	if len(keywords) < 5 {
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingImportCostProcessor Process() : keywords must contain at least 5 elements")
+	}
+
+	// 拿到 行关键字和列关键字
+	columnName := keywords[len(keywords)-4]
+	rowVariety := keywords[0]
+	rowPort := keywords[len(keywords)-3]
+	indexNamePrefix := keywords[:1]
+	indexNameSuffix := keywords[1:]
+
+	// 提取所有表格数据
+	tableData := getNoHeadTableData(reportContent)
+
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 解析日期并计算当前月份
+	targetMonths, err := utils.ParseDateAndMonth(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingImportCostProcessor Process() : Failed to parse date: %v", err)
+	}
+	fmt.Printf("Target Month: %s\n", targetMonths)
+
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	for _, data := range tableData {
+		tableHeaders := data.Headers
+		tableRows := data.Rows
+
+		// 查找目标列
+		columnIdx := -1
+		for i, header := range tableHeaders {
+			if strings.Contains(header, columnName) {
+				columnIdx = i
+				break
+			}
+		}
+		if columnIdx == -1 {
+			log.Printf("ProcessingImportCostProcessor Process() : Column '%s' not found in table", columnName)
+			continue
+		}
+
+		// 处理表格中的每一行
+		//var flag bool = true
+		var previousRowVariety string
+		var previousRowPort string
+		for rowIndex, row := range tableRows {
+			if len(row) == len(tableHeaders) {
+				previousRowVariety = row[0]
+				previousRowPort = row[1]
+			} else if len(row) == len(tableHeaders)-1 {
+				previousRowPort = row[0]
+				row = append([]string{previousRowVariety}, row...)
+				tableRows[rowIndex] = row
+			} else if len(row) == len(tableHeaders)-2 {
+				row = append([]string{previousRowVariety, previousRowPort}, row...)
+				tableRows[rowIndex] = row
+			}
+			for _, targetMonth := range targetMonths {
+				if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth && row[len(row)-1] == rowPort {
+					if columnIdx < len(row) {
+						// 指标名称
+						indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
+						indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
+						// 指标编码
+						indexCode := utils.GenerateIndexCode(sourceName, indexName)
+						// 指标id获取
+						indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+						if err != nil {
+							logs.Error("ProcessingImportCostProcessor Process() : Failed to get index id: %v", err)
+							continue
+						}
+
+						indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+						if err != nil {
+							logs.Error("ProcessingImportCostProcessor Process() : Failed to get data by index id and date: %v", err)
+							continue
+						}
+						if len(indexData) > 0 {
+							logs.Info("ProcessingImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+							continue
+						}
+
+						valueStr := row[columnIdx]
+						value, err := strconv.ParseFloat(valueStr, 64)
+						if err != nil {
+							return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
+						}
+						// 创建并添加到结果列表
+						baseFromLyData := models.BaseFromLyData{
+							DataTime:          format,
+							Value:             value,
+							BaseFromLyIndexId: indexId,
+							IndexCode:         indexCode,
+						}
+						result = append(result, baseFromLyData)
+					} else {
+						log.Printf("ProcessingImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort)
+					}
+					break
+				}
+			}
+
+		}
+
+	}
+
+	return result, nil
 }
 
 // ProcessingProfitProcessor
 // @Description: 加工利润处理器
 type ProcessingProfitProcessor struct{}
 
-func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
+func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
 	fmt.Println("Processing processing profit...")
-	// 实现具体的处理逻辑
-	return models.BaseFromLyData{}, nil
-}
+	// 解析关键字
+	if len(keywords) < 4 {
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingProfitProcessor Process() : keywords must contain at least 4 elements")
+	}
 
-// ProcessingReportProcessor
-// @Description: 加工报告处理器
-type ProcessingReportProcessor struct {
+	// 拿到 行关键字和列关键字
+	columnName := keywords[1]
+	rowVariety := keywords[0]
+	indexNamePrefix := keywords[:1]
+	indexNameSuffix := keywords[1:]
+
+	// 提取所有表格数据
+	tableData := getNoHeadTableData(reportContent)
+
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 解析日期并计算当前月份 和 后两月
+	yearMonths, err := utils.ConvertTimeFormatToYearMonth(format)
+	if err != nil {
+		return nil, err
+	}
+	fmt.Printf("Target yearMonth: %s\n", yearMonths)
+
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	for _, data := range tableData {
+		tableHeaders := data.Headers
+		tableRows := data.Rows
+
+		// 查找目标列
+		columnIdx := -1
+		for i, header := range tableHeaders {
+			if strings.Contains(columnName, header) {
+				columnIdx = i
+				break
+			}
+		}
+		if columnIdx == -1 {
+			log.Printf("ProcessingProfitProcessor Process() : Column '%s' not found in table", columnName)
+			continue
+		}
+
+		// 处理表格中的每一行
+		var previousRowVariety string
+		for rowIndex, row := range tableRows {
+			if len(row) == len(tableHeaders) {
+				previousRowVariety = row[0]
+			} else if len(row) == len(tableHeaders)-1 {
+				row = append([]string{previousRowVariety}, row...)
+				tableRows[rowIndex] = row
+			}
+
+			for _, targetMonth := range yearMonths {
+				if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth {
+					if columnIdx < len(row) {
+						// 指标名称
+						indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
+						indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
+						// 指标编码
+						indexCode := utils.GenerateIndexCode(sourceName, indexName)
+						// 指标id获取
+						indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+						if err != nil {
+							logs.Error("ProcessingProfitProcessor Process() : Failed to get index id: %v", err)
+							continue
+						}
+
+						indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+						if err != nil {
+							logs.Error("ProcessingProfitProcessor Process() : Failed to get data by index id and date: %v", err)
+							continue
+						}
+						if len(indexData) > 0 {
+							logs.Info("ProcessingProfitProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+							continue
+						}
+
+						valueStr := row[columnIdx]
+						value, err := strconv.ParseFloat(valueStr, 64)
+						if err != nil {
+							return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
+						}
+						// 创建并添加到结果列表
+						baseFromLyData := models.BaseFromLyData{
+							DataTime:          format,
+							Value:             value,
+							BaseFromLyIndexId: indexId,
+							IndexCode:         indexCode,
+						}
+						result = append(result, baseFromLyData)
+					} else {
+						log.Printf("ProcessingProfitProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+					}
+					break
+				}
+			}
+
+		}
+	}
+
+	return result, nil
 }
 
-// TableData 用于存储表格的数据
-type TableData struct {
-	Headers []string   `json:"headers"`
-	Rows    [][]string `json:"rows"`
+// ShippingCostProcessor
+// @Description: 船运费用处理器
+type ShippingCostProcessor struct{}
+
+func (p *ShippingCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	fmt.Println("Processing processing profit...")
+	// 解析关键字
+	if len(keywords) < 4 {
+		return []models.BaseFromLyData{}, fmt.Errorf("ShippingCostProcessor Process() : keywords must contain at least 5 elements")
+	}
+
+	// 拿到 行关键字和列关键字
+	columnName := keywords[len(keywords)-3]
+	rowVariety := keywords[0]
+	rowDestination := keywords[1]
+	rowShipType := keywords[2]
+
+	// 提取所有表格数据
+	tableData := getNoHeadTableData(reportContent)
+
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	for _, data := range tableData {
+		tableHeaders := data.Headers
+		tableRows := data.Rows
+
+		// 查找目标列
+		columnIdx := -1
+		for i, header := range tableHeaders {
+			if strings.Contains(header, columnName) {
+				columnIdx = i
+				break
+			}
+		}
+		if columnIdx == -1 {
+			log.Printf("ShippingCostProcessor Process() : Column '%s' not found in table", columnName)
+			continue
+		}
+
+		// 处理表格中的每一行
+		for rowIndex, row := range tableRows {
+			if len(row) == len(tableHeaders)-1 {
+				row = append([]string{rowVariety}, row...)
+				tableRows[rowIndex] = row
+				rowShipType, err = extractValueInParentheses(rowVariety)
+				if err != nil {
+					logs.Error("ShippingCostProcessor Process() : Failed to extract value in parentheses: %v", err)
+					continue
+				}
+
+			}
+			if len(row) >= len(tableHeaders) && row[0] == rowVariety && (row[1] == rowDestination || strings.Contains(row[0], row[1])) && row[2] == rowShipType {
+				if columnIdx < len(row) {
+					// 指标名称
+					indexName := strings.Join(keywords[:len(keywords)-3], `:`)
+					// 指标编码
+					indexCode := utils.GenerateIndexCode(sourceName, indexName)
+					// 指标id获取
+					indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+					if err != nil {
+						logs.Error("ShippingCostProcessor Process() : Failed to get index id: %v", err)
+						continue
+					}
+
+					indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+					if err != nil {
+						logs.Error("ShippingCostProcessor Process() : Failed to get data by index id and date: %v", err)
+						continue
+					}
+					if len(indexData) > 0 {
+						logs.Info("ShippingCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						continue
+					}
+
+					valueStr := row[columnIdx]
+					value, err := strconv.ParseFloat(valueStr, 64)
+					if err != nil {
+						return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
+					}
+					// 创建并添加到结果列表
+					baseFromLyData := models.BaseFromLyData{
+						DataTime:          format,
+						Value:             value,
+						BaseFromLyIndexId: indexId,
+						IndexCode:         indexCode,
+					}
+					result = append(result, baseFromLyData)
+				} else {
+					log.Printf("ShippingCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+				}
+				break
+			}
+		}
+	}
+
+	return result, nil
 }
 
-func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
+// SupplyDemandBalanceProcessor
+// @Description: 供需平衡处理器
+type SupplyDemandBalanceProcessor struct{}
+
+func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
 	logs.Info("Processing processing report...")
 	// 解析关键字
-	if len(keywords) < 3 {
-		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
+	if len(keywords) < 4 {
+		return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 4 elements")
 	}
 
 	// 拿到 行关键字和列关键字
-	columnName := keywords[0]
-	rowName := keywords[1]
+	/*columnName := keywords[1]
+	rowVariety := keywords[0]
+	indexNamePrefix := keywords[:1]
+	indexNameSuffix := keywords[1:]*/
 
 	// 提取所有表格数据
-	var tableData []TableData
+	tableData := getTableData(reportContent)
+	logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
 
-	doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
 	if err != nil {
-		log.Fatal(err)
+		return []models.BaseFromLyData{}, err
 	}
 
-	// 选择 id 为 "a_content" 的 div
-	doc.Find("#a_content").Each(func(index int, item *goquery.Selection) {
-		item.Find("table").Each(func(index int, table *goquery.Selection) {
-			var headers []string
-			var rows [][]string
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
 
-			// 提取表头
-			table.Find("thead th").Each(func(index int, th *goquery.Selection) {
-				headers = append(headers, th.Text())
-			})
+	currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format)
+	if err != nil {
+		return nil, err
+	}
 
-			// 提取表格行数据
-			table.Find("tbody tr").Each(func(index int, row *goquery.Selection) {
-				var rowData []string
-				row.Find("td").Each(func(index int, td *goquery.Selection) {
-					rowData = append(rowData, td.Text())
-				})
-				rows = append(rows, rowData)
-			})
+	month, err := utils.GetCurrentMonth(format)
+	if err != nil {
+		return nil, err
+	}
+	monthSuffix := "预估"
+	logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix)
 
-			// 仅在表头存在时添加到结果中
-			if len(headers) > 0 {
-				tableData = append(tableData, TableData{
-					Headers: headers,
-					Rows:    rows,
-				})
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	/*for _, data := range tableData {
+		//tableHeaders := data.Headers
+		tableRows := data.Rows
+
+		tableHeaders := tableRows[0]
+
+		// 查找目标列
+		columnIdx := -1
+		for i, header := range tableHeaders {
+			if strings.Contains(columnName, header) {
+				columnIdx = i
+				break
 			}
-		})
-	})
-	// 打印提取的数据以进行调试
-	dataJSON, _ := json.MarshalIndent(tableData, "", "  ")
-	fmt.Printf("Extracted Table Data: %s\n", dataJSON)
+		}
+		if columnIdx == -1 {
+			log.Printf("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
+			continue
+		}
+
+		// 处理表格中的每一行
+		var previousRowVariety string
+		for rowIndex, row := range tableRows {
+
+			if len(row) == len(tableHeaders) {
+				previousRowVariety = row[0]
+			} else if len(row) == len(tableHeaders)-1 {
+				row = append([]string{previousRowVariety}, row...)
+				tableRows[rowIndex] = row
+			}
+
+			for _, targetMonth := range yearMonths {
+				if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth {
+					if columnIdx < len(row) {
+						// 指标名称
+						indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
+						indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
+						// 指标编码
+						indexCode := utils.GenerateIndexCode(sourceName, indexName)
+						// 指标id获取
+						indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+						if err != nil {
+							logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
+							continue
+						}
+
+						indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+						if err != nil {
+							logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err)
+							continue
+						}
+						if len(indexData) > 0 {
+							logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+							continue
+						}
+
+						valueStr := row[columnIdx]
+						value, err := strconv.ParseFloat(valueStr, 64)
+						if err != nil {
+							return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
+						}
+						// 创建并添加到结果列表
+						baseFromLyData := models.BaseFromLyData{
+							DataTime:          format,
+							Value:             value,
+							BaseFromLyIndexId: indexId,
+							IndexCode:         indexCode,
+						}
+						result = append(result, baseFromLyData)
+					} else {
+						log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+					}
+					break
+				}
+			}
+
+		}
+	}*/
+	return result, nil
+}
+
+// ProcessingReportProcessor
+// @Description: 加工报告处理器
+type ProcessingReportProcessor struct {
+}
+
+func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	logs.Info("Processing processing report...")
+	// 解析关键字
+	if len(keywords) < 3 {
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
+	}
+
+	// 拿到 行关键字和列关键字
+	columnName := keywords[0]
+	rowName := keywords[1]
+
+	// 提取所有表格数据
+	tableData := getAllTableData(reportContent)
 
 	// 提取日期信息
-	var dateText string
-	err = chromedp.Run(ctx,
-		chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText),
-	)
+	dateText, err := getDateInfo(ctx)
 	if err != nil {
-		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to extract date: %v", err)
+		return []models.BaseFromLyData{}, err
+	}
+	indexName := strings.Join(keywords[:len(keywords)-2], ":")
+	// 指标编码
+	indexCode := utils.GenerateIndexCode(sourceName, indexName)
+	// 指标id获取
+	indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+	if err != nil {
+		return nil, err
 	}
-
-	logs.Info("ProcessingReportProcessor Process() : Extracted Date: %s", dateText)
 
 	// 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走
 	format, err := utils.ConvertTimeFormat(dateText)
 	if err != nil {
-		return models.BaseFromLyData{}, err
+		return []models.BaseFromLyData{}, err
 	}
 	indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
 	if err != nil {
-		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
 	}
 	if len(indexData) > 0 {
 		logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
-		return models.BaseFromLyData{}, nil
+		return []models.BaseFromLyData{}, nil
 	}
 
 	// 解析日期并计算当前周数
 	targetWeek, err := utils.ParseDateAndWeek(dateText)
 	if err != nil {
-		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err)
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err)
 	}
 
 	fmt.Printf("Target Week: %s\n", targetWeek)
 
+	var result []models.BaseFromLyData
 	// 处理提取的表格数据
 	for _, data := range tableData {
 		tableHeaders := data.Headers
@@ -180,14 +613,14 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 						value, err := strconv.ParseFloat(row[columnIdx], 64)
 						if err != nil {
 							logs.Error("ProcessingReportProcessor Process() : Error converting value to float64: %v", err)
-							return models.BaseFromLyData{}, err
+							return []models.BaseFromLyData{}, err
 						}
 						// 返回BaseFromLyData对象的数据
 						baseFromLyData := models.BaseFromLyData{
 							DataTime: dateText,
 							Value:    value,
 						}
-						return baseFromLyData, nil
+						result = append(result, baseFromLyData)
 					}
 				} else {
 					logs.Error("ProcessingReportProcessor Process() : Column index out of range")
@@ -196,7 +629,193 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 		}
 	}
 	// TODO 后面把这个日志打印,不做返回错误处理,一个指标找不到会导致后续指标无法处理
-	return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : No matching row found for '%s'", rowName)
+	return result, nil
+
+}
+
+// ExtractValueInParentheses 从字符串中提取括号中的值
+func extractValueInParentheses(input string) (string, error) {
+	re := regexp.MustCompile(`(([^)]+))`)
+	matches := re.FindStringSubmatch(input)
+
+	if len(matches) > 1 {
+		return matches[1], nil
+	}
+
+	return "", fmt.Errorf("no value found in parentheses")
+}
+
+// 获取指标id,根据指标名称判断,没有插入指标生成返回
+func getIndexId(indexCode string, indexName string, classifyId int, sourceName string, frequency string, unit string) (int, error) {
+	// 判断指标是否存在
+	var indexId int
+	indexInfo, err := models.GetLyIndexByCode(indexCode)
+	if err != nil {
+		// 新增指标
+		index, err := addLyIndex(classifyId, indexCode, indexName, frequency, unit)
+		if err != nil {
+			return 0, err
+		}
+		indexId = index
+	} else {
+		indexId = indexInfo.BaseFromLyIndexId
+	}
+	return indexId, nil
+}
+
+// 获取页面时间信息
+func getDateInfo(ctx context.Context) (string, error) {
+	var dateText string
+	err := chromedp.Run(ctx,
+		chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText),
+	)
+	if err != nil {
+		return "", fmt.Errorf("processing Process() : Failed to extract report date: %v", err)
+	}
+
+	logs.Info("Processing Process() : Report Extracted Date: %s", dateText)
+	return dateText, nil
+}
+
+// 获取所有表格数据 获取表格中有thead标签的数据
+func getAllTableData(reportContent string) []TableData {
+	var tableData []TableData
+
+	doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	// 选择 id 为 "a_content" 的 div
+	doc.Find("#a_content").Each(func(index int, item *goquery.Selection) {
+		item.Find("table").Each(func(index int, table *goquery.Selection) {
+			var headers []string
+			var rows [][]string
+
+			// 提取表头
+			table.Find("thead th").Each(func(index int, th *goquery.Selection) {
+				headers = append(headers, th.Text())
+			})
+
+			// 提取表格行数据
+			table.Find("tbody tr").Each(func(index int, row *goquery.Selection) {
+				var rowData []string
+				row.Find("td").Each(func(index int, td *goquery.Selection) {
+					rowData = append(rowData, td.Text())
+				})
+				rows = append(rows, rowData)
+			})
+
+			// 仅在表头存在时添加到结果中
+			if len(headers) > 0 {
+				tableData = append(tableData, TableData{
+					Headers: headers,
+					Rows:    rows,
+				})
+			}
+		})
+	})
+	return tableData
+}
+
+// 获取无头表格数据
+func getNoHeadTableData(reportContent string) []TableData {
+	var tableData []TableData
+
+	doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	// Find the div with id "a_content"
+	doc.Find("#a_content").Each(func(index int, div *goquery.Selection) {
+		// Find all tables within the div
+		div.Find("table").Each(func(index int, table *goquery.Selection) {
+			var headers []string
+			var rows [][]string
+
+			// Extract table headers if any
+			table.Find("tr").Each(func(index int, tr *goquery.Selection) {
+				var rowData []string
+				tr.Find("td, th").Each(func(index int, cell *goquery.Selection) {
+					cellText := cell.Text()
+					rowData = append(rowData, cellText)
+				})
+
+				if index == 0 && len(rowData) > 0 {
+					// The first row is treated as the header row
+					headers = rowData
+				} else if len(rowData) > 0 {
+					// Add the row data to the rows slice
+					rows = append(rows, rowData)
+				}
+			})
+
+			// Only add table data if headers are present
+			if len(headers) > 0 {
+				tableData = append(tableData, TableData{
+					Headers: headers,
+					Rows:    rows,
+				})
+			}
+		})
+	})
+
+	return tableData
+}
+
+// 获取表格数据 有tr td标签的数据 列转行存储==>Rows, 行转头存储==>Headers
+func getTableData(reportContent string) TableData {
+	doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
+	if err != nil {
+		fmt.Println("Error:", err)
+	}
+
+	var tableData TableData
+
+	// 查找 id 为 a_content 的 div
+	doc.Find("#a_content").Each(func(index int, divHtml *goquery.Selection) {
+		divHtml.Find("table").Each(func(index int, tableHtml *goquery.Selection) {
+			// 提取 Headers(列信息)
+			tableHtml.Find("tr").Each(func(rowIndex int, rowHtml *goquery.Selection) {
+				if rowIndex == 0 { // 处理第一行(包含年度信息)
+					var headerRow []string
+					rowHtml.Find("td").Each(func(colIndex int, colHtml *goquery.Selection) {
+						text := colHtml.Text()
+						if colIndex > 0 { // 忽略第一列“年度(10/9月)”
+							headerRow = append(headerRow, strings.TrimSpace(text))
+						}
+					})
+					if len(headerRow) > 0 {
+						tableData.Headers = append(tableData.Headers, headerRow...)
+					}
+				} else if rowIndex == 1 { // 处理第二行(列标题)
+					rowHtml.Find("td").Each(func(colIndex int, colHtml *goquery.Selection) {
+						text := colHtml.Text()
+						if colIndex > 0 { // 忽略第一列“年度(10/9月)”
+							tableData.Headers = append(tableData.Headers, strings.TrimSpace(text))
+						}
+					})
+				}
+			})
+
+			// 提取数据行
+			tableHtml.Find("tr").Each(func(rowIndex int, rowHtml *goquery.Selection) {
+				if rowIndex > 1 { // 从第三行开始
+					var row []string
+					rowHtml.Find("td").Each(func(colIndex int, colHtml *goquery.Selection) {
+						text := colHtml.Text()
+						row = append(row, strings.TrimSpace(text))
+					})
+					if len(row) > 0 {
+						tableData.Rows = append(tableData.Rows, row)
+					}
+				}
+			})
+		})
+	})
+
+	return tableData
 }
 
 // 判断字符串是否是数字

+ 5 - 1
cmd/processor_factory.go

@@ -8,7 +8,7 @@ import (
 )
 
 type ReportProcessor interface {
-	Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error)
+	Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error)
 }
 
 func GetProcessor(product string, category string) (ReportProcessor, error) {
@@ -20,6 +20,10 @@ func GetProcessor(product string, category string) (ReportProcessor, error) {
 			return &ProcessingProfitProcessor{}, nil
 		case "加工报告":
 			return &ProcessingReportProcessor{}, nil
+		case "船运费用":
+			return &ShippingCostProcessor{}, nil
+		case "供需平衡":
+			return &SupplyDemandBalanceProcessor{}, nil
 
 		default:
 			return nil, fmt.Errorf("unknown category: %s", category)

+ 15 - 54
static/liangyou.json

@@ -1,59 +1,20 @@
 {
   "大豆": {
-    "加工报告": {
-      "国内大豆周度加工量调查": [
-        "国内大豆加工量:河南省:万吨:周度",
-        "国内大豆加工量:湖北省:万吨:周度",
-        "国内大豆加工量:湖南省:万吨:周度",
-        "国内大豆加工量:黑龙江:万吨:周度",
-        "国内大豆加工量:吉林省:万吨:周度",
-        "国内大豆加工量:辽宁省:万吨:周度",
-        "国内大豆加工量:内蒙古:万吨:周度",
-        "国内大豆加工量:河北省:万吨:周度",
-        "国内大豆加工量:天津市:万吨:周度",
-        "国内大豆加工量:江西省:万吨:周度",
-        "国内大豆加工量:山东省:万吨:周度",
-        "国内大豆加工量:安徽省:万吨:周度",
-        "国内大豆加工量:江苏省:万吨:周度",
-        "国内大豆加工量:上海市:万吨:周度",
-        "国内大豆加工量:浙江省:万吨:周度",
-        "国内大豆加工量:福建省:万吨:周度",
-        "国内大豆加工量:广东省:万吨:周度",
-        "国内大豆加工量:广西省:万吨:周度",
-        "国内大豆加工量:海南省:万吨:周度",
-        "国内大豆加工量:陕西省:万吨:周度",
-        "国内大豆加工量:四川省:万吨:周度",
-        "国内大豆加工量:重庆市:万吨:周度",
-        "国内大豆加工量:云南省:万吨:周度",
-        "国内大豆加工量:合计:万吨:周度",
-        "国内大豆加工量:其中:国产:万吨:周度",
-        "国内大豆加工量:进口:万吨:周度",
-        "国内大豆开机率:河南省:%:周度",
-        "国内大豆开机率:湖北省:%:周度",
-        "国内大豆开机率:湖南省:%:周度",
-        "国内大豆开机率:黑龙江:%:周度",
-        "国内大豆开机率:吉林省:%:周度",
-        "国内大豆开机率:辽宁省:%:周度",
-        "国内大豆开机率:内蒙古:%:周度",
-        "国内大豆开机率:河北省:%:周度",
-        "国内大豆开机率:天津市:%:周度",
-        "国内大豆开机率:江西省:%:周度",
-        "国内大豆开机率:山东省:%:周度",
-        "国内大豆开机率:安徽省:%:周度",
-        "国内大豆开机率:江苏省:%:周度",
-        "国内大豆开机率:上海市:%:周度",
-        "国内大豆开机率:浙江省:%:周度",
-        "国内大豆开机率:福建省:%:周度",
-        "国内大豆开机率:广东省:%:周度",
-        "国内大豆开机率:广西省:%:周度",
-        "国内大豆开机率:海南省:%:周度",
-        "国内大豆开机率:陕西省:%:周度",
-        "国内大豆开机率:四川省:%:周度",
-        "国内大豆开机率:重庆市:%:周度",
-        "国内大豆开机率:云南省:%:周度",
-        "国内大豆开机率:合计:%:周度",
-        "国内大豆开机率:其中:国产:%:周度",
-        "国内大豆开机率:进口:%:周度"
+    "供需平衡": {
+      "年度中国大豆市场供需报告": [
+        "中国大豆市场供需:期初库存:万吨:月度",
+        "中国大豆市场供需:种植面积:万吨:月度",
+        "中国大豆市场供需:国内产量:万吨:月度",
+        "中国大豆市场供需:进口量:万吨:月度",
+        "中国大豆市场供需:总供应量:万吨:月度",
+        "中国大豆市场供需:压榨用量:万吨:月度",
+        "中国大豆市场供需:其中:国产大豆:万吨:月度",
+        "中国大豆市场供需:进口大豆:万吨:月度",
+        "中国大豆市场供需:出口量:万吨:月度",
+        "中国大豆市场供需:食用量:万吨:月度",
+        "中国大豆市场供需:种用及其他:万吨:月度",
+        "中国大豆市场供需:总需求量:万吨:月度",
+        "中国大豆市场供需:期末库存:万吨:月度"
       ]
     }
   }

+ 83 - 1
utils/date_util.go

@@ -7,7 +7,7 @@ import (
 	"time"
 )
 
-// ParseDateAndWeek parseDateAndWeek 解析日期并计算当前周数
+// ParseDateAndWeek parseDateAndWeek 解析日期并计算当前周数 ==> 24年31周
 func ParseDateAndWeek(dateText string) (string, error) {
 	// 解析日期
 	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(strings.Split(dateText, " ")[0]))
@@ -24,6 +24,37 @@ func ParseDateAndWeek(dateText string) (string, error) {
 	return targetWeek, nil
 }
 
+// ParseDateAndMonth 解析时间并计算当前月份 和 后两月 1月就是1月F,二月是二月G 规则:F=1月,G=2月,H=3月,J=4月,K=5月,M=6月,N=7月,Q=8月,U=9月,V=10月,X=11月,Z=12月
+func ParseDateAndMonth(dateText string) ([]string, error) {
+	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(strings.Split(dateText, " ")[0]))
+	if err != nil {
+		return nil, fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	months := make([]string, 3)
+	monthMap := map[string]string{
+		"01": "1月F",
+		"02": "2月G",
+		"03": "3月H",
+		"04": "4月J",
+		"05": "5月K",
+		"06": "6月M",
+		"07": "7月N",
+		"08": "8月Q",
+		"09": "9月X",
+		"10": "10月X",
+		"11": "11月X",
+		"12": "12月Z",
+	}
+
+	for i := 0; i < 3; i++ {
+		month := reportDate.AddDate(0, i, 0).Format("01")
+		months[i] = monthMap[month]
+	}
+
+	return months, nil
+}
+
 // GetCurrentTime 获取当前时间 格式为 2024-08-07 15:29:58
 func GetCurrentTime() string {
 	return time.Now().Format("2006-01-02 15:04:05")
@@ -39,3 +70,54 @@ func ConvertTimeFormat(dateText string) (string, error) {
 
 	return reportDate.Format("2006-01-02"), nil
 }
+
+// ConvertTimeFormatToYearMonth 转换时间格式 dateText 返回本月 和 后两月 格式为 2024-08-03 --> 2024年8月,2024-10-03 --> 2024年10月
+func ConvertTimeFormatToYearMonth(dateText string) ([]string, error) {
+
+	// 解析日期
+	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(dateText))
+	if err != nil {
+		return nil, fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	months := make([]string, 3)
+	for i := 0; i < 3; i++ {
+		month := reportDate.AddDate(0, i, 0).Format("2006年1月")
+		months[i] = month
+	}
+
+	return months, nil
+}
+
+// GetCurrentYearAndNextYear 获取当时所在得年度和明年得年度列表 2024-08-03 --> 2023/24年度, 2024/25年度
+func GetCurrentYearAndNextYear(dateText string) ([]string, error) {
+	// 解析日期
+	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(dateText))
+	if err != nil {
+		return nil, fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	years := make([]string, 2)
+	year := reportDate.Year()
+
+	// 当前年度
+	years[0] = fmt.Sprintf("%d/%02d年度", year-1, year%100)
+	// 下一年度
+	years[1] = fmt.Sprintf("%d/%02d年度", year, (year+1)%100)
+
+	return years, nil
+}
+
+// GetCurrentMonth 获取当前月份 2024-08-03 --> 8月
+func GetCurrentMonth(dateText string) (string, error) {
+	// 解析日期
+	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(dateText))
+	if err != nil {
+		return "", fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	// 计算月份
+	month := reportDate.Month()
+
+	return fmt.Sprintf("%d月", month), nil
+}

+ 14 - 2
utils/index_code_util.go

@@ -10,10 +10,16 @@ import (
 // GenerateIndexCode 指标编码规则:粮油商务网拼音首字母+指标名称拼音首字母,数字、字母保留,特殊字符拿掉
 // 例:美湾:9月U:国际大豆进口成本价:期货收盘:张家港 -----> lyswwmw9yUgjddjkcbjqhspzjg
 func GenerateIndexCode(sourceName string, indexName string) string {
+	// 获取拼音首字母
 	indexInitials := getFirstLetter(indexName)
 
-	// 拼接后过滤特殊字符,保留数字和字母
-	indexCode := sourceName + indexInitials
+	// 保留源名称中的字母和数字
+	sourceNameFiltered := filterAlphanumeric(indexName)
+
+	// 拼接源名称和首字母
+	indexCode := sourceName + sourceNameFiltered + indexInitials
+
+	// 保留字母和数字,去掉其他特殊字符
 	re := regexp.MustCompile(`[^a-zA-Z0-9]`)
 	indexCode = re.ReplaceAllString(indexCode, "")
 
@@ -35,3 +41,9 @@ func getFirstLetter(s string) string {
 	}
 	return firstLetters
 }
+
+// 过滤只保留字母和数字
+func filterAlphanumeric(s string) string {
+	re := regexp.MustCompile(`[^a-zA-Z0-9]`)
+	return re.ReplaceAllString(s, "")
+}