package main import ( "context" "encoding/json" "eta/eta_data_analysis/models" "eta/eta_data_analysis/utils" "fmt" "github.com/chromedp/cdproto/cdp" "github.com/xuri/excelize/v2" "io" "log" "os" "path/filepath" "strings" "time" "github.com/chromedp/chromedp" ) // 定义选择器 var ( downloadDir = "D:\\download" defaultDir = "C:\\Users\\Guo Mengyuan\\Downloads" rzdLoginPath = "https://clients.rystadenergy.com/clients/" rzdBatchSize = 500 clientSearchLink = `div.d-none.d-lg-flex.flex-grow-1 a[href="/clients/search/"]` clientsCubeDashboardsLink = `div.d-none.d-lg-flex.flex-grow-1 a[href="/clients/cube-dashboards/"]` supplyRevisionAnalysisSelector = `div.ais-Hits li h5:contains("Supply Revision Analysis")` oilDemandAnalysisSelector = `div.ais-Hits li[contains(., 'Oil Demand Analysis')]` oilSupplyAnalysisSelector = `div.ais-Hits li[contains(., 'Oil Supply Analysis')]` dateSlicerInputSelector = `div.visualContainer.unselectable.readMode.hideBorder.visualHeaderBelow.droppableElement.ui-droppable div.date-slicer-control input.date-slicer-input.enable-hover` downloadButtonSelector = `div.btn.btn-link.btn-sm.dashboard-action.dashboard-action--download-data` oilDemandIframeSelector = `div#WithPollingInFrame iframe` // 根据实际 iframe 的选择器更新 tabSelectorBase = `h3.preTextWithEllipsis` // H3 标签中每个标签的基础选择器 ) // 函数用于点击下载按钮 func clickDownload(ctx context.Context) error { return chromedp.Run(ctx, chromedp.Click(downloadButtonSelector, chromedp.ByQuery)) } // 处理数据下载的步骤 func downloadData(ctx context.Context) error { // Analytics Library if err := chromedp.Run(ctx, chromedp.Sleep(5*time.Second), // 考虑移除这一行,如果不必要的话 chromedp.Navigate(rzdLoginPath), chromedp.WaitVisible(`div.d-none.d-lg-flex.flex-grow-1`, chromedp.ByQuery), chromedp.Click(clientSearchLink, chromedp.ByQuery), chromedp.WaitVisible(`input[class="ais-SearchBox-input rounded border py-2 px-3 shadow-sm font-size-14 w-100"]`, chromedp.ByQuery), chromedp.SetValue(`input[class="ais-SearchBox-input rounded border py-2 px-3 shadow-sm font-size-14 w-100"]`, "oil demand signals weekly report", chromedp.ByQuery), //chromedp.Click(`div.ais-InfiniteHits li a:has(img[src="/Static/img/icons/xls.png"])`, chromedp.ByQuery), ); err != nil { return fmt.Errorf("下载 Analytics Library 数据错误: %v", err) } xpath := `//div[@id='search-page-hits']//li//a[.//div//span[@class='align-middle' and text()='Data']]` var inputCount int var nodes []*cdp.Node // 使用 *cdp.Node if err := chromedp.Run(ctx, chromedp.ActionFunc(func(ctx context.Context) error { // 获取匹配的节点 if err := chromedp.Nodes(xpath, &nodes, chromedp.BySearch).Do(ctx); err != nil { return fmt.Errorf("检查节点失败: %v", err) } // 获取节点数量 inputCount = len(nodes) fmt.Printf("找到 %d 个匹配的元素\n", inputCount) if inputCount > 0 { // 点击第一个节点 return chromedp.MouseClickNode(nodes[0]).Do(ctx) // 使用 []cdp.NodeID } return nil }), chromedp.Sleep(10*time.Second), ); err != nil { return fmt.Errorf("下载 Analytics Library 数据错误: %v", err) } // 解析文件移动到目标目录 if err := waitAndRenameDownloadedFile("Oil_Demand_Signals_Weekly_Report_"+utils.GetCurrentYearMonth()+".xlsx", downloadDir); err != nil { return err } // Cube Dashboards: Supply Revision Analysis if err := chromedp.Run(ctx, chromedp.WaitVisible(`div.d-none.d-lg-flex.flex-grow-1`, chromedp.ByQuery), chromedp.Click(clientsCubeDashboardsLink, chromedp.ByQuery), chromedp.Sleep(5*time.Second), chromedp.WaitVisible(`div.ais-Hits`, chromedp.ByQuery), chromedp.ActionFunc(func(ctx context.Context) error { var elements []string // 获取所有 h5 标签的文本内容 if err := chromedp.Evaluate(`Array.from(document.querySelectorAll('div.ais-Hits li h5.text-body.overflow-hidden.mb-1.mr-3.font-weight-bold.line-height-1.dashboards-hit__name')).map(h => h.textContent)`, &elements).Do(ctx); err != nil { return err } // 遍历文本,查找完全匹配的元素并点击 for i, text := range elements { if strings.Contains(text, "Supply Revision Analysis") { // 构造选择器,点击找到的匹配元素 selector := fmt.Sprintf(`div.ais-Hits ol li:nth-child(%d) h5.text-body.overflow-hidden.mb-1.mr-3.font-weight-bold.line-height-1.dashboards-hit__name`, i+2) if err := chromedp.Click(selector, chromedp.ByQuery).Do(ctx); err != nil { return fmt.Errorf("点击 'Supply Revision Analysis' 失败: %v", err) } break // 找到后跳出循环 } } return nil }), ); err != nil { return err } if err := clickDownload(ctx); err != nil { return err } if err := waitAndRenameDownloadedFile("Supply_Revision_Analysis_2020.xlsx", downloadDir); err != nil { return err } // Oil Supply Analysis if err := chromedp.Run(ctx, chromedp.Click(`a[href="/clients/subscription/"]`, chromedp.ByQuery), chromedp.Click(oilSupplyAnalysisSelector, chromedp.ByQuery), ); err != nil { return err } if err := clickDownload(ctx); err != nil { return err } if err := waitAndRenameDownloadedFile("Oil_Supply_Analysis_2010.xlsx", downloadDir); err != nil { return err } return nil } // 等待下载文件并重命名 func waitAndRenameDownloadedFile(newFileName, targetDir string) error { // 等待一段时间以确保文件下载完成 time.Sleep(60 * time.Second) // 可能需要根据实际情况调整 // 查找下载目录中的文件 files, err := filepath.Glob(filepath.Join(defaultDir, "*.xlsx")) if err != nil { return fmt.Errorf("查找文件时出错: %v", err) } // 如果没有找到文件,返回错误 if len(files) == 0 { return fmt.Errorf("未找到任何下载的文件") } // 找到最新的文件 var latestFile string var latestTime time.Time for _, file := range files { info, err := os.Stat(file) if err != nil { return fmt.Errorf("获取文件信息时出错: %v", err) } if info.ModTime().After(latestTime) { latestTime = info.ModTime() latestFile = file } } // 目标文件的完整路径 targetFilePath := filepath.Join(targetDir, newFileName) // 重命名并移动到目标目录 if latestFile != "" { if err := moveFile(latestFile, targetFilePath); err != nil { return fmt.Errorf("重命名文件时出错: %v", err) } // 打印重命名后的文件名 fmt.Printf("文件重命名并移动到: %s\n", targetFilePath) } return nil } func moveFile(source, destination string) error { // 复制文件 srcFile, err := os.Open(source) if err != nil { return fmt.Errorf("打开源文件时出错: %v", err) } defer srcFile.Close() dstFile, err := os.Create(destination) if err != nil { return fmt.Errorf("创建目标文件时出错: %v", err) } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { return fmt.Errorf("复制文件时出错: %v", err) } time.Sleep(60 * time.Second) // 删除源文件 if err := os.Remove(source); err != nil { return fmt.Errorf("删除源文件时出错: %v", err) } return nil } // 解析网页数据,下载文件 // func main() { func resolverNet() { // 创建下载目录 if err := os.MkdirAll(downloadDir, os.ModePerm); err != nil { fmt.Printf("创建下载目录时出错: %v\n", err) return } // 创建 chromedp 执行上下文 options := []chromedp.ExecAllocatorOption{ chromedp.Flag("headless", false), chromedp.Flag("disable-blink-features", "AutomationControlled"), chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`), // 设置了但并不生效,直接从默认下载路径读取过来 //chromedp.Flag("download.default_directory", downloadDir), //chromedp.Flag("download.prompt_for_download", false), // 不弹出下载对话框 chromedp.Flag("safebrowsing.enabled", true), // 启用安全浏览 //chromedp.UserDataDir(filepath.Join(downloadDir, "user-data")), } allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...) defer cancel() ctx, cancel := chromedp.NewContext(allocCtx) defer cancel() // 启动 Chrome 实例 if err := chromedp.Run(ctx); err != nil { fmt.Printf("启动 Chrome 实例时出错: %v\n", err) return } // 设置下载行为 /*if err := setDownloadBehavior(ctx); err != nil { fmt.Printf("设置下载路径时出错: %v\n", err) return }*/ // 登录操作 if err := login(ctx); err != nil { fmt.Printf("登录错误: %v\n", err) return } // 下载数据 if err := downloadData(ctx); err != nil { fmt.Printf("数据下载错误: %v\n", err) return } fmt.Println("数据下载完成") } // 解析本地文件 // func fileResolver() { func main() { var tableNameList = []string{ //"Oil_Demand_Signals_Weekly_Report", "Supply_Revision_Analysis", /*"Oil_Market_Cube_Upstream_Supply_Oil_Quality_Api", "Oil_Market_Cube_Upstream_Supply_Oil_Quality_Sulphur", "Oil_Market_Cube_Upstream_Supply_Capacity_Capacity", "Oil_Market_Cube_Upstream_Supply_Production", "Oil_Market_Cube_Upstream_Supply_Production_Wo_Seasonality", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Reference_Production", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Target_Production", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Target_Cut", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Actual_Cut", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Compliance", "Oil_Market_Cube_Upstream_Supply_OPEC_Policy_Production_Subject_To_Cut", "Oil_Market_Cube_Products_Demand_Products_Demand_Mean", "Oil_Market_Cube_Products_Demand_Products_Demand+Sigma", "Oil_Market_Cube_Products_Demand_Products_Demand-Sigma", "Oil_Market_Cube_Balances_Total_Liquids_Balances", "Oil_Market_Cube_Geography_Latitude", "Oil_Market_Cube_Geography_Longitude",*/ } for _, tableName := range tableNameList { var fileName string // 解析表格 fileName = tableName + "_" + utils.GetCurrentYearMonth() + ".xlsx" filePath := filepath.Join(downloadDir, fileName) // 打开 Excel 文件 f, err := excelize.OpenFile(filePath) if err != nil { log.Fatalf("无法打开 Excel 文件: %v", err) } // 获取所有工作表 sheetNames := f.GetSheetList() for _, sheetName := range sheetNames { fmt.Printf("读取工作表: %s\n", sheetName) /*if strings.Contains(sheetName, "Content") { continue } if strings.Contains(sheetName, "Road Index") { continue } if strings.Contains(sheetName, "Road Active Fleet") { continue } if strings.Contains(sheetName, "Aviation Index") { continue } if strings.Contains(sheetName, "Aviation Active Fleet") { continue } if strings.Contains(sheetName, "Demand - Gasoline") { continue } if strings.Contains(sheetName, "Demand - Diesel") { continue } if strings.Contains(sheetName, "Demand - Jet Fuel") { continue } if strings.Contains(sheetName, "Demand - Maritime Bunker") { continue }*/ if strings.Contains(sheetName, "Chart1") { continue } if strings.Contains(sheetName, "Chart2") { continue } if strings.Contains(sheetName, "Chart3") { continue } if strings.Contains(sheetName, "Chart4") { continue } if strings.Contains(sheetName, "Chart5") { continue } // 获取工作表的最大行数 maxRow, err := f.GetRows(sheetName) // 直接获取所有行数据 if err != nil { log.Fatalf("获取工作表数据时出错: %v", err) continue } // 遍历行并打印内容 indexData := []models.BaseFromRzdData{} for rowIndex, rowData := range maxRow { processor, err := GetProcessor(tableName, sheetName) if err != nil { continue } baseFromLyDataList, err := processor.Process(tableName, sheetName, rowIndex, rowData) if err != nil { log.Printf("processor.Process err: %v", err) return } indexData = append(indexData, baseFromLyDataList...) } // 新增数据源指标数据 if len(indexData) > 0 { for i := 0; i < len(indexData); i += rzdBatchSize { // 计算当前批次的结束索引 end := i + rzdBatchSize if end > len(indexData) { end = len(indexData) } // 获取当前批次的数据 batchData := indexData[i:end] // 转换成json marshal, err := json.Marshal(batchData) if err != nil { log.Printf("json.Marshal err: %v", err) return } // 发送 HTTP POST 请求 _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_DATA, string(marshal), "application/json") if err != nil { log.Printf("postEdbLib err: %v", err) return } } } // 新增指标库数据 edbDataList := []models.EdbDataRzd{} for _, index := range indexData { // 补充 判断是否存在于指标库 paramsLib := make(map[string]interface{}) paramsLib["IndexCode"] = index.IndexCode paramsLib["Source"] = utils.DATA_SOURCE_RZD postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_EDB_INFO_BY_INDEX_CODE) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) continue } var requestResponse models.RequestResponse[models.EdbInfo] err = json.Unmarshal(postEdbLib, &requestResponse) if err != nil { log.Printf("postEdbLib err: %v", err) continue } if requestResponse.Data.EdbInfoId != 0 { edbDataRzd := models.EdbDataRzd{ CreateTime: utils.GetCurrentTime(), ModifyTime: utils.GetCurrentTime(), EdbInfoId: index.BaseFromRzdIndexId, EdbCode: index.IndexCode, DataTime: index.DataTime, Value: index.Value, DataTimestamp: uint64(time.Now().UnixNano() / int64(time.Millisecond)), } edbDataList = append(edbDataList, edbDataRzd) } } if len(edbDataList) > 0 { // 转换成json marshal, err := json.Marshal(edbDataList) if err != nil { log.Printf("postEdbLib err: %v", err) return } _, err = utils.HttpPostRequest(utils.EDB_LIB_URL+utils.ADD_BATCH_RZD_EDB_DATA, string(marshal), "application/json") if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return } } } } } func login(ctx context.Context) error { return chromedp.Run(ctx, chromedp.Navigate(rzdLoginPath), chromedp.SetValue(`input[id="Username"]`, utils.RZD_USERNAME, chromedp.ByQuery), chromedp.SetValue(`input[id="Password"]`, utils.RZD_PASSWORD, chromedp.ByQuery), chromedp.WaitEnabled(`//button[text()='Login']`, chromedp.BySearch), chromedp.Click(`//button[text()='Login']`, chromedp.BySearch), chromedp.Sleep(5*time.Second), // 等待并点击登录后页面的链接 chromedp.WaitVisible(`a[href="/clients/"]`, chromedp.ByQuery), // 等待 Analytics Library 链接可见 chromedp.Sleep(5*time.Second), // 等待页面加载完成 ) } func httpRequestFill(data interface{}, urlMethod string) (postEdbLib []byte, err error) { // 转换成json marshal, err := json.Marshal(data) if err != nil { return nil, err } // json 转 interface var result map[string]interface{} err = json.Unmarshal(marshal, &result) if err != nil { return nil, err } postEdbLib, err = utils.PostEdbLibRequest(result, urlMethod) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return nil, err } return postEdbLib, nil }