Python源码示例:email.message()

示例1
def __init__(self, info):
        # info is either an email.message or
        # an httplib.HTTPResponse object.
        if isinstance(info, http.client.HTTPResponse):
            for key, value in info.getheaders():
                key = key.lower()
                prev = self.get(key)
                if prev is not None:
                    value = ", ".join((prev, value))
                self[key] = value
            self.status = info.status
            self["status"] = str(self.status)
            self.reason = info.reason
            self.version = info.version
        elif isinstance(info, email.message.Message):
            for key, value in list(info.items()):
                self[key.lower()] = value
            self.status = int(self["status"])
        else:
            for key, value in info.items():
                self[key.lower()] = value
            self.status = int(self.get("status", self.status)) 
示例2
def replace_str_in_msg(msg: Message, fr: str, to: str):
    if msg.get_content_maintype() != "text":
        return msg
    new_body = msg.get_payload(decode=True).replace(fr.encode(), to.encode())

    # If utf-8 decoding fails, do not touch message part
    try:
        new_body = new_body.decode("utf-8")
    except:
        return msg

    cte = (
        msg["Content-Transfer-Encoding"].lower()
        if msg["Content-Transfer-Encoding"]
        else None
    )
    subtype = msg.get_content_subtype()
    delete_header(msg, "Content-Transfer-Encoding")
    delete_header(msg, "Content-Type")

    email.contentmanager.set_text_content(msg, new_body, subtype=subtype, cte=cte)
    return msg 
示例3
def handle_sender_email(envelope: Envelope):
    filename = (
        arrow.now().format("YYYY-MM-DD_HH-mm-ss") + "_" + random_string(10) + ".eml"
    )
    filepath = os.path.join(SENDER_DIR, filename)

    with open(filepath, "wb") as f:
        f.write(envelope.original_content)

    LOG.d("Write email to sender at %s", filepath)

    msg = email.message_from_bytes(envelope.original_content)
    orig = get_orig_message_from_bounce(msg)
    if orig:
        LOG.warning(
            "Original message %s -> %s saved at %s", orig["From"], orig["To"], filepath
        )

    return "250 email to sender accepted" 
示例4
def handle_DATA(self, server, session, envelope: Envelope):
        LOG.debug(
            "===>> New message, mail from %s, rctp tos %s ",
            envelope.mail_from,
            envelope.rcpt_tos,
        )

        if POSTFIX_SUBMISSION_TLS:
            smtp = SMTP(POSTFIX_SERVER, 587)
            smtp.starttls()
        else:
            smtp = SMTP(POSTFIX_SERVER, POSTFIX_PORT or 25)

        app = new_app()
        with app.app_context():
            return handle(envelope, smtp) 
示例5
def get_spam_info(msg: Message) -> (bool, str):
    """parse SpamAssassin header to detect whether a message is classified as spam.
    Return (is spam, spam status detail)
    The header format is
    ```X-Spam-Status: No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
  DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
  URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
    """
    spamassassin_status = msg["X-Spam-Status"]
    if not spamassassin_status:
        return False, ""

    # yes or no
    spamassassin_answer = spamassassin_status[: spamassassin_status.find(",")]

    return spamassassin_answer.lower() == "yes", spamassassin_status 
示例6
def test_base64(self):
    """Test automatic decoding of a base-64-encoded message."""
    # Create upload.
    upload_data = (
        """--================1234==
Content-Type: text/plain
MIME-Version: 1.0
Content-Disposition: form-data; name="field1"; filename="stuff.txt"
Content-Transfer-Encoding: base64

%s
--================1234==--""" % base64.urlsafe_b64encode('value'))

    upload_url = blobstore.create_upload_url('/success')

    upload, forward_environ, _ = self._run_test_success(
        upload_data, upload_url)

    self.assertEquals('/success', forward_environ['PATH_INFO'])
    self.assertEquals(
        ('form-data', {'filename': 'stuff.txt', 'name': 'field1'}),
        cgi.parse_header(upload['content-disposition'])) 
