瀏覽代碼

粮油商务网数据对接-大豆

gmy 8 月之前
父節點
當前提交
cde734e446
共有 3 個文件被更改,包括 413 次插入44 次删除
  1. 346 33
      cmd/processor_business_logic.go
  2. 50 1
      cmd/processor_factory.go
  3. 17 10
      static/liangyou.json

+ 346 - 33
cmd/processor_business_logic.go

@@ -13,6 +13,7 @@ import (
 	"regexp"
 	"strconv"
 	"strings"
+	"unicode"
 )
 
 const (
@@ -53,19 +54,24 @@ func (p *ImportCostProcessor) Process(ctx context.Context, product string, repor
 		return []models.BaseFromLyData{}, err
 	}
 
-	// 解析日期并计算当前月份
-	targetMonths, err := utils.ParseDateAndMonth(dateText)
-	if err != nil {
-		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingImportCostProcessor Process() : Failed to parse date: %v", err)
-	}
-	fmt.Printf("Target Month: %s\n", targetMonths)
-
 	// 时间格式转换
 	format, err := utils.ConvertTimeFormat(dateText)
 	if err != nil {
 		return []models.BaseFromLyData{}, err
 	}
 
+	// 解析日期并计算当前月份
+	var targetMonths []string
+	if product == "油菜籽" {
+		targetMonths, err = utils.ParseDateAndMonthColzaOil(format)
+	} else {
+		targetMonths, err = utils.ParseDateAndMonth(dateText)
+	}
+	if err != nil {
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingImportCostProcessor Process() : Failed to parse date: %v", err)
+	}
+	fmt.Printf("Target Month: %s\n", targetMonths)
+
 	// 处理提取的表格数据
 	var result []models.BaseFromLyData
 
@@ -103,7 +109,7 @@ func (p *ImportCostProcessor) Process(ctx context.Context, product string, repor
 				tableRows[rowIndex] = row
 			}
 			for _, targetMonth := range targetMonths {
-				if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth && row[len(row)-1] == rowPort {
+				if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort {
 					if columnIdx < len(row) {
 						// 指标名称
 						indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
@@ -402,7 +408,7 @@ func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product stri
 	rowVariety := keywords[1]
 
 	// 提取所有表格数据
-	tableData := getTableData(reportContent)
+	tableData := getTableData(reportContent, true)
 	logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
 
 	// 提取日期信息
@@ -761,17 +767,23 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 type InventoryAnalysisProcessor struct{}
 
 func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	// https://www.fao.com.cn/art/yg1IKj9FpPEIDv2LefnPhQ==.htm
+	logs.Info("Processing inventory analysis...")
+
 	// 解析关键字
 	if len(keywords) < 4 {
 		return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 4 elements")
 	}
 
 	// 拿到 行关键字和列关键字
-	var columnName string
+	columnName := keywords[0]
 	rowVariety := keywords[1]
 
+	columnSuffix := "本周"
+	columnName = columnName + columnSuffix
+
 	// 提取所有表格数据
-	tableData := getTableData(reportContent)
+	tableData := getTableData(reportContent, true)
 	logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
 
 	// 提取日期信息
@@ -786,42 +798,132 @@ func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string
 		return []models.BaseFromLyData{}, err
 	}
 
-	currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format)
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	headers := tableData.Headers
+	rows := tableData.Rows
+
+	// 查找目标列
+	columnIdx := -1
+	for i, header := range headers {
+		header := removeParentheses(header)
+		if strings.Contains(columnName, header) {
+			columnIdx = i
+			break
+		}
+	}
+	if columnIdx == -1 {
+		logs.Error("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
+	} else {
+		// 处理表格中的每一行
+		for _, row := range rows {
+			if len(row) >= len(headers) && strings.Contains(row[0], rowVariety) {
+				if columnIdx < len(row) {
+					// 指标名称
+					indexName := strings.Join(keywords[:len(keywords)-2], ":")
+					indexName = removeParentheses(indexName)
+					// 指标编码
+					indexCode := utils.GenerateIndexCode(sourceName, indexName)
+					// 指标id获取
+					indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
+						continue
+					}
+
+					indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err)
+						continue
+					}
+					if len(indexData) > 0 {
+						logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						continue
+					}
+
+					valueStr := row[columnIdx]
+					value, err := strconv.ParseFloat(valueStr, 64)
+					if err != nil {
+						return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
+					}
+					// 创建并添加到结果列表
+					baseFromLyData := models.BaseFromLyData{
+						DataTime:          format,
+						Value:             value,
+						BaseFromLyIndexId: indexId,
+						IndexCode:         indexCode,
+					}
+					result = append(result, baseFromLyData)
+				} else {
+					log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+				}
+				break
+			}
+		}
+	}
+
+	return result, nil
+}
+
+// PriceSpreadArbitrageProcessor
+// @Description: 价差套利处理器
+type PriceSpreadArbitrageProcessor struct{}
+
+func (p *PriceSpreadArbitrageProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	fmt.Println("Processing processing profit...")
+	// 解析关键字
+	if len(keywords) < 4 {
+		return []models.BaseFromLyData{}, fmt.Errorf("ProcessingProfitProcessor Process() : keywords must contain at least 4 elements")
+	}
+
+	// 拿到 行关键字和列关键字
+	var columnDate string
+	rowVariety := keywords[0]
+	rowBourse := keywords[1]
+	// 提取所有表格数据
+	tableData := getNoHeadTableData(reportContent)
+
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
 	if err != nil {
-		return nil, err
+		return []models.BaseFromLyData{}, err
 	}
 
-	month, err := utils.GetCurrentMonth(format)
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+	day, err := utils.ConvertTimeFormatToYearMonthDay(format)
 	if err != nil {
 		return nil, err
 	}
-	monthSuffix := "预估"
-	logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix)
+	columnDate = day
 
 	// 处理提取的表格数据
 	var result []models.BaseFromLyData
 
