main.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import io, json, os
  2. from oauthlib.oauth2 import BackendApplicationClient
  3. from requests_oauthlib import OAuth2Session
  4. from dataclasses import dataclass
  5. from flask import Flask
  6. from flask import request
  7. from datetime import datetime
  8. import pandas as pd
  9. @dataclass
  10. class bbg_schedule_run:
  11. credentail_file: str
  12. # 获取BBG现值数据
  13. def get_sched_data(self, identifier):
  14. with io.open(self.credentail_file, encoding="utf-8") as credential_file:
  15. CREDENTIALS = json.load(credential_file)
  16. CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
  17. OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
  18. SESSION = OAuth2Session(client=CLIENT)
  19. SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
  20. URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
  21. rep = SESSION.get(URL, headers={'api-version': "2"})
  22. # rep.json()
  23. DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
  24. jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
  25. # return (pd.json_normalize(jdata).to_json())
  26. return jdata
  27. # 获取BBG历史数据文件
  28. def get_sched_history_file(self, identifier, file_name):
  29. with io.open(self.credentail_file, encoding="utf-8") as credential_file:
  30. CREDENTIALS = json.load(credential_file)
  31. CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
  32. OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
  33. SESSION = OAuth2Session(client=CLIENT)
  34. SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
  35. URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
  36. rep = SESSION.get(URL, headers={'api-version': "2"})
  37. DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
  38. jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
  39. df = pd.json_normalize(jdata)
  40. df.to_csv(file_name, index=False)
  41. return True
  42. # hug接口输出为json格式
  43. app = Flask(__name__)
  44. # 服务端口号
  45. server_port = 7110
  46. # 鉴权文件地址
  47. ps_credential_path = "ps-credential.txt" # 鉴权文件地址
  48. # 任务对应的归档目录
  49. task_key_dir_dict = {
  50. "IDpcsgDailyRunHistU2": "data/histu2", "IDpcsgDailyRunHist4": "data/hist4", "IDpcsgDailyRunHist1": "data/hist1",
  51. "IDpcsgDailyRunHist2": "data/hist2", "IDpcsgDailyRunHistV1": "data/histv1", "IDpcsgDailyRun4": "data/run4",
  52. "IDpcsgDailyRun6": "data/run6", "IDpcsgDailyRun7": "data/run7", "IDpcsgDailyRun8": "data/run8",
  53. "IDpcsgDailySnap0000": "data/snap0000", "IDpcsgDailySnap0330": "data/snap0330",
  54. "IDpcsgDailySnap0345": "data/snap0345", "IDpcsgDailyRun10": "data/day10", "IDpcsgDailyRun11": "data/run11"
  55. }
  56. @app.route('/api/bloomberg/server')
  57. def bloomberg_server():
  58. return {"code": 200, "data": "ok", "error": ""}
  59. @app.route('/api/bloomberg/general_data', methods=['GET'])
  60. def get_bloomberg_general_data():
  61. task_key = request.args.get('task_key', default="", type=str)
  62. if task_key is None or task_key == "":
  63. return {"code": 403, "data": "", "error": "task_key is missing"}
  64. print("task_key: ", task_key)
  65. save_dir = task_key_dir_dict[task_key]
  66. if dir == "":
  67. return {"code": 403, "data": "", "error": "save_dir is missing"}
  68. print("save_dir: ", save_dir)
  69. try:
  70. bbg_down = bbg_schedule_run(ps_credential_path)
  71. bbg_data = bbg_down.get_sched_data(task_key)
  72. # TEST
  73. # with open("data/day3/2024-06-19.json", 'r', encoding='utf-8') as file:
  74. # bbg_data = file.read() # 读取文件全部内容
  75. # print(bbg_data) # 打印文件内容
  76. # 写入文件留档
  77. write_data_to_file(bbg_data, save_dir)
  78. resp_data = {
  79. "code": 200,
  80. "data": bbg_data,
  81. "msg": "获取成功"
  82. }
  83. return json.dumps(resp_data, indent=2)
  84. except Exception as e:
  85. err_msg = str(e)
  86. print(err_msg)
  87. return {"code": 403, "data": "", "error": err_msg}
  88. @app.route('/api/bloomberg/down', methods=['GET'])
  89. def bloomberg_test():
  90. task_key = request.args.get('task_key', default="", type=str)
  91. if task_key is None or task_key == "":
  92. return {"code": 403, "data": "", "error": "task key is none"}
  93. try:
  94. bbg_down = bbg_schedule_run(ps_credential_path)
  95. today_date_str = datetime.now().strftime('%Y%m%d')
  96. file_name = f"dailyHistory-{task_key}-{today_date_str}.csv"
  97. bbg_down.get_sched_history_file(task_key, file_name)
  98. return {"code": 200, "data": "", "error": ""}
  99. except Exception as e:
  100. err_msg = str(e)
  101. print(err_msg)
  102. return {"code": 403, "data": "", "error": err_msg}
  103. # write_data_to_file 数据写入文件
  104. def write_data_to_file(json_data, dir_path=None):
  105. # 获取当前日期并格式化为YYYY-MM-DD形式
  106. today_date_str = datetime.now().strftime('%Y-%m-%d')
  107. file_name = f"{today_date_str}.json"
  108. # 确定输出目录
  109. directory = dir_path if dir_path else os.path.join("output", "data")
  110. # 构建完整的文件路径
  111. file_path = os.path.join(directory, file_name)
  112. # 检查目录是否存在,如果不存在则创建
  113. if not os.path.exists(directory):
  114. os.makedirs(directory)
  115. print(f"目录 '{directory}' 已创建。")
  116. # 写入JSON数据到文件
  117. try:
  118. with open(file_path, 'w', encoding='utf-8') as json_file:
  119. json.dump(json_data, json_file, ensure_ascii=False, indent=4)
  120. print(f"JSON数据已成功写入/更新至 '{file_path}'。")
  121. except Exception as e:
  122. print(f"写入文件时发生错误:{e}")
  123. if __name__ == "__main__":
  124. app.run(host='0.0.0.0', port=server_port, debug=True)