import hug from bottle import route, run,NORUN import iFinDPy as THS import json import time hug.API(__name__).http.output_format = hug.output_format.json username = "" password = "" @hug.get('/hz_server') def hello(): return 1 # return 'wind true' # 同花顺登录函数 def thslogin(): # 输入用户的帐号和密码 thsLogin = THS.THS_iFinDLogin(username, password) print(thsLogin) if (thsLogin == 0 or thsLogin == -201): print('ths 登录成功') else: print('ths 登录失败') @hug.get('/edbInfo/ths') def GetEdbDataByThs(EdbCode, StartDate, EndDate): print("GetEdbDataByThs") thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True) print(thsEDBDataQuery) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":start GetEdbDataByThs") print("thsEDBDataQuery") print(thsEDBDataQuery) thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape") result = json.loads(thsEDBDataQuery) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetEdbDataByThs") if result["errorcode"]==-1010: thsLogin = THS.THS_iFinDLogin(username, password) print("登录一次") print(thsLogin) if (thsLogin == 0 or thsLogin == -201): thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True) thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape") result = json.loads(thsEDBDataQuery) else: print("已登录") return result @hug.get('/edbInfo/ths/future_good') def GetFutureGoodEdbDataByThs(EdbCode, StartDate, EndDate): print("THS_HistoryQuotes start") filed = 'lastclose,open,high,low,close,avgprice,change,changeper,volume,amount,hsl,lastsettlement,settlement,zdsettlement,zdfsettlement,ccl,ccbd,zf,zjlx,zjcd' # filed = 'open,high,low,close,volume,amount,settlement,ccl,zjlx' params = 'Interval:D,CPS:1,baseDate:1900-01-01,Currency:YSHB,fill:Previous' # params = 'YSHB;Tradedays' # print(params) thsEDBDataQuery = THS.THS_HistoryQuotes(EdbCode, filed, params, StartDate, EndDate, True) # thsEDBDataQuery = THS.THS_HQ(EdbCode, filed, params, StartDate, EndDate) # thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":start GetFutureGoodEdbDataByThs") print("THS_HistoryQuotes end") print(thsEDBDataQuery) thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape") result = json.loads(thsEDBDataQuery) if result["errorcode"]==-1010: thsLogin = THS.THS_iFinDLogin(username, password) print("登录一次") print(thsLogin) if (thsLogin == 0 or thsLogin == -201): thsEDBDataQuery = THS.THS_HistoryQuotes(EdbCode, filed, params, StartDate, EndDate, True) thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape") result = json.loads(thsEDBDataQuery) else: print("已登录") print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetFutureGoodEdbDataByThs") return result def ths_data_to_dict(ths_instance): if ths_instance is not None and ths_instance.data is not None: return { 'errorcode': ths_instance.errorcode, 'errmsg': ths_instance.errmsg, 'data': ths_instance.data.to_dict(orient='records') } else: # 处理 ths_instance 为 None 的情况 return { 'errorcode': 'N/A', 'errmsg': 'N/A', 'data': [] } def has_comma(input_string): return isinstance(input_string, str) and ',' in input_string def process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate): thsEDBDataQuery = THS.THS_DS(StockCode, SingleEdbCode, '', '', StartDate, EndDate) print(thsEDBDataQuery) result = json.dumps(ths_data_to_dict(thsEDBDataQuery)) return result @hug.get('/edbInfo/ths/ds') def GetEdbDataByThsDs(StockCode, EdbCode, StartDate, EndDate): print("THS_DataSequence start") print(EdbCode) result_list = [] if isinstance(EdbCode, list): for SingleEdbCode in EdbCode: result_list.append(process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate)) print("THS_DS end") print(result_list) return result_list else: # 如果不包含逗号,直接处理 result = process_single_edb_code(StockCode, EdbCode, StartDate, EndDate) print(result) return result if __name__ == "__main__": # ths登录函数 thslogin() app = __hug__.http.server() run(app=app, reloader=True, port=7000)