-	headers := tableData.Headers
-	rows := tableData.Rows
-
-	for _, year := range currentYearAndNextYear {
-		columnName = year + month + monthSuffix
+	for _, data := range tableData {
+		tableHeaders := data.Headers
+		tableRows := data.Rows
 
 		// 查找目标列
 		columnIdx := -1
-		for i, header := range headers {
-			if strings.Contains(columnName, header) {
+		for i, header := range tableHeaders {
+			if strings.Contains(header, columnDate) {
 				columnIdx = i
 				break
 			}
 		}
 		if columnIdx == -1 {
-			log.Printf("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
+			log.Printf("ProcessingProfitProcessor Process() : Column '%s' not found in table", columnDate)
 			continue
 		}
+
 		// 处理表格中的每一行
-		for _, row := range rows {
-			if len(row) >= len(headers) && row[0] == rowVariety {
+		for _, row := range tableRows {
+			if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == rowBourse {
 				if columnIdx < len(row) {
 					// 指标名称
 					indexName := strings.Join(keywords[:len(keywords)-2], ":")
@@ -829,6 +931,195 @@ func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string
 					indexCode := utils.GenerateIndexCode(sourceName, indexName)
 					// 指标id获取
 					indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+					if err != nil {
+						logs.Error("ProcessingProfitProcessor Process() : Failed to get index id: %v", err)
+						continue
+					}
+
+					indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+					if err != nil {
+						logs.Error("ProcessingProfitProcessor Process() : Failed to get data by index id and date: %v", err)
+						continue
+					}
+					if len(indexData) > 0 {
+						logs.Info("ProcessingProfitProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						continue
+					}
+
+					valueStr := row[columnIdx]
+					value, err := strconv.ParseFloat(valueStr, 64)
+					if err != nil {
+						return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
+					}
+					// 创建并添加到结果列表
+					baseFromLyData := models.BaseFromLyData{
+						DataTime:          format,
+						Value:             value,
+						BaseFromLyIndexId: indexId,
+						IndexCode:         indexCode,
+					}
+					result = append(result, baseFromLyData)
+				} else {
+					log.Printf("ProcessingProfitProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate)
+				}
+				break
+			}
+
+		}
+	}
+
+	return result, nil
+}
+
+// DailyTransactionProcessor
+// @Description: 每日成交处理器
+type DailyTransactionProcessor struct{}
+
+func (p *DailyTransactionProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	fmt.Println("Processing processing profit...")
+	// 解析关键字
+	if len(keywords) < 4 {
+		return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : keywords must contain at least 4 elements")
+	}
+
+	// 获取第一个表格
+	areaTableData := getNoHeadTableData(reportContent)[0]
+	// 获取第二个表格
+	blocTableData := getTableData(reportContent, false)
+	logs.Info("SupplyDemandBalanceProcessor Process() : areaTableData data: %v, blocTableData data: %v", areaTableData, blocTableData)
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	areaHeaders := areaTableData.Headers
+	areaRows := areaTableData.Rows
+
+	// 第一个表格
+	// 拿到 行关键字和列关键字
+	columnArea := keywords[1]
+	var rowAreaMonthDays []string
+	rowWeek := "均价"
+
+	monthDay, err := utils.GetWeekdaysInSameWeek(format)
+	if err != nil {
+		return nil, err
+	}
+	rowAreaMonthDays = monthDay
+
+	// 查找目标列
+	areaColumnIdx := -1
+	for i, header := range areaHeaders {
+		if strings.Contains(header, columnArea) {
+			areaColumnIdx = i
+			break
+		}
+	}
+	if areaColumnIdx == -1 {
+		log.Printf("DailyTransactionProcessor Process() : One Column '%s' not found in table", columnArea)
+	} else {
+		for _, row := range areaRows {
+			for _, monthDay := range rowAreaMonthDays {
+				if len(row) >= len(areaHeaders) && (row[0] == monthDay || row[0] == rowWeek) {
+					if areaColumnIdx < len(row) {
+						// 指标名称
+						indexName := strings.Join(keywords[:len(keywords)-3], ":")
+						// 指标编码
+						indexCode := utils.GenerateIndexCode(sourceName, indexName)
+						// 指标id获取
+						indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+						if err != nil {
+							logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
+							continue
+						}
+
+						indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+						if err != nil {
+							logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
+							continue
+						}
+						if len(indexData) > 0 {
+							logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+							continue
+						}
+
+						valueStr := row[areaColumnIdx]
+						isChinese := IsChinese(valueStr)
+						if isChinese {
+							continue
+						}
+						value, err := strconv.ParseFloat(valueStr, 64)
+						if err != nil {
+							return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
+						}
+						// 创建并添加到结果列表
+						var dealDate string
+						if row[0] == rowWeek {
+							dealDate = format
+						} else {
+							date, err := utils.ConvertToDate(row[0])
+							if err != nil {
+								return nil, err
+							}
+							dealDate = date
+						}
+						baseFromLyData := models.BaseFromLyData{
+							DataTime:          dealDate,
+							Value:             value,
+							BaseFromLyIndexId: indexId,
+							IndexCode:         indexCode,
+						}
+						result = append(result, baseFromLyData)
+					} else {
+						log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea)
+					}
+					break
+				}
+			}
+		}
+	}
+
+	// 第二个表格
+	// 拿到 行关键字和列关键字
+	columnBloc := keywords[len(keywords)-3]
+	rowBloc := keywords[1]
+
+	blocHeaders := blocTableData.Headers
+	blocRows := blocTableData.Rows
+
+	// 查找目标列
+	blocColumnIdx := -1
+	for i, header := range blocHeaders {
+		if strings.Contains(header, columnBloc) {
+			blocColumnIdx = i
+			break
+		}
+	}
+
+	if blocColumnIdx == -1 {
+		log.Printf("DailyTransactionProcessor Process() : Two Column '%s' not found in table", columnBloc)
+	} else {
+		// 处理表格中的每一行
+		for _, row := range blocRows {
+			if len(row) >= len(blocHeaders) && strings.Contains(row[0], rowBloc) {
+				if blocColumnIdx < len(row) {
+					// 指标名称
+					indexName := strings.Join(keywords[:len(keywords)-3], ":")
+					indexName = removeParentheses(indexName)
+					// 指标编码
+					indexCode := utils.GenerateIndexCode(sourceName, indexName)
+					// 指标id获取
+					indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
 					if err != nil {
 						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
 						continue
@@ -844,7 +1135,7 @@ func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string
 						continue
 					}
 
-					valueStr := row[columnIdx]
+					valueStr := row[blocColumnIdx]
 					value, err := strconv.ParseFloat(valueStr, 64)
 					if err != nil {
 						return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
@@ -858,12 +1149,13 @@ func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string
 					}
 					result = append(result, baseFromLyData)
 				} else {
-					log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+					log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowBloc, columnBloc)
 				}
 				break
 			}
 		}
 	}
+
 	return result, nil
 }
 
@@ -998,8 +1290,8 @@ func getNoHeadTableData(reportContent string) []TableData {
 	return tableData
 }
 
-// 获取表格数据 获取id 为 a_content 的 div 中的第一个表格 会拼上列上的合并单元格
-func getTableData(reportContent string) TableData {
+// 获取表格数据 获取id 为 a_content 的 div 中的第一个表格 左上角那个单元格会拼在第一个,会拼上列上的合并单元格
+func getTableData(reportContent string, isFirst bool) TableData {
 	doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
 	if err != nil {
 		log.Fatal(err)
@@ -1008,7 +1300,12 @@ func getTableData(reportContent string) TableData {
 	tableData := &TableData{}
 
 	// 只提取 id 为 a_content 的 div 中的第一个表格
-	firstTable := doc.Find("div#a_content table").First()
+	var firstTable *goquery.Selection
+	if isFirst {
+		firstTable = doc.Find("div#a_content table").First()
+	} else {
+		firstTable = doc.Find("div#a_content table").Last()
+	}
 
 	var combinedHeaders []string
 
@@ -1016,7 +1313,7 @@ func getTableData(reportContent string) TableData {
 	firstTable.Find("tr").Each(func(i int, row *goquery.Selection) {
 		if i == 0 {
 			// 第一行处理合并单元格,保存到 combinedHeaders
-			row.Find("td").Each(func(j int, cell *goquery.Selection) {
+			row.Find("td,th").Each(func(j int, cell *goquery.Selection) {
 				if j == 0 {
 					// 把左上角的“年度(10/9月)”放入 Headers 第一个元素
 					tableData.Headers = append(tableData.Headers, strings.TrimSpace(cell.Text()))
@@ -1036,7 +1333,7 @@ func getTableData(reportContent string) TableData {
 			})
 		} else if i == 1 {
 			// 第二行处理具体标题,组合后保存到 Headers
-			row.Find("").Each(func(j int, cell *goquery.Selection) {
+			row.Find("td,th").Each(func(j int, cell *goquery.Selection) {
 				if j < len(combinedHeaders) {
 					fullHeader := combinedHeaders[j] + strings.TrimSpace(cell.Text())
 					tableData.Headers = append(tableData.Headers, fullHeader)
@@ -1149,3 +1446,19 @@ func extractChinese(text string) string {
 	re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符
 	return re.ReplaceAllString(text, "")
 }
+
+// 去除括号中的内容 包含括号 ()
+func removeParentheses(text string) string {
+	re := regexp.MustCompile(`\([^)]*\)`)
+	return re.ReplaceAllString(text, "")
+}
+
+// IsChinese 判断传入的是否是汉字
+func IsChinese(str string) bool {
+	for _, r := range str {
+		if unicode.Is(unicode.Han, r) {
+			return true
+		}
+	}
+	return false
+}

+ 50 - 1
cmd/processor_factory.go

@@ -28,7 +28,56 @@ func GetProcessor(product string, category string) (ReportProcessor, error) {
 			return &PurchaseShippingProcessor{}, nil
 		case "库存分析":
 			return &InventoryAnalysisProcessor{}, nil
-
+		default:
+			return nil, fmt.Errorf("unknown category: %s", category)
+		}
+	} else if product == "豆粕" {
+		switch category {
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
+		default:
+			return nil, fmt.Errorf("unknown category: %s", category)
+		}
+	} else if product == "大豆油" {
+		switch category {
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
+		case "价差套利":
+			return &PriceSpreadArbitrageProcessor{}, nil
+		case "每日成交":
+			return &DailyTransactionProcessor{}, nil
+		default:
+			return nil, fmt.Errorf("unknown category: %s", category)
+		}
+	} else if product == "棕榈油" {
+		switch category {
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
+		case "每日成交":
+			return &DailyTransactionProcessor{}, nil
+		default:
+			return nil, fmt.Errorf("unknown category: %s", category)
+		}
+	} else if product == "油菜籽" {
+		switch category {
+		case "进口成本":
+			return &ImportCostProcessor{}, nil
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
+		default:
+			return nil, fmt.Errorf("unknown category: %s", category)
+		}
+	} else if product == "菜粕" {
+		switch category {
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
+		default:
+			return nil, fmt.Errorf("unknown category: %s", category)
+		}
+	} else if product == "菜籽油" {
+		switch category {
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
 		default:
 			return nil, fmt.Errorf("unknown category: %s", category)
 		}

+ 17 - 10
static/liangyou.json

@@ -1,14 +1,21 @@
 {
-  "大豆": {
-    "采购装船": {
-      "中国大豆采购进度周统计": [
-        "中国大豆计划采购量:万吨:周度",
-        "中国大豆已采购量:美国:万吨:周度",
-        "中国大豆已采购量:巴西:万吨:周度",
-        "中国大豆已采购量:阿根廷/乌拉圭:万吨:周度",
-        "中国大豆已采购量:小计:万吨:周度",
-        "中国大豆未采购量:万吨:周度",
-        "中国大豆采购进度:%:周度"
+  "菜粕": {
+    "库存分析": {
+      "全国油厂进口压榨菜粕库存与合同统计周报": [
+        "全国油厂进口压榨菜粕库存量:广西地区:万吨:周度",
+        "全国油厂进口压榨菜粕库存量:广东地区:万吨:周度",
+        "全国油厂进口压榨菜粕库存量:福建地区:万吨:周度",
+        "全国油厂进口压榨菜粕库存量:江苏地区:万吨:周度",
+        "全国油厂进口压榨菜粕库存量:辽宁地区:万吨:周度",
+        "全国油厂进口压榨菜粕库存量:其他地区:万吨:周度",
+        "全国油厂进口压榨菜粕库存量:全国合计:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:广西地区:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:广东地区:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:福建地区:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:江苏地区:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:辽宁地区:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:其他地区:万吨:周度",
+        "全国油厂进口压榨菜粕合同量:全国合计:万吨:周度"
       ]
     }
   }