Browse Source

Merge branch 'master' of http://8.136.199.33:3000/eta_server/eta_python

hsun 1 year ago
parent
commit
30ab106417
3 changed files with 214 additions and 0 deletions
  1. 86 0
      mail.py
  2. 87 0
      mail_xiangyu.py
  3. 41 0
      ths_api.py

+ 86 - 0
mail.py

@@ -0,0 +1,86 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+import poplib
+import datetime
+import time
+from email.parser import Parser
+from email.header import decode_header
+
+# 输入邮件地址, 口令和POP3服务器地址:
+email = ''
+password = '' # 注意使用开通POP,SMTP等的授权码
+pop3_server = 'pop.qiye.aliyun.com'
+
+
+def decode_str(s):  # 字符编码转换
+    value, charset = decode_header(s)[0]
+    if charset:
+        value = value.decode(charset)
+    return value
+
+
+def get_att(msg):
+    import email
+    attachment_files = []
+
+    for part in msg.walk():
+        file_name = part.get_filename()  # 获取附件名称类型
+        contType = part.get_content_type()
+        print(file_name)
+        if file_name:
+            h = email.header.Header(file_name)
+            dh = email.header.decode_header(h)  # 对附件名称进行解码
+            filename = dh[0][0]
+            if dh[0][1]:
+                filename = decode_str(str(filename, dh[0][1]))  # 将附件名称可读化
+                print(filename)
+                # filename = filename.encode("utf-8")
+            data = part.get_payload(decode=True)  # 下载附件
+            att_file = open('/home/code/python/coal_mail/emailFile/' + filename, 'wb')  # 在指定目录下创建文件,注意二进制文件需要用wb模式打开
+            attachment_files.append(filename)
+            att_file.write(data)  # 保存附件
+            att_file.close()
+    return attachment_files
+
+
+# 连接到POP3服务器,有些邮箱服务器需要ssl加密,对于不需要加密的服务器可以使用poplib.POP3()
+server = poplib.POP3_SSL(pop3_server)
+server.set_debuglevel(1)
+# 打印POP3服务器的欢迎文字:
+print(server.getwelcome().decode('utf-8'))
+# 身份认证:
+server.user(email)
+server.pass_(password)
+# 返回邮件数量和占用空间:
+print('Messages: %s. Size: %s' % server.stat())
+# list()返回所有邮件的编号:
+resp, mails, octets = server.list()
+# 可以查看返回的列表类似[b'1 82923', b'2 2184', ...]
+print(mails)
+index = len(mails)
+print(index)
+for i in range(index, 0, -1):
+    # 倒序遍历邮件
+    resp, lines, octets = server.retr(i)
+    # lines存储了邮件的原始文本的每一行,
+    # 邮件的原始文本:
+    msg_content = b'\r\n'.join(lines).decode('utf-8')
+    # 解析邮件:
+    msg = Parser().parsestr(msg_content)
+    # 获取邮件时间
+    date1 = time.strptime(msg.get("Date")[0:24], '%a, %d %b %Y %H:%M:%S')  # 格式化收件时间
+    date2 = time.strftime("%Y%m%d", date1)  # 邮件时间格式转换
+    now_time = datetime.datetime.now()
+    # // 自定义下载附件时间范围
+    getFileDay = (now_time + datetime.timedelta(days=-1)).strftime("%Y%m%d")
+    # 邮件最早的时间
+    print(date1)
+    # 邮件最近的时间
+    print(date2)
+    if date2 < getFileDay:
+        continue
+    f_list = get_att(msg)  # 获取附件
+    # print_info(msg)
+
+server.quit()
+

+ 87 - 0
mail_xiangyu.py

