Browse Source

睿姿得数据爬取

gmy 4 months ago
parent
commit
cc6825930a

+ 105 - 87
services/ruizide/data_processor.go

@@ -23,6 +23,7 @@ var (
 	downloadDir  = "D:\\download"
 	defaultDir   = "C:\\Users\\Guo Mengyuan\\Downloads"
 	rzdLoginPath = "https://clients.rystadenergy.com/clients/"
+	rzdBatchSize = 500
 
 	clientSearchLink               = `div.d-none.d-lg-flex.flex-grow-1 a[href="/clients/search/"]`
 	clientsCubeDashboardsLink      = `div.d-none.d-lg-flex.flex-grow-1 a[href="/clients/cube-dashboards/"]`
@@ -270,112 +271,129 @@ func resolverNet() {
 // 解析本地文件
 // func fileResolver() {
 func main() {
-	var fileName string
-	var tableName = "Oil_Demand_Signals_Weekly_Report"
-	// 解析Oil_Demand_Signals_Weekly_Report_表格
-	fileName = tableName + "_" + utils.GetCurrentYearMonth() + ".xlsx"
-	filePath := filepath.Join(downloadDir, fileName)
-
-	// 打开 Excel 文件
-	f, err := excelize.OpenFile(filePath)
-	if err != nil {
-		log.Fatalf("无法打开 Excel 文件: %v", err)
+	var tableNameList = []string{
+		"Oil_Demand_Signals_Weekly_Report",
+		"RE_Dashboard_Export",
 	}
-
-	// 获取所有工作表
-	sheetNames := f.GetSheetList()
-	for sheetIndex, sheetName := range sheetNames {
-		fmt.Printf("读取工作表: %s\n", sheetName)
-		if sheetIndex == 0 {
-			continue
-		}
-
-		// 获取工作表的最大行数
-		maxRow, err := f.GetRows(sheetName) // 直接获取所有行数据
+	for _, tableName := range tableNameList {
+		var fileName string
+		// 解析Oil_Demand_Signals_Weekly_Report_表格
+		fileName = tableName + "_" + utils.GetCurrentYearMonth() + ".xlsx"
+		filePath := filepath.Join(downloadDir, fileName)
+
+		// 打开 Excel 文件
+		f, err := excelize.OpenFile(filePath)
 		if err != nil {
-			log.Fatalf("获取工作表数据时出错: %v", err)
+			log.Fatalf("无法打开 Excel 文件: %v", err)
 		}
 
-		// 遍历行并打印内容
-		indexData := []models.BaseFromRzdData{}
-		for rowIndex, rowData := range maxRow {
-			if rowIndex < 4 {
+		// 获取所有工作表
+		sheetNames := f.GetSheetList()
+		for sheetIndex, sheetName := range sheetNames {
+			fmt.Printf("读取工作表: %s\n", sheetName)
+			if sheetIndex == 0 {
 				continue
 			}
-			processor, err := GetProcessor(tableName, sheetName)
-			if err != nil {
-				return
-			}
-			baseFromLyDataList, err := processor.Process(tableName, sheetName, rowData)
-			indexData = append(indexData, baseFromLyDataList...)
-		}
 
-		// 新增数据源指标数据
-		if len(indexData) > 0 {
-			// 转换成json
-			marshal, err := json.Marshal(indexData)
-			if err != nil {
-				log.Printf("postEdbLib err: %v", err)
-				return
-			}
-			_, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_DATA, string(marshal), "application/json")
+			// 获取工作表的最大行数
+			maxRow, err := f.GetRows(sheetName) // 直接获取所有行数据
 			if err != nil {
-				// 有错误就不继续执行
-				log.Printf("postEdbLib err: %v", err)
-				return
+				log.Fatalf("获取工作表数据时出错: %v", err)
 			}
-		}
 
-		// 新增指标库数据
-		edbDataList := []models.EdbDataRzd{}
-		for _, index := range indexData {
-			// 补充 判断是否存在于指标库
-			paramsLib := make(map[string]interface{})
-			paramsLib["IndexCode"] = index.IndexCode
-			paramsLib["Source"] = utils.DATA_SOURCE_RZD
-			postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_EDB_INFO_BY_INDEX_CODE)
-			if err != nil {
-				// 有错误就不继续执行
-				log.Printf("postEdbLib err: %v", err)
-				continue
-			}
-			var requestResponse models.RequestResponse[models.EdbInfo]
-			err = json.Unmarshal(postEdbLib, &requestResponse)
-			if err != nil {
-				log.Printf("postEdbLib err: %v", err)
-				continue
+			// 遍历行并打印内容
+			indexData := []models.BaseFromRzdData{}
+			for rowIndex, rowData := range maxRow {
+				processor, err := GetProcessor(tableName, sheetName)
+				if err != nil {
+					continue
+				}
+				baseFromLyDataList, err := processor.Process(tableName, sheetName, rowIndex, rowData)
+				if err != nil {
+					log.Printf("processor.Process err: %v", err)
+					return
+				}
+				indexData = append(indexData, baseFromLyDataList...)
 			}
 
-			if requestResponse.Data.EdbInfoId != 0 {
-				edbDataRzd := models.EdbDataRzd{
-					CreateTime:    utils.GetCurrentTime(),
-					ModifyTime:    utils.GetCurrentTime(),
-					EdbInfoId:     index.BaseFromRzdIndexId,
-					EdbCode:       index.IndexCode,
-					DataTime:      index.DataTime,
-					Value:         index.Value,
-					DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
+			// 新增数据源指标数据
+			if len(indexData) > 0 {
+				for i := 0; i < len(indexData); i += rzdBatchSize {
+					// 计算当前批次的结束索引
+					end := i + rzdBatchSize
+					if end > len(indexData) {
+						end = len(indexData)
+					}
+
+					// 获取当前批次的数据
+					batchData := indexData[i:end]
+
+					// 转换成json
+					marshal, err := json.Marshal(batchData)
+					if err != nil {
+						log.Printf("json.Marshal err: %v", err)
+						return
+					}
+
+					// 发送 HTTP POST 请求
+					_, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_DATA, string(marshal), "application/json")
+					if err != nil {
+						log.Printf("postEdbLib err: %v", err)
+						return
+					}
 				}
-				edbDataList = append(edbDataList, edbDataRzd)
 			}
 
-		}
+			// 新增指标库数据
+			edbDataList := []models.EdbDataRzd{}
+			for _, index := range indexData {
+				// 补充 判断是否存在于指标库
+				paramsLib := make(map[string]interface{})
+				paramsLib["IndexCode"] = index.IndexCode
+				paramsLib["Source"] = utils.DATA_SOURCE_RZD
+				postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_EDB_INFO_BY_INDEX_CODE)
+				if err != nil {
+					// 有错误就不继续执行
+					log.Printf("postEdbLib err: %v", err)
+					continue
+				}
+				var requestResponse models.RequestResponse[models.EdbInfo]
+				err = json.Unmarshal(postEdbLib, &requestResponse)
+				if err != nil {
+					log.Printf("postEdbLib err: %v", err)
+					continue
+				}
 
-		if len(edbDataList) > 0 {
-			// 转换成json
-			marshal, err := json.Marshal(edbDataList)
-			if err != nil {
-				log.Printf("postEdbLib err: %v", err)
-				return
+				if requestResponse.Data.EdbInfoId != 0 {
+					edbDataRzd := models.EdbDataRzd{
+						CreateTime:    utils.GetCurrentTime(),
+						ModifyTime:    utils.GetCurrentTime(),
+						EdbInfoId:     index.BaseFromRzdIndexId,
+						EdbCode:       index.IndexCode,
+						DataTime:      index.DataTime,
+						Value:         index.Value,
+						DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
+					}
+					edbDataList = append(edbDataList, edbDataRzd)
+				}
 			}
-			_, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_EDB_DATA, string(marshal), "application/json")
-			if err != nil {
-				// 有错误就不继续执行
-				log.Printf("postEdbLib err: %v", err)
-				return
+
+			if len(edbDataList) > 0 {
+				// 转换成json
+				marshal, err := json.Marshal(edbDataList)
+				if err != nil {
+					log.Printf("postEdbLib err: %v", err)
+					return
+				}
+				_, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_EDB_DATA, string(marshal), "application/json")
+				if err != nil {
+					// 有错误就不继续执行
+					log.Printf("postEdbLib err: %v", err)
+					return
+				}
 			}
-		}
 
+		}
 	}
 }
 

+ 44 - 1
services/ruizide/processor_business_logic.go

@@ -30,8 +30,12 @@ var classifyMap = map[string]string{
 // @Description: AnalyticsLibrary处理器
 type RoadIndexProcessor struct{}
 
-func (p *RoadIndexProcessor) Process(tableName string, sheetName string, rowData []string) ([]models.BaseFromRzdData, error) {
+func (p *RoadIndexProcessor) Process(tableName string, sheetName string, rowIndex int, rowData []string) ([]models.BaseFromRzdData, error) {
 	logs.Info("Processing AnalyticsLibrary...")
+	if rowIndex < 4 {
+		return nil, nil
+	}
+
 	frequency := "日度"
 	unit := "%"
 	indexNameColOne := "Index"
@@ -61,6 +65,45 @@ func (p *RoadIndexProcessor) Process(tableName string, sheetName string, rowData
 	return dataList, err
 }
 
+// ReDashboardExportOneProcessor
+// @Description: ReDashboardExportOneProcessor处理器
+type ReDashboardExportOneProcessor struct{}
+
+func (p *ReDashboardExportOneProcessor) Process(tableName string, sheetName string, rowIndex int, rowData []string) ([]models.BaseFromRzdData, error) {
+	logs.Info("Processing ReDashboardExportOne...")
+	if rowIndex < 4 {
+		return nil, nil
+	}
+
+	frequency := "季度"
+	unit := "千桶每天"
+	indexNameColOne := "Index"
+	indexNameColTwo := "Index 7DMA"
+
+	// step_1: 分类
+	classifyId, err := dealClassify("cube dashboards", "Supply Revision Analysis")
+	if err != nil {
+		return nil, err
+	}
+	logs.Info("classifyId: %v", classifyId)
+
+	// step_2: 指标
+	indexOneId, indexTwoId, indexCodeOne, indexCodeTwo, err := dealIndex(sheetName, rowData, indexNameColOne, indexNameColTwo, frequency, unit, classifyId)
+	if err != nil {
+		return nil, err
+	}
+	logs.Info("indexOneId: %v, indexTwoId: %v, indexCodeOne: %v, indexCodeTwo: %v", indexOneId, indexTwoId, indexCodeOne, indexCodeTwo)
+
+	// step_3: 指标数据
+	dataList, err := dealData(indexOneId, indexTwoId, indexCodeOne, indexCodeTwo, rowData)
+	if err != nil {
+		return nil, err
+	}
+	logs.Info("dataList: %v", dataList)
+
+	return dataList, err
+}
+
 func dealData(indexOneId, indexTwoId int, indexCodeOne, indexCodeTwo string, rowData []string) ([]models.BaseFromRzdData, error) {
 	var dataList []models.BaseFromRzdData
 

+ 4 - 2
services/ruizide/processor_factory.go

@@ -6,7 +6,7 @@ import (
 )
 
 type ReportProcessor interface {
-	Process(string, string, []string) ([]models.BaseFromRzdData, error)
+	Process(string, string, int, []string) ([]models.BaseFromRzdData, error)
 }
 
 func GetProcessor(tableName string, sheetName string) (ReportProcessor, error) {
@@ -17,8 +17,10 @@ func GetProcessor(tableName string, sheetName string) (ReportProcessor, error) {
 		default:
 			return nil, fmt.Errorf("unknown sheetName: %s", sheetName)
 		}
-	} else if tableName == "豆粕" {
+	} else if tableName == "RE_Dashboard_Export" {
 		switch sheetName {
+		case "Chart1":
+			return &RoadIndexProcessor{}, nil
 		default:
 			return nil, fmt.Errorf("unknown sheetName: %s", sheetName)
 		}