Browse Source

粮油商务网数据对接-大豆-加工报告

gmy 9 months ago
parent
commit
0931abf6ea

+ 67 - 8
cmd/commodity_trade_liangyou.go → cmd/commodity_liangyou.go

@@ -3,6 +3,8 @@ package main
 import (
 	"context"
 	"encoding/json"
+	models "eta/eta_crawler/models"
+	"eta/eta_crawler/utils"
 	"fmt"
 	"github.com/beego/beego/v2/core/logs"
 	"os"
@@ -111,10 +113,6 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 	selector := fmt.Sprintf(`//dl[contains(@class, 'dl_hot')]//a[text()='%s']`, product)
 	logs.Info("选择器表达式: %s", selector)
 
-	// 增加超时
-	/*ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
-	defer cancel()*/
-
 	err = chromedp.Run(ctx,
 		chromedp.WaitReady(selector, chromedp.BySearch),
 		chromedp.Click(selector, chromedp.BySearch),
@@ -124,7 +122,7 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 	if err != nil {
 		return err
 	}
-	logs.Info("productPageURL: %s", productPageURL)
+	logs.Info("productPageURL: %s: %s: %s", product, category, productPageURL)
 
 	// Navigate to the category page
 	var categoryPageURL string
@@ -138,7 +136,7 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 	if err != nil {
 		return err
 	}
-	logs.Info("categoryPageURL: %s", categoryPageURL)
+	logs.Info("categoryPageURL: %s: %s: %s", product, category, categoryPageURL)
 
 	var allReportURLs []string
 	for {
@@ -181,7 +179,7 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 		}
 	}
 
-	logs.Info("所有报告 URLs: %v", allReportURLs)
+	logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLs)
 
 	if len(allReportURLs) == 0 {
 		return fmt.Errorf("未找到报告 URL")
@@ -224,6 +222,7 @@ func extractReportURLs(htmlContent, keyword string) []string {
 	return reportURLs
 }
 
+// Process the report data
 func processReport(ctx context.Context, product string, category string, reportURL string, keywords []string) error {
 	// Navigate to the report page
 	var reportContent string
@@ -236,6 +235,7 @@ func processReport(ctx context.Context, product string, category string, reportU
 		return err
 	}
 
+	var lyIndexDataList []models.BaseFromLyData
 	// Process the data based on keywords
 	for _, keyword := range keywords {
 		partialKeyword := strings.Split(keyword, ":")
@@ -245,10 +245,69 @@ func processReport(ctx context.Context, product string, category string, reportU
 			return err
 		}
 
+		// 查询报告所属分类
+		classify, err := models.GetLyClassifyByName(product)
+		if err != nil {
+			return err
+		}
+
+		// 指标名称
+		indexName := strings.Join(partialKeyword[:len(partialKeyword)-2], ":")
+		// 指标编码
+		indexCode := utils.GenerateIndexCode(sourceName, indexName)
+		// 判断指标是否存在
+		var indexId int
+		indexInfo, err := models.GetLyIndexByCode(indexCode)
+		if err != nil {
+			// 新增指标
+			index, err := addLyIndex(classify.BaseFromLyClassifyId, indexCode, indexName, partialKeyword[len(partialKeyword)-2], partialKeyword[len(partialKeyword)-1])
+			if err != nil {
+				return err
+			}
+			indexId = index
+		} else {
+			indexId = indexInfo.BaseFromLyIndexId
+		}
+
 		// Process the report content using the selected processor
-		err = processor.Process(ctx, product, reportContent, partialKeyword)
+		baseFromLyData, err := processor.Process(ctx, product, reportContent, partialKeyword, indexId)
+		if err != nil {
+			// 这块逻辑会导致,如果有一个指标处理失败,后续的指标也无法处理,目前出现失败的原因有 产品提供表格中的字段在页面找不到
+			return err
+		}
 
+		if baseFromLyData.DataTime != "" && baseFromLyData.Value != 0 {
+			baseFromLyData.IndexCode = indexCode
+			baseFromLyData.BaseFromLyIndexId = indexId
+			baseFromLyData.CreateTime = utils.GetCurrentTime()
+			baseFromLyData.ModifyTime = utils.GetCurrentTime()
+			lyIndexDataList = append(lyIndexDataList, baseFromLyData)
+		}
+	}
+	// 新增指标数据
+	err = models.AddLyDataList(lyIndexDataList)
+	if err != nil {
+		return err
 	}
 
 	return nil
 }
+
+func addLyIndex(classifyId int, indexCode string, indexName string, unit string, frequency string) (int, error) {
+	// 添加指标
+	index := &models.BaseFromLyIndex{
+		CreateTime:           utils.GetCurrentTime(),
+		ModifyTime:           utils.GetCurrentTime(),
+		BaseFromLyClassifyId: classifyId,
+		IndexCode:            indexCode,
+		IndexName:            indexName,
+		Frequency:            frequency,
+		Unit:                 unit,
+		EdbExist:             0,
+	}
+	indexId, err := models.AddLyIndex(index)
+	if err != nil {
+		return 0, err
+	}
+	return int(indexId), nil
+}

+ 59 - 12
cmd/processor_business_logic.go

@@ -4,33 +4,40 @@ package main
 import (
 	"context"
 	"encoding/json"
+	"eta/eta_crawler/models"
 	"eta/eta_crawler/utils"
 	"fmt"
 	"github.com/PuerkitoBio/goquery"
 	"github.com/beego/beego/v2/core/logs"
 	"github.com/chromedp/chromedp"
 	"log"
+	"regexp"
+	"strconv"
 	"strings"
 )
 
+const (
+	sourceName = "lysww" // 粮油商务网
+)
+
 // ImportCostProcessor
 // @Description: 进口成本处理器
 type ImportCostProcessor struct{}
 
-func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string) error {
+func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
 	fmt.Println("Processing import cost...")
 	// 实现具体的处理逻辑
-	return nil
+	return models.BaseFromLyData{}, nil
 }
 
 // ProcessingProfitProcessor
 // @Description: 加工利润处理器
 type ProcessingProfitProcessor struct{}
 
