Py-imaplib读取163邮箱

Py-imaplib读取163邮箱

  • 引入包
# -*- coding: utf-8 -*-
import os
import email
import imaplib
import quopri
import datetime
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from optparse import OptionParser
import re
  • 链接邮箱
    def login(self):
        imap_server = imaplib.IMAP4_SSL(self.host)
        imap_server.login(self.account, self.password)
        # 解决网易邮箱报错:Unsafe Login. Please contact xxx@163.com for help
        imaplib.Commands["ID"] = ('AUTH',)
        args = ("name", self.account, "contact", self.account, "version", "1.0.0", "vendor", "myclient")
        imap_server._simple_command("ID", str(args).replace(",", "").replace("\'", "\""))
        return imap_server
  • 通过select选中邮箱,注意如果仅读取邮件内容,不改变邮件未读属性,设置readonly=True。该属性默认为False,读取内容后会将邮件设置为已读。
self.imap_server.select(mailbox='INBOX', readonly=True)
  • 通过search获取邮件ID列表,默认邮件接收时间排序是由远到近。
self.imap_server.search(None, message_type)
  • 通过reversed倒排邮件ID循环fetch读取,获取的邮件内容需要处理,通过email获取邮件可读内容。
 for message_index in reversed(items[0].split()):
            msg_data = Message()
            fetch_status, message = self.imap_server.fetch(message_index, "(RFC822)")
            msg = email.message_from_bytes(message[0][1])
  # walk读取邮件主题,获取附件、主体
  for part in msg.walk():
            if not part.is_multipart():
                content_type = part.get_content_type()
                filename = part.get_filename()
                # 是否有附件
                if filename:
                    file_header = email.header.Header(filename)
                    decode_header = email.header.decode_header(file_header)
                    file_name = decode_header[0][0]
                    data = part.get_payload(decode=True)
                    try:
                        print('Attachment : ' + file_name)
                        # 保存附件
                        if file_name:
                            save_file(file_name, data, save_path)
                            files.append(file_name)
                    except:
                        print(file_name)

                else:
                    if content_type in ['text/plain']:
                        suffix = '.txt'
                    if content_type in ['text/html']:
                        suffix = '.htm'
                    if part.get_charsets() is None:
                        message_content = part.get_payload(decode=True)
                    else:
                        message_content = part.get_payload(decode=True).decode(part.get_charsets()[0])
  • 完整代码
# -*- coding: utf-8 -*-
import os
import email
import imaplib
import quopri
import datetime
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from optparse import OptionParser
import re


def save_file(file_name, data, save_path=''):
    file_path = os.path.join(save_path, file_name)
    with open(file_path, 'wb') as fp:
        fp.write(data)
    return file_path


def get_time_date(time):
    """
    %a 英文星期简写
    %A 英文星期的完全
    %b 英文月份的简写
    %B 英文月份的完全
    %c 显示本地日期时间
    %d 日期,取1-31
    %H 小时, 0-23
    %I 小时, 0-12
    %m 月, 01 -12
    %M 分钟,1-59
    %j 年中当天的天数
    %w 显示今天是星期几
    %W 第几周
    %x 当天日期
    %X 本地的当天时间
    %y 年份 00-99间
    %Y 年份的完整拼写
    :param time:
    :return:
    """
    if '+0800' in time:
        time = time.split('+0800')[0]
        print(time)
    time_format = datetime.datetime.strptime(time, '%a, %d %b %Y %H:%M:%S ')
    return time_format


def get_body(msg):
    try:
        if msg.is_multipart():
            return get_body(msg.get_payload(0))
        else:
            return msg.get_payload(decode=True)
    except:
        return msg.get_payload(decode=True)


# mime 字符解码
def decode_mime(text):
    """MIME字符进行解码"""
    result = quopri.decodestring(text).decode("u8")
    return result


