123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import io, json, os
- from oauthlib.oauth2 import BackendApplicationClient
- from requests_oauthlib import OAuth2Session
- from dataclasses import dataclass
- from flask import Flask
- from flask import request
- from datetime import datetime
- import pandas as pd
- @dataclass
- class bbg_schedule_run:
- credentail_file: str
- # 获取BBG现值数据
- def get_sched_data(self, identifier):
- with io.open(self.credentail_file, encoding="utf-8") as credential_file:
- CREDENTIALS = json.load(credential_file)
- CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
- OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
- SESSION = OAuth2Session(client=CLIENT)
- SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
- URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
- rep = SESSION.get(URL, headers={'api-version': "2"})
- # rep.json()
- DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
- jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
- # return (pd.json_normalize(jdata).to_json())
- return jdata
- # 获取BBG历史数据文件
- def get_sched_history_file(self, identifier, file_name):
- with io.open(self.credentail_file, encoding="utf-8") as credential_file:
- CREDENTIALS = json.load(credential_file)
- CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
- OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
- SESSION = OAuth2Session(client=CLIENT)
- SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
- URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
- rep = SESSION.get(URL, headers={'api-version': "2"})
- DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
- jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
- df = pd.json_normalize(jdata)
- df.to_csv(file_name, index=False)
- return True
- # hug接口输出为json格式
- app = Flask(__name__)
- # 服务端口号
- server_port = 7110
- # 鉴权文件地址
- ps_credential_path = "ps-credential.txt" # 鉴权文件地址
- # 任务对应的归档目录
- task_key_dir_dict = {
- "IDpcsgDailyRunHistU2": "data/histu2", "IDpcsgDailyRunHist4": "data/hist4", "IDpcsgDailyRunHist1": "data/hist1",
- "IDpcsgDailyRunHist2": "data/hist2", "IDpcsgDailyRunHistV1": "data/histv1", "IDpcsgDailyRun4": "data/run4",
- "IDpcsgDailyRun6": "data/run6", "IDpcsgDailyRun7": "data/run7", "IDpcsgDailyRun8": "data/run8",
- "IDpcsgDailySnap0000": "data/snap0000", "IDpcsgDailySnap0330": "data/snap0330",
- "IDpcsgDailySnap0345": "data/snap0345", "IDpcsgDailyRun10": "data/day10", "IDpcsgDailyRun11": "data/run11"
- }
- @app.route('/api/bloomberg/server')
- def bloomberg_server():
- return {"code": 200, "data": "ok", "error": ""}
- @app.route('/api/bloomberg/general_data', methods=['GET'])
- def get_bloomberg_general_data():
- task_key = request.args.get('task_key', default="", type=str)
- if task_key is None or task_key == "":
- return {"code": 403, "data": "", "error": "task_key is missing"}
- print("task_key: ", task_key)
- save_dir = task_key_dir_dict[task_key]
- if dir == "":
- return {"code": 403, "data": "", "error": "save_dir is missing"}
- print("save_dir: ", save_dir)
- try:
- bbg_down = bbg_schedule_run(ps_credential_path)
- bbg_data = bbg_down.get_sched_data(task_key)
- # TEST
- # with open("data/day3/2024-06-19.json", 'r', encoding='utf-8') as file:
- # bbg_data = file.read() # 读取文件全部内容
- # print(bbg_data) # 打印文件内容
- # 写入文件留档
- write_data_to_file(bbg_data, save_dir)
- resp_data = {
- "code": 200,
- "data": bbg_data,
- "msg": "获取成功"
- }
- return json.dumps(resp_data, indent=2)
- except Exception as e:
- err_msg = str(e)
- print(err_msg)
- return {"code": 403, "data": "", "error": err_msg}
- @app.route('/api/bloomberg/down', methods=['GET'])
- def bloomberg_test():
- task_key = request.args.get('task_key', default="", type=str)
- if task_key is None or task_key == "":
- return {"code": 403, "data": "", "error": "task key is none"}
- try:
- bbg_down = bbg_schedule_run(ps_credential_path)
- today_date_str = datetime.now().strftime('%Y%m%d')
- file_name = f"dailyHistory-{task_key}-{today_date_str}.csv"
- bbg_down.get_sched_history_file(task_key, file_name)
- return {"code": 200, "data": "", "error": ""}
- except Exception as e:
- err_msg = str(e)
- print(err_msg)
- return {"code": 403, "data": "", "error": err_msg}
- # write_data_to_file 数据写入文件
- def write_data_to_file(json_data, dir_path=None):
- # 获取当前日期并格式化为YYYY-MM-DD形式
- today_date_str = datetime.now().strftime('%Y-%m-%d')
- file_name = f"{today_date_str}.json"
- # 确定输出目录
- directory = dir_path if dir_path else os.path.join("output", "data")
- # 构建完整的文件路径
- file_path = os.path.join(directory, file_name)
- # 检查目录是否存在,如果不存在则创建
- if not os.path.exists(directory):
- os.makedirs(directory)
- print(f"目录 '{directory}' 已创建。")
- # 写入JSON数据到文件
- try:
- with open(file_path, 'w', encoding='utf-8') as json_file:
- json.dump(json_data, json_file, ensure_ascii=False, indent=4)
- print(f"JSON数据已成功写入/更新至 '{file_path}'。")
- except Exception as e:
- print(f"写入文件时发生错误:{e}")
- if __name__ == "__main__":
- app.run(host='0.0.0.0', port=server_port, debug=True)
|