示例7
def test_check_line_endings(self):
    """Ensure the upload message uses correct RFC-2821 line terminators."""
    # Create upload.
    upload_data = (
        """--================1234==
Content-Type: text/plain
MIME-Version: 1.0
Content-Disposition: form-data; name="field1"; filename="stuff.txt"

value
--================1234==--""")

    upload_url = blobstore.create_upload_url('/success')

    request_path = urlparse.urlparse(upload_url)[2]
    self.environ['PATH_INFO'] = request_path
    self.environ['CONTENT_TYPE'] = (
        'multipart/form-data; boundary="================1234=="')

    status, _, _, _, forward_body = self.run_dispatcher(upload_data)

    self.assertEquals('200 OK', status)
    forward_body = forward_body.replace('\r\n', '')
    self.assertEqual(forward_body.rfind('\n'), -1) 
示例8
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        old_subpath = self._lookup(key)
        temp_key = self.add(message)
        temp_subpath = self._lookup(temp_key)
        if isinstance(message, MaildirMessage):
            # temp's subdir and suffix were specified by message.
            dominant_subpath = temp_subpath
        else:
            # temp's subdir and suffix were defaults from add().
            dominant_subpath = old_subpath
        subdir = os.path.dirname(dominant_subpath)
        if self.colon in dominant_subpath:
            suffix = self.colon + dominant_subpath.split(self.colon)[-1]
        else:
            suffix = ''
        self.discard(key)
        new_path = os.path.join(self._path, subdir, key + suffix)
        os.rename(os.path.join(self._path, temp_subpath), new_path)
        if isinstance(message, MaildirMessage):
            os.utime(new_path, (os.path.getatime(new_path),
                                message.get_date())) 
示例9
def remove_folder(self, folder):
        """Delete the named folder, which must be empty."""
        path = os.path.join(self._path, '.' + folder)
        for entry in os.listdir(os.path.join(path, 'new')) + \
                     os.listdir(os.path.join(path, 'cur')):
            if len(entry) < 1 or entry[0] != '.':
                raise NotEmptyError('Folder contains message(s): %s' % folder)
        for entry in os.listdir(path):
            if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
               os.path.isdir(os.path.join(path, entry)):
                raise NotEmptyError("Folder contains subdirectory '%s': %s" %
                                    (folder, entry))
        for root, dirs, files in os.walk(path, topdown=False):
            for entry in files:
                os.remove(os.path.join(root, entry))
            for entry in dirs:
                os.rmdir(os.path.join(root, entry))
        os.rmdir(path) 
示例10
def test_get_decoded_payload(self):
        eq = self.assertEqual
        msg = self._msgobj('msg_10.txt')
        # The outer message is a multipart
        eq(msg.get_payload(decode=True), None)
        # Subpart 1 is 7bit encoded
        eq(msg.get_payload(0).get_payload(decode=True),
           'This is a 7bit encoded message.\n')
        # Subpart 2 is quopri
        eq(msg.get_payload(1).get_payload(decode=True),
           '\xa1This is a Quoted Printable encoded message!\n')
        # Subpart 3 is base64
        eq(msg.get_payload(2).get_payload(decode=True),
           'This is a Base64 encoded message.')
        # Subpart 4 is base64 with a trailing newline, which
        # used to be stripped (issue 7143).
        eq(msg.get_payload(3).get_payload(decode=True),
           'This is a Base64 encoded message.\n')
        # Subpart 5 has no Content-Transfer-Encoding: header.
        eq(msg.get_payload(4).get_payload(decode=True),
           'This has no Content-Transfer-Encoding: header.\n') 
示例11
def fail(self, object, name=None, *args):
        """Raise an exception for unimplemented types."""
        message = "don't know how to document object%s of type %s" % (
            name and ' ' + repr(name), type(object).__name__)
        raise TypeError(message) 