def filter_tags(htmlstr):
    # 先过滤CDATA
    re_cdata = re.compile('//<!\[CDATA\[[^>]*//\]\]>', re.I)  # 匹配CDATA
    re_script = re.compile('<\s*script[^>]*>[^<]*<\s*/\s*script\s*>', re.I)  # Script
    re_style = re.compile('<\s*style[^>]*>[^<]*<\s*/\s*style\s*>', re.I)  # style
    re_br = re.compile('<br\s*?/?>')  # 处理换行
    re_h = re.compile('</?\w+[^>]*>')  # HTML标签
    re_comment = re.compile('<!--[^>]*-->')  # HTML注释
    s = re_cdata.sub('', htmlstr)  # 去掉CDATA
    s = re_script.sub('', s)  # 去掉SCRIPT
    s = re_style.sub('', s)  # 去掉style
    s = re_br.sub('\n', s)  # 将br转换为换行
    s = re_h.sub('', s)  # 去掉HTML 标签
    s = re_comment.sub('', s)  # 去掉HTML注释
    # 去掉多余的空行
    blank_line = re.compile('\n+')
    s = blank_line.sub('\n', s)
    s = replaceCharEntity(s)  # 替换实体
    return s


# 替换常用HTML字符实体.
# 使用正常的字符替换HTML中特殊的字符实体.
# 你可以添加新的实体字符到CHAR_ENTITIES中,处理更多HTML字符实体.
# @param html_str HTML字符串.
def replaceCharEntity(html_str):
    char_entities = {'nbsp': ' ', '160': ' ',
                     'lt': '<', '60': '<',
                     'gt': '>', '62': '>',
                     'amp': '&', '38': '&',
                     'quot': '"', '34': '"', }

    re_char_entity = re.compile(r'&#?(?P<name>\w+);')
    sz = re_char_entity.search(html_str)
    while sz:
        entity = sz.group()  # entity全称,如&gt;
        key = sz.group('name')  # 去除&;后entity,如&gt;为gt
        try:
            html_str = re_char_entity.sub(char_entities[key], html_str, 1)
            sz = re_char_entity.search(html_str)
        except KeyError:
            # 以空串代替
            html_str = re_char_entity.sub('', html_str, 1)
            sz = re_char_entity.search(html_str)
    return html_str


def get_string(text):
    return "邮箱报警 - " + str(text)

def notification(datas):
    """钉钉通知"""
    headers = {'Content-Type': 'application/json', "Charset": "UTF-8"}
    # 这里替换为复制的完整 webhook 地址
    prefix = 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
    timestamp = str(round(time.time() * 1000))
    # 这里替换为自己复制过来的加签秘钥
    secret = 'xxxxx'
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))

    url = f'{prefix}&timestamp={timestamp}&sign={sign}&keyword=邮箱报警'

    return requests.post(url=url, data=json.dumps(datas), headers=headers).text


def check_content(content):
    """
    检测邮箱内容
    :param content:
    :return:
    """
    if keyword in content:
        dict = {
            "msgtype": "markdown",
            "markdown": {"title": "邮箱报警",
                         "text": ""
                         },
            "at": {
                "isAtAll": False
            }
        }
        # 把文案内容写入请求格式中
        dict["markdown"]["text"] = get_string(content)
        notification(dict)


class Message(dict):
    """邮件内容存储格式"""


