Python源码示例:email.message()
示例1
def __init__(self, info):
# info is either an email.message or
# an httplib.HTTPResponse object.
if isinstance(info, http.client.HTTPResponse):
for key, value in info.getheaders():
key = key.lower()
prev = self.get(key)
if prev is not None:
value = ", ".join((prev, value))
self[key] = value
self.status = info.status
self["status"] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.message.Message):
for key, value in list(info.items()):
self[key.lower()] = value
self.status = int(self["status"])
else:
for key, value in info.items():
self[key.lower()] = value
self.status = int(self.get("status", self.status))
示例2
def replace_str_in_msg(msg: Message, fr: str, to: str):
if msg.get_content_maintype() != "text":
return msg
new_body = msg.get_payload(decode=True).replace(fr.encode(), to.encode())
# If utf-8 decoding fails, do not touch message part
try:
new_body = new_body.decode("utf-8")
except:
return msg
cte = (
msg["Content-Transfer-Encoding"].lower()
if msg["Content-Transfer-Encoding"]
else None
)
subtype = msg.get_content_subtype()
delete_header(msg, "Content-Transfer-Encoding")
delete_header(msg, "Content-Type")
email.contentmanager.set_text_content(msg, new_body, subtype=subtype, cte=cte)
return msg
示例3
def handle_sender_email(envelope: Envelope):
filename = (
arrow.now().format("YYYY-MM-DD_HH-mm-ss") + "_" + random_string(10) + ".eml"
)
filepath = os.path.join(SENDER_DIR, filename)
with open(filepath, "wb") as f:
f.write(envelope.original_content)
LOG.d("Write email to sender at %s", filepath)
msg = email.message_from_bytes(envelope.original_content)
orig = get_orig_message_from_bounce(msg)
if orig:
LOG.warning(
"Original message %s -> %s saved at %s", orig["From"], orig["To"], filepath
)
return "250 email to sender accepted"
示例4
def handle_DATA(self, server, session, envelope: Envelope):
LOG.debug(
"===>> New message, mail from %s, rctp tos %s ",
envelope.mail_from,
envelope.rcpt_tos,
)
if POSTFIX_SUBMISSION_TLS:
smtp = SMTP(POSTFIX_SERVER, 587)
smtp.starttls()
else:
smtp = SMTP(POSTFIX_SERVER, POSTFIX_PORT or 25)
app = new_app()
with app.app_context():
return handle(envelope, smtp)
示例5
def get_spam_info(msg: Message) -> (bool, str):
"""parse SpamAssassin header to detect whether a message is classified as spam.
Return (is spam, spam status detail)
The header format is
```X-Spam-Status: No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
"""
spamassassin_status = msg["X-Spam-Status"]
if not spamassassin_status:
return False, ""
# yes or no
spamassassin_answer = spamassassin_status[: spamassassin_status.find(",")]
return spamassassin_answer.lower() == "yes", spamassassin_status
示例6
def test_base64(self):
"""Test automatic decoding of a base-64-encoded message."""
# Create upload.
upload_data = (
"""--================1234==
Content-Type: text/plain
MIME-Version: 1.0
Content-Disposition: form-data; name="field1"; filename="stuff.txt"
Content-Transfer-Encoding: base64
%s
--================1234==--""" % base64.urlsafe_b64encode('value'))
upload_url = blobstore.create_upload_url('/success')
upload, forward_environ, _ = self._run_test_success(
upload_data, upload_url)
self.assertEquals('/success', forward_environ['PATH_INFO'])
self.assertEquals(
('form-data', {'filename': 'stuff.txt', 'name': 'field1'}),
cgi.parse_header(upload['content-disposition']))
示例7
def test_check_line_endings(self):
"""Ensure the upload message uses correct RFC-2821 line terminators."""
# Create upload.
upload_data = (
"""--================1234==
Content-Type: text/plain
MIME-Version: 1.0
Content-Disposition: form-data; name="field1"; filename="stuff.txt"
value
--================1234==--""")
upload_url = blobstore.create_upload_url('/success')
request_path = urlparse.urlparse(upload_url)[2]
self.environ['PATH_INFO'] = request_path
self.environ['CONTENT_TYPE'] = (
'multipart/form-data; boundary="================1234=="')
status, _, _, _, forward_body = self.run_dispatcher(upload_data)
self.assertEquals('200 OK', status)
forward_body = forward_body.replace('\r\n', '')
self.assertEqual(forward_body.rfind('\n'), -1)
示例8
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
old_subpath = self._lookup(key)
temp_key = self.add(message)
temp_subpath = self._lookup(temp_key)
if isinstance(message, MaildirMessage):
# temp's subdir and suffix were specified by message.
dominant_subpath = temp_subpath
else:
# temp's subdir and suffix were defaults from add().
dominant_subpath = old_subpath
subdir = os.path.dirname(dominant_subpath)
if self.colon in dominant_subpath:
suffix = self.colon + dominant_subpath.split(self.colon)[-1]
else:
suffix = ''
self.discard(key)
new_path = os.path.join(self._path, subdir, key + suffix)
os.rename(os.path.join(self._path, temp_subpath), new_path)
if isinstance(message, MaildirMessage):
os.utime(new_path, (os.path.getatime(new_path),
message.get_date()))
示例9
def remove_folder(self, folder):
"""Delete the named folder, which must be empty."""
path = os.path.join(self._path, '.' + folder)
for entry in os.listdir(os.path.join(path, 'new')) + \
os.listdir(os.path.join(path, 'cur')):
if len(entry) < 1 or entry[0] != '.':
raise NotEmptyError('Folder contains message(s): %s' % folder)
for entry in os.listdir(path):
if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
os.path.isdir(os.path.join(path, entry)):
raise NotEmptyError("Folder contains subdirectory '%s': %s" %
(folder, entry))
for root, dirs, files in os.walk(path, topdown=False):
for entry in files:
os.remove(os.path.join(root, entry))
for entry in dirs:
os.rmdir(os.path.join(root, entry))
os.rmdir(path)
示例10
def test_get_decoded_payload(self):
eq = self.assertEqual
msg = self._msgobj('msg_10.txt')
# The outer message is a multipart
eq(msg.get_payload(decode=True), None)
# Subpart 1 is 7bit encoded
eq(msg.get_payload(0).get_payload(decode=True),
'This is a 7bit encoded message.\n')
# Subpart 2 is quopri
eq(msg.get_payload(1).get_payload(decode=True),
'\xa1This is a Quoted Printable encoded message!\n')
# Subpart 3 is base64
eq(msg.get_payload(2).get_payload(decode=True),
'This is a Base64 encoded message.')
# Subpart 4 is base64 with a trailing newline, which
# used to be stripped (issue 7143).
eq(msg.get_payload(3).get_payload(decode=True),
'This is a Base64 encoded message.\n')
# Subpart 5 has no Content-Transfer-Encoding: header.
eq(msg.get_payload(4).get_payload(decode=True),
'This has no Content-Transfer-Encoding: header.\n')
示例11
def fail(self, object, name=None, *args):
"""Raise an exception for unimplemented types."""
message = "don't know how to document object%s of type %s" % (
name and ' ' + repr(name), type(object).__name__)
raise TypeError(message)
示例12
def should_append_alias(msg: Message, address: str):
"""whether an alias should be appended to TO header in message"""
# # force convert header to string, sometimes addrs is Header object
if msg["To"] and address.lower() in str(msg["To"]).lower():
return False
if msg["Cc"] and address.lower() in str(msg["Cc"]).lower():
return False
return True
示例13
def prepare_pgp_message(orig_msg: Message, pgp_fingerprint: str):
msg = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
# copy all headers from original message except all standard MIME headers
for i in reversed(range(len(orig_msg._headers))):
header_name = orig_msg._headers[i][0].lower()
if header_name.lower() not in _MIME_HEADERS:
msg[header_name] = orig_msg._headers[i][1]
# Delete unnecessary headers in orig_msg except to save space
delete_all_headers_except(
orig_msg, _MIME_HEADERS,
)
first = MIMEApplication(
_subtype="pgp-encrypted", _encoder=encoders.encode_7or8bit, _data=""
)
first.set_payload("Version: 1")
msg.attach(first)
second = MIMEApplication("octet-stream", _encoder=encoders.encode_7or8bit)
second.add_header("Content-Disposition", "inline")
# encrypt original message
encrypted_data = pgp_utils.encrypt_file(
BytesIO(orig_msg.as_bytes()), pgp_fingerprint
)
second.set_payload(encrypted_data)
msg.attach(second)
return msg
示例14
def delete_header(msg: Message, header: str):
"""a header can appear several times in message."""
# inspired from https://stackoverflow.com/a/47903323/1428034
for i in reversed(range(len(msg._headers))):
header_name = msg._headers[i][0].lower()
if header_name == header.lower():
del msg._headers[i]
示例15
def get_orig_message_from_bounce(msg: Message) -> Message:
"""parse the original email from Bounce"""
i = 0
for part in msg.walk():
i += 1
# the original message is the 4th part
# 1st part is the root part, multipart/report
# 2nd is text/plain, Postfix log
# ...
# 7th is original message
if i == 7:
return part
示例16
def get_orig_message_from_spamassassin_report(msg: Message) -> Message:
"""parse the original email from Spamassassin report"""
i = 0
for part in msg.walk():
i += 1
# the original message is the 4th part
# 1st part is the root part, multipart/report
# 2nd is text/plain, SpamAssassin part
# 3rd is the original message in message/rfc822 content type
# 4th is original message
if i == 4:
return part
示例17
def copy(msg: Message) -> Message:
"""return a copy of message"""
return email.message_from_bytes(msg.as_bytes())
示例18
def normalize_header_lines(self, message):
"""Normalize blocks of header lines in a message.
This sorts blocks of consecutive header lines and then for certain
headers (Content-Type and -Disposition) sorts the parameter values.
"""
lines = message.splitlines(True)
# Normalize groups of header-like lines.
output = []
headers = []
for line in lines:
if re.match(r'^\S+: ', line):
# It's a header line. Maybe normalize the parameter values.
line = self.normalize_header(line)
headers.append(line)
else:
# Not a header. Flush the list of headers.
if headers:
headers.sort()
output.extend(headers)
headers = []
output.append(line)
# Flush the final list of headers.
if headers:
headers.sort()
output.extend(headers)
# Put it all back together.
return ''.join(output)
示例19
def test_store_and_build_forward_message_with_gs_bucket(self):
"""Test the high-level method to store a blob and build a MIME message."""
self.generate_blob_key().AndReturn(blobstore.BlobKey('item1'))
self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))
self.mox.ReplayAll()
form = FakeForm({
'field1': FakeForm(name='field1',
file=StringIO.StringIO('file1'),
type='image/png',
type_options={'a': 'b', 'x': 'y'},
filename='stuff.png',
headers={'h1': 'v1',
'h2': 'v2',
}),
})
content_type, content_text = self.handler.store_and_build_forward_message(
form, '================1234==', bucket_name='my-test-bucket')
self.mox.VerifyAll()
self.assertEqual(EXPECTED_GENERATED_CONTENT_TYPE_WITH_BUCKET, content_type)
self.assertMessageEqual(EXPECTED_GENERATED_MIME_MESSAGE_WITH_BUCKET,
content_text)
blob1 = blobstore.get('item1')
self.assertEquals('stuff.png', blob1.filename)
示例20
def test_store_and_build_forward_message_utf8_values(self):
"""Test store and build message method with UTF-8 values."""
self.generate_blob_key().AndReturn(blobstore.BlobKey('item1'))
self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))
self.mox.ReplayAll()
form = FakeForm({
'field1': FakeForm(name='field1',
file=StringIO.StringIO('file1'),
type='text/plain',
type_options={'a': 'b', 'x': 'y'},
filename='chinese_char_name_\xe6\xb1\x89.txt',
headers={'h1': 'v1',
'h2': 'v2',
}),
})
content_type, content_text = self.handler.store_and_build_forward_message(
form, '================1234==')
self.mox.VerifyAll()
self.assertEqual(EXPECTED_GENERATED_UTF8_CONTENT_TYPE, content_type)
self.assertMessageEqual(EXPECTED_GENERATED_UTF8_MIME_MESSAGE,
content_text)
blob1 = blobstore.get('item1')
self.assertEquals(u'chinese_char_name_\u6c49.txt', blob1.filename)
示例21
def test_store_and_build_forward_message_latin1_values(self):
"""Test store and build message method with Latin-1 values."""
# There is a special exception class for this case. This is designed to
# emulate production, which currently fails silently. See b/6722082.
self.generate_blob_key().AndReturn(blobstore.BlobKey('item1'))
self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))
self.mox.ReplayAll()
form = FakeForm({
'field1': FakeForm(name='field1',
file=StringIO.StringIO('file1'),
type='text/plain',
type_options={'a': 'b', 'x': 'y'},
filename='german_char_name_f\xfc\xdfe.txt',
headers={'h1': 'v1',
'h2': 'v2',
}),
})
self.assertRaises(blob_upload._InvalidMetadataError,
self.handler.store_and_build_forward_message, form,
'================1234==')
self.mox.VerifyAll()
blob1 = blobstore.get('item1')
self.assertIsNone(blob1)
示例22
def add(self, message):
"""Add message and return assigned key."""
raise NotImplementedError('Method must be implemented by subclass')
示例23
def remove(self, key):
"""Remove the keyed message; raise KeyError if it doesn't exist."""
raise NotImplementedError('Method must be implemented by subclass')
示例24
def discard(self, key):
"""If the keyed message exists, remove it."""
try:
self.remove(key)
except KeyError:
pass
示例25
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
raise NotImplementedError('Method must be implemented by subclass')
示例26
def get(self, key, default=None):
"""Return the keyed message, or default if it doesn't exist."""
try:
return self.__getitem__(key)
except KeyError:
return default
示例27
def iteritems(self):
"""Return an iterator over (key, message) tuples."""
for key in self.iterkeys():
try:
value = self[key]
except KeyError:
continue
yield (key, value)
示例28
def items(self):
"""Return a list of (key, message) tuples. Memory intensive."""
return list(self.iteritems())
示例29
def has_key(self, key):
"""Return True if the keyed message exists, False otherwise."""
raise NotImplementedError('Method must be implemented by subclass')
示例30
def pop(self, key, default=None):
"""Delete the keyed message and return it, or default."""
try:
result = self[key]
except KeyError:
return default
self.discard(key)
return result