123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- import io, json, os
- from oauthlib.oauth2 import BackendApplicationClient
- from requests_oauthlib import OAuth2Session
- from dataclasses import dataclass
- from flask import Flask
- from datetime import datetime
- @dataclass
- class bbg_schedule_run:
- credentail_file: str
- def get_sched_data(self, identifier):
- with io.open(self.credentail_file, encoding="utf-8") as credential_file:
- CREDENTIALS = json.load(credential_file)
- CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
- OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
- SESSION = OAuth2Session(client=CLIENT)
- SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
- URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
- rep = SESSION.get(URL, headers={'api-version': "2"})
- # rep.json()
- DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
- jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
- # return (pd.json_normalize(jdata).to_json())
- return jdata
- # hug接口输出为json格式
- app = Flask(__name__)
- # 服务端口号
- server_port = 7110
- # 鉴权文件地址
- ps_credential_path = "ps-credential.txt" # 鉴权文件地址
- # 日度
- # daily_price_sched_key = "IDpcsgDailyType6Price" # 日度Price任务key(第一版)
- # daily_general_sched_key = "IDpcsgDailyGeneral" # 日度general任务key(第一版)
- # daily_data_price_dir = "data/day/price" # 日度Price数据文件夹
- # daily_data_general_dir = "data/day/general" # 日度Price数据文件夹
- daily_sched_key = "IDpcsgDailyRun2" # 日度任务key
- daily_data_dir = "data/day2" # 日度数据文件夹
- # 周度
- # weekly_sched_key = "IDpcsgWeeklyRunETA223" # 周度任务key(暂停)
- # weekly_data_dir = "data/week"
- # 月度
- # month_sched_key = "IDpcsgMonthRun" # 月度任务key(第一版)
- month_sched_key = "IDpcsgMonthRun2" # 月度任务key
- monthly_data_dir = "data/month2" # 月度数据文件夹
- @app.route('/api/bloomberg/server')
- def bloomberg_server():
- return {"code": 200, "data": "ok", "error": ""}
- @app.route('/api/bloomberg/daily_data', methods=['POST'])
- def get_bloomberg_daily_data():
- try:
- # bbg_down = bbg_schedule_run(ps_credential_path)
- # df_daily_price = bbg_down.get_sched_data(daily_price_sched_key)
- # df_daily_general = bbg_down.get_sched_data(daily_general_sched_key)
- #
- # # 写入文件留档
- # write_data_to_file(df_daily_price, daily_data_price_dir)
- # write_data_to_file(df_daily_general, daily_data_general_dir)
- #
- # resp_data = {
- # "code": 200,
- # "data": {
- # "price_data": df_daily_price,
- # "general_data": df_daily_general
- # },
- # "msg": "获取成功"
- # }
- # return json.dumps(resp_data, indent=2)
- bbg_down = bbg_schedule_run(ps_credential_path)
- df_daily = bbg_down.get_sched_data(daily_sched_key)
- # 写入文件留档
- write_data_to_file(df_daily, daily_data_dir)
- resp_data = {
- "code": 200,
- "data": df_daily,
- "msg": "获取成功"
- }
- return json.dumps(resp_data, indent=2)
- except Exception as e:
- err_msg = str(e)
- print(err_msg)
- return {"code": 403, "data": "", "error": err_msg}
- # 周度任务暂关闭
- # @app.route('/api/bloomberg/weekly_data', methods=['POST'])
- # def get_bloomberg_weekly_data():
- # try:
- # bbg_down = bbg_schedule_run(ps_credential_path)
- # df_weekly = bbg_down.get_sched_data(weekly_sched_key)
- #
- # # 写入文件留档
- # write_data_to_file(df_weekly, weekly_data_dir)
- #
- # resp_data = {
- # "code": 200,
- # "data": df_weekly,
- # "msg": "获取成功"
- # }
- # return json.dumps(resp_data, indent=2)
- # except Exception as e:
- # err_msg = str(e)
- # print(err_msg)
- # return {"code": 403, "data": "", "error": err_msg}
- @app.route('/api/bloomberg/monthly_data', methods=['POST'])
- def get_bloomberg_monthly_data():
- try:
- bbg_down = bbg_schedule_run(ps_credential_path)
- df_monthly = bbg_down.get_sched_data(month_sched_key)
- # 写入文件留档
- write_data_to_file(df_monthly, monthly_data_dir)
- resp_data = {
- "code": 200,
- "data": df_monthly,
- "msg": "获取成功"
- }
- return json.dumps(resp_data, indent=2)
- except Exception as e:
- err_msg = str(e)
- print(err_msg)
- return {"code": 403, "data": "", "error": err_msg}
- # write_data_to_file 数据写入文件
- def write_data_to_file(json_data, dir_path=None):
- # 获取当前日期并格式化为YYYY-MM-DD形式
- today_date_str = datetime.now().strftime('%Y-%m-%d')
- file_name = f"{today_date_str}.json"
- # 确定输出目录
- directory = dir_path if dir_path else os.path.join("output", "data")
- # 构建完整的文件路径
- file_path = os.path.join(directory, file_name)
- # 检查目录是否存在,如果不存在则创建
- if not os.path.exists(directory):
- os.makedirs(directory)
- print(f"目录 '{directory}' 已创建。")
- # 写入JSON数据到文件
- try:
- with open(file_path, 'w', encoding='utf-8') as json_file:
- json.dump(json_data, json_file, ensure_ascii=False, indent=4)
- print(f"JSON数据已成功写入/更新至 '{file_path}'。")
- except Exception as e:
- print(f"写入文件时发生错误:{e}")
- if __name__ == "__main__":
- app.run(host='0.0.0.0', port=server_port, debug=True)
|