Python源码示例:email.message_from_bytes()

示例1
def handle_sender_email(envelope: Envelope):
    filename = (
        arrow.now().format("YYYY-MM-DD_HH-mm-ss") + "_" + random_string(10) + ".eml"
    )
    filepath = os.path.join(SENDER_DIR, filename)

    with open(filepath, "wb") as f:
        f.write(envelope.original_content)

    LOG.d("Write email to sender at %s", filepath)

    msg = email.message_from_bytes(envelope.original_content)
    orig = get_orig_message_from_bounce(msg)
    if orig:
        LOG.warning(
            "Original message %s -> %s saved at %s", orig["From"], orig["To"], filepath
        )

    return "250 email to sender accepted" 
示例2
def get_imap_messages() -> Generator[EmailMessage, None, None]:
    mbox = IMAP4_SSL(settings.EMAIL_GATEWAY_IMAP_SERVER, settings.EMAIL_GATEWAY_IMAP_PORT)
    mbox.login(settings.EMAIL_GATEWAY_LOGIN, settings.EMAIL_GATEWAY_PASSWORD)
    try:
        mbox.select(settings.EMAIL_GATEWAY_IMAP_FOLDER)
        try:
            status, num_ids_data = mbox.search(None, 'ALL')
            for message_id in num_ids_data[0].split():
                status, msg_data = mbox.fetch(message_id, '(RFC822)')
                assert isinstance(msg_data[0], tuple)
                msg_as_bytes = msg_data[0][1]
                message = email.message_from_bytes(msg_as_bytes, policy=email.policy.default)
                assert isinstance(message, EmailMessage)  # https://github.com/python/typeshed/issues/2417
                yield message
                mbox.store(message_id, '+FLAGS', '\\Deleted')
            mbox.expunge()
        finally:
            mbox.close()
    finally:
        mbox.logout() 
示例3
def consume(self, event: Mapping[str, Any]) -> None:
        rcpt_to = event['rcpt_to']
        msg = email.message_from_bytes(
            base64.b64decode(event["msg_base64"]),
            policy=email.policy.default,
        )
        assert isinstance(msg, EmailMessage)  # https://github.com/python/typeshed/issues/2417
        if not is_missed_message_address(rcpt_to):
            # Missed message addresses are one-time use, so we don't need
            # to worry about emails to them resulting in message spam.
            recipient_realm = decode_stream_email_address(rcpt_to)[0].realm
            try:
                rate_limit_mirror_by_realm(recipient_realm)
            except RateLimited:
                logger.warning("MirrorWorker: Rejecting an email from: %s "
                               "to realm: %s - rate limited.",
                               msg['From'], recipient_realm.name)
                return

        mirror_email(msg, rcpt_to=rcpt_to) 
示例4
def parse(self):
        try:
            if self.message_type == ParserMessageType.STRING:
                msg = email.message_from_string(self.raw_message, Message)
            elif self.message_type == ParserMessageType.BYTES:
                msg = email.message_from_bytes(self.raw_message, Message)
            elif self.message_type == ParserMessageType.BINARY_FILE:
                msg = email.message_from_binary_file(self.raw_message, Message)
            else:
                raise ValueError("Invalid message_type, could not parse message.")
        except Exception:
            raise ParseEmailException

        # Do basic post-processing of the message, checking it for defects or
        # other missing information.
        if msg.defects:
            raise DefectMessageException

        # Add headers used by LEGO
        msg.original_size = len(self.raw_message)
        msg["X-MailFrom"] = self.mail_from

        return msg 
示例5
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message)) 
示例6
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message)) 
示例7
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message)) 
示例8
def test_mangled_from_with_bad_bytes(self):
        source = textwrap.dedent("""\
            Content-Type: text/plain; charset="utf-8"
            MIME-Version: 1.0
            Content-Transfer-Encoding: 8bit
            From: aaa@bbb.org

        """).encode('utf-8')
        msg = email.message_from_bytes(source + b'From R\xc3\xb6lli\n')
        b = BytesIO()
        g = BytesGenerator(b, mangle_from_=True)
        g.flatten(msg)
        self.assertEqual(b.getvalue(), source + b'>From R\xc3\xb6lli\n')


