Browse Source

中石油新加坡-bloomberg

hsun 10 months ago
parent
commit
51f63358b0
4 changed files with 186 additions and 11 deletions
  1. 2 1
      .gitignore
  2. 3 0
      pscg_bloomberg/.gitignore
  3. 171 0
      pscg_bloomberg/main.py
  4. 10 10
      wind_api.py

+ 2 - 1
.gitignore

@@ -1 +1,2 @@
-/.idea
+/.idea
+/venv

+ 3 - 0
pscg_bloomberg/.gitignore

@@ -0,0 +1,3 @@
+/.idea
+/data
+/ps-credential.txt

+ 171 - 0
pscg_bloomberg/main.py

@@ -0,0 +1,171 @@
+import io, json, os
+from oauthlib.oauth2 import BackendApplicationClient
+from requests_oauthlib import OAuth2Session
+from dataclasses import dataclass
+from flask import Flask
+from datetime import datetime
+
+
+@dataclass
+class bbg_schedule_run:
+    credentail_file: str
+
+    def get_sched_data(self, identifier):
+        with io.open(self.credentail_file, encoding="utf-8") as credential_file:
+            CREDENTIALS = json.load(credential_file)
+
+        CLIENT = BackendApplicationClient(client_id=CREDENTIALS['client_id'])
+
+        OAUTH2_ENDPOINT = 'https://bsso.blpprofessional.com/ext/api/as/token.oauth2'
+        SESSION = OAuth2Session(client=CLIENT)
+        SESSION.fetch_token(token_url=OAUTH2_ENDPOINT, client_secret=CREDENTIALS['client_secret'])
+
+        URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/?requestIdentifier=' + identifier
+
+        rep = SESSION.get(URL, headers={'api-version': "2"})
+        # rep.json()
+
+        DATA_URL = 'https://api.bloomberg.com/eap/catalogs/47929/content/responses/' + rep.json()['contains'][0]['key']
+
+        jdata = SESSION.get(DATA_URL, headers={'api-version': "2"}).json()
+
+        # return (pd.json_normalize(jdata).to_json())
+        return jdata
+
+
+# hug接口输出为json格式
+app = Flask(__name__)
+
+server_port = 7110
+ps_credential_path = "ps-credential.txt"  # 鉴权文件地址
+daily_price_sched_key = "IDpcsgDailyType6Price"  # 日度Price任务key
+daily_general_sched_key = "IDpcsgDailyGeneral"  # 日度general任务key
+weekly_sched_key = "IDpcsgWeeklyRunETA223"  # 周度任务key
+month_sched_key = "IDpcsgMonthRun"  # 月度任务key
+daily_data_price_dir = "data/day/price"
+daily_data_general_dir = "data/day/general"
+weekly_data_dir = "data/week"
+monthly_data_dir = "data/month"
+
+
+@app.route('/api/bloomberg/server')
+def bloomberg_server():
+    return {"code": 200, "data": "ok", "error": ""}
+
+
+@app.route('/api/bloomberg/daily_data', methods=['POST'])
+def get_bloomberg_daily_data():
+    try:
+        bbg_down = bbg_schedule_run(ps_credential_path)
+        df_daily_price = bbg_down.get_sched_data(daily_price_sched_key)
+        df_daily_general = bbg_down.get_sched_data(daily_general_sched_key)
+
+        # 写入文件留档
+        write_data_to_file(df_daily_price, daily_data_price_dir)
+        write_data_to_file(df_daily_general, daily_data_general_dir)
+
+        resp_temp = {
+            "code": 200,
+            "data": {
+                "price_data": df_daily_price,
+                "general_data": df_daily_general
+            },
+            "msg": "获取成功"
+        }
+        return json.dumps(resp_temp, indent=2)
+    except Exception as e:
+        err_msg = str(e)
+        print(err_msg)
+        return {"code": 403, "data": "", "error": err_msg}
+
+
+@app.route('/api/bloomberg/weekly_data', methods=['POST'])
+def get_bloomberg_weekly_data():
+    try:
+        bbg_down = bbg_schedule_run(ps_credential_path)
+        df_weekly = bbg_down.get_sched_data(weekly_sched_key)
+
+        # 写入文件留档
+        write_data_to_file(df_weekly, weekly_data_dir)
+
+        resp_temp = {
+            "code": 200,
+            "data": df_weekly,
+            "msg": "获取成功"
+        }
+        return json.dumps(resp_temp, indent=2)
+    except Exception as e:
+        err_msg = str(e)
+        print(err_msg)
+        return {"code": 403, "data": "", "error": err_msg}
+
+
+@app.route('/api/bloomberg/monthly_data', methods=['POST'])
+def get_bloomberg_monthly_data():
+    try:
+        bbg_down = bbg_schedule_run(ps_credential_path)
+        df_monthly = bbg_down.get_sched_data(month_sched_key)
+
+        # 写入文件留档
+        write_data_to_file(df_monthly, monthly_data_dir)
+
+        resp_temp = {
+            "code": 200,
+            "data": df_monthly,
+            "msg": "获取成功"
+        }
+        return json.dumps(resp_temp, indent=2)
+    except Exception as e:
+        err_msg = str(e)
+        print(err_msg)
+        return {"code": 403, "data": "", "error": err_msg}
+
+
+# write_data_to_file 数据写入文件
+def write_data_to_file(json_data, dir_path=None):
+    # 获取当前日期并格式化为YYYY-MM-DD形式
+    today_date_str = datetime.now().strftime('%Y-%m-%d')
+    file_name = f"{today_date_str}.json"
+
+    # 确定输出目录
+    directory = dir_path if dir_path else os.path.join("output", "data")
+
+    # 构建完整的文件路径
+    file_path = os.path.join(directory, file_name)
+
+    # 检查目录是否存在,如果不存在则创建
+    if not os.path.exists(directory):
+        os.makedirs(directory)
+        print(f"目录 '{directory}' 已创建。")
+
+    # 写入JSON数据到文件
+    try:
+        with open(file_path, 'w', encoding='utf-8') as json_file:
+            json.dump(json_data, json_file, ensure_ascii=False, indent=4)
+            print(f"JSON数据已成功写入/更新至 '{file_path}'。")
+    except Exception as e:
+        print(f"写入文件时发生错误:{e}")
+
+
+def main():
+    ### Download data from Bloomberg website scheduled run
+    ### Be aware different column fields for different run job
+    ### For example: price data clolumn:
+    ### Date: PX_CLOSE_DT, Value: PX_YEST_CLOSE
+    ### Other type of data download cloumn
+    ### Date: LAST_UPDATE_DT, Value: PX_LAST
+
+    bbg_down = bbg_schedule_run("./ps-credential.txt")
+    # "IDpcsgDailyType6Price", "IDpcsgDailyGeneral" "IDpcsgWeeklyRunETA222", "IDpcsgMonthRun"
+    df_daily_price = bbg_down.get_sched_data("IDpcsgDailyType6Price")
+    df_daily_general = bbg_down.get_sched_data("IDpcsgDailyGeneral")
+    # df_weekly = bbg_down.get_sched_data("IDpcsgWeeklyRunETA223")
+    df_monthly = bbg_down.get_sched_data("IDpcsgMonthRun")
+
+    print(df_daily_price)
+    print(df_daily_general)
+    print(df_monthly)
+
+
+if __name__ == "__main__":
+    app.run(host='0.0.0.0', port=server_port, debug=True)

