Browse Source

粮油商务网数据对接-大豆

gmy 9 tháng trước cách đây
mục cha
commit
29b140dadd

+ 0 - 1
cmd/commodity_liangyou.go

@@ -255,7 +255,6 @@ func processReport(ctx context.Context, product string, category string, reportU
 		// Process the report content using the selected processor
 		baseFromLyDataList, err := processor.Process(ctx, product, reportContent, partialKeyword, classify.BaseFromLyClassifyId)
 		if err != nil {
-			// 这块逻辑会导致,如果有一个指标处理失败,后续的指标也无法处理,目前出现失败的原因有 产品提供表格中的字段在页面找不到
 			return err
 		}
 		if len(baseFromLyDataList) > 0 {

+ 337 - 3
cmd/processor_business_logic.go

@@ -390,6 +390,7 @@ func (p *ShippingCostProcessor) Process(ctx context.Context, product string, rep
 type SupplyDemandBalanceProcessor struct{}
 
 func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	// https://www.fao.com.cn/art/gG7gKTCNDHLJNsq9QRYjoQ==.htm
 	logs.Info("Processing processing report...")
 	// 解析关键字
 	if len(keywords) < 4 {
@@ -435,7 +436,145 @@ func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product stri
 	rows := tableData.Rows
 
 	for _, year := range currentYearAndNextYear {
-		columnName = year + monthSuffix
+		columnName = year + month + monthSuffix
+		isCurrentYear, err := utils.IsCurrentYear(year)
+		if err != nil {
+			logs.Error("SupplyDemandBalanceProcessor Process() : Failed to determine if year is current year: %v", err)
+			continue
+		}
+		if !isCurrentYear {
+			format, err = utils.GetNextYearLastDay(format)
+			if err != nil {
+				logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get next year last day: %v", err)
+				continue
+			}
+		}
+
+		// 查找目标列
+		columnIdx := -1
+		for i, header := range headers {
+			if strings.Contains(columnName, header) {
+				columnIdx = i
+				break
+			}
+		}
+		if columnIdx == -1 {
+			logs.Error("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
+			continue
+		}
+		// 处理表格中的每一行
+		for _, row := range rows {
+			if len(row) >= len(headers) && row[0] == rowVariety {
+				if columnIdx < len(row) {
+					// 指标名称
+					indexName := strings.Join(keywords[:len(keywords)-2], ":")
+					// 指标编码
+					indexCode := utils.GenerateIndexCode(sourceName, indexName)
+					// 指标id获取
+					indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
+						continue
+					}
+					valueStr := row[columnIdx]
+					value, err := strconv.ParseFloat(valueStr, 64)
+					if err != nil {
+						return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
+					}
+					yearMonth, err := utils.GetYearMonth(format)
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get year month: %v", err)
+						continue
+					}
+					indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, yearMonth)
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err)
+						continue
+					}
+					if len(indexData) > 0 {
+						logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						// 存在走更新逻辑 主要更新今年在去年的预估值
+						indexData := indexData[0]
+						if indexData.Value != value {
+							err := models.UpdateLyDataById(indexData.BaseFromLyDataId, value)
+							if err != nil {
+								logs.Error("SupplyDemandBalanceProcessor Process() : Failed to update data: %v", err)
+								continue
+							}
+						}
+						continue
+					}
+
+					// 创建并添加到结果列表
+					baseFromLyData := models.BaseFromLyData{
+						DataTime:          format,
+						Value:             value,
+						BaseFromLyIndexId: indexId,
+						IndexCode:         indexCode,
+					}
+					result = append(result, baseFromLyData)
+				} else {
+					log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+				}
+				break
+			}
+		}
+	}
+	return result, nil
+}
+
+// PurchaseShippingProcessor
+// @Description: 采购装船处理器
+type PurchaseShippingProcessor struct{}
+
+func (p *PurchaseShippingProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	logs.Info("Processing purchase shipping...")
+	// TODO 卡住了
+	// 解析关键字
+	if len(keywords) < 3 {
+		return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 3 elements")
+	}
+
+	// 拿到 行关键字和列关键字
+	var columnName string
+	rowVariety := keywords[1]
+
+	// 提取所有表格数据
+	tableData := getPurchaseShippingTableData(reportContent)
+	logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
+
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format)
+	if err != nil {
+		return nil, err
+	}
+
+	month, err := utils.GetCurrentMonth(format)
+	if err != nil {
+		return nil, err
+	}
+	monthSuffix := "预估"
+	logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix)
+
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	headers := tableData.Headers
+	rows := tableData.Rows
+
+	for _, year := range currentYearAndNextYear {
+		columnName = year + month + monthSuffix
 
 		// 查找目标列
 		columnIdx := -1
@@ -617,6 +756,117 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 
 }
 
