Ver código fonte

粮油商务网数据对接-更新指标库

gmy 6 meses atrás
pai
commit
bceb877988

+ 51 - 13
cmd/commodity_liangyou.go

@@ -7,6 +7,7 @@ import (
 	"eta/eta_crawler/utils"
 	"fmt"
 	"github.com/beego/beego/v2/core/logs"
+	"github.com/chromedp/cdproto/cdp"
 	"os"
 	"strings"
 	"time"
@@ -108,21 +109,10 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 	}
 
 	// Navigate to the product page
-	var productPageURL string
-
-	selector := fmt.Sprintf(`//dl[contains(@class, 'dl_hot')]//a[text()='%s']`, product)
-	logs.Info("选择器表达式: %s", selector)
-
-	err = chromedp.Run(ctx,
-		chromedp.WaitReady(selector, chromedp.BySearch),
-		chromedp.Click(selector, chromedp.BySearch),
-		chromedp.Sleep(5*time.Second),
-		chromedp.Location(&productPageURL),
-	)
+	productPageURL, err := fillProductPageURL(ctx, product, category)
 	if err != nil {
 		return err
 	}
-	logs.Info("productPageURL: %s: %s: %s", product, category, productPageURL)
 
 	// Navigate to the category page
 	var categoryPageURL string
@@ -155,8 +145,10 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 		reportURLs := extractReportURLs(htmlContent, report)
 		allReportURLs = append(allReportURLs, reportURLs...)
 
+		// todo 测试环境跑部分数据,上线放开
 		break
 		// Check if next page button is disabled
+		// todo 测试环境跑部分数据,上线放开
 		/*var nextPageDisabled bool
 		err = chromedp.Run(ctx,
 			chromedp.Evaluate(`document.querySelector('div.my-page-next').classList.contains('my-page-forbid')`, &nextPageDisabled),
@@ -177,7 +169,8 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 		)
 		if err != nil {
 			return err
-		}*/
+		}
+		*/
 	}
 
 	logs.Info("所有报告 URLs: %s: %s: %v", product, category, allReportURLs)
@@ -197,6 +190,51 @@ func fetchReportData(ctx context.Context, product, category, report string, keyw
 	return nil
 }
 
+func fillProductPageURL(ctx context.Context, product string, category string) (string, error) {
+	selector := `//dl[contains(@class, 'dl_hot')]`
+	logs.Info("选择器表达式: %s", selector)
+
+	var nodes []*cdp.Node
+	var productPageURL string
+
+	// 获取 dl 标签下的所有 a 标签节点
+	err := chromedp.Run(ctx,
+		chromedp.WaitReady(selector, chromedp.BySearch),
+		chromedp.Nodes(selector+"//a", &nodes, chromedp.BySearch),
+	)
+	if err != nil {
+		return "", err
+	}
+
+	// 提取并打印所有 a 标签的文本内容
+	for _, node := range nodes {
+		var linkText string
+		err = chromedp.Run(ctx,
+			chromedp.Text(node.FullXPath(), &linkText),
+		)
+		if err != nil {
+			return "", err
+		}
+		logs.Info("Link Text: %s", linkText)
+	}
+
+	// 点击目标产品的链接
+	clickSelector := fmt.Sprintf(`//dl[contains(@class, 'dl_hot')]//a[text()='%s']`, product)
+	err = chromedp.Run(ctx,
+		chromedp.WaitReady(clickSelector, chromedp.BySearch),
+		chromedp.Click(clickSelector, chromedp.BySearch),
+		chromedp.Sleep(5*time.Second),
+		chromedp.Location(&productPageURL),
+	)
+	if err != nil {
+		return "", err
+	}
+
+	// 返回点击后的页面URL
+	logs.Info("productPageURL: %s", productPageURL)
+	return productPageURL, nil
+}
+
 // Extract report URLs from the HTML content
 func extractReportURLs(htmlContent, keyword string) []string {
 	var reportURLs []string

+ 94 - 31
cmd/processor_business_logic.go

@@ -502,10 +502,34 @@ func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product stri
 						// 存在走更新逻辑 主要更新今年在去年的预估值
 						indexData := indexData[0]
 						if indexData.Value != value {
-							err := models.UpdateLyDataById(indexData.BaseFromLyDataId, value)
+							time, err := utils.StringToTime(indexData.ModifyTime)
 							if err != nil {
-								logs.Error("SupplyDemandBalanceProcessor Process() : Failed to update data: %v", err)
-								continue
+								return nil, err
+							}
+
+							timeZero, err := utils.StringToTimeZero(format)
+							if err != nil {
+								return nil, err
+							}
+
+							if time.Before(timeZero) {
+								// 更新指标数据
+								err := models.UpdateLyDataById(indexData.BaseFromLyDataId, value)
+								if err != nil {
+									logs.Error("SupplyDemandBalanceProcessor Process() : Failed to update data: %v", err)
+									continue
+								}
+								// 更新指标库数据
+								edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(indexData.IndexCode, yearMonth)
+								if err != nil {
+									return nil, err
+								}
+								if len(edbIndexData) > 0 {
+									err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value)
+									if err != nil {
+										return nil, err
+									}
+								}
 							}
 						}
 						continue