+ 10 - 10
wind_api.py

@@ -1,5 +1,5 @@
 import hug
-from bottle import route, run,NORUN
+from bottle import route, run, NORUN
 from WindPy import w
 import json
 import pandas as pd
@@ -15,6 +15,7 @@ def hello():
     return 1
     # return 'wind true'
 
+
 @hug.get('/edbInfo/wind')
 def GetEdbDataByWind(EdbCode, StartDate, EndDate):
     print("GetEdbDataByWind:", EdbCode)
@@ -34,7 +35,7 @@ def GetEdbDataByWind(EdbCode, StartDate, EndDate):
     print("wind data")
     print(data)
     df = pd.DataFrame()
-    if data.ErrorCode == -40521010: # Internet Timeout 超时退出
+    if data.ErrorCode == -40521010:  # Internet Timeout 超时退出
         os._exit(0)
         return "a"
     df['DT'] = data.Times
@@ -50,7 +51,7 @@ def GetEdbDataByWind(EdbCode, StartDate, EndDate):
 
 
 @hug.get('/edbInfo/wind/wsd')
-def GetEdbDataWindWsd(StockCode,EdbCode, StartDate, EndDate):
+def GetEdbDataWindWsd(StockCode, EdbCode, StartDate, EndDate):
     print("GetEdbDataByWind:", EdbCode)
     isConnected = w.isconnected()
     print("isconnected")
@@ -63,19 +64,18 @@ def GetEdbDataWindWsd(StockCode,EdbCode, StartDate, EndDate):
             return "{'ErrMsg':'启动Wind接口失败'}"
 
     option = "Fill=Previous"
-    wsd_data = w.wsd(StockCode,EdbCode, StartDate, EndDate, option)
+    wsd_data = w.wsd(StockCode, EdbCode, StartDate, EndDate, option)
 
-    if wsd_data.ErrorCode == -40521010: # Internet Timeout 超时退出
+    if wsd_data.ErrorCode == -40521010:  # Internet Timeout 超时退出
         os._exit(0)
         return "a"
-   
-    fm=pd.DataFrame(wsd_data.Data,index=wsd_data.Fields,columns=wsd_data.Times)
-    json_data=fm.to_json()
+
+    fm = pd.DataFrame(wsd_data.Data, index=wsd_data.Fields, columns=wsd_data.Times)
+    json_data = fm.to_json()
     result = json.loads(json_data)
     return result
 
 
-
 if __name__ == "__main__":
     # wind 登录
     wStart = w.start()
@@ -83,4 +83,4 @@ if __name__ == "__main__":
         print("启动万得API接口失败")
     print(wStart)
     app = __hug__.http.server()
-    run(app=app, reloader=True,host='0.0.0.0', port=7000)
+    run(app=app, reloader=True, host='0.0.0.0', port=7000)