import io, json, os from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session from dataclasses import dataclass from flask import Flask from datetime import datetime @dataclass class bbg_schedule_run: credentail_file: str def get_sched_data(self, identifier): with io.open(self.credentail_file, encoding="utf-8") as credential_file: CREDENTIALS = json.load(credential_file) CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id']) OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2' SESSION = OAuth2Session(client=CLIENT) SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret']) URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier rep = SESSION.get(URL, headers={'api-version': "2"}) # rep.json() DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key'] jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json() # return (pd.json_normalize(jdata).to_json()) return jdata # hug接口输出为json格式 app = Flask(__name__) # 服务端口号 server_port = 7110 # 鉴权文件地址 ps_credential_path = "ps-credential.txt" # 鉴权文件地址 # 日度 # daily_price_sched_key = "IDpcsgDailyType6Price" # 日度Price任务key(第一版) # daily_general_sched_key = "IDpcsgDailyGeneral" # 日度general任务key(第一版) # daily_data_price_dir = "data/day/price" # 日度Price数据文件夹 # daily_data_general_dir = "data/day/general" # 日度Price数据文件夹 daily_sched_key = "IDpcsgDailyRun2" # 日度任务key daily_data_dir = "data/day2" # 日度数据文件夹 # 周度 # weekly_sched_key = "IDpcsgWeeklyRunETA223" # 周度任务key(暂停) # weekly_data_dir = "data/week" # 月度 # month_sched_key = "IDpcsgMonthRun" # 月度任务key(第一版) month_sched_key = "IDpcsgMonthRun2" # 月度任务key monthly_data_dir = "data/month2" # 月度数据文件夹 @app.route('/api/bloomberg/server') def bloomberg_server(): return {"code": 200, "data": "ok", "error": ""} @app.route('/api/bloomberg/daily_data', methods=['POST']) def get_bloomberg_daily_data(): try: # bbg_down = bbg_schedule_run(ps_credential_path) # df_daily_price = bbg_down.get_sched_data(daily_price_sched_key) # df_daily_general = bbg_down.get_sched_data(daily_general_sched_key) # # # 写入文件留档 # write_data_to_file(df_daily_price, daily_data_price_dir) # write_data_to_file(df_daily_general, daily_data_general_dir) # # resp_data = { # "code": 200, # "data": { # "price_data": df_daily_price, # "general_data": df_daily_general # }, # "msg": "获取成功" # } # return json.dumps(resp_data, indent=2) bbg_down = bbg_schedule_run(ps_credential_path) df_daily = bbg_down.get_sched_data(daily_sched_key) # 写入文件留档 write_data_to_file(df_daily, daily_data_dir) resp_data = { "code": 200, "data": df_daily, "msg": "获取成功" } return json.dumps(resp_data, indent=2) except Exception as e: err_msg = str(e) print(err_msg) return {"code": 403, "data": "", "error": err_msg} # 周度任务暂关闭 # @app.route('/api/bloomberg/weekly_data', methods=['POST']) # def get_bloomberg_weekly_data(): # try: # bbg_down = bbg_schedule_run(ps_credential_path) # df_weekly = bbg_down.get_sched_data(weekly_sched_key) # # # 写入文件留档 # write_data_to_file(df_weekly, weekly_data_dir) # # resp_data = { # "code": 200, # "data": df_weekly, # "msg": "获取成功" # } # return json.dumps(resp_data, indent=2) # except Exception as e: # err_msg = str(e) # print(err_msg) # return {"code": 403, "data": "", "error": err_msg} @app.route('/api/bloomberg/monthly_data', methods=['POST']) def get_bloomberg_monthly_data(): try: bbg_down = bbg_schedule_run(ps_credential_path) df_monthly = bbg_down.get_sched_data(month_sched_key) # 写入文件留档 write_data_to_file(df_monthly, monthly_data_dir) resp_data = { "code": 200, "data": df_monthly, "msg": "获取成功" } return json.dumps(resp_data, indent=2) except Exception as e: err_msg = str(e) print(err_msg) return {"code": 403, "data": "", "error": err_msg} # write_data_to_file 数据写入文件 def write_data_to_file(json_data, dir_path=None): # 获取当前日期并格式化为YYYY-MM-DD形式 today_date_str = datetime.now().strftime('%Y-%m-%d') file_name = f"{today_date_str}.json" # 确定输出目录 directory = dir_path if dir_path else os.path.join("output", "data") # 构建完整的文件路径 file_path = os.path.join(directory, file_name) # 检查目录是否存在,如果不存在则创建 if not os.path.exists(directory): os.makedirs(directory) print(f"目录 '{directory}' 已创建。") # 写入JSON数据到文件 try: with open(file_path, 'w', encoding='utf-8') as json_file: json.dump(json_data, json_file, ensure_ascii=False, indent=4) print(f"JSON数据已成功写入/更新至 '{file_path}'。") except Exception as e: print(f"写入文件时发生错误:{e}") if __name__ == "__main__": app.run(host='0.0.0.0', port=server_port, debug=True)