示例12
def should_append_alias(msg: Message, address: str):
    """whether an alias should be appended to TO header in message"""

    # # force convert header to string, sometimes addrs is Header object
    if msg["To"] and address.lower() in str(msg["To"]).lower():
        return False
    if msg["Cc"] and address.lower() in str(msg["Cc"]).lower():
        return False

    return True 
示例13
def prepare_pgp_message(orig_msg: Message, pgp_fingerprint: str):
    msg = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")

    # copy all headers from original message except all standard MIME headers
    for i in reversed(range(len(orig_msg._headers))):
        header_name = orig_msg._headers[i][0].lower()
        if header_name.lower() not in _MIME_HEADERS:
            msg[header_name] = orig_msg._headers[i][1]

    # Delete unnecessary headers in orig_msg except to save space
    delete_all_headers_except(
        orig_msg, _MIME_HEADERS,
    )

    first = MIMEApplication(
        _subtype="pgp-encrypted", _encoder=encoders.encode_7or8bit, _data=""
    )
    first.set_payload("Version: 1")
    msg.attach(first)

    second = MIMEApplication("octet-stream", _encoder=encoders.encode_7or8bit)
    second.add_header("Content-Disposition", "inline")
    # encrypt original message
    encrypted_data = pgp_utils.encrypt_file(
        BytesIO(orig_msg.as_bytes()), pgp_fingerprint
    )
    second.set_payload(encrypted_data)
    msg.attach(second)

    return msg 
示例14
def delete_header(msg: Message, header: str):
    """a header can appear several times in message."""
    # inspired from https://stackoverflow.com/a/47903323/1428034
    for i in reversed(range(len(msg._headers))):
        header_name = msg._headers[i][0].lower()
        if header_name == header.lower():
            del msg._headers[i] 
示例15
def get_orig_message_from_bounce(msg: Message) -> Message:
    """parse the original email from Bounce"""
    i = 0
    for part in msg.walk():
        i += 1

        # the original message is the 4th part
        # 1st part is the root part,  multipart/report
        # 2nd is text/plain, Postfix log
        # ...
        # 7th is original message
        if i == 7:
            return part 
示例16
def get_orig_message_from_spamassassin_report(msg: Message) -> Message:
    """parse the original email from Spamassassin report"""
    i = 0
    for part in msg.walk():
        i += 1

        # the original message is the 4th part
        # 1st part is the root part,  multipart/report
        # 2nd is text/plain, SpamAssassin part
        # 3rd is the original message in message/rfc822 content type
        # 4th is original message
        if i == 4:
            return part 
示例17
def copy(msg: Message) -> Message:
    """return a copy of message"""
    return email.message_from_bytes(msg.as_bytes()) 
示例18
def normalize_header_lines(self, message):
    """Normalize blocks of header lines in a message.

    This sorts blocks of consecutive header lines and then for certain
    headers (Content-Type and -Disposition) sorts the parameter values.
    """
    lines = message.splitlines(True)
    # Normalize groups of header-like lines.
    output = []
    headers = []
    for line in lines:
      if re.match(r'^\S+: ', line):
        # It's a header line.  Maybe normalize the parameter values.
        line = self.normalize_header(line)
        headers.append(line)
      else:
        # Not a header.  Flush the list of headers.
        if headers:
          headers.sort()
          output.extend(headers)
          headers = []
        output.append(line)
    # Flush the final list of headers.
    if headers:
      headers.sort()
      output.extend(headers)
    # Put it all back together.
    return ''.join(output) 
示例19
def test_store_and_build_forward_message_with_gs_bucket(self):
    """Test the high-level method to store a blob and build a MIME message."""
    self.generate_blob_key().AndReturn(blobstore.BlobKey('item1'))

    self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))

    self.mox.ReplayAll()

    form = FakeForm({
        'field1': FakeForm(name='field1',
                           file=StringIO.StringIO('file1'),
                           type='image/png',
                           type_options={'a': 'b', 'x': 'y'},
                           filename='stuff.png',
                           headers={'h1': 'v1',
                                    'h2': 'v2',
                                   }),
        })

    content_type, content_text = self.handler.store_and_build_forward_message(
        form, '================1234==', bucket_name='my-test-bucket')

    self.mox.VerifyAll()

    self.assertEqual(EXPECTED_GENERATED_CONTENT_TYPE_WITH_BUCKET, content_type)
    self.assertMessageEqual(EXPECTED_GENERATED_MIME_MESSAGE_WITH_BUCKET,
                            content_text)

    blob1 = blobstore.get('item1')
    self.assertEquals('stuff.png', blob1.filename) 
