Python源码示例:email.message_from_file()
示例1
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
self.set_metadata_version()
示例2
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
self.set_metadata_version()
示例3
def test_default_type(self):
eq = self.assertEqual
fp = openfile('msg_30.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例4
def test_default_type_with_explicit_container_type(self):
eq = self.assertEqual
fp = openfile('msg_28.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例5
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例6
def test_default_type(self):
eq = self.assertEqual
fp = openfile('msg_30.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例7
def test_default_type_with_explicit_container_type(self):
eq = self.assertEqual
fp = openfile('msg_28.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例8
def test_message_from_file_with_class(self):
# Create a subclass
class MyMessage(Message):
pass
fp = openfile('msg_01.txt')
try:
msg = email.message_from_file(fp, MyMessage)
finally:
fp.close()
self.assertIsInstance(msg, MyMessage)
# Try something more complicated
fp = openfile('msg_02.txt')
try:
msg = email.message_from_file(fp, MyMessage)
finally:
fp.close()
for subpart in msg.walk():
self.assertIsInstance(subpart, MyMessage)
示例9
def test__all__(self):
module = __import__('email')
# Can't use sorted() here due to Python 2.3 compatibility
all = module.__all__[:]
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例10
def _get_version_from_pkg_metadata(package_name):
"""Get the version from package metadata if present.
This looks for PKG-INFO if present (for sdists), and if not looks
for METADATA (for wheels) and failing that will return None.
"""
pkg_metadata_filenames = ['PKG-INFO', 'METADATA']
pkg_metadata = {}
for filename in pkg_metadata_filenames:
try:
pkg_metadata_file = open(filename, 'r')
except (IOError, OSError):
continue
try:
pkg_metadata = email.message_from_file(pkg_metadata_file)
except email.errors.MessageError:
continue
# Check to make sure we're in our own dir
if pkg_metadata.get('Name', None) != package_name:
return None
return pkg_metadata.get('Version', None)
示例11
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
logger.debug('Attempting to set metadata for %s', self)
self.set_metadata_version()
示例12
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
self.set_metadata_version()
示例13
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
self.set_metadata_version()
示例14
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
self.set_metadata_version()
示例15
def text_parser(path):
with open(path) as eml:
m = email.message_from_file(eml)
if m.get_content_type!='mixed':
for m in m.walk():
if m.get_content_subtype()=='plain':
try:
text = str(m.get_payload(decode=True),encoding='utf-8')
except:
text = str(m.get_payload(decode=True),encoding='gbk')
text = text.replace('--\n发自我的网易邮箱平板适配版','')
text = text.split('----------------')[0]
text = text.strip()
return text
if m.get_content_subtype()=='html':
try:
text = str(m.get_payload(decode=True),encoding='utf-8')
except:
text = str(m.get_payload(decode=True),encoding='gbk')
text = h2t.handle(text)
text = text.strip()
text = text.replace(' ','')
return text
示例16
def test_default_type(self):
eq = self.assertEqual
fp = openfile('msg_30.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例17
def test_default_type_with_explicit_container_type(self):
eq = self.assertEqual
fp = openfile('msg_28.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例18
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例19
def test_default_type(self):
eq = self.assertEqual
fp = openfile('msg_30.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例20
def test_default_type_with_explicit_container_type(self):
eq = self.assertEqual
fp = openfile('msg_28.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例21
def test_message_from_file_with_class(self):
unless = self.assertTrue
# Create a subclass
class MyMessage(Message):
pass
fp = openfile('msg_01.txt')
try:
msg = email.message_from_file(fp, MyMessage)
finally:
fp.close()
unless(isinstance(msg, MyMessage))
# Try something more complicated
fp = openfile('msg_02.txt')
try:
msg = email.message_from_file(fp, MyMessage)
finally:
fp.close()
for subpart in msg.walk():
unless(isinstance(subpart, MyMessage))
示例22
def test__all__(self):
module = __import__('email')
# Can't use sorted() here due to Python 2.3 compatibility
all = module.__all__[:]
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例23
def test_default_type(self):
eq = self.assertEqual
fp = openfile('msg_30.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例24
def test_default_type_with_explicit_container_type(self):
eq = self.assertEqual
fp = openfile('msg_28.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例25
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例26
def test_default_type(self):
eq = self.assertEqual
fp = openfile('msg_30.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例27
def test_default_type_with_explicit_container_type(self):
eq = self.assertEqual
fp = openfile('msg_28.txt')
try:
msg = email.message_from_file(fp)
finally:
fp.close()
container1 = msg.get_payload(0)
eq(container1.get_default_type(), 'message/rfc822')
eq(container1.get_content_type(), 'message/rfc822')
container2 = msg.get_payload(1)
eq(container2.get_default_type(), 'message/rfc822')
eq(container2.get_content_type(), 'message/rfc822')
container1a = container1.get_payload(0)
eq(container1a.get_default_type(), 'text/plain')
eq(container1a.get_content_type(), 'text/plain')
container2a = container2.get_payload(0)
eq(container2a.get_default_type(), 'text/plain')
eq(container2a.get_content_type(), 'text/plain')
示例28
def test_message_from_file_with_class(self):
# Create a subclass
class MyMessage(Message):
pass
fp = openfile('msg_01.txt')
try:
msg = email.message_from_file(fp, MyMessage)
finally:
fp.close()
self.assertIsInstance(msg, MyMessage)
# Try something more complicated
fp = openfile('msg_02.txt')
try:
msg = email.message_from_file(fp, MyMessage)
finally:
fp.close()
for subpart in msg.walk():
self.assertIsInstance(subpart, MyMessage)
示例29
def test__all__(self):
module = __import__('email')
# Can't use sorted() here due to Python 2.3 compatibility
all = module.__all__[:]
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
示例30
def read_file(self, fileob):
"""Read the metadata values from a file object."""
msg = message_from_file(fileob)
self._fields['Metadata-Version'] = msg['metadata-version']
# When reading, get all the fields we can
for field in _ALL_FIELDS:
if field not in msg:
continue
if field in _LISTFIELDS:
# we can have multiple lines
values = msg.get_all(field)
if field in _LISTTUPLEFIELDS and values is not None:
values = [tuple(value.split(',')) for value in values]
self.set(field, values)
else:
# single line
value = msg[field]
if value is not None and value != 'UNKNOWN':
self.set(field, value)
# logger.debug('Attempting to set metadata for %s', self)
# self.set_metadata_version()