123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- # coding:utf-8
- from time import sleep
- import datetime
- import requests
- import openpyxl
- from selenium import webdriver
- # 设置Chrome浏览器选项
- from selenium.common import exceptions
- from selenium.webdriver.chrome.service import Service
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- """
- 根据table的id属性和table中的某一个元素定位其在table中的位置
- table包括表头,位置坐标都是从1开始算
- tableId:table的id属性
- queryContent:需要确定位置的内容
- """
- def get_table_content(driver, tableId, queryContent):
- # 按行查询表格的数据,取出的数据是一整行,按空格分隔每一列的数据
- table_tr_list = driver.find_element(By.ID, tableId).find_elements(By.TAG_NAME, "tr")
- table_list = [] # 存放table数据
- for tr in table_tr_list: # 遍历每一个tr
- # 将每一个tr的数据根据td查询出来,返回结果为list对象
- table_td_list = tr.find_elements(By.TAG_NAME, "td")
- row_list = []
- print(table_td_list)
- for td in table_td_list: # 遍历每一个td
- row_list.append(td.text) # 取出表格的数据,并放入行列表里
- table_list.append(row_list)
- # 循环遍历table数据,确定查询数据的位置
- # for i in range(len(table_list)):
- # for j in range(len(table_list[i])):
- # if queryContent == table_list[i][j]:
- # print("%r坐标为(%r,%r)" % (queryContent, i + 1, j + 1))
- # 写入文件
- def write_excel_xlsx(path, sheet_name, value):
- index = len(value) # 列表中所含元组的个数,从而确定写入Excel的行数
- # 打开Excel
- wb = openpyxl.Workbook()
- # wb = load_workbook(path)
- sheet = wb.active # 获得一个的工作表
- sheet.title = sheet_name
- # 设置格式
- sheet.column_dimensions['B'].width = 115
- # 按行加入
- for i in range(index):
- sheet.append(value[i])
- # 保存文件
- print(sheet.values)
- wb.save(path)
- print("题目写入数据成功!")
- def send_file(url, file_path):
- with open(file_path, 'rb') as file:
- files = {'file': file}
- response2 = requests.post(url, files=files)
- return response2
- def get_element(my_driver, xpaths):
- """
- 判断是否存在元素并获取元素对象
- :param my_driver:
- :param xpaths: xpaths表达式
- :return: 元素对象或为空
- """
- try:
- target = my_driver.find_element(By.XPATH, xpaths)
- except exceptions.NoSuchElementException:
- return False
- else:
- return target
- if __name__ == "__main__":
- # 创建一个 Chrome WebDriver 实例
- options = webdriver.ChromeOptions()
- # options.add_argument("headless")
- options.add_argument('--headless')
- options.add_argument('--disable-gpu')
- options.add_argument('--no-sandbox')
- options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36')
- options.add_argument(" window-size=1920,1080")
- # options.add_argument(" window-size=1920,1080")
- # s = Service(executable_path='/home/code/python/meeting_probabilities/chromedriver')
- s = Service(executable_path='D:\download\chromedriver119-win64\chromedriver.exe')
- driver = webdriver.Chrome(service=s, options=options)
- # driver.maximize_window()
- driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
- "source": """
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- })
- """
- })
- driver.get(
- 'https://www.cmegroup.com/markets/interest-rates/cme-fedwatch-tool.html?redirect=/trading/interest-rates/countdown-to-fomc.html')
- sleep(2)
- agent = driver.execute_script("return navigator.userAgent")
- print(agent)
- text = driver.find_element(By.XPATH, '//*[@id="page_default_sections"]/div/div/div/div/div/p[1]/span')
- print("网页中段,文本内容:")
- print(text.text)
- btn_pop = get_element(driver, '/html/body/div[4]/div[3]/div/section/span')
- if btn_pop:
- btn_pop.click()
- accept_btn = get_element(driver, '//*[@id="onetrust-accept-btn-handler"]')
- if accept_btn:
- accept_btn.click()
- WebDriverWait(driver, 10).until(
- EC.visibility_of_element_located((By.XPATH, '/html/body/main/div/div[4]/div/iframe'))) # iframe是否出现
- # page_height = driver.execute_script('return document.documentElement.scrollHeight') # 页面高度
- driver.execute_script("window.scrollBy(0,{})".format(600))
- driver.switch_to.frame("cmeIframe-jtxelq2f")
- sleep(2)
- # button = driver.find_element(By.XPATH, '//*[@id="ctl00_MainContent_ucViewControl_IntegratedFedWatchTool_lbPTree"]')
- driver.execute_script(
- "javascript:__doPostBack('ctl00$MainContent$ucViewControl_IntegratedFedWatchTool$lbPTree','')")
- sleep(2)
- table = driver.find_element(By.XPATH, '//*[@id="MainContent_pnlContainer"]/div[3]/div/div/table[2]')
- table.screenshot(r'meeting.png')
- print(table.text)
- # 按行查询表格的数据,取出的数据是一整行,按空格分隔每一列的数据
- table_tr_list = table.find_elements(By.TAG_NAME, "tr")
- table_list = [] # 存放table数据
- th_flag = False
- title = 'MEETING PROBABILITIES'
- i = 0
- for tr in table_tr_list: # 遍历每一个tr
- # 将每一个tr的数据根据td查询出来,返回结果为list对象
- if i == 0:
- title = 'MEETING PROBABILITIES'
- i = i + 1
- continue
- if i == 1:
- i = i + 1
- table_th_list = tr.find_elements(By.TAG_NAME, "th")
- row_list = []
- for th in table_th_list:
- row_list.append(th.text)
- if len(row_list) == 0:
- continue
- row_tuple = tuple(row_list)
- table_list.append(row_list)
- else:
- i = i + 1
- table_td_list = tr.find_elements(By.TAG_NAME, "td")
- row_list = []
- for td in table_td_list: # 遍历每一个td
- row_list.append(td.text) # 取出表格的数据,并放入行列表里
- if len(row_list) == 0:
- continue
- row_tuple = tuple(row_list)
- table_list.append(row_list)
- driver.quit()
- # list_text = content.strip().split('\n')
- # print(list_text)
- # ls = list()
- # title = ""
- # length = len(list_text)
- # for i in range(length):
- # line = list_text[i]
- # if i == 0:
- # title = line
- # continue
- # if i == 1:
- # line = line.replace('MEETING DATE', 'MEETING_DATE')
- # dataList = line.split(' ')
- # dataList[0] = 'MEETING DATE'
- # my_tuple = tuple(dataList)
- # ls.append(my_tuple) # 以元组的形式追加进空列表
- # continue
- # dataList = line.split(' ')
- # my_tuple = tuple(dataList)
- # ls.append(my_tuple) # 以元组的形式追加进空列表
- # 获取当前时间,并将其格式化为指定的形式
- current_time = datetime.datetime.now().strftime("%Y-%m-%d")
- # 构建新的文件路径
- book_name_xlsx = f'D:\pythonProject\metting\{current_time}.xlsx'
- # book_name_xlsx = f'/Users/xi/Desktop/{current_time}.xlsx'
- write_excel_xlsx(book_name_xlsx, title, table_list)
- url = 'http://47.102.213.75:8809/v1/test/resource/upload'
- file_path = book_name_xlsx # 替换为本地文件路径
- # file_path = '/Users/xi/Desktop/2023-10-15.xlsx' # 替换为本地文件路径
- print(file_path)
- print(datetime.datetime.now())
- response = send_file(url, file_path)
- print(response)
|