@@ -629,23 +653,40 @@ func (p *PurchaseShippingProcessor) Process(ctx context.Context, product string,
 						continue
 					}
 					if len(indexData) > 0 {
-						logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
-						err := models.UpdateLyDataById(indexData[0].BaseFromLyDataId, value)
-						if err != nil {
-							return nil, err
-						}
-						// 同步更新指标库数据 须根据指标编码和日期更新
-						edbData, err := models.GetLyEdbDataByIndexCodeAndDataTime(indexData[0].IndexCode, month)
-						if err != nil {
-							return nil, err
-						}
-						if len(edbData) > 0 {
-							err := models.UpdateLyEdbDataById(edbData[0].BaseFromLyDataId, value)
+						if indexData[0].Value != value {
+							logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+
+							lyData := indexData[0]
+							time, err := utils.StringToTime(lyData.ModifyTime)
 							if err != nil {
 								return nil, err
 							}
-						}
 
+							timeZero, err := utils.StringToTimeZero(format)
+							if err != nil {
+								return nil, err
+							}
+
+							if time.Before(timeZero) {
+								// 更新指标数据
+								err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
+								if err != nil {
+									return nil, err
+								}
+
+								// 同步更新指标库数据 须根据指标编码和日期更新
+								edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(lyData.IndexCode, month)
+								if err != nil {
+									return nil, err
+								}
+								if len(edbIndexData) > 0 {
+									err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value)
+									if err != nil {
+										return nil, err
+									}
+								}
+							}
+						}
 						continue
 					}
 
@@ -713,6 +754,7 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 	}
 	if len(indexData) > 0 {
 		logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+		// 不必做更新处理,报告每周刷新,即使本周和上周数据一致,也需要每周记录
 		return []models.BaseFromLyData{}, nil
 	}
 
@@ -784,9 +826,8 @@ func (p *ProcessingReportProcessor) Process(ctx context.Context, product string,
 			}
 		}
 	}
-	// TODO 后面把这个日志打印,不做返回错误处理,一个指标找不到会导致后续指标无法处理
-	return result, nil
 
+	return result, nil
 }
 
 // InventoryAnalysisProcessor
@@ -866,6 +907,7 @@ func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string
 					}
 					if len(indexData) > 0 {
 						logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						// 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
 						continue
 					}
 
