123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import hug
- from bottle import route, run,NORUN
- import iFinDPy as THS
- import json
- import time
- hug.API(__name__).http.output_format = hug.output_format.json
- username = "账号"
- password = "密码"
- @hug.get('/hz_server')
- def hello():
- return 1
- # return 'wind true'
- # 同花顺登录函数
- def thslogindemo():
- # 输入用户的帐号和密码
- thsLogin = THS.THS_iFinDLogin(username, password)
- print(thsLogin)
- if (thsLogin == 0 or thsLogin == -201):
- print('ths 登录成功')
- else:
- print('ths 登录失败')
- @hug.get('/edbInfo/ths')
- def GetEdbDataByThs(EdbCode, StartDate, EndDate):
- print("GetEdbDataByThs")
- thsLogin = THS.THS_iFinDLogin(username, password)
- thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True)
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":start GetEdbDataByThs")
- print("thsEDBDataQuery")
- print(thsEDBDataQuery)
- thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape")
- result = json.loads(thsEDBDataQuery)
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetEdbDataByThs")
- return result
- @hug.get('/edbInfo/ths/future_good')
- def GetFutureGoodEdbDataByThs(EdbCode, StartDate, EndDate):
- print("THS_HistoryQuotes start")
- thsLogin = THS.THS_iFinDLogin(username, password)
- filed = 'lastclose,open,high,low,close,avgprice,change,changeper,volume,amount,hsl,lastsettlement,settlement,zdsettlement,zdfsettlement,ccl,ccbd,zf,zjlx,zjcd'
- # filed = 'open,high,low,close,volume,amount,settlement,ccl,zjlx'
- params = 'Interval:D,CPS:1,baseDate:1900-01-01,Currency:YSHB,fill:Previous'
- # params = 'YSHB;Tradedays'
- # print(params)
- thsEDBDataQuery = THS.THS_HistoryQuotes(EdbCode, filed, params, StartDate, EndDate, True)
- # thsEDBDataQuery = THS.THS_HQ(EdbCode, filed, params, StartDate, EndDate)
- # thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True)
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":start GetFutureGoodEdbDataByThs")
- print("THS_HistoryQuotes end")
- print(thsEDBDataQuery)
- thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape")
- result = json.loads(thsEDBDataQuery)
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetFutureGoodEdbDataByThs")
- return result
- def ths_data_to_dict(ths_instance):
- if ths_instance is not None and ths_instance.data is not None:
- return {
- 'errorcode': ths_instance.errorcode,
- 'errmsg': ths_instance.errmsg,
- 'data': ths_instance.data.to_dict(orient='records')
- }
- else:
- # 处理 ths_instance 为 None 的情况
- return {
- 'errorcode': 'N/A',
- 'errmsg': 'N/A',
- 'data': []
- }
- def has_comma(input_string):
- return isinstance(input_string, str) and ',' in input_string
- def process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate):
- thsEDBDataQuery = THS.THS_DS(StockCode, SingleEdbCode, '', '', StartDate, EndDate)
- print(thsEDBDataQuery)
- result = json.dumps(ths_data_to_dict(thsEDBDataQuery))
- return result
- @hug.get('/edbInfo/ths/ds')
- def GetEdbDataByThsDs(StockCode, EdbCode, StartDate, EndDate):
- print("THS_DataSequence start")
- print(EdbCode)
- result_list = []
- if isinstance(EdbCode, list):
- for SingleEdbCode in EdbCode:
- result_list.append(process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate))
- print("THS_DS end")
- print(result_list)
- return result_list
- else:
- # 如果不包含逗号,直接处理
- result = process_single_edb_code(StockCode, EdbCode, StartDate, EndDate)
- print(result)
- return result
- if __name__ == "__main__":
- # ths登录函数
- thslogindemo()
- app = __hug__.http.server()
- run(app=app, reloader=True, port=7000)
|