# Test the basic MIMEAudio class 
示例9
def test_binary_body_with_encode_noop(self):
        # Issue 16564: This does not produce an RFC valid message, since to be
        # valid it should have a CTE of binary.  But the below works in
        # Python2, and is documented as working this way.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        s = BytesIO()
        g = BytesGenerator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_bytes(wireform)
        self.assertEqual(msg.get_payload(), '\uFFFD' * len(bytesdata))
        self.assertEqual(msg2.get_payload(decode=True), bytesdata) 
示例10
def test_8bit_in_quopri_body(self):
        # This is non-RFC compliant data...without 'decode' the library code
        # decodes the body using the charset from the headers, and because the
        # source byte really is utf-8 this works.  This is likely to fail
        # against real dirty data (ie: produce mojibake), but the data is
        # invalid anyway so it is as good a guess as any.  But this means that
        # this test just confirms the current behavior; that behavior is not
        # necessarily the best possible behavior.  With 'decode' it is
        # returning the raw bytes, so that test should be of correct behavior,
        # or at least produce the same result that email4 did.
        m = self.bodytest_msg.format(charset='utf-8',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n')
        self.assertEqual(msg.get_payload(decode=True),
                         'pöstál\n'.encode('utf-8')) 
示例11
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
        # This is similar to the previous test, but proves that if the 8bit
        # byte is undecodeable in the specified charset, it gets replaced
        # by the unicode 'unknown' character.  Again, this may or may not
        # be the ideal behavior.  Note that if decode=False none of the
        # decoders will get involved, so this is the only test we need
        # for this behavior.
        m = self.bodytest_msg.format(charset='ascii',
                                     cte='quoted-printable',
                                     bodyline='p=C3=B6stál').encode('utf-8')
        msg = email.message_from_bytes(m)
        self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n')
        self.assertEqual(msg.get_payload(decode=True),
                        'pöstál\n'.encode('utf-8'))

    # test_defect_handling:test_invalid_chars_in_base64_payload 
示例12
def decrypt(smime: bytes, key: rsa.RSAPrivateKey, serial: Optional[int] = None):
    """Decrypt an S/MIME message using the RSA Private Key given.

    The recipient can be hinted using the serial parameter, otherwise we assume single recipient = the given key.
    """
    string_content = smime.decode('utf8')
    msg: Message = email.message_from_string(string_content)
    assert msg.get_content_type() == 'application/pkcs7-mime'
    assert msg.get_filename() == 'smime.p7m'
    assert msg.get('Content-Description') == 'S/MIME Encrypted Message'

    b64payload = msg.get_payload()
    payload = b64decode(b64payload)
    decrypted_data = decrypt_smime_content(payload, key)
    decrypted_msg: Message = email.message_from_bytes(decrypted_data)

    return decrypted_msg.get_payload() 
示例13
def fetch_email(M, msg_id):
    """Returns the given email message as a unicode string."""
    res, data = M.fetch(msg_id, '(RFC822)')
    if res == 'OK':
        # Data here is a list with 1 element containing a tuple
        # whose 2nd element is a long string containing the email
        # The content is a bytes that must be decoded
        raw_msg_txt = data[0][1]
        # In Python3, we call message_from_bytes, but this function doesn't
        # exist in Python 2.
        try:
            msg = email.message_from_bytes(raw_msg_txt)
        except AttributeError:
            msg = email.message_from_string(raw_msg_txt)
        # At this point, we have a message containing bytes (not unicode)
        # fields that will still need to be decoded, ideally according to the
        # character set specified in the message.
        return msg
    else:
        return None 
示例14
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, bytes):
            self._become_message(email.message_from_bytes(message))
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif isinstance(message, io.TextIOWrapper):
            self._become_message(email.message_from_file(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_binary_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message)) 