@@ -970,6 +1012,7 @@ func (p *PriceSpreadArbitrageProcessor) Process(ctx context.Context, product str
 					}
 					if len(indexData) > 0 {
 						logs.Info("ProcessingProfitProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						// 无需走更新逻辑,报告每天更新,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
 						continue
 					}
 
@@ -1076,6 +1119,7 @@ func (p *DailyTransactionProcessor) Process(ctx context.Context, product string,
 					}
 					if len(indexData) > 0 {
 						logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						// 无需走更新逻辑,报告每周更新,一周出来一周中每天得数据,即使本周和上周数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
 						continue
 					}
 
@@ -1133,6 +1177,7 @@ func (p *DailyTransactionProcessor) Process(ctx context.Context, product string,
 							}
 							if len(indexData) > 0 {
 								logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+								// 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
 								continue
 							}
 
@@ -1428,7 +1473,6 @@ func (p *ImportEstimateProcessor) Process(ctx context.Context, product string, r
 
 		// 查找目标列
 		for _, columnDate := range columnDates {
-
 			columnIdx := -1
 			for i, tableHeader := range tableHeaders {
 				if strings.Contains(tableHeader, columnDate) {
@@ -1479,24 +1523,41 @@ func (p *ImportEstimateProcessor) Process(ctx context.Context, product string, r
 							}
 
 							if len(indexData) > 0 {
-								logs.Info("ImportEstimateProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
-								lyData := indexData[0]
-								time, err := utils.StringToTime(lyData.ModifyTime)
-								if err != nil {
-									return nil, err
-								}
+								if indexData[0].Value != value {
+									logs.Info("ImportEstimateProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
 
-								timeZero, err := utils.StringToTimeZero(format)
-								if err != nil {
-									return nil, err
-								}
+									lyData := indexData[0]
+									time, err := utils.StringToTime(lyData.ModifyTime)
+									if err != nil {
+										return nil, err
+									}
 
-								if lyData.Value != value && time.Before(timeZero) {
-									err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
+									timeZero, err := utils.StringToTimeZero(format)
 									if err != nil {
 										return nil, err
 									}
+
+									if lyData.Value != value && time.Before(timeZero) {
+										// 更新指标数据
+										err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
+										if err != nil {
+											return nil, err
+										}
+
+										// 同步更新指标库数据
+										lyEdbIndexData, err := models.GetLyEdbDataByIndexCodeAndExactDataTime(lyData.IndexCode, lyData.DataTime)
+										if err != nil {
+											return nil, err
+										}
+										if len(lyEdbIndexData) > 0 {
+											err := models.UpdateLyEdbDataById(lyEdbIndexData[0].EdbInfoId, value)
+											if err != nil {
+												return nil, err
+											}
+										}
+									}
 								}
+
 								continue
 							}
 
@@ -1612,6 +1673,7 @@ func (p *InternationalPriceProcessor) Process(ctx context.Context, product strin
 						}
 						if len(indexData) > 0 {
 							logs.Info("InternationalPriceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+							// 无需更新 指标展示本月和后两月的数据,报告每天更新,每天的值可能会改变,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
 							continue
 						}
 
@@ -1714,6 +1776,7 @@ func (p *CanadaStatisticsBureauProcessor) Process(ctx context.Context, product s
 					}
 					if len(indexData) > 0 {
 						logs.Info("CanadaStatisticsBureauProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
+						// 无需更新 指标展示本周的数据,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
 						continue
 					}
 

+ 1 - 1
models/base_from_ly_data.go

@@ -32,7 +32,7 @@ func GetLyDataByIndexIdAndDataTime(indexId int, dataTime string) (items []BaseFr
 	return
 }
 
-// GetLyDataByIndexIdAndDataTimeYM 根据指标id和数据日期的年月查询数据 2024-08 数据库存的年月日 格式为 2024-08-01
+// GetLyDataByIndexIdAndDataTimeYM 根据指标id和数据日期的年月查询数据
 func GetLyDataByIndexIdAndDataTimeYM(indexId int, dataTime string) (items []BaseFromLyData, err error) {
 	o := orm.NewOrmUsingDB("data")
 	sql := `SELECT * FROM base_from_ly_data WHERE base_from_ly_index_id=? AND data_time like ?`

+ 16 - 9
models/ebd_data_ly.go

@@ -5,14 +5,14 @@ package models
 import "github.com/beego/beego/v2/client/orm"
 
 type EdbDataLy struct {
-	BaseFromLyDataId int     `orm:"column(base_from_ly_data_id);pk"` // 数据ID
-	CreateTime       string  `orm:"column(create_time)"`             // 创建时间
-	ModifyTime       string  `orm:"column(modify_time)"`             // 修改时间
-	EdbInfoId        int     `orm:"column(edb_info_id)"`             // 指标id
-	EdbCode          string  `orm:"column(edb_code)"`                // 指标编码
-	DataTime         string  `orm:"column(data_time)"`               // 数据日期
-	Value            float64 `orm:"column(value)"`                   // 数据值
-	DataTimestamp    uint64  `orm:"column(data_timestamp)"`          // 数据日期时间戳
+	edbDataId     int     `orm:"column(edb_data_id);pk"` // 数据ID
+	CreateTime    string  `orm:"column(create_time)"`    // 创建时间
+	ModifyTime    string  `orm:"column(modify_time)"`    // 修改时间
+	EdbInfoId     int     `orm:"column(edb_info_id)"`    // 指标id
+	EdbCode       string  `orm:"column(edb_code)"`       // 指标编码
+	DataTime      string  `orm:"column(data_time)"`      // 数据日期
+	Value         float64 `orm:"column(value)"`          // 数据值
+	DataTimestamp uint64  `orm:"column(data_timestamp)"` // 数据日期时间戳
 }
 
 func GetLyEdbDataByIndexCodeAndDataTime(indexCode string, dataTime string) (items []EdbDataLy, err error) {
@@ -22,10 +22,17 @@ func GetLyEdbDataByIndexCodeAndDataTime(indexCode string, dataTime string) (item
 	return
 }
 
+func GetLyEdbDataByIndexCodeAndExactDataTime(indexCode string, dataTime string) (items []EdbDataLy, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM edb_data_ly WHERE edb_code=? AND data_time=?`
+	_, err = o.Raw(sql, indexCode, dataTime).QueryRows(&items)
+	return
+}
+
 // UpdateLyEdbDataById 更新指标库数据 须根据指标编码和日期更新 仅适合月度数据
 func UpdateLyEdbDataById(id int, value float64) (err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := `UPDATE edb_data_ly SET value=? WHERE base_from_ly_data_id=?`
+	sql := `UPDATE edb_data_ly SET value=? WHERE edb_data_id=?`
 	_, err = o.Raw(sql, value, id).Exec()
 	return
 }

+ 41 - 2
utils/index_code_util.go

@@ -2,14 +2,53 @@
 package utils
 
 import (
+	"fmt"
 	"github.com/mozillazg/go-pinyin"
-	"regexp"
 	"strings"
+	"unicode"
 )
 
 // GenerateIndexCode 指标编码规则:粮油商务网拼音首字母+指标名称拼音首字母,数字、字母保留,特殊字符拿掉
 // 例:美湾:9月U:国际大豆进口成本价:期货收盘:张家港 -----> lyswwmw9yUgjddjkcbjqhspzjg
 func GenerateIndexCode(sourceName string, indexName string) string {
+
+	// 获取汉字的拼音首字母,保留数字和大写字母
+	firstLetters := getFirstLetters(indexName)
+
+	// 组合 sourceName 和处理后的拼音首字母
+	indexCode := fmt.Sprintf("%s%s", sourceName, firstLetters)
+
+	return indexCode
+}
+
+// getFirstLetters 获取汉字的拼音首字母,并保留数字和大写字母
+func getFirstLetters(input string) string {
+	// 设置拼音转换选项,只获取首字母
+	args := pinyin.NewArgs()
+	args.Style = pinyin.FirstLetter
+
+	// 定义用于存储结果的字符串
+	var result strings.Builder
+
+	// 遍历输入字符串中的每个字符
+	for _, r := range input {
+		if unicode.IsDigit(r) || unicode.IsUpper(r) {
+			// 保留数字和大写字母
+			result.WriteRune(r)
+		} else if unicode.Is(unicode.Han, r) {
+			// 如果是汉字,则获取其拼音首字母
+			py := pinyin.Pinyin(string(r), args)
+			if len(py) > 0 && len(py[0]) > 0 {
+				result.WriteString(py[0][0])
+			}
+		}
+		// 对于其他字符,忽略处理
+	}
+
+	return result.String()
+}
+
+/*func GenerateIndexCode(sourceName string, indexName string) string {
 	// 获取拼音首字母
 	indexInitials := getFirstLetter(indexName)
 
@@ -46,4 +85,4 @@ func getFirstLetter(s string) string {
 func filterAlphanumeric(s string) string {
 	re := regexp.MustCompile(`[^a-zA-Z0-9]`)
 	return re.ReplaceAllString(s, "")
-}
+}*/