ths_api.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import hug
  2. from bottle import route, run,NORUN
  3. import iFinDPy as THS
  4. import json
  5. import time
  6. hug.API(__name__).http.output_format = hug.output_format.json
  7. username = ""
  8. password = ""
  9. @hug.get('/hz_server')
  10. def hello():
  11. return 1
  12. # return 'wind true'
  13. # 同花顺登录函数
  14. def thslogin():
  15. # 输入用户的帐号和密码
  16. thsLogin = THS.THS_iFinDLogin(username, password)
  17. print(thsLogin)
  18. if (thsLogin == 0 or thsLogin == -201):
  19. print('ths 登录成功')
  20. else:
  21. print('ths 登录失败')
  22. @hug.get('/edbInfo/ths')
  23. def GetEdbDataByThs(EdbCode, StartDate, EndDate):
  24. print("GetEdbDataByThs")
  25. thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True)
  26. print(thsEDBDataQuery)
  27. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":start GetEdbDataByThs")
  28. print("thsEDBDataQuery")
  29. print(thsEDBDataQuery)
  30. thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape")
  31. result = json.loads(thsEDBDataQuery)
  32. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetEdbDataByThs")
  33. if result["errorcode"]==-1010:
  34. thsLogin = THS.THS_iFinDLogin(username, password)
  35. print("登录一次")
  36. print(thsLogin)
  37. if (thsLogin == 0 or thsLogin == -201):
  38. thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True)
  39. thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape")
  40. result = json.loads(thsEDBDataQuery)
  41. else:
  42. print("已登录")
  43. return result
  44. @hug.get('/edbInfo/ths/future_good')
  45. def GetFutureGoodEdbDataByThs(EdbCode, StartDate, EndDate):
  46. print("THS_HistoryQuotes start")
  47. filed = 'lastclose,open,high,low,close,avgprice,change,changeper,volume,amount,hsl,lastsettlement,settlement,zdsettlement,zdfsettlement,ccl,ccbd,zf,zjlx,zjcd'
  48. # filed = 'open,high,low,close,volume,amount,settlement,ccl,zjlx'
  49. params = 'Interval:D,CPS:1,baseDate:1900-01-01,Currency:YSHB,fill:Previous'
  50. # params = 'YSHB;Tradedays'
  51. # print(params)
  52. thsEDBDataQuery = THS.THS_HistoryQuotes(EdbCode, filed, params, StartDate, EndDate, True)
  53. # thsEDBDataQuery = THS.THS_HQ(EdbCode, filed, params, StartDate, EndDate)
  54. # thsEDBDataQuery = THS.THS_EDBQuery(EdbCode, StartDate, EndDate, True)
  55. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":start GetFutureGoodEdbDataByThs")
  56. print("THS_HistoryQuotes end")
  57. print(thsEDBDataQuery)
  58. thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape")
  59. result = json.loads(thsEDBDataQuery)
  60. if result["errorcode"]==-1010:
  61. thsLogin = THS.THS_iFinDLogin(username, password)
  62. print("登录一次")
  63. print(thsLogin)
  64. if (thsLogin == 0 or thsLogin == -201):
  65. thsEDBDataQuery = THS.THS_HistoryQuotes(EdbCode, filed, params, StartDate, EndDate, True)
  66. thsEDBDataQuery = thsEDBDataQuery.decode("unicode_escape")
  67. result = json.loads(thsEDBDataQuery)
  68. else:
  69. print("已登录")
  70. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetFutureGoodEdbDataByThs")
  71. return result
  72. def ths_data_to_dict(ths_instance):
  73. if ths_instance is not None and ths_instance.data is not None:
  74. return {
  75. 'errorcode': ths_instance.errorcode,
  76. 'errmsg': ths_instance.errmsg,
  77. 'data': ths_instance.data.to_dict(orient='records')
  78. }
  79. else:
  80. # 处理 ths_instance 为 None 的情况
  81. return {
  82. 'errorcode': 'N/A',
  83. 'errmsg': 'N/A',
  84. 'data': []
  85. }
  86. def has_comma(input_string):
  87. return isinstance(input_string, str) and ',' in input_string
  88. def process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate):
  89. thsEDBDataQuery = THS.THS_DS(StockCode, SingleEdbCode, '', '', StartDate, EndDate)
  90. print(thsEDBDataQuery)
  91. result = json.dumps(ths_data_to_dict(thsEDBDataQuery))
  92. return result
  93. @hug.get('/edbInfo/ths/ds')
  94. def GetEdbDataByThsDs(StockCode, EdbCode, StartDate, EndDate):
  95. print("THS_DataSequence start")
  96. print(EdbCode)
  97. result_list = []
  98. if isinstance(EdbCode, list):
  99. for SingleEdbCode in EdbCode:
  100. result_list.append(process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate))
  101. print("THS_DS end")
  102. print(result_list)
  103. return result_list
  104. else:
  105. # 如果不包含逗号,直接处理
  106. result = process_single_edb_code(StockCode, EdbCode, StartDate, EndDate)
  107. print(result)
  108. return result
  109. if __name__ == "__main__":
  110. # ths登录函数
  111. thslogin()
  112. app = __hug__.http.server()
  113. run(app=app, reloader=True, port=7000)