示例15
def append_literal(self, data):
        tag, mailbox_name, size = self.append_literal_command
        if data == CRLF:
            if 'UIDPLUS' in self.capabilities:
                self.send_tagged_line(tag, 'OK [APPENDUID %s %s] APPEND completed.' %
                                      (self.uidvalidity, self.server_state.max_uid(self.user_login, mailbox_name)))
            else:
                self.send_tagged_line(tag, 'OK APPEND completed.')
            self.append_literal_command = None
            return

        literal_data, rest = data[:size], data[size:]
        if len(literal_data) < size:
            self.send_tagged_line(self.append_literal_command[0],
                                  'BAD literal length : expected %s but was %s' % (size, len(literal_data)))
            self.append_literal_command = None
        elif rest and rest != CRLF:
            self.send_tagged_line(self.append_literal_command[0],
                                  'BAD literal trailing data : expected CRLF but got %s' % (rest))
        else:
            m = email.message_from_bytes(data)
            self.server_state.add_mail(self.user_login, Mail(m), mailbox_name)

            if rest:
                self.append_literal(rest) 
示例16
def copy(msg: Message) -> Message:
    """return a copy of message"""
    return email.message_from_bytes(msg.as_bytes()) 
示例17
def generateTestCases(self):
        for folder in self.args.spam_folder:
            for eml in Path(folder).glob("*.eml"):
                try:
                    msg = message_from_bytes(eml.read_bytes())
                except IOError:
                    print("! Failed to read from {}".format(eml.name))

                del msg["To"]

                yield msg 
示例18
def prepare(self):
        """
        Prepare the `self.message` object.
        """
        if self.prepared_message:
            self.message = message_from_bytes(self.prepared_message)
            self.prepared = True
            return

        self.text = self.text or BeautifulSoup(self.html, 'html.parser').get_text(strip=True)
        self.html = self.html or mistune.markdown(self.text)

        self.message['Sender'] = stringify_address(self.sender)
        self.message['From'] = stringify_addresses(self.authors) if self.authors else stringify_address(self.sender)
        self.message['To'] = stringify_addresses(self.receivers)
        self.message['Subject'] = self.subject
        if self.cc:
            self.message['CC'] = stringify_addresses(self.cc)
        if self.bcc:
            self.message['BCC'] = stringify_addresses(self.bcc)
        if self.reply_to:
            self.message['Reply-To'] = stringify_addresses(self.reply_to)
        if self.headers:
            for key, value in self.headers.items():
                self.message[key] = value

        body = MIMEMultipart('alternative')
        plaintext_part = MIMEText(self.text, 'plain')
        html_part = MIMEText(self.html, 'html')
        body.attach(plaintext_part)
        body.attach(html_part)
        self.message.attach(body)
        self.prepared = True 
示例19
def get_string(self, key):
        """Return a string representation or raise a KeyError.

        Uses email.message.Message to create a 7bit clean string
        representation of the message."""
        return email.message_from_bytes(self.get_bytes(key)).as_string() 
示例20
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        return email.message_from_bytes(
            self.get_bytes(key)).as_string(unixfrom=from_) 
示例21
def _bytes_msg(self, bytestring, message=None, policy=None):
        if policy is None:
            policy = self.policy
        if message is None:
            message = self.message
        return email.message_from_bytes(bytestring, message, policy=policy) 
示例22
def msg_as_input(self, msg):
        m = message_from_bytes(msg, policy=policy.SMTP)
        b = io.BytesIO()
        g = BytesGenerator(b)
        g.flatten(m)
        self.assertEqual(b.getvalue(), msg)

    # XXX: spaces are not preserved correctly here yet in the general case. 
示例23
def test_cte_type_7bit_transforms_8bit_cte(self):
        source = textwrap.dedent("""\
            From: foo@bar.com
            To: Dinsdale
            Subject: Nudge nudge, wink, wink
            Mime-Version: 1.0
            Content-Type: text/plain; charset="latin-1"
            Content-Transfer-Encoding: 8bit

            oh là là, know what I mean, know what I mean?
            """).encode('latin1')
        msg = message_from_bytes(source)
        expected =  textwrap.dedent("""\
            From: foo@bar.com
            To: Dinsdale
            Subject: Nudge nudge, wink, wink
            Mime-Version: 1.0
            Content-Type: text/plain; charset="iso-8859-1"
            Content-Transfer-Encoding: quoted-printable

            oh l=E0 l=E0, know what I mean, know what I mean?
            """).encode('ascii')
        s = io.BytesIO()
        g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit',
                                                       linesep='\n'))
        g.flatten(msg)
        self.assertEqual(s.getvalue(), expected) 
