Python源码示例:email.utils()
示例1
def _encode_field(self, name, value):
self._log(DEBUG1, '_FormDataPart._encode_field: %s %s',
type(name), type(value))
if not _rfc2231_encode:
s = '%s="%s"' % (name, value)
self._log(DEBUG1, '_FormDataPart._encode_field: %s %s',
type(s), s)
if _isunicode(s):
s = s.encode('utf-8')
self._log(DEBUG1, '_FormDataPart._encode_field: %s %s',
type(s), s)
return s
if not [ch for ch in '\r\n\\' if ch in value]:
try:
return ('%s="%s"' % (name, value)).encode('ascii')
except UnicodeEncodeError:
self._log(DEBUG1, 'UnicodeEncodeError 3.x')
except UnicodeDecodeError: # 2.x
self._log(DEBUG1, 'UnicodeDecodeError 2.x')
# RFC 2231
value = email.utils.encode_rfc2231(value, 'utf-8')
return ('%s*=%s' % (name, value)).encode('ascii')
示例2
def dateToUTCstr(str_date):
# this fails to parse timezones out of formats like
# Tue, 17 Jun 2010 08:33:51 EDT
# so it will assume the local timezone for those cases
try:
dt = dateutil.parser.parse(str_date)
except (TypeError, ValueError) as e:
# print u"Failed to parse date with dateutil, using email utils: date={}".format(str_date)
parsed_dt = parsedate_tz(str_date)
# Make an arbitrary tz info object name can be anything NSTZ "Newman Seconds Time Zone"
nstz_info = dateutil.tz.tzoffset("NSTZ",parsed_dt[9])
dt= datetime.datetime(*parsed_dt[:6], tzinfo=nstz_info)
if not dt.tzinfo:
print "WARNING: Failed to parse timezone defaulting to UTC for Date: {}".format(str_date)
dt = dt.replace(tzinfo=dateutil.tz.tzutc())
dt_tz = dt.astimezone(dateutil.tz.tzutc())
time_str = dt_tz.strftime('%Y-%m-%dT%H:%M:%S')
# print u"Parsed date={} ====> {}".format(str_date, time_str)
return time_str
示例3
def generate_email_files(msg):
counter = 1
upload_date = time.mktime(email.utils.parsedate(msg["Date"]))
for part in msg.walk():
# multipart/* are just containers
if part.get_content_maintype() == 'multipart':
continue
# Applications should really sanitize the given filename so that an
# email message can't be used to overwrite important files
filename = part.get_filename()
if not filename:
ext = mimetypes.guess_extension(part.get_content_type())
if not ext:
# Use a generic bag-of-bits extension
ext = '.bin'
filename = 'part-%03d%s' % (counter, ext)
counter += 1
data = part.get_payload(decode=True)
if parse_pathname(filename).ext == '.zip':
for zipfn, zipdata, zipdt in generate_zip_files(data):
yield zipfn, zipdata, zipdt
else:
yield filename, data, upload_date
示例4
def getmailaddresses(self, prop):
"""retrieve From:, To: and Cc: addresses"""
addrs=email.utils.getaddresses(self.msg.get_all(prop, []))
for i, (name, addr) in enumerate(addrs):
if not name and addr:
# only one string! Is it the address or is it the name ?
# use the same for both and see later
name=addr
try:
# address must be ascii only
addr=addr.encode('ascii')
except UnicodeError:
addr=''
else:
# address must match adress regex
if not email_address_re.match(addr.decode("utf-8")):
addr=''
if not isinstance(addr, str):
# Python 2 imaplib returns a bytearray,
# Python 3 imaplib returns a str.
addrs[i]=(self.getmailheader(name), addr.decode("utf-8"))
return addrs
示例5
def _get_last_chromium_modification():
"""Returns the last modification date of the chromium-browser-official tar file"""
with _get_requests_session() as session:
response = session.head(
'https://storage.googleapis.com/chromium-browser-official/chromium-{}.tar.xz'.format(
get_chromium_version()))
response.raise_for_status()
return email.utils.parsedate_to_datetime(response.headers['Last-Modified'])
示例6
def _get_gitiles_git_log_date(log_entry):
"""Helper for _get_gitiles_git_log_date"""
return email.utils.parsedate_to_datetime(log_entry['committer']['time'])
示例7
def open_local_file(self, req):
import email.utils
import mimetypes
host = req.host
filename = req.selector
localfile = url2pathname(filename)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(filename)[0]
headers = email.message_from_string(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if host:
host, port = splitport(host)
if not host or \
(not port and _safe_gethostbyname(host) in self.get_names()):
if host:
origurl = 'file://' + host + filename
else:
origurl = 'file://' + filename
return addinfourl(open(localfile, 'rb'), headers, origurl)
except OSError as exp:
# users shouldn't expect OSErrors coming from urlopen()
raise URLError(exp)
raise URLError('file not on local host')
示例8
def open_local_file(self, url):
"""Use local file."""
import email.utils
import mimetypes
host, file = splithost(url)
localname = url2pathname(file)
try:
stats = os.stat(localname)
except OSError as e:
raise URLError(e.strerror, e.filename)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(url)[0]
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if not host:
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
return addinfourl(open(localname, 'rb'), headers, urlfile)
host, port = splitport(host)
if (not port
and socket.gethostbyname(host) in ((localhost(),) + thishost())):
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
elif file[:2] == './':
raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
return addinfourl(open(localname, 'rb'), headers, urlfile)
raise URLError('local file error: not on local host')
示例9
def _send_to_sendmail(self, message, email_addresses, cc=None, bcc=None,
sender=None):
msg = email.mime.text.MIMEText(base.handle_encoding(message),
'html' if self.html else 'plain')
msg['Subject'] = base.handle_encoding(self._get_summary())
msg['From'] = _get_sender(sender)
msg['To'] = ', '.join(email_addresses)
# We could pass the priority in the 'Importance' header, but
# since nobody pays attention to that (and we can't even set
# that header when sending from appengine), we just use the
# fact it's embedded in the subject line.
if cc:
if not isinstance(cc, six.string_types):
cc = ', '.join(cc)
msg['Cc'] = cc
if bcc:
if not isinstance(bcc, six.string_types):
bcc = ', '.join(bcc)
msg['Bcc'] = bcc
# I think sendmail wants just email addresses, so extract
# them in case the user specified "Name <email>".
to_emails = [email.utils.parseaddr(a) for a in email_addresses]
to_emails = [email_addr for (_, email_addr) in to_emails]
s = smtplib.SMTP('localhost')
s.sendmail('no-reply@khanacademy.org', to_emails, msg.as_string())
s.quit()
示例10
def open_local_file(self, req):
import email.utils
import mimetypes
host = req.host
filename = req.selector
localfile = url2pathname(filename)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(filename)[0]
headers = email.message_from_string(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if host:
host, port = splitport(host)
if not host or \
(not port and _safe_gethostbyname(host) in self.get_names()):
if host:
origurl = 'file://' + host + filename
else:
origurl = 'file://' + filename
return addinfourl(open(localfile, 'rb'), headers, origurl)
except OSError as exp:
raise URLError(exp)
raise URLError('file not on local host')
示例11
def open_local_file(self, url):
"""Use local file."""
import email.utils
import mimetypes
host, file = splithost(url)
localname = url2pathname(file)
try:
stats = os.stat(localname)
except OSError as e:
raise URLError(e.strerror, e.filename)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(url)[0]
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if not host:
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
return addinfourl(open(localname, 'rb'), headers, urlfile)
host, port = splitport(host)
if (not port
and socket.gethostbyname(host) in ((localhost(),) + thishost())):
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
elif file[:2] == './':
raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
return addinfourl(open(localname, 'rb'), headers, urlfile)
raise URLError('local file error: not on local host')
示例12
def parse_email(message):
"""Parse email message"""
msg = email.message_from_string(message)
parts = []
for part in msg.walk():
if part.get_content_type() == 'text/plain':
parts.append(part.get_payload(decode=True).decode(
part.get_content_charset() or 'utf-8'
))
from_addr = email.utils.parseaddr(decode_header(msg['From']))
to_addr = email.utils.getaddresses([decode_header(h) for h in msg.get_all('To')])
subject = decode_header(msg['Subject'])
return {'from': from_addr, 'to': to_addr, 'subject': subject, 'body': parts}
示例13
def parse_address(addr_str):
name, addr = email.utils.parseaddr(_parse_header(addr_str))
return name, addr
示例14
def _get_addr_list(self, field, text):
ret = []
f = self._m.get_all(field, [])
f = (_parse_header(x) for x in f)
addrs = email.utils.getaddresses(f)
for name, addr in addrs:
name = name or addr
if text:
ret.append(_addr_fmt_text(name, addr))
else:
ret.append((name, addr))
if text:
ret = ", ".join(ret)
return ret
示例15
def get_date(self, timestamp=False):
tup = email.utils.parsedate_tz(self._m["date"])
if tup:
stamp = email.utils.mktime_tz(tup)
if timestamp:
return stamp
return datetime.datetime.utcfromtimestamp(stamp)
示例16
def get_reviewed_by(self):
"""Try to find a "Reviewed-by:" line in message body"""
prefix = "Reviewed-by:"
r = self._find_line("^" + prefix + ".*>$")
if r:
return email.utils.parseaddr(r[len(prefix) :].strip())
else:
return None
示例17
def open_local_file(self, req):
import email.utils
import mimetypes
host = req.host
filename = req.selector
localfile = url2pathname(filename)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(filename)[0]
headers = email.message_from_string(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if host:
host, port = splitport(host)
if not host or \
(not port and _safe_gethostbyname(host) in self.get_names()):
if host:
origurl = 'file://' + host + filename
else:
origurl = 'file://' + filename
return addinfourl(open(localfile, 'rb'), headers, origurl)
except OSError as exp:
# users shouldn't expect OSErrors coming from urlopen()
raise URLError(exp)
raise URLError('file not on local host')
示例18
def open_local_file(self, url):
"""Use local file."""
import email.utils
import mimetypes
host, file = splithost(url)
localname = url2pathname(file)
try:
stats = os.stat(localname)
except OSError as e:
raise URLError(e.strerror, e.filename)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(url)[0]
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if not host:
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
return addinfourl(open(localname, 'rb'), headers, urlfile)
host, port = splitport(host)
if (not port
and socket.gethostbyname(host) in ((localhost(),) + thishost())):
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
elif file[:2] == './':
raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
return addinfourl(open(localname, 'rb'), headers, urlfile)
raise URLError('local file error: not on local host')
示例19
def open_local_file(self, req):
import email.utils
import mimetypes
host = req.host
filename = req.selector
localfile = url2pathname(filename)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(filename)[0]
headers = email.message_from_string(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if host:
host, port = splitport(host)
if not host or \
(not port and _safe_gethostbyname(host) in self.get_names()):
if host:
origurl = 'file://' + host + filename
else:
origurl = 'file://' + filename
return addinfourl(open(localfile, 'rb'), headers, origurl)
except OSError as exp:
raise URLError(exp)
raise URLError('file not on local host')
示例20
def open_local_file(self, url):
"""Use local file."""
import email.utils
import mimetypes
host, file = splithost(url)
localname = url2pathname(file)
try:
stats = os.stat(localname)
except OSError as e:
raise URLError(e.strerror, e.filename)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(url)[0]
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if not host:
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
return addinfourl(open(localname, 'rb'), headers, urlfile)
host, port = splitport(host)
if (not port
and socket.gethostbyname(host) in ((localhost(),) + thishost())):
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
elif file[:2] == './':
raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
return addinfourl(open(localname, 'rb'), headers, urlfile)
raise URLError('local file error: not on local host')
示例21
def get_addr_header(self, header_name):
"""Get a list of the first addresses from this header."""
values = list()
for value in self.get_decoded_header(header_name):
for dummy, addr in email.utils.getaddresses([value]):
if addr:
values.append(addr)
break
return values
示例22
def get_all_addr_header(self, header_name):
"""Get a list of all the addresses from this header."""
values = list()
for value in self.get_decoded_header(header_name):
for dummy, addr in email.utils.getaddresses([value]):
if addr:
values.append(addr)
return values
示例23
def get_name_header(self, header_name):
"""Get a list of the first names from this header."""
values = list()
for value in self.get_decoded_header(header_name):
for name, dummy in email.utils.getaddresses([value]):
if name:
values.append(name)
break
return values
示例24
def receive_date(self):
"""Get the date from the headers."""
received = self.msg.get_all("Received") or list()
for header in received:
try:
ts = header.rsplit(";", 1)[1]
except IndexError:
continue
ts = email.utils.parsedate(ts)
return calendar.timegm(ts)
# SA will look in other headers too. Perhaps we should also?
return time.time()
示例25
def normalizeDate(self, datestr):
if not datestr:
print("No date for '%s'. Using Unix Epoch instead." % self.directory)
datestr="Thu, 1 Jan 1970 00:00:00 +0000"
t = email.utils.parsedate_tz(datestr)
timeval = time.mktime(t[:-1])
date = email.utils.formatdate(timeval, True)
utc = time.gmtime(email.utils.mktime_tz(t))
rfc2822 = '{} {:+03d}00'.format(date[:-6], t[9]//3600)
iso8601 = time.strftime('%Y%m%dT%H%M%SZ', utc)
return (rfc2822, iso8601)
示例26
def on_event(self, event, **params):
class EmailCancelled(Exception):
pass
po = None
mo = None
for v in list(params.values()):
if isinstance(v, Message):
mo = v
po = mo.project
break
elif isinstance(v, Project):
po = v
break
if not po:
return
for nt in list(self.get_notifications(po).values()):
headers = {}
if not nt["enabled"]:
continue
if nt["event"] != event:
continue
def cancel_email():
raise EmailCancelled
params["cancel"] = cancel_email
ctx = Context(params, autoescape=False)
try:
subject = Template(nt["subject_template"]).render(ctx).strip()
body = Template(nt["body_template"]).render(ctx).strip()
to = [x.strip() for x in Template(nt["to"]).render(ctx).strip().split()]
cc = [x.strip() for x in Template(nt["cc"]).render(ctx).strip().split()]
except EmailCancelled:
continue
if mo:
if nt["reply_to_all"] or not len(to):
to += [mo.get_sender_addr()]
if nt["reply_to_all"]:
cc += [x[1] for x in mo.recipients]
if mo and nt["in_reply_to"]:
headers["In-Reply-To"] = "<%s>" % mo.message_id
if mo and nt["set_reply_to"]:
headers["Reply-To"] = "<%s>" % mo.project.mailing_list
if nt["reply_subject"] and mo:
subject = (
"Re: " + mo.subject
if not mo.subject.startswith("Re:")
else mo.subject
)
if nt["to_user"] and "user" in params and params["user"].email:
to += params["user"].email
if not (subject and body and (to or cc)):
continue
headers["Subject"] = subject
headers["Message-ID"] = email.utils.make_msgid()
self._send_email(to, cc, headers, body)
示例27
def sending(msg, linesep='\r\n', maxlinelen=70):
def _fold(v, name=None):
try:
v.encode('ascii')
except UnicodeEncodeError:
v = email.header.Header(v, charset='utf-8', header_name=name)
v = v.encode(maxlinelen=maxlinelen, linesep=linesep)
return v
def fold(name, value):
return '%s: %s%s' % (name, _fold(value, name), linesep)
def fold_addrs(name, value):
addrs = email.utils.getaddresses([value])
parts = []
length = 0
for n, a in addrs:
part = '%s <%s>' % (_fold(n), a)
length += len(part)
if len(part) > maxlinelen:
part = '%s %s' % (linesep, part)
length = len(part)
parts.append(part)
addrs = ','.join(parts)
return '%s: %s%s' % (name, addrs, linesep)
params = [
[a for n, a in email.utils.getaddresses([msg[name]])]
for name in ('From', 'To') if msg[name]
]
if len(params) < 2:
raise ValueError('"From" and "To" shouldn\'t be empty')
# These new email policies work pretty strange,
# so this machinery is to encode "Subject", "From" and "To" headers
# and keep mime body as is
headers = []
for n, f in (('Subject', fold), ('From', fold_addrs), ('To', fold_addrs)):
headers.append(f(n, msg[n]))
del msg[n]
msg = b''.join([''.join(headers).encode(), msg.as_bytes()])
params.append(msg)
return params
示例28
def send_email(self, subject, body, to, **kwargs):
"""
发送邮件
:param subject:
:param body:
:param to:
:param kwargs:
:return: str: 成功为 'success'
有异常为 traceback信息
"""
try:
if not to:
logger.warning('收件人为空,无法发送邮件')
return
if not isinstance(to, list):
raise TypeError('收件人需要为列表')
list_cc = kwargs.get('list_cc_addr', [])
if not isinstance(list_cc, list):
raise TypeError('抄送人需要为列表')
# 构造MIMEMultipart对象做为根容器
main_msg = email.mime.multipart.MIMEMultipart()
# 添加文本内容
text_msg = email.mime.text.MIMEText(body, 'plain', 'utf-8')
main_msg.attach(text_msg)
# 添加附件
filename_list = kwargs.get('filename_list')
if filename_list:
for filename in kwargs['filename_list']:
file_msg = self._add_attachment(filename)
main_msg.attach(file_msg)
# 消息内容:
main_msg['Subject'] = Header(subject, "utf-8").encode()
main_msg['From'] = formataddr(["Archery 通知", self.MAIL_REVIEW_FROM_ADDR])
main_msg['To'] = ','.join(list(set(to)))
main_msg['Cc'] = ', '.join(str(cc) for cc in list(set(list_cc)))
main_msg['Date'] = email.utils.formatdate()
if self.MAIL_SSL:
server = smtplib.SMTP_SSL(self.MAIL_REVIEW_SMTP_SERVER, self.MAIL_REVIEW_SMTP_PORT, timeout=3)
else:
server = smtplib.SMTP(self.MAIL_REVIEW_SMTP_SERVER, self.MAIL_REVIEW_SMTP_PORT, timeout=3)
# 如果提供的密码为空,则不需要登录
if self.MAIL_REVIEW_FROM_PASSWORD:
server.login(self.MAIL_REVIEW_FROM_ADDR, self.MAIL_REVIEW_FROM_PASSWORD)
server.sendmail(self.MAIL_REVIEW_FROM_ADDR, to + list_cc, main_msg.as_string())
server.quit()
logger.debug(f'邮件推送成功\n消息标题:{subject}\n通知对象:{to + list_cc}\n消息内容:{body}')
return 'success'
except Exception:
errmsg = '邮件推送失败\n{}'.format(traceback.format_exc())
logger.error(errmsg)
return errmsg