+// InventoryAnalysisProcessor
+// @Description: 库存分析处理器
+type InventoryAnalysisProcessor struct{}
+
+func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
+	// 解析关键字
+	if len(keywords) < 4 {
+		return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 4 elements")
+	}
+
+	// 拿到 行关键字和列关键字
+	var columnName string
+	rowVariety := keywords[1]
+
+	// 提取所有表格数据
+	tableData := getTableData(reportContent)
+	logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
+
+	// 提取日期信息
+	dateText, err := getDateInfo(ctx)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	// 时间格式转换
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return []models.BaseFromLyData{}, err
+	}
+
+	currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format)
+	if err != nil {
+		return nil, err
+	}
+
+	month, err := utils.GetCurrentMonth(format)
+	if err != nil {
+		return nil, err
+	}
+	monthSuffix := "预估"
+	logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix)
+
+	// 处理提取的表格数据
+	var result []models.BaseFromLyData
+
+	headers := tableData.Headers
+	rows := tableData.Rows
+
+	for _, year := range currentYearAndNextYear {
+		columnName = year + month + monthSuffix
+
+		// 查找目标列
+		columnIdx := -1
+		for i, header := range headers {
+			if strings.Contains(columnName, header) {
+				columnIdx = i
+				break
+			}
+		}
+		if columnIdx == -1 {
+			log.Printf("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
+			continue
+		}
+		// 处理表格中的每一行
+		for _, row := range rows {
+			if len(row) >= len(headers) && row[0] == rowVariety {
+				if columnIdx < len(row) {
+					// 指标名称
+					indexName := strings.Join(keywords[:len(keywords)-2], ":")
+					// 指标编码
+					indexCode := utils.GenerateIndexCode(sourceName, indexName)
+					// 指标id获取
+					indexId, err := getIndexId(indexCode, indexName, classifyId, sourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
+						continue
+					}
+
+					indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+					if err != nil {
+						logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err)
+						continue
+					}
+					if len(indexData) > 0 {
+						logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						continue
+					}
+
+					valueStr := row[columnIdx]
+					value, err := strconv.ParseFloat(valueStr, 64)
+					if err != nil {
+						return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
+					}
+					// 创建并添加到结果列表
+					baseFromLyData := models.BaseFromLyData{
+						DataTime:          format,
+						Value:             value,
+						BaseFromLyIndexId: indexId,
+						IndexCode:         indexCode,
+					}
+					result = append(result, baseFromLyData)
+				} else {
+					log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
+				}
+				break
+			}
+		}
+	}
+	return result, nil
+}
+
 // ExtractValueInParentheses 从字符串中提取括号中的值
 func extractValueInParentheses(input string) (string, error) {
 	re := regexp.MustCompile(`(([^)]+))`)
@@ -767,7 +1017,11 @@ func getTableData(reportContent string) TableData {
 		if i == 0 {
 			// 第一行处理合并单元格,保存到 combinedHeaders
 			row.Find("td").Each(func(j int, cell *goquery.Selection) {
-				if j > 0 { // 跳过左上角的“年度(10/9月)”
+				if j == 0 {
+					// 把左上角的“年度(10/9月)”放入 Headers 第一个元素
+					tableData.Headers = append(tableData.Headers, strings.TrimSpace(cell.Text()))
+				} else {
+					// 处理其他单元格
 					colspan, exists := cell.Attr("colspan")
 					if exists {
 						spanCount := 0
@@ -782,7 +1036,7 @@ func getTableData(reportContent string) TableData {
 			})
 		} else if i == 1 {
 			// 第二行处理具体标题,组合后保存到 Headers
-			row.Find("td").Each(func(j int, cell *goquery.Selection) {
+			row.Find("").Each(func(j int, cell *goquery.Selection) {
 				if j < len(combinedHeaders) {
 					fullHeader := combinedHeaders[j] + strings.TrimSpace(cell.Text())
 					tableData.Headers = append(tableData.Headers, fullHeader)
@@ -803,6 +1057,86 @@ func getTableData(reportContent string) TableData {
 	return *tableData
 }
 
+// 获取采购装船表格数据
+func getPurchaseShippingTableData(reportContent string) TableData {
+	doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	tableData := &TableData{}
+
+	// 只提取 id 为 a_content 的 div 中的第一个表格
+	firstTable := doc.Find("div#a_content table").First()
+
+	// 处理表头的变量
+	headers := []string{}
+	combinedHeaders := []string{}
+	headerRowspans := make(map[int]int)
+
+	// 遍历所有表头行
+	firstTable.Find("tr").Each(func(rowIndex int, row *goquery.Selection) {
+		row.Find("th, td").Each(func(cellIndex int, cell *goquery.Selection) {
+			text := strings.TrimSpace(cell.Text())
+
+			// 处理 colspan 属性
+			colspan, exists := cell.Attr("colspan")
+			if exists {
+				spanCount := 0
+				fmt.Sscanf(colspan, "%d", &spanCount)
+				for k := 0; k < spanCount; k++ {
+					combinedHeaders = append(combinedHeaders, text)
+				}
+			} else {
+				combinedHeaders = append(combinedHeaders, text)
+			}
+
+			// 处理 rowspan 属性
+			rowspan, exists := cell.Attr("rowspan")
+			if exists {
+				rowspanCount := 0
+				fmt.Sscanf(rowspan, "%d", &rowspanCount)
+				if rowspanCount > 1 {
+					// 记录该单元格的行合并信息
+					for i := 1; i < rowspanCount; i++ {
+						headerRowspans[rowIndex+i]++
+					}
+				}
+			}
+		})
+
+		// 处理第二行的具体标题
+		if rowIndex == 1 {
+			combinedHeadersIndex := 0
+			row.Find("th, td").Each(func(cellIndex int, cell *goquery.Selection) {
+				if combinedHeadersIndex < len(combinedHeaders) {
+					if colspan, _ := cell.Attr("colspan"); colspan != "" {
+						combinedHeadersIndex += len(strings.Split(colspan, " ")) - 1
+					}
+					headers = append(headers, combinedHeaders[combinedHeadersIndex])
+					combinedHeadersIndex++
+				}
+			})
+		}
+	})
+
+	// 处理数据行
+	firstTable.Find("tr").Each(func(rowIndex int, row *goquery.Selection) {
+		if rowIndex >= 2 {
+			var rowData []string
+			row.Find("td").Each(func(cellIndex int, cell *goquery.Selection) {
+				rowData = append(rowData, strings.TrimSpace(cell.Text()))
+			})
+			if len(rowData) > 0 {
+				tableData.Rows = append(tableData.Rows, rowData)
+			}
+		}
+	})
+
+	tableData.Headers = headers
+	return *tableData
+}
+
 // 判断字符串是否是数字
 func isNumeric(value string) bool {
 	// 正则表达式匹配整数和浮点数

+ 4 - 0
cmd/processor_factory.go

@@ -24,6 +24,10 @@ func GetProcessor(product string, category string) (ReportProcessor, error) {
 			return &ShippingCostProcessor{}, nil
 		case "供需平衡":
 			return &SupplyDemandBalanceProcessor{}, nil
+		case "采购装船":
+			return &PurchaseShippingProcessor{}, nil
+		case "库存分析":
+			return &InventoryAnalysisProcessor{}, nil
 
 		default:
 			return nil, fmt.Errorf("unknown category: %s", category)

+ 17 - 1
models/base_from_liangyou_data.go

@@ -24,10 +24,26 @@ func AddLyDataList(items []BaseFromLyData) (err error) {
 	return
 }
 
-// 根据指标id和数据日期查询数据
+// GetLyDataByIndexIdAndDataTime 根据指标id和数据日期查询数据
 func GetLyDataByIndexIdAndDataTime(indexId int, dataTime string) (items []BaseFromLyData, err error) {
 	o := orm.NewOrmUsingDB("data")
 	sql := `SELECT * FROM base_from_ly_data WHERE base_from_ly_index_id=? AND data_time=?`
 	_, err = o.Raw(sql, indexId, dataTime).QueryRows(&items)
 	return
 }
+
+// GetLyDataByIndexIdAndDataTimeYM 根据指标id和数据日期的年月查询数据 2024-08 数据库存的年月日 格式为 2024-08-01
+func GetLyDataByIndexIdAndDataTimeYM(indexId int, dataTime string) (items []BaseFromLyData, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM base_from_ly_data WHERE base_from_ly_index_id=? AND data_time like ?`
+	_, err = o.Raw(sql, indexId, dataTime+"%").QueryRows(&items)
+	return
+}
+
+// UpdateLyDataById 根据主键id更新数据
+func UpdateLyDataById(dataId int, value float64) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `UPDATE base_from_ly_data SET value=? WHERE base_from_ly_data_id=?`
+	_, err = o.Raw(sql, value, dataId).Exec()
+	return
+}

+ 9 - 15
static/liangyou.json

@@ -1,20 +1,14 @@
 {
   "大豆": {
-    "供需平衡": {
-      "年度中国大豆市场供需报告": [
-        "中国大豆市场供需:期初库存:万吨:月度",
-        "中国大豆市场供需:种植面积:万吨:月度",
-        "中国大豆市场供需:国内产量:万吨:月度",
-        "中国大豆市场供需:进口量:万吨:月度",
-        "中国大豆市场供需:总供应量:万吨:月度",
-        "中国大豆市场供需:压榨用量:万吨:月度",
-        "中国大豆市场供需:其中:国产大豆:万吨:月度",
-        "中国大豆市场供需:进口大豆:万吨:月度",
-        "中国大豆市场供需:出口量:万吨:月度",
-        "中国大豆市场供需:食用量:万吨:月度",
-        "中国大豆市场供需:种用及其他:万吨:月度",
-        "中国大豆市场供需:总需求量:万吨:月度",
-        "中国大豆市场供需:期末库存:万吨:月度"
+    "采购装船": {
+      "中国大豆采购进度周统计": [
+        "中国大豆计划采购量:万吨:周度",
+        "中国大豆已采购量:美国:万吨:周度",
+        "中国大豆已采购量:巴西:万吨:周度",
+        "中国大豆已采购量:阿根廷/乌拉圭:万吨:周度",
+        "中国大豆已采购量:小计:万吨:周度",
+        "中国大豆未采购量:万吨:周度",
+        "中国大豆采购进度:%:周度"
       ]
     }
   }

+ 56 - 0
utils/date_util.go

@@ -3,6 +3,7 @@ package utils
 
 import (
 	"fmt"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -108,6 +109,61 @@ func GetCurrentYearAndNextYear(dateText string) ([]string, error) {
 	return years, nil
 }
 
+// IsCurrentYear 判断是否是当前年度 传入日期格式为 2023/24年度  --> true, 2024/25年度 --> false
+func IsCurrentYear(dateText string) (bool, error) {
+	// 去掉字符串中的 "年度"
+	trimmed := strings.TrimSuffix(strings.TrimSpace(dateText), "年度")
+
+	// 分割年份,例如 "2023/24" -> ["2023", "24"]
+	parts := strings.Split(trimmed, "/")
+	if len(parts) != 2 {
+		return false, fmt.Errorf("invalid date format: %s", dateText)
+	}
+
+	// 将前一年的年份转换为整数
+	startYear, err := strconv.Atoi(parts[0])
+	if err != nil {
+		return false, fmt.Errorf("failed to parse start year: %v", err)
+	}
+
+	// 获取当前年份
+	currentYear := time.Now().Year()
+
+	// 如果当前年份等于 dateText 中的后一年的年份,返回 true
+	if currentYear == startYear+1 {
+		return true, nil
+	}
+
+	return false, nil
+}
+
+// GetNextYearLastDay 获取明年本月份的最后一天 2024-08-03 --> 2025-08-31
+func GetNextYearLastDay(dateText string) (string, error) {
+	// 解析日期
+	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(dateText))
+	if err != nil {
+		return "", fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	// 获取下一年的年份
+	nextYear := reportDate.Year() + 1
+	// 获取本月份的最后一天
+	lastDay := time.Date(nextYear, reportDate.Month()+1, 0, 0, 0, 0, 0, reportDate.Location())
+
+	return lastDay.Format("2006-01-02"), nil
+}
+
+// GetYearMonth 获取年月日 2024-08-03 --> 2024-08
+func GetYearMonth(dateText string) (string, error) {
+	// 解析日期
+	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(dateText))
+	if err != nil {
+		return "", fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	return reportDate.Format("2006-01"), nil
+}
+
 // GetCurrentMonth 获取当前月份 2024-08-03 --> 8月
 func GetCurrentMonth(dateText string) (string, error) {
 	// 解析日期