@@ -0,0 +1,87 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+import poplib
+import datetime
+import time
+from email.parser import Parser
+from email.header import decode_header
+
+# 输入邮件地址, 口令和POP3服务器地址:
+email = ''
+password = '' # 注意使用开通POP,SMTP等的授权码
+pop3_server = 'pop3.xiangyu.cn'
+
+
+def decode_str(s):  # 字符编码转换
+    value, charset = decode_header(s)[0]
+    if charset:
+        value = value.decode(charset)
+    return value
+
+
+def get_att(msg):
+    import email
+    attachment_files = []
+
+    for part in msg.walk():
+        file_name = part.get_filename()  # 获取附件名称类型
+        contType = part.get_content_type()
+        print(file_name)
+        if file_name:
+            h = email.header.Header(file_name)
+            dh = email.header.decode_header(h)  # 对附件名称进行解码
+            filename = dh[0][0]
+            if dh[0][1]:
+                filename = decode_str(str(filename, dh[0][1]))  # 将附件名称可读化
+                print(filename)
+                # filename = filename.encode("utf-8")
+            data = part.get_payload(decode=True)  # 下载附件
+#             att_file = open('/home/code/python/coal_mail/emailFile/' + filename, 'wb')  # 在指定目录下创建文件,注意二进制文件需要用wb模式打开
+            att_file = open('/Users/xi/Desktop/coal_mail/emailFile/' + filename, 'wb')  # 在指定目录下创建文件,注意二进制文件需要用wb模式打开
+            attachment_files.append(filename)
+            att_file.write(data)  # 保存附件
+            att_file.close()
+    return attachment_files
+
+
+# 连接到POP3服务器,有些邮箱服务器需要ssl加密,对于不需要加密的服务器可以使用poplib.POP3()
+server = poplib.POP3_SSL(pop3_server,'995')
+server.set_debuglevel(1)
+# 打印POP3服务器的欢迎文字:
+print(server.getwelcome().decode('utf-8'))
+# 身份认证:
+server.user(email)
+server.pass_(password)
+# 返回邮件数量和占用空间:
+print('Messages: %s. Size: %s' % server.stat())
+# list()返回所有邮件的编号:
+resp, mails, octets = server.list()
+# 可以查看返回的列表类似[b'1 82923', b'2 2184', ...]
+print(mails)
+index = len(mails)
+print(index)
+for i in range(index, 0, -1):
+    # 倒序遍历邮件
+    resp, lines, octets = server.retr(i)
+    # lines存储了邮件的原始文本的每一行,
+    # 邮件的原始文本:
+    msg_content = b'\r\n'.join(lines).decode('utf-8')
+    # 解析邮件:
+    msg = Parser().parsestr(msg_content)
+    # 获取邮件时间
+    date1 = time.strptime(msg.get("Date")[0:24], '%a, %d %b %Y %H:%M:%S')  # 格式化收件时间
+    date2 = time.strftime("%Y%m%d", date1)  # 邮件时间格式转换
+    now_time = datetime.datetime.now()
+    # // 自定义下载附件时间范围
+    getFileDay = (now_time + datetime.timedelta(days=-1)).strftime("%Y%m%d")
+    # 邮件最早的时间
+    print(date1)
+    # 邮件最近的时间
+    print(date2)
+    if date2 < getFileDay:
+        continue
+    f_list = get_att(msg)  # 获取附件
+    # print_info(msg)
+
+server.quit()
+

+ 41 - 0
ths_api.py

@@ -61,6 +61,47 @@ def GetFutureGoodEdbDataByThs(EdbCode, StartDate, EndDate):
     print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":end GetFutureGoodEdbDataByThs")
     return result
 
+def ths_data_to_dict(ths_instance):
+    if ths_instance is not None and ths_instance.data is not None:
+        return {
+            'errorcode': ths_instance.errorcode,
+            'errmsg': ths_instance.errmsg,
+            'data': ths_instance.data.to_dict(orient='records')
+        }
+    else:
+        # 处理 ths_instance 为 None 的情况
+        return {
+            'errorcode': 'N/A',
+            'errmsg': 'N/A',
+            'data': []
+        }
+
+def has_comma(input_string):
+    return isinstance(input_string, str) and ',' in input_string
+
+def process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate):
+    thsEDBDataQuery = THS.THS_DS(StockCode, SingleEdbCode, '', '', StartDate, EndDate)
+    print(thsEDBDataQuery)
+    result = json.dumps(ths_data_to_dict(thsEDBDataQuery))
+    return result
+
+@hug.get('/edbInfo/ths/ds')
+def GetEdbDataByThsDs(StockCode, EdbCode, StartDate, EndDate):
+    print("THS_DataSequence start")
+    print(EdbCode)
+    result_list = []
+
+    if isinstance(EdbCode, list):
+        for SingleEdbCode in EdbCode:
+            result_list.append(process_single_edb_code(StockCode, SingleEdbCode, StartDate, EndDate))
+            print("THS_DS end")
+        print(result_list)
+        return result_list
+    else:
+        # 如果不包含逗号,直接处理
+        result = process_single_edb_code(StockCode, EdbCode, StartDate, EndDate)
+        print(result)
+        return result
 
 if __name__ == "__main__":
     # ths登录函数