示例20
def test_store_and_build_forward_message_utf8_values(self):
    """Test store and build message method with UTF-8 values."""
    self.generate_blob_key().AndReturn(blobstore.BlobKey('item1'))

    self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))

    self.mox.ReplayAll()

    form = FakeForm({
        'field1': FakeForm(name='field1',
                           file=StringIO.StringIO('file1'),
                           type='text/plain',
                           type_options={'a': 'b', 'x': 'y'},
                           filename='chinese_char_name_\xe6\xb1\x89.txt',
                           headers={'h1': 'v1',
                                    'h2': 'v2',
                                   }),
        })

    content_type, content_text = self.handler.store_and_build_forward_message(
        form, '================1234==')

    self.mox.VerifyAll()

    self.assertEqual(EXPECTED_GENERATED_UTF8_CONTENT_TYPE, content_type)
    self.assertMessageEqual(EXPECTED_GENERATED_UTF8_MIME_MESSAGE,
                            content_text)

    blob1 = blobstore.get('item1')
    self.assertEquals(u'chinese_char_name_\u6c49.txt', blob1.filename) 
示例21
def test_store_and_build_forward_message_latin1_values(self):
    """Test store and build message method with Latin-1 values."""
    # There is a special exception class for this case. This is designed to
    # emulate production, which currently fails silently. See b/6722082.
    self.generate_blob_key().AndReturn(blobstore.BlobKey('item1'))

    self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))

    self.mox.ReplayAll()

    form = FakeForm({
        'field1': FakeForm(name='field1',
                           file=StringIO.StringIO('file1'),
                           type='text/plain',
                           type_options={'a': 'b', 'x': 'y'},
                           filename='german_char_name_f\xfc\xdfe.txt',
                           headers={'h1': 'v1',
                                    'h2': 'v2',
                                   }),
        })

    self.assertRaises(blob_upload._InvalidMetadataError,
                      self.handler.store_and_build_forward_message, form,
                      '================1234==')

    self.mox.VerifyAll()

    blob1 = blobstore.get('item1')
    self.assertIsNone(blob1) 
示例22
def add(self, message):
        """Add message and return assigned key."""
        raise NotImplementedError('Method must be implemented by subclass') 
示例23
def remove(self, key):
        """Remove the keyed message; raise KeyError if it doesn't exist."""
        raise NotImplementedError('Method must be implemented by subclass') 
示例24
def discard(self, key):
        """If the keyed message exists, remove it."""
        try:
            self.remove(key)
        except KeyError:
            pass 
示例25
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        raise NotImplementedError('Method must be implemented by subclass') 
示例26
def get(self, key, default=None):
        """Return the keyed message, or default if it doesn't exist."""
        try:
            return self.__getitem__(key)
        except KeyError:
            return default 
示例27
def iteritems(self):
        """Return an iterator over (key, message) tuples."""
        for key in self.iterkeys():
            try:
                value = self[key]
            except KeyError:
                continue
            yield (key, value) 
示例28
def items(self):
        """Return a list of (key, message) tuples. Memory intensive."""
        return list(self.iteritems()) 
示例29
def has_key(self, key):
        """Return True if the keyed message exists, False otherwise."""
        raise NotImplementedError('Method must be implemented by subclass') 
示例30
def pop(self, key, default=None):
        """Delete the keyed message and return it, or default."""
        try:
            result = self[key]
        except KeyError:
            return default
        self.discard(key)
        return result