Py-imaplib读取163邮箱
import os
import email
import imaplib
import quopri
import datetime
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from optparse import OptionParser
import re
def login(self):
imap_server = imaplib.IMAP4_SSL(self.host)
imap_server.login(self.account, self.password)
imaplib.Commands["ID"] = ('AUTH',)
args = ("name", self.account, "contact", self.account, "version", "1.0.0", "vendor", "myclient")
imap_server._simple_command("ID", str(args).replace(",", "").replace("\'", "\""))
return imap_server
- 通过select选中邮箱,注意如果仅读取邮件内容,不改变邮件未读属性,设置readonly=True。该属性默认为False,读取内容后会将邮件设置为已读。
self.imap_server.select(mailbox='INBOX', readonly=True)
- 通过search获取邮件ID列表,默认邮件接收时间排序是由远到近。
self.imap_server.search(None, message_type)
- 通过reversed倒排邮件ID循环fetch读取,获取的邮件内容需要处理,通过email获取邮件可读内容。
for message_index in reversed(items[0].split()):
msg_data = Message()
fetch_status, message = self.imap_server.fetch(message_index, "(RFC822)")
msg = email.message_from_bytes(message[0][1])
for part in msg.walk():
if not part.is_multipart():
content_type = part.get_content_type()
filename = part.get_filename()
if filename:
file_header = email.header.Header(filename)
decode_header = email.header.decode_header(file_header)
file_name = decode_header[0][0]
data = part.get_payload(decode=True)
try:
print('Attachment : ' + file_name)
if file_name:
save_file(file_name, data, save_path)
files.append(file_name)
except:
print(file_name)
else:
if content_type in ['text/plain']:
suffix = '.txt'
if content_type in ['text/html']:
suffix = '.htm'
if part.get_charsets() is None:
message_content = part.get_payload(decode=True)
else:
message_content = part.get_payload(decode=True).decode(part.get_charsets()[0])
import os
import email
import imaplib
import quopri
import datetime
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from optparse import OptionParser
import re
def save_file(file_name, data, save_path=''):
file_path = os.path.join(save_path, file_name)
with open(file_path, 'wb') as fp:
fp.write(data)
return file_path
def get_time_date(time):
"""
%a 英文星期简写
%A 英文星期的完全
%b 英文月份的简写
%B 英文月份的完全
%c 显示本地日期时间
%d 日期,取1-31
%H 小时, 0-23
%I 小时, 0-12
%m 月, 01 -12
%M 分钟,1-59
%j 年中当天的天数
%w 显示今天是星期几
%W 第几周
%x 当天日期
%X 本地的当天时间
%y 年份 00-99间
%Y 年份的完整拼写
:param time:
:return:
"""
if '+0800' in time:
time = time.split('+0800')[0]
print(time)
time_format = datetime.datetime.strptime(time, '%a, %d %b %Y %H:%M:%S ')
return time_format
def get_body(msg):
try:
if msg.is_multipart():
return get_body(msg.get_payload(0))
else:
return msg.get_payload(decode=True)
except:
return msg.get_payload(decode=True)
def decode_mime(text):
"""MIME字符进行解码"""
result = quopri.decodestring(text).decode("u8")
return result
def filter_tags(htmlstr):
re_cdata = re.compile('//<!\[CDATA\[[^>]*//\]\]>', re.I)
re_script = re.compile('<\s*script[^>]*>[^<]*<\s*/\s*script\s*>', re.I)
re_style = re.compile('<\s*style[^>]*>[^<]*<\s*/\s*style\s*>', re.I)
re_br = re.compile('<br\s*?/?>')
re_h = re.compile('</?\w+[^>]*>')
re_comment = re.compile('<!--[^>]*-->')
s = re_cdata.sub('', htmlstr)
s = re_script.sub('', s)
s = re_style.sub('', s)
s = re_br.sub('\n', s)
s = re_h.sub('', s)
s = re_comment.sub('', s)
blank_line = re.compile('\n+')
s = blank_line.sub('\n', s)
s = replaceCharEntity(s)
return s
def replaceCharEntity(html_str):
char_entities = {'nbsp': ' ', '160': ' ',
'lt': '<', '60': '<',
'gt': '>', '62': '>',
'amp': '&', '38': '&',
'quot': '"', '34': '"', }
re_char_entity = re.compile(r'&#?(?P<name>\w+);')
sz = re_char_entity.search(html_str)
while sz:
entity = sz.group()
key = sz.group('name')
try:
html_str = re_char_entity.sub(char_entities[key], html_str, 1)
sz = re_char_entity.search(html_str)
except KeyError:
html_str = re_char_entity.sub('', html_str, 1)
sz = re_char_entity.search(html_str)
return html_str
def get_string(text):
return "邮箱报警 - " + str(text)
def notification(datas):
"""钉钉通知"""
headers = {'Content-Type': 'application/json', "Charset": "UTF-8"}
prefix = 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
timestamp = str(round(time.time() * 1000))
secret = 'xxxxx'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f'{prefix}×tamp={timestamp}&sign={sign}&keyword=邮箱报警'
return requests.post(url=url, data=json.dumps(datas), headers=headers).text
def check_content(content):
"""
检测邮箱内容
:param content:
:return:
"""
if keyword in content:
dict = {
"msgtype": "markdown",
"markdown": {"title": "邮箱报警",
"text": ""
},
"at": {
"isAtAll": False
}
}
dict["markdown"]["text"] = get_string(content)
notification(dict)
class Message(dict):
"""邮件内容存储格式"""
class Email(object):
All, Unseen, Seen, Recent, Answered, Flagged = "All,Unseen,Seen,Recent,Answered,Flagged".split(',')
def __init__(self, imap, account, password, file_save_path='', count=1):
if imap and account and password:
self.host = imap
self.account = account
self.password = password
self.save_path = file_save_path
self.imap_server = self.login()
self.count = count
def login(self):
imap_server = imaplib.IMAP4_SSL(self.host)
imap_server.login(self.account, self.password)
imaplib.Commands["ID"] = ('AUTH',)
args = ("name", self.account, "contact", self.account, "version", "1.0.0", "vendor", "myclient")
imap_server._simple_command("ID", str(args).replace(",", "").replace("\'", "\""))
return imap_server
def get_newest(self):
"""获取最新的未读邮件,自动下载附件"""
msg_data_list = []
for msg_data in self.check_email(last_message=True, message_type=self.Unseen, count=self.count):
receive_mail_datetime = str(msg_data.get('date'))
if receive_mail_datetime < str(datetime.datetime.now() - datetime.timedelta(minutes=2)):
print('超出读取范围')
return msg_data_list
check_content(msg_data.get('content'))
msg_data_list.append(
{
'邮件主题': msg_data.get('subject'),
'邮件日期': receive_mail_datetime,
'附件列表': msg_data.get('files'),
'邮件正文': msg_data.get('content')
}
)
return msg_data_list
def check_email(self, last_message=True, message_type="All", count=1):
"""Message status in "All,Unseen,Seen,Recent,Answered,Flagged"
:param last_message: 返回邮箱最新(最后一封)邮件,默认为True,
:param message_type: 检索邮件类型,默认为Unseen(未读)邮件,
:param count: 检出的邮件消息数目 默认为 1
:return:
"""
select_status, info = self.imap_server.select(mailbox='INBOX', readonly=True)
if select_status != 'OK':
print(info)
raise StopIteration
search_status, items = self.imap_server.search(None, message_type)
if select_status != 'OK':
print(items)
raise StopIteration
print("Read messages within the last 30 days,total {0} {1}type message".format(len(items[0].split()),
message_type))
for message_index in reversed(items[0].split()):
msg_data = Message()
fetch_status, message = self.imap_server.fetch(message_index, "(RFC822)")
msg = email.message_from_bytes(message[0][1])
msg_data['date'] = get_time_date(msg['Date'])
message_subject = email.header.decode_header(msg["Subject"])
msg_data['subject'] = self.str_to_unicode(message_subject[0][0], message_subject[0][1])
msg_data.update(self.parse_message(msg, save_path=self.save_path))
yield msg_data
@staticmethod
def str_to_unicode(s, encoding=None):
return str(s, encoding) if encoding else str(s)
@staticmethod
def parse_message(msg, save_path=''):
"""解析message并下载附件,返回字典类型"""
message_content, content_type, suffix = None, None, None
files = []
for part in msg.walk():
if not part.is_multipart():
content_type = part.get_content_type()
filename = part.get_filename()
if filename:
file_header = email.header.Header(filename)
decode_header = email.header.decode_header(file_header)
file_name = decode_header[0][0]
data = part.get_payload(decode=True)
try:
print('Attachment : ' + file_name)
if file_name:
save_file(file_name, data, save_path)
files.append(file_name)
except:
print(file_name)
else:
if content_type in ['text/plain']:
suffix = '.txt'
if content_type in ['text/html']:
suffix = '.htm'
if part.get_charsets() is None:
message_content = part.get_payload(decode=True)
else:
message_content = part.get_payload(decode=True).decode(part.get_charsets()[0])
try:
if message_content is not None:
message_content = filter_tags(message_content)
message_content = message_content.replace('\r\n', '').replace(' ', '')
except:
print('处理html标签失败')
msg_data = {
'content': message_content,
'type': suffix,
'files': files
}
return msg_data