class Email(object):
    # 邮件类型
    All, Unseen, Seen, Recent, Answered, Flagged = "All,Unseen,Seen,Recent,Answered,Flagged".split(',')

    def __init__(self, imap, account, password, file_save_path='', count=1):
        if imap and account and password:
            self.host = imap
            self.account = account
            self.password = password
            self.save_path = file_save_path
            self.imap_server = self.login()
            self.count = count

    def login(self):
        imap_server = imaplib.IMAP4_SSL(self.host)
        imap_server.login(self.account, self.password)
        # 解决网易邮箱报错:Unsafe Login. Please contact kefu@188.com for help
        imaplib.Commands["ID"] = ('AUTH',)
        args = ("name", self.account, "contact", self.account, "version", "1.0.0", "vendor", "myclient")
        imap_server._simple_command("ID", str(args).replace(",", "").replace("\'", "\""))
        return imap_server

    def get_newest(self):
        """获取最新的未读邮件,自动下载附件"""
        msg_data_list = []
        for msg_data in self.check_email(last_message=True, message_type=self.Unseen, count=self.count):
            # 检测邮箱是否
            receive_mail_datetime = str(msg_data.get('date'))

            # 两分钟后的时间字符串
            if receive_mail_datetime < str(datetime.datetime.now() - datetime.timedelta(minutes=2)):
                print('超出读取范围')
                return msg_data_list
            # 检测邮箱内容
            check_content(msg_data.get('content'))
            msg_data_list.append(
                {
                    '邮件主题': msg_data.get('subject'),
                    '邮件日期': receive_mail_datetime,
                    '附件列表': msg_data.get('files'),
                    '邮件正文': msg_data.get('content')
                }
            )

        return msg_data_list

    def check_email(self, last_message=True, message_type="All", count=1):
        """Message status in "All,Unseen,Seen,Recent,Answered,Flagged"
        :param last_message: 返回邮箱最新(最后一封)邮件,默认为True,
        :param message_type: 检索邮件类型,默认为Unseen(未读)邮件,
        :param count: 检出的邮件消息数目 默认为 1
        :return:
        """
        # 选中收件箱
        select_status, info = self.imap_server.select(mailbox='INBOX', readonly=True)
        if select_status != 'OK':
            print(info)
            raise StopIteration
        # 选择邮件类型
        search_status, items = self.imap_server.search(None, message_type)
        if select_status != 'OK':
            print(items)
            raise StopIteration
        # 获取邮箱信息数目
        # message_list = items[0].split()[-1:] if last_message else items[0].split()[:count]
        print("Read messages within the last 30 days,total {0} {1}type message".format(len(items[0].split()),
                                                                                       message_type))

        # for message_index in message_list:
        # 倒叙迭代循环
        for message_index in reversed(items[0].split()):
            msg_data = Message()
            fetch_status, message = self.imap_server.fetch(message_index, "(RFC822)")
            msg = email.message_from_bytes(message[0][1])
            # 消息日期 消息日期转为常规时间格式 Y-m-d H:M:S
            msg_data['date'] = get_time_date(msg['Date'])
            # 消息主题
            message_subject = email.header.decode_header(msg["Subject"])
            msg_data['subject'] = self.str_to_unicode(message_subject[0][0], message_subject[0][1])
            # 消息正文,消息类型,消息附件
            msg_data.update(self.parse_message(msg, save_path=self.save_path))
            yield msg_data

    @staticmethod
    def str_to_unicode(s, encoding=None):
        return str(s, encoding) if encoding else str(s)

    @staticmethod
    def parse_message(msg, save_path=''):
        """解析message并下载附件,返回字典类型"""
        message_content, content_type, suffix = None, None, None
        files = []
        for part in msg.walk():
            if not part.is_multipart():
                content_type = part.get_content_type()
                filename = part.get_filename()
                # 是否有附件
                if filename:
                    file_header = email.header.Header(filename)
                    decode_header = email.header.decode_header(file_header)
                    file_name = decode_header[0][0]
                    data = part.get_payload(decode=True)
                    try:
                        print('Attachment : ' + file_name)
                        # 保存附件
                        if file_name:
                            save_file(file_name, data, save_path)
                            files.append(file_name)
                    except:
                        print(file_name)

                else:
                    if content_type in ['text/plain']:
                        suffix = '.txt'
                    if content_type in ['text/html']:
                        suffix = '.htm'
                    if part.get_charsets() is None:
                        message_content = part.get_payload(decode=True)
                    else:
                        message_content = part.get_payload(decode=True).decode(part.get_charsets()[0])

        try:
            if message_content is not None:
                # 处理html标签 script、style内容
                message_content = filter_tags(message_content)
                # 处理\r\n
                message_content = message_content.replace('\r\n', '').replace(' ', '')
        except:
            print('处理html标签失败')

        msg_data = {
            'content': message_content,
            'type': suffix,
            'files': files
        }
        return msg_data