示例24
def get_string(self, key):
        """Return a string representation or raise a KeyError.

        Uses email.message.Message to create a 7bit clean string
        representation of the message."""
        return email.message_from_bytes(self.get_bytes(key)).as_string() 
示例25
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        return email.message_from_bytes(
            self.get_bytes(key)).as_string(unixfrom=from_) 
示例26
def get_item_as_eml(client: EWSClient, item_id, target_mailbox=None):
    """
    Retrieve item as an eml
    :param client: EWS Client
    :param item_id: Item id to retrieve
    :param (Optional) target_mailbox: target mailbox
    :return: Output tuple
    """
    account = client.get_account(target_mailbox)
    item = client.get_item_from_mailbox(account, item_id)

    if item.mime_content:
        mime_content = item.mime_content
        if isinstance(mime_content, bytes):
            email_content = email.message_from_bytes(mime_content)
        else:
            email_content = email.message_from_string(mime_content)
        if item.headers:
            attached_email_headers = [
                (h, " ".join(map(str.strip, v.split("\r\n"))))
                for (h, v) in list(email_content.items())
            ]
            for header in item.headers:
                if (
                    header.name,
                    header.value,
                ) not in attached_email_headers and header.name != "Content-Type":
                    email_content.add_header(header.name, header.value)

        eml_name = item.subject if item.subject else "demisto_untitled_eml"
        file_result = fileResult(eml_name + ".eml", email_content.as_string())
        file_result = (
            file_result if file_result else "Failed uploading eml file to war room"
        )

        return file_result 
示例27
def _msgs(box=None, uids='1:*', *, parsed=False, raw=False, policy=None):
    from mailur import local, message

    def flags(m):
        res = re.search(r'FLAGS \(([^)]*)\)', m).group(1).split()
        if '\\Recent' in res:
            res.remove('\\Recent')
        return ' '.join(res)

    def msg(res):
        msg = {
            'uid': re.search(r'UID (\d+)', res[0].decode()).group(1),
            'flags': flags(res[0].decode()),
        }
        if parsed:
            body = email.message_from_bytes(res[1], policy=policy)
            parts = [p.get_payload() for p in body.get_payload()]
            txt = [p.get_payload() for p in parts[1]]
            msg['meta'] = json.loads(parts[0])
            msg['body'] = txt[0]
            msg['body_txt'] = txt[1] if len(txt) > 1 else None
            msg['body_end'] = parts[2] if len(parts) > 2 else None
            msg['body_full'] = body
            msg['raw'] = res[1]
        else:
            body = res[1]
            if not raw:
                body = email.message_from_bytes(res[1], policy=policy)
            msg['body'] = body

        return msg

    policy = policy if policy else message.policy
    con_local.select(box or local.ALL)
    res = con_local.fetch(uids, '(uid flags body[])')
    return [msg(res[i]) for i in range(0, len(res), 2)] 
示例28
def raw_msg(uid, box, parsed=False, con=None):
    con.select(box)
    res = con.fetch(uid, 'BODY.PEEK[]')
    body = res[0][1] if res else None
    if body and parsed:
        body = email.message_from_bytes(body)
    return body 
示例29
def raw_part(uid, box, part, con=None):
    con.select(box)
    fields = '(BINARY.PEEK[{0}] BINARY.PEEK[{0}.mime])'.format(part)
    res = con.fetch(uid, fields)
    body = res[0][1]
    mime = res[1][1]
    content_type = email.message_from_bytes(mime).get_content_type()
    return body, content_type 
示例30
def uids_by_msgid_gmail(con):
    uids = {}
    res = con.fetch('1:*', 'BODY.PEEK[HEADER.FIELDS (X-GM-MSGID)]')
    for i in range(0, len(res), 2):
        uid = res[i][0].decode().split()[2]
        line = res[i][1].strip()
        if not line:
            continue
        gid = email.message_from_bytes(line)['X-GM-MSGID'].strip()
        uids[gid.strip('<>')] = uid
    return uids