-func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string) error {
+func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
 	fmt.Println("Processing processing profit...")
 	// 实现具体的处理逻辑
-	return nil
+	return models.BaseFromLyData{}, nil
 }
 
 // ProcessingReportProcessor
@@ -44,11 +51,11 @@ type TableData struct {
 	Rows    [][]string `json:"rows"`
 }
 
-func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string) error {
+func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
 	logs.Info("Processing processing report...")
 	// 解析关键字
 	if len(keywords) < 3 {
-		return fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
+		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
 	}
 
 	// 拿到 行关键字和列关键字
@@ -102,15 +109,29 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 		chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText),
 	)
 	if err != nil {
-		return err
+		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to extract date: %v", err)
 	}
 
 	logs.Info("ProcessingReportProcessor Process() : Extracted Date: %s", dateText)
 
+	// 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走
+	format, err := utils.ConvertTimeFormat(dateText)
+	if err != nil {
+		return models.BaseFromLyData{}, err
+	}
+	indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
+	if err != nil {
+		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
+	}
+	if len(indexData) > 0 {
+		logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+		return models.BaseFromLyData{}, nil
+	}
+
 	// 解析日期并计算当前周数
 	targetWeek, err := utils.ParseDateAndWeek(dateText)
 	if err != nil {
-		return err
+		return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err)
 	}
 
 	fmt.Printf("Target Week: %s\n", targetWeek)
@@ -123,7 +144,8 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 		// 查找目标列
 		columnIdx := -1
 		for i, header := range tableHeaders {
-			if strings.Contains(columnName, header) {
+			headerString := extractChinese(header)
+			if strings.Contains(columnName, headerString) {
 				columnIdx = i
 				break
 			}
@@ -151,16 +173,41 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 		// 查找目标行
 		for _, row := range tableRows {
 			if len(row) > 0 && strings.Contains(row[0], rowName) {
-				fmt.Printf("Row matching '%s':\n", rowName)
 				if weekIdx < len(row) {
 					logs.Info("Value in column '%s' - '%s': %s", columnName, rowName, row[columnIdx])
+					numFlag := isNumeric(row[columnIdx])
+					if numFlag {
+						value, err := strconv.ParseFloat(row[columnIdx], 64)
+						if err != nil {
+							logs.Error("ProcessingReportProcessor Process() : Error converting value to float64: %v", err)
+							return models.BaseFromLyData{}, err
+						}
+						// 返回BaseFromLyData对象的数据
+						baseFromLyData := models.BaseFromLyData{
+							DataTime: dateText,
+							Value:    value,
+						}
+						return baseFromLyData, nil
+					}
 				} else {
 					logs.Error("ProcessingReportProcessor Process() : Column index out of range")
 				}
 			}
 		}
-
 	}
+	// TODO 后面把这个日志打印,不做返回错误处理,一个指标找不到会导致后续指标无法处理
+	return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : No matching row found for '%s'", rowName)
+}
+
+// 判断字符串是否是数字
+func isNumeric(value string) bool {
+	// 正则表达式匹配整数和浮点数
+	re := regexp.MustCompile(`^[+-]?(\d+(\.\d*)?|\.\d+)$`)
+	return re.MatchString(value)
+}
 
