import io, json, os from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session from dataclasses import dataclass from flask import Flask from flask import request from datetime import datetime import pandas as pd @dataclass class bbg_schedule_run: credentail_file: str # 获取BBG现值数据 def get_sched_data(self, identifier): with io.open(self.credentail_file, encoding="utf-8") as credential_file: CREDENTIALS = json.load(credential_file) CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id']) OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2' SESSION = OAuth2Session(client=CLIENT) SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret']) URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier rep = SESSION.get(URL, headers={'api-version': "2"}) # rep.json() DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key'] jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json() # return (pd.json_normalize(jdata).to_json()) return jdata # 获取BBG历史数据文件 def get_sched_history_file(self, identifier, file_name): with io.open(self.credentail_file, encoding="utf-8") as credential_file: CREDENTIALS = json.load(credential_file) CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id']) OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2' SESSION = OAuth2Session(client=CLIENT) SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret']) URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier rep = SESSION.get(URL, headers={'api-version': "2"}) DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key'] jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json() df = pd.json_normalize(jdata) df.to_csv(file_name, index=False) return True # hug接口输出为json格式 app = Flask(__name__) # 服务端口号 server_port = 7110 # 鉴权文件地址 ps_credential_path = "ps-credential.txt" # 鉴权文件地址 # 任务对应的归档目录 task_key_dir_dict = { "IDpcsgDailyRunHistU2": "data/histu2", "IDpcsgDailyRunHist4": "data/hist4", "IDpcsgDailyRunHist1": "data/hist1", "IDpcsgDailyRunHist2": "data/hist2", "IDpcsgDailyRunHistV1": "data/histv1", "IDpcsgDailyRun4": "data/run4", "IDpcsgDailyRun6": "data/run6", "IDpcsgDailyRun7": "data/run7", "IDpcsgDailyRun8": "data/run8", "IDpcsgDailySnap0000": "data/snap0000", "IDpcsgDailySnap0330": "data/snap0330", "IDpcsgDailySnap0345": "data/snap0345", "IDpcsgDailyRun10": "data/day10", "IDpcsgDailyRun11": "data/run11" } @app.route('/api/bloomberg/server') def bloomberg_server(): return {"code": 200, "data": "ok", "error": ""} @app.route('/api/bloomberg/general_data', methods=['GET']) def get_bloomberg_general_data(): task_key = request.args.get('task_key', default="", type=str) if task_key is None or task_key == "": return {"code": 403, "data": "", "error": "task_key is missing"} print("task_key: ", task_key) save_dir = task_key_dir_dict[task_key] if dir == "": return {"code": 403, "data": "", "error": "save_dir is missing"} print("save_dir: ", save_dir) try: bbg_down = bbg_schedule_run(ps_credential_path) bbg_data = bbg_down.get_sched_data(task_key) # TEST # with open("data/day3/2024-06-19.json", 'r', encoding='utf-8') as file: # bbg_data = file.read() # 读取文件全部内容 # print(bbg_data) # 打印文件内容 # 写入文件留档 write_data_to_file(bbg_data, save_dir) resp_data = { "code": 200, "data": bbg_data, "msg": "获取成功" } return json.dumps(resp_data, indent=2) except Exception as e: err_msg = str(e) print(err_msg) return {"code": 403, "data": "", "error": err_msg} @app.route('/api/bloomberg/down', methods=['GET']) def bloomberg_test(): task_key = request.args.get('task_key', default="", type=str) if task_key is None or task_key == "": return {"code": 403, "data": "", "error": "task key is none"} try: bbg_down = bbg_schedule_run(ps_credential_path) today_date_str = datetime.now().strftime('%Y%m%d') file_name = f"dailyHistory-{task_key}-{today_date_str}.csv" bbg_down.get_sched_history_file(task_key, file_name) return {"code": 200, "data": "", "error": ""} except Exception as e: err_msg = str(e) print(err_msg) return {"code": 403, "data": "", "error": err_msg} # write_data_to_file 数据写入文件 def write_data_to_file(json_data, dir_path=None): # 获取当前日期并格式化为YYYY-MM-DD形式 today_date_str = datetime.now().strftime('%Y-%m-%d') file_name = f"{today_date_str}.json" # 确定输出目录 directory = dir_path if dir_path else os.path.join("output", "data") # 构建完整的文件路径 file_path = os.path.join(directory, file_name) # 检查目录是否存在,如果不存在则创建 if not os.path.exists(directory): os.makedirs(directory) print(f"目录 '{directory}' 已创建。") # 写入JSON数据到文件 try: with open(file_path, 'w', encoding='utf-8') as json_file: json.dump(json_data, json_file, ensure_ascii=False, indent=4) print(f"JSON数据已成功写入/更新至 '{file_path}'。") except Exception as e: print(f"写入文件时发生错误:{e}") if __name__ == "__main__": app.run(host='0.0.0.0', port=server_port, debug=True)