main.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import io, json, os
  2. from oauthlib.oauth2 import BackendApplicationClient
  3. from requests_oauthlib import OAuth2Session
  4. from dataclasses import dataclass
  5. from flask import Flask
  6. from flask import request
  7. from datetime import datetime
  8. import pandas as pd
  9. @dataclass
  10. class bbg_schedule_run:
  11. credentail_file: str
  12. # 获取BBG现值数据
  13. def get_sched_data(self, identifier):
  14. with io.open(self.credentail_file, encoding="utf-8") as credential_file:
  15. CREDENTIALS = json.load(credential_file)
  16. CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
  17. OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
  18. SESSION = OAuth2Session(client=CLIENT)
  19. SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
  20. URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
  21. rep = SESSION.get(URL, headers={'api-version': "2"})
  22. # rep.json()
  23. DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
  24. jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
  25. # return (pd.json_normalize(jdata).to_json())
  26. return jdata
  27. # 获取BBG历史数据文件
  28. def get_sched_history_file(self, identifier, file_name):
  29. with io.open(self.credentail_file, encoding="utf-8") as credential_file:
  30. CREDENTIALS = json.load(credential_file)
  31. CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
  32. OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
  33. SESSION = OAuth2Session(client=CLIENT)
  34. SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
  35. URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
  36. rep = SESSION.get(URL, headers={'api-version': "2"})
  37. DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
  38. jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
  39. df = pd.json_normalize(jdata)
  40. df.to_csv(file_name, index=False)
  41. return True
  42. # hug接口输出为json格式
  43. app = Flask(__name__)
  44. # 服务端口号
  45. server_port = 7110
  46. # 鉴权文件地址
  47. ps_credential_path = "ps-credential.txt" # 鉴权文件地址
  48. # 任务对应的归档目录
  49. task_key_dir_dict = {
  50. "IDpcsgMonthRun2": "data/month2", "IDpcsgDailyRun4": "data/day4", "IDpcsgDailyRunHist2": "data/hist2",
  51. "IDpcsgDailyRunHist1": "data/hist1", "IDpcsgDailyRunHistV1": "data/histv1"
  52. }
  53. @app.route('/api/bloomberg/server')
  54. def bloomberg_server():
  55. return {"code": 200, "data": "ok", "error": ""}
  56. @app.route('/api/bloomberg/general_data', methods=['GET'])
  57. def get_bloomberg_general_data():
  58. task_key = request.args.get('task_key', default="", type=str)
  59. if task_key is None or task_key == "":
  60. return {"code": 403, "data": "", "error": "task_key is missing"}
  61. print("task_key: ", task_key)
  62. save_dir = task_key_dir_dict[task_key]
  63. if dir == "":
  64. return {"code": 403, "data": "", "error": "save_dir is missing"}
  65. print("save_dir: ", save_dir)
  66. try:
  67. bbg_down = bbg_schedule_run(ps_credential_path)
  68. bbg_data = bbg_down.get_sched_data(task_key)
  69. # TEST
  70. # with open("data/day3/2024-06-19.json", 'r', encoding='utf-8') as file:
  71. # bbg_data = file.read() # 读取文件全部内容
  72. # print(bbg_data) # 打印文件内容
  73. # 写入文件留档
  74. write_data_to_file(bbg_data, save_dir)
  75. resp_data = {
  76. "code": 200,
  77. "data": bbg_data,
  78. "msg": "获取成功"
  79. }
  80. return json.dumps(resp_data, indent=2)
  81. except Exception as e:
  82. err_msg = str(e)
  83. print(err_msg)
  84. return {"code": 403, "data": "", "error": err_msg}
  85. @app.route('/api/bloomberg/down', methods=['GET'])
  86. def bloomberg_test():
  87. task_key = request.args.get('task_key', default="", type=str)
  88. if task_key is None or task_key == "":
  89. return {"code": 403, "data": "", "error": "task key is none"}
  90. try:
  91. bbg_down = bbg_schedule_run(ps_credential_path)
  92. today_date_str = datetime.now().strftime('%Y%m%d')
  93. file_name = f"dailyHistory-{task_key}-{today_date_str}.csv"
  94. bbg_down.get_sched_history_file(task_key, file_name)
  95. return {"code": 200, "data": "", "error": ""}
  96. except Exception as e:
  97. err_msg = str(e)
  98. print(err_msg)
  99. return {"code": 403, "data": "", "error": err_msg}
  100. # write_data_to_file 数据写入文件
  101. def write_data_to_file(json_data, dir_path=None):
  102. # 获取当前日期并格式化为YYYY-MM-DD形式
  103. today_date_str = datetime.now().strftime('%Y-%m-%d')
  104. file_name = f"{today_date_str}.json"
  105. # 确定输出目录
  106. directory = dir_path if dir_path else os.path.join("output", "data")
  107. # 构建完整的文件路径
  108. file_path = os.path.join(directory, file_name)
  109. # 检查目录是否存在,如果不存在则创建
  110. if not os.path.exists(directory):
  111. os.makedirs(directory)
  112. print(f"目录 '{directory}' 已创建。")
  113. # 写入JSON数据到文件
  114. try:
  115. with open(file_path, 'w', encoding='utf-8') as json_file:
  116. json.dump(json_data, json_file, ensure_ascii=False, indent=4)
  117. print(f"JSON数据已成功写入/更新至 '{file_path}'。")
  118. except Exception as e:
  119. print(f"写入文件时发生错误:{e}")
  120. if __name__ == "__main__":
  121. app.run(host='0.0.0.0', port=server_port, debug=True)