-	return nil
+// 只保留汉字
+func extractChinese(text string) string {
+	re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符
+	return re.ReplaceAllString(text, "")
 }

+ 2 - 1
cmd/processor_factory.go

@@ -3,11 +3,12 @@ package main
 
 import (
 	"context"
+	"eta/eta_crawler/models"
 	"fmt"
 )
 
 type ReportProcessor interface {
-	Process(ctx context.Context, product string, reportContent string, keywords []string) error
+	Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error)
 }
 
 func GetProcessor(product string, category string) (ReportProcessor, error) {

+ 26 - 0
models/base_from_liangyou_classify.go

@@ -0,0 +1,26 @@
+// @Author gmy 2024/8/7 9:26:00
+package models
+
+import "github.com/beego/beego/v2/client/orm"
+
+type BaseFromLyClassify struct {
+	BaseFromLyClassifyId int    `orm:"column(base_from_ly_classify_id);pk"` // 分类ID
+	CreateTime           string `orm:"column(create_time)"`                 // 创建时间
+	ModifyTime           string `orm:"column(modify_time)"`                 // 修改时间
+	ClassifyName         string `orm:"column(classify_name)"`               // 分类名称
+	ParentId             int    `orm:"column(parent_id)"`                   // 上级id
+	Sort                 int    `orm:"column(sort)"`                        // 排序字段,越小越靠前
+	ClassifyNameEn       string `orm:"column(classify_name_en)"`            // 英文分类名称
+}
+
+func init() {
+	orm.RegisterModel(new(BaseFromLyClassify))
+}
+
+// GetLyClassifyByName 根据分类名称查询
+func GetLyClassifyByName(classifyName string) (item *BaseFromLyClassify, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM base_from_ly_classify WHERE classify_name=?`
+	err = o.Raw(sql, classifyName).QueryRow(&item)
+	return
+}

+ 33 - 0
models/base_from_liangyou_data.go

@@ -0,0 +1,33 @@
+// @Author gmy 2024/8/7 9:50:00
+package models
+
+import "github.com/beego/beego/v2/client/orm"
+
+type BaseFromLyData struct {
+	BaseFromLyDataId  int     `orm:"column(base_from_ly_data_id);pk"` // 数据ID
+	CreateTime        string  `orm:"column(create_time)"`             // 创建时间
+	ModifyTime        string  `orm:"column(modify_time)"`             // 修改时间
+	BaseFromLyIndexId int     `orm:"column(base_from_ly_index_id)"`   // 指标id
+	IndexCode         string  `orm:"column(index_code)"`              // 指标编码
+	DataTime          string  `orm:"column(data_time)"`               // 数据日期
+	Value             float64 `orm:"column(value)"`                   // 数据值
+}
+
+func init() {
+	orm.RegisterModel(new(BaseFromLyData))
+}
+
+// AddLyDataList 批量插入数据记录列表
+func AddLyDataList(items []BaseFromLyData) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.InsertMulti(len(items), items)
+	return
+}
+
+// 根据指标id和数据日期查询数据
+func GetLyDataByIndexIdAndDataTime(indexId int, dataTime string) (items []BaseFromLyData, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM base_from_ly_data WHERE base_from_ly_index_id=? AND data_time=?`
+	_, err = o.Raw(sql, indexId, dataTime).QueryRows(&items)
+	return
+}

+ 50 - 0
models/base_from_liangyou_index.go

@@ -0,0 +1,50 @@
+// @Author gmy 2024/8/7 9:38:00
+package models
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+type BaseFromLyIndex struct {
+	BaseFromLyIndexId    int    `orm:"column(base_from_ly_index_id);pk"` // 指标ID
+	CreateTime           string `orm:"column(create_time)"`              // 创建时间
+	ModifyTime           string `orm:"column(modify_time)"`              // 修改时间
+	BaseFromLyClassifyId int    `orm:"column(base_from_ly_classify_id)"` // 原始数据指标分类id
+	IndexCode            string `orm:"column(index_code)"`               // 指标编码
+	IndexName            string `orm:"column(index_name)"`               // 指标名称
+	Frequency            string `orm:"column(frequency)"`                // 频度
+	Unit                 string `orm:"column(unit)"`                     // 单位
+	EdbExist             int    `orm:"column(edb_exist)"`                // 指标库是否已添加:0-否;1-是
+}
+
+// 在 init 函数中注册模型
+func init() {
+	orm.RegisterModel(new(BaseFromLyIndex))
+}
+
+// AddLyIndexList 批量插入指标记录列表
+func AddLyIndexList(items []*BaseFromLyIndex) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.InsertMulti(len(items), items)
+	return
+}
+
+// AddLyIndex 添加指标
+func AddLyIndex(item *BaseFromLyIndex) (int64, error) {
+	item.CreateTime = time.Now().Format("2006-01-02 15:04:05")
+	o := orm.NewOrmUsingDB("data")
+	id, err := o.Insert(item)
+	if err != nil {
+		return 0, err
+	}
+	return id, nil
+}
+
+// 查询指标编码是否存在
+func GetLyIndexByCode(indexCode string) (item *BaseFromLyIndex, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM base_from_ly_index WHERE index_code=?`
+	err = o.Raw(sql, indexCode).QueryRow(&item)
+	return
+}

+ 2 - 2
static/liangyou.json

@@ -11,7 +11,7 @@
         "国内大豆加工量:内蒙古:万吨:周度",
         "国内大豆加工量:河北省:万吨:周度",
         "国内大豆加工量:天津市:万吨:周度",
-        "国内大豆加工量:西省:万吨:周度",
+        "国内大豆加工量:西省:万吨:周度",
         "国内大豆加工量:山东省:万吨:周度",
         "国内大豆加工量:安徽省:万吨:周度",
         "国内大豆加工量:江苏省:万吨:周度",
@@ -37,7 +37,7 @@
         "国内大豆开机率:内蒙古:%:周度",
         "国内大豆开机率:河北省:%:周度",
         "国内大豆开机率:天津市:%:周度",
-        "国内大豆开机率:西省:%:周度",
+        "国内大豆开机率:西省:%:周度",
         "国内大豆开机率:山东省:%:周度",
         "国内大豆开机率:安徽省:%:周度",
         "国内大豆开机率:江苏省:%:周度",

+ 18 - 2
utils/date_util.go

@@ -1,4 +1,4 @@
-// @Author gmy 2024/8/6 16:06:00
+// Package utils @Author gmy 2024/8/6 16:06:00
 package utils
 
 import (
@@ -7,7 +7,7 @@ import (
 	"time"
 )
 
-// parseDateAndWeek 解析日期并计算当前周数
+// ParseDateAndWeek parseDateAndWeek 解析日期并计算当前周数
 func ParseDateAndWeek(dateText string) (string, error) {
 	// 解析日期
 	reportDate, err := time.Parse("2006-01-02", strings.TrimSpace(strings.Split(dateText, " ")[0]))
@@ -23,3 +23,19 @@ func ParseDateAndWeek(dateText string) (string, error) {
 
 	return targetWeek, nil
 }
+
+// GetCurrentTime 获取当前时间 格式为 2024-08-07 15:29:58
+func GetCurrentTime() string {
+	return time.Now().Format("2006-01-02 15:04:05")
+}
+
+// ConvertTimeFormat 转换时间格式 dateText 格式为 2024-08-03 07:53 --> 2024-08-03
+func ConvertTimeFormat(dateText string) (string, error) {
+	// 解析日期
+	reportDate, err := time.Parse("2006-01-02 15:04", strings.TrimSpace(dateText))
+	if err != nil {
+		return "", fmt.Errorf("failed to parse report date: %v", err)
+	}
+
+	return reportDate.Format("2006-01-02"), nil
+}

+ 37 - 0
utils/index_code_util.go

@@ -0,0 +1,37 @@
+// @Author gmy 2024/8/7 10:41:00
+package utils
+
+import (
+	"github.com/mozillazg/go-pinyin"
+	"regexp"
+	"strings"
+)
+
+// GenerateIndexCode 指标编码规则:粮油商务网拼音首字母+指标名称拼音首字母,数字、字母保留,特殊字符拿掉
+// 例:美湾:9月U:国际大豆进口成本价:期货收盘:张家港 -----> lyswwmw9yUgjddjkcbjqhspzjg
+func GenerateIndexCode(sourceName string, indexName string) string {
+	indexInitials := getFirstLetter(indexName)
+
+	// 拼接后过滤特殊字符,保留数字和字母
+	indexCode := sourceName + indexInitials
+	re := regexp.MustCompile(`[^a-zA-Z0-9]`)
+	indexCode = re.ReplaceAllString(indexCode, "")
+
+	// 转换为小写
+	indexCode = strings.ToLower(indexCode)
+
+	return indexCode
+}
+
+// 获取字符串中的拼音首字母
+func getFirstLetter(s string) string {
+	a := pinyin.NewArgs()
+	p := pinyin.Pinyin(s, a)
+	firstLetters := ""
+	for _, syllables := range p {
+		if len(syllables) > 0 {
+			firstLetters += strings.ToUpper(syllables[0][:1])
+		}
+	}
+	return firstLetters
+}