package ruizide import ( "context" "encoding/json" "eta/eta_data_analysis/models" "eta/eta_data_analysis/utils" "fmt" "github.com/chromedp/cdproto/cdp" "github.com/xuri/excelize/v2" "io" "log" "os" "path/filepath" "time" "github.com/chromedp/chromedp" ) // 定义选择器 var ( downloadDir = utils.RZD_EXCEL_PATH defaultDir = utils.RZD_DOWNLOAD_PATH rzdLoginPath = "https://clients.rystadenergy.com/clients/" rzdBatchSize = 500 clientSearchLink = `div.d-none.d-lg-flex.flex-grow-1 a[href="/clients/search/"]` downloadButtonSelector = `div.btn.btn-link.btn-sm.dashboard-action.dashboard-action--download-data` ) // 处理数据下载的步骤 func downloadData(ctx context.Context) error { // Analytics Library if err := chromedp.Run(ctx, chromedp.Sleep(5*time.Second), // 考虑移除这一行,如果不必要的话 chromedp.Navigate(rzdLoginPath), chromedp.WaitVisible(`div.d-none.d-lg-flex.flex-grow-1`, chromedp.ByQuery), chromedp.Click(clientSearchLink, chromedp.ByQuery), chromedp.WaitVisible(`input[class="ais-SearchBox-input rounded border py-2 px-3 shadow-sm font-size-14 w-100"]`, chromedp.ByQuery), chromedp.SetValue(`input[class="ais-SearchBox-input rounded border py-2 px-3 shadow-sm font-size-14 w-100"]`, "oil demand signals weekly report", chromedp.ByQuery), chromedp.Sleep(10*time.Second), //chromedp.Click(`div.ais-InfiniteHits li a:has(img[src="/Static/img/icons/xls.png"])`, chromedp.ByQuery), ); err != nil { return fmt.Errorf("下载 Analytics Library 数据错误: %v", err) } xpath := `//div[@id='search-page-hits']//li//a[.//div//span[@class='align-middle' and text()='Data']]` var inputCount int var nodes []*cdp.Node // 使用 *cdp.Node if err := chromedp.Run(ctx, chromedp.ActionFunc(func(ctx context.Context) error { // 获取匹配的节点 if err := chromedp.Nodes(xpath, &nodes, chromedp.BySearch).Do(ctx); err != nil { return fmt.Errorf("检查节点失败: %v", err) } // 获取节点数量 inputCount = len(nodes) fmt.Printf("找到 %d 个匹配的元素\n", inputCount) if inputCount > 0 { // 点击第一个节点 return chromedp.MouseClickNode(nodes[0]).Do(ctx) // 使用 []cdp.NodeID } return nil }), chromedp.Sleep(10*time.Second), ); err != nil { return fmt.Errorf("下载 Analytics Library 数据错误: %v", err) } // 解析文件移动到目标目录 if err := waitAndRenameDownloadedFile("Oil_Demand_Signals_Weekly_Report_"+utils.GetCurrentYearMonth()+".xlsx", downloadDir); err != nil { return err } return nil } // 等待下载文件并重命名 func waitAndRenameDownloadedFile(newFileName, targetDir string) error { // 等待一段时间以确保文件下载完成 time.Sleep(100 * time.Second) // 可能需要根据实际情况调整 // 查找下载目录中的文件 files, err := filepath.Glob(filepath.Join(defaultDir, "*.xlsx")) if err != nil { return fmt.Errorf("查找文件时出错: %v", err) } // 如果没有找到文件,返回错误 if len(files) == 0 { return fmt.Errorf("未找到任何下载的文件") } // 找到最新的文件 var latestFile string var latestTime time.Time for _, file := range files { info, err := os.Stat(file) if err != nil { return fmt.Errorf("获取文件信息时出错: %v", err) } if info.ModTime().After(latestTime) { latestTime = info.ModTime() latestFile = file } } // 目标文件的完整路径 targetFilePath := filepath.Join(targetDir, newFileName) // 重命名并移动到目标目录 if latestFile != "" { if err := moveFile(latestFile, targetFilePath); err != nil { return fmt.Errorf("重命名文件时出错: %v", err) } // 打印重命名后的文件名 fmt.Printf("文件重命名并移动到: %s\n", targetFilePath) } return nil } func moveFile(source, destination string) error { // 复制文件 srcFile, err := os.Open(source) if err != nil { return fmt.Errorf("打开源文件时出错: %v", err) } defer srcFile.Close() dstFile, err := os.Create(destination) if err != nil { return fmt.Errorf("创建目标文件时出错: %v", err) } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { return fmt.Errorf("复制文件时出错: %v", err) } time.Sleep(60 * time.Second) // 删除源文件 /*if err := os.Remove(source); err != nil { return fmt.Errorf("删除源文件时出错: %v", err) }*/ return nil } // 解析网页数据,下载文件 // func main() { func ResolverNet(cont context.Context) (err error) { // 创建下载目录 if err := os.MkdirAll(downloadDir, os.ModePerm); err != nil { fmt.Printf("创建下载目录时出错: %v\n", err) return nil } // 创建 chromedp 执行上下文 options := []chromedp.ExecAllocatorOption{ chromedp.Flag("headless", false), chromedp.Flag("disable-blink-features", "AutomationControlled"), chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`), // 设置了但并不生效,直接从默认下载路径读取过来 //chromedp.Flag("download.default_directory", downloadDir), //chromedp.Flag("download.prompt_for_download", false), // 不弹出下载对话框 chromedp.Flag("safebrowsing.enabled", true), // 启用安全浏览 //chromedp.UserDataDir(filepath.Join(downloadDir, "user-data")), } allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...) defer cancel() ctx, cancel := chromedp.NewContext(allocCtx) defer cancel() // 启动 Chrome 实例 if err := chromedp.Run(ctx); err != nil { fmt.Printf("启动 Chrome 实例时出错: %v\n", err) return nil } // 设置下载行为 /*if err := setDownloadBehavior(ctx); err != nil { fmt.Printf("设置下载路径时出错: %v\n", err) return }*/ // 登录操作 if err := login(ctx); err != nil { fmt.Printf("登录错误: %v\n", err) return nil } // 下载数据 if err := downloadData(ctx); err != nil { fmt.Printf("数据下载错误: %v\n", err) return nil } fmt.Println("数据下载完成") // 解析表格 读取数据 fileResolver() return nil } // 解析本地文件 func fileResolver() { //func main() { var tableNameList = []string{ "Oil_Demand_Signals_Weekly_Report", "Oil_Supply_Analysis", "Supply_Revision_Analysis", "Oil_Market_Cube_Upstream_Supply_Oil_Quality_Api", "Oil_Market_Cube_Upstream_Supply_Oil_Quality_Sulphur", "Oil_Market_Cube_Upstream_Supply_Capacity_Capacity", "Oil_Market_Cube_Upstream_Supply_Production", "Oil_Market_Cube_Upstream_Supply_Production_Wo_Seasonality", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Reference_Production", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Target_Production", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Target_Cut", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Actual_Cut", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Compliance", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Production_Subject_To_Cut", "Oil_Market_Cube_Products_Demand_Products_Demand_Mean", "Oil_Market_Cube_Products_Demand_Products_Demand+Sigma", "Oil_Market_Cube_Products_Demand_Products_Demand-Sigma", "Oil_Market_Cube_Balances_Total_Liquids_Balances", "Oil_Market_Cube_Geography_Latitude", "Oil_Market_Cube_Geography_Longitude", "Oil_Demand_Analysis_Product_Detail", "Oil_Demand_Analysis_Region", "Oil_Demand_Analysis_Scenario", "Oil_Demand_Analysis_Continent", "Oil_Demand_Analysis_Country", "Oil_Demand_Analysis_Product_Category", "Oil_Demand_Analysis_Sector_Category", "Oil_Demand_Analysis_Sector_Detail", } for _, tableName := range tableNameList { var fileName string // 解析表格 fileName = tableName + "_" + utils.GetCurrentYearMonth() + ".xlsx" filePath := filepath.Join(downloadDir, fileName) // 打开 Excel 文件 f, err := excelize.OpenFile(filePath) if err != nil { log.Fatalf("无法打开 Excel 文件: %v", err) } // 获取所有工作表 sheetNames := f.GetSheetList() for _, sheetName := range sheetNames { fmt.Printf("读取工作表: %s\n", sheetName) // 获取工作表的最大行数 maxRow, err := f.GetRows(sheetName) // 直接获取所有行数据 if err != nil { log.Fatalf("获取工作表数据时出错: %v", err) continue } // 遍历行并打印内容 indexData := []models.BaseFromRzdData{} for rowIndex, rowData := range maxRow { // 因为excel文件中的sheet表格不固定 对于 Supply_Revision_Analysis, Oil_Supply_Analysis 文件 手动调整sheet表格顺序 if tableName == "Supply_Revision_Analysis" && rowIndex == 0 { if rowData[0] == "YearQuarter" && rowData[1] == "Revision" && rowData[2] == "CountryRevisionGroup" { sheetName = "Chart1" } if rowData[0] == "YearQuarter" && rowData[1] == "Current" && rowData[2] == "Previous" { sheetName = "Chart2" } if rowData[0] == "Year" && rowData[1] == "Revision" && rowData[2] == "CountryRevisionGroup" { sheetName = "Chart3" } if rowData[0] == "Year" && rowData[1] == "Current" && rowData[2] == "Previous" { sheetName = "Chart4" } if rowData[0] == "Previous" && rowData[1] == "Current" && rowData[2] == "YearMonth" { sheetName = "Chart5" } if rowData[0] == "YearMonth" && rowData[1] == "CountryRevisionGroup" && rowData[2] == "Revision" { sheetName = "Chart6" } } else if tableName == "Oil_Supply_Analysis" && rowIndex == 0 { if rowData[0] == "Viz Date" && rowData[1] == "OilAndGasCategory" && rowData[2] == "supply_kbbld" { sheetName = "Chart1" } if rowData[0] == "Viz Date" && rowData[1] == "supply_kbbld" && rowData[2] == "Region" { sheetName = "Chart2" } if rowData[0] == "Viz Date" && rowData[1] == "CapacityDetail" && rowData[2] == "Capacity_kbbld" { sheetName = "Chart3" } if rowData[0] == "Viz Date" && rowData[1] == "Oil Classification Group" && rowData[2] == "supply_kbbld" { sheetName = "Chart4" } } processor, err := GetProcessor(tableName, sheetName) if err != nil { continue } baseFromLyDataList, err := processor.Process(tableName, sheetName, rowIndex, rowData) if err != nil { log.Printf("processor.Process err: %v", err) return } indexData = append(indexData, baseFromLyDataList...) } // 新增数据源指标数据 if len(indexData) > 0 { for i := 0; i < len(indexData); i += rzdBatchSize { // 计算当前批次的结束索引 end := i + rzdBatchSize if end > len(indexData) { end = len(indexData) } // 获取当前批次的数据 batchData := indexData[i:end] // 转换成json marshal, err := json.Marshal(batchData) if err != nil { log.Printf("json.Marshal err: %v", err) return } // 发送 HTTP POST 请求 _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_DATA, string(marshal), "application/json") if err != nil { log.Printf("postEdbLib err: %v", err) return } } } // 新增指标库数据 edbDataList := []models.EdbDataRzd{} for _, index := range indexData { // 补充 判断是否存在于指标库 paramsLib := make(map[string]interface{}) paramsLib["IndexCode"] = index.IndexCode paramsLib["Source"] = utils.DATA_SOURCE_RZD postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_EDB_INFO_BY_INDEX_CODE) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) continue } var requestResponse models.RequestResponse[models.EdbInfo] err = json.Unmarshal(postEdbLib, &requestResponse) if err != nil { log.Printf("postEdbLib err: %v", err) continue } if requestResponse.Data.EdbInfoId != 0 { edbDataRzd := models.EdbDataRzd{ CreateTime: utils.GetCurrentTime(), ModifyTime: utils.GetCurrentTime(), EdbInfoId: index.BaseFromRzdIndexId, EdbCode: index.IndexCode, DataTime: index.DataTime, Value: index.Value, DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)), } edbDataList = append(edbDataList, edbDataRzd) } } if len(edbDataList) > 0 { // 转换成json marshal, err := json.Marshal(edbDataList) if err != nil { log.Printf("postEdbLib err: %v", err) return } _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_EDB_DATA, string(marshal), "application/json") if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return } } } } } func login(ctx context.Context) error { return chromedp.Run(ctx, chromedp.Navigate(rzdLoginPath), chromedp.SetValue(`input[id="Username"]`, utils.RZD_USERNAME, chromedp.ByQuery), chromedp.SetValue(`input[id="Password"]`, utils.RZD_PASSWORD, chromedp.ByQuery), chromedp.WaitEnabled(`//button[text()='Login']`, chromedp.BySearch), chromedp.Click(`//button[text()='Login']`, chromedp.BySearch), chromedp.Sleep(5*time.Second), // 等待并点击登录后页面的链接 chromedp.WaitVisible(`a[href="/clients/"]`, chromedp.ByQuery), // 等待 Analytics Library 链接可见 chromedp.Sleep(5*time.Second), // 等待页面加载完成 ) } func httpRequestFill(data interface{}, urlMethod string) (postEdbLib []byte, err error) { // 转换成json marshal, err := json.Marshal(data) if err != nil { return nil, err } // json 转 interface var result map[string]interface{} err = json.Unmarshal(marshal, &result) if err != nil { return nil, err } postEdbLib, err = utils.PostEdbLibRequest(result, urlMethod) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return nil, err } return postEdbLib, nil }