Python源码示例:email.utils()

示例1
def _encode_field(self, name, value):
        self._log(DEBUG1, '_FormDataPart._encode_field: %s %s',
                  type(name), type(value))
        if not _rfc2231_encode:
            s = '%s="%s"' % (name, value)
            self._log(DEBUG1, '_FormDataPart._encode_field: %s %s',
                      type(s), s)
            if _isunicode(s):
                s = s.encode('utf-8')
                self._log(DEBUG1, '_FormDataPart._encode_field: %s %s',
                          type(s), s)
            return s

        if not [ch for ch in '\r\n\\' if ch in value]:
            try:
                return ('%s="%s"' % (name, value)).encode('ascii')
            except UnicodeEncodeError:
                self._log(DEBUG1, 'UnicodeEncodeError 3.x')
            except UnicodeDecodeError:  # 2.x
                self._log(DEBUG1, 'UnicodeDecodeError 2.x')
        # RFC 2231
        value = email.utils.encode_rfc2231(value, 'utf-8')
        return ('%s*=%s' % (name, value)).encode('ascii') 
示例2
def dateToUTCstr(str_date):
    # this fails to parse timezones out of formats like
    # Tue, 17 Jun 2010 08:33:51 EDT
    # so it will assume the local timezone for those cases

    try:
        dt = dateutil.parser.parse(str_date)
    except (TypeError, ValueError) as e:
        # print u"Failed to parse date with dateutil, using email utils: date={}".format(str_date)
        parsed_dt = parsedate_tz(str_date)
        # Make an arbitrary tz info object name can be anything NSTZ "Newman Seconds Time Zone"
        nstz_info = dateutil.tz.tzoffset("NSTZ",parsed_dt[9])
        dt= datetime.datetime(*parsed_dt[:6], tzinfo=nstz_info)


    if not dt.tzinfo:
        print "WARNING:  Failed to parse timezone defaulting to UTC for Date: {}".format(str_date)
        dt = dt.replace(tzinfo=dateutil.tz.tzutc())

    dt_tz = dt.astimezone(dateutil.tz.tzutc())
    time_str =  dt_tz.strftime('%Y-%m-%dT%H:%M:%S')
    # print u"Parsed date={} ====> {}".format(str_date, time_str)

    return time_str 
示例3
def generate_email_files(msg):
    counter = 1
    upload_date = time.mktime(email.utils.parsedate(msg["Date"]))
    for part in msg.walk():
        # multipart/* are just containers
        if part.get_content_maintype() == 'multipart':
            continue
        # Applications should really sanitize the given filename so that an
        # email message can't be used to overwrite important files
        filename = part.get_filename()
        if not filename:
            ext = mimetypes.guess_extension(part.get_content_type())
            if not ext:
                # Use a generic bag-of-bits extension
                ext = '.bin'
            filename = 'part-%03d%s' % (counter, ext)
        counter += 1

        data = part.get_payload(decode=True)
        if parse_pathname(filename).ext == '.zip':
            for zipfn, zipdata, zipdt in generate_zip_files(data):
                yield zipfn, zipdata, zipdt
        else:
            yield filename, data, upload_date 
示例4
def getmailaddresses(self, prop):
        """retrieve From:, To: and Cc: addresses"""
        addrs=email.utils.getaddresses(self.msg.get_all(prop, []))
        for i, (name, addr) in enumerate(addrs):
            if not name and addr:
                # only one string! Is it the address or is it the name ?
                # use the same for both and see later
                name=addr

            try:
                # address must be ascii only
                addr=addr.encode('ascii')
            except UnicodeError:
                addr=''
            else:
                # address must match adress regex
                if not email_address_re.match(addr.decode("utf-8")):
                    addr=''
            if not isinstance(addr, str):
                # Python 2 imaplib returns a bytearray,
                # Python 3 imaplib returns a str.
                addrs[i]=(self.getmailheader(name), addr.decode("utf-8"))
        return addrs 
示例5
def _get_last_chromium_modification():
    """Returns the last modification date of the chromium-browser-official tar file"""
    with _get_requests_session() as session:
        response = session.head(
            'https://storage.googleapis.com/chromium-browser-official/chromium-{}.tar.xz'.format(
                get_chromium_version()))
        response.raise_for_status()
        return email.utils.parsedate_to_datetime(response.headers['Last-Modified']) 
示例6
def _get_gitiles_git_log_date(log_entry):
    """Helper for _get_gitiles_git_log_date"""
    return email.utils.parsedate_to_datetime(log_entry['committer']['time']) 
示例7
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host') 
示例8
def open_local_file(self, url):
        """Use local file."""
        import email.utils
        import mimetypes
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in ((localhost(),) + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error: not on local host') 
示例9
def _send_to_sendmail(self, message, email_addresses, cc=None, bcc=None,
                          sender=None):
        msg = email.mime.text.MIMEText(base.handle_encoding(message),
                                       'html' if self.html else 'plain')
        msg['Subject'] = base.handle_encoding(self._get_summary())
        msg['From'] = _get_sender(sender)
        msg['To'] = ', '.join(email_addresses)
        # We could pass the priority in the 'Importance' header, but
        # since nobody pays attention to that (and we can't even set
        # that header when sending from appengine), we just use the
        # fact it's embedded in the subject line.
        if cc:
            if not isinstance(cc, six.string_types):
                cc = ', '.join(cc)
            msg['Cc'] = cc
        if bcc:
            if not isinstance(bcc, six.string_types):
                bcc = ', '.join(bcc)
            msg['Bcc'] = bcc

        # I think sendmail wants just email addresses, so extract
        # them in case the user specified "Name <email>".
        to_emails = [email.utils.parseaddr(a) for a in email_addresses]
        to_emails = [email_addr for (_, email_addr) in to_emails]

        s = smtplib.SMTP('localhost')
        s.sendmail('no-reply@khanacademy.org', to_emails, msg.as_string())
        s.quit() 
示例10
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            raise URLError(exp)
        raise URLError('file not on local host') 
示例11
def open_local_file(self, url):
        """Use local file."""
        import email.utils
        import mimetypes
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in ((localhost(),) + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error: not on local host') 
示例12
def parse_email(message):
    """Parse email message"""
    msg = email.message_from_string(message)
    parts = []
    for part in msg.walk():
        if part.get_content_type() == 'text/plain':
            parts.append(part.get_payload(decode=True).decode(
                part.get_content_charset() or 'utf-8'
            ))

    from_addr = email.utils.parseaddr(decode_header(msg['From']))
    to_addr = email.utils.getaddresses([decode_header(h) for h in msg.get_all('To')])
    subject = decode_header(msg['Subject'])
    return {'from': from_addr, 'to': to_addr, 'subject': subject, 'body': parts} 
示例13
def parse_address(addr_str):
    name, addr = email.utils.parseaddr(_parse_header(addr_str))
    return name, addr 
示例14
def _get_addr_list(self, field, text):
        ret = []
        f = self._m.get_all(field, [])
        f = (_parse_header(x) for x in f)
        addrs = email.utils.getaddresses(f)
        for name, addr in addrs:
            name = name or addr
            if text:
                ret.append(_addr_fmt_text(name, addr))
            else:
                ret.append((name, addr))
        if text:
            ret = ", ".join(ret)
        return ret 
示例15
def get_date(self, timestamp=False):
        tup = email.utils.parsedate_tz(self._m["date"])
        if tup:
            stamp = email.utils.mktime_tz(tup)
            if timestamp:
                return stamp
            return datetime.datetime.utcfromtimestamp(stamp) 
示例16
def get_reviewed_by(self):
        """Try to find a "Reviewed-by:" line in message body"""
        prefix = "Reviewed-by:"
        r = self._find_line("^" + prefix + ".*>$")
        if r:
            return email.utils.parseaddr(r[len(prefix) :].strip())
        else:
            return None 
示例17
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host') 
示例18
def open_local_file(self, url):
        """Use local file."""
        import email.utils
        import mimetypes
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in ((localhost(),) + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error: not on local host') 
示例19
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            raise URLError(exp)
        raise URLError('file not on local host') 
示例20
def open_local_file(self, url):
        """Use local file."""
        import email.utils
        import mimetypes
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in ((localhost(),) + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error: not on local host') 
示例21
def get_addr_header(self, header_name):
        """Get a list of the first addresses from this header."""
        values = list()
        for value in self.get_decoded_header(header_name):
            for dummy, addr in email.utils.getaddresses([value]):
                if addr:
                    values.append(addr)
                    break
        return values 
示例22
def get_all_addr_header(self, header_name):
        """Get a list of all the addresses from this header."""
        values = list()
        for value in self.get_decoded_header(header_name):
            for dummy, addr in email.utils.getaddresses([value]):
                if addr:
                    values.append(addr)
        return values 
示例23
def get_name_header(self, header_name):
        """Get a list of the first names from this header."""
        values = list()
        for value in self.get_decoded_header(header_name):
            for name, dummy in email.utils.getaddresses([value]):
                if name:
                    values.append(name)
                    break
        return values 
示例24
def receive_date(self):
        """Get the date from the headers."""
        received = self.msg.get_all("Received") or list()
        for header in received:
            try:
                ts = header.rsplit(";", 1)[1]
            except IndexError:
                continue
            ts = email.utils.parsedate(ts)
            return calendar.timegm(ts)
        # SA will look in other headers too. Perhaps we should also?
        return time.time() 
示例25
def normalizeDate(self, datestr):
        if not datestr:
            print("No date for '%s'. Using Unix Epoch instead." % self.directory)
            datestr="Thu, 1 Jan 1970 00:00:00 +0000"
        t = email.utils.parsedate_tz(datestr)
        timeval = time.mktime(t[:-1])
        date = email.utils.formatdate(timeval, True)
        utc = time.gmtime(email.utils.mktime_tz(t))
        rfc2822 = '{} {:+03d}00'.format(date[:-6], t[9]//3600)
        iso8601 = time.strftime('%Y%m%dT%H%M%SZ', utc)

        return (rfc2822, iso8601) 
示例26
def on_event(self, event, **params):
        class EmailCancelled(Exception):
            pass

        po = None
        mo = None
        for v in list(params.values()):
            if isinstance(v, Message):
                mo = v
                po = mo.project
                break
            elif isinstance(v, Project):
                po = v
                break
        if not po:
            return
        for nt in list(self.get_notifications(po).values()):
            headers = {}
            if not nt["enabled"]:
                continue
            if nt["event"] != event:
                continue

            def cancel_email():
                raise EmailCancelled

            params["cancel"] = cancel_email

            ctx = Context(params, autoescape=False)

            try:
                subject = Template(nt["subject_template"]).render(ctx).strip()
                body = Template(nt["body_template"]).render(ctx).strip()
                to = [x.strip() for x in Template(nt["to"]).render(ctx).strip().split()]
                cc = [x.strip() for x in Template(nt["cc"]).render(ctx).strip().split()]
            except EmailCancelled:
                continue
            if mo:
                if nt["reply_to_all"] or not len(to):
                    to += [mo.get_sender_addr()]
                if nt["reply_to_all"]:
                    cc += [x[1] for x in mo.recipients]
            if mo and nt["in_reply_to"]:
                headers["In-Reply-To"] = "<%s>" % mo.message_id
            if mo and nt["set_reply_to"]:
                headers["Reply-To"] = "<%s>" % mo.project.mailing_list
            if nt["reply_subject"] and mo:
                subject = (
                    "Re: " + mo.subject
                    if not mo.subject.startswith("Re:")
                    else mo.subject
                )
            if nt["to_user"] and "user" in params and params["user"].email:
                to += params["user"].email
            if not (subject and body and (to or cc)):
                continue
            headers["Subject"] = subject
            headers["Message-ID"] = email.utils.make_msgid()
            self._send_email(to, cc, headers, body) 
示例27
def sending(msg, linesep='\r\n', maxlinelen=70):
    def _fold(v, name=None):
        try:
            v.encode('ascii')
        except UnicodeEncodeError:
            v = email.header.Header(v, charset='utf-8', header_name=name)
            v = v.encode(maxlinelen=maxlinelen, linesep=linesep)
        return v

    def fold(name, value):
        return '%s: %s%s' % (name, _fold(value, name), linesep)

    def fold_addrs(name, value):
        addrs = email.utils.getaddresses([value])
        parts = []
        length = 0
        for n, a in addrs:
            part = '%s <%s>' % (_fold(n), a)
            length += len(part)
            if len(part) > maxlinelen:
                part = '%s %s' % (linesep, part)
                length = len(part)
            parts.append(part)
        addrs = ','.join(parts)
        return '%s: %s%s' % (name, addrs, linesep)

    params = [
        [a for n, a in email.utils.getaddresses([msg[name]])]
        for name in ('From', 'To') if msg[name]
    ]
    if len(params) < 2:
        raise ValueError('"From" and "To" shouldn\'t be empty')

    # These new email policies work pretty strange,
    # so this machinery is to encode "Subject", "From" and "To" headers
    # and keep mime body as is
    headers = []
    for n, f in (('Subject', fold), ('From', fold_addrs), ('To', fold_addrs)):
        headers.append(f(n, msg[n]))
        del msg[n]
    msg = b''.join([''.join(headers).encode(), msg.as_bytes()])
    params.append(msg)
    return params 
示例28
def send_email(self, subject, body, to, **kwargs):
        """
        发送邮件
        :param subject:
        :param body:
        :param to:
        :param kwargs:
        :return: str: 成功为 'success'
                      有异常为 traceback信息
        """

        try:
            if not to:
                logger.warning('收件人为空,无法发送邮件')
                return
            if not isinstance(to, list):
                raise TypeError('收件人需要为列表')
            list_cc = kwargs.get('list_cc_addr', [])
            if not isinstance(list_cc, list):
                raise TypeError('抄送人需要为列表')

            # 构造MIMEMultipart对象做为根容器
            main_msg = email.mime.multipart.MIMEMultipart()

            # 添加文本内容
            text_msg = email.mime.text.MIMEText(body, 'plain', 'utf-8')
            main_msg.attach(text_msg)

            # 添加附件
            filename_list = kwargs.get('filename_list')
            if filename_list:
                for filename in kwargs['filename_list']:
                    file_msg = self._add_attachment(filename)
                    main_msg.attach(file_msg)

            # 消息内容:
            main_msg['Subject'] = Header(subject, "utf-8").encode()
            main_msg['From'] = formataddr(["Archery 通知", self.MAIL_REVIEW_FROM_ADDR])
            main_msg['To'] = ','.join(list(set(to)))
            main_msg['Cc'] = ', '.join(str(cc) for cc in list(set(list_cc)))
            main_msg['Date'] = email.utils.formatdate()

            if self.MAIL_SSL:
                server = smtplib.SMTP_SSL(self.MAIL_REVIEW_SMTP_SERVER, self.MAIL_REVIEW_SMTP_PORT, timeout=3)
            else:
                server = smtplib.SMTP(self.MAIL_REVIEW_SMTP_SERVER, self.MAIL_REVIEW_SMTP_PORT, timeout=3)

                # 如果提供的密码为空,则不需要登录
            if self.MAIL_REVIEW_FROM_PASSWORD:
                server.login(self.MAIL_REVIEW_FROM_ADDR, self.MAIL_REVIEW_FROM_PASSWORD)
            server.sendmail(self.MAIL_REVIEW_FROM_ADDR, to + list_cc, main_msg.as_string())
            server.quit()
            logger.debug(f'邮件推送成功\n消息标题:{subject}\n通知对象:{to + list_cc}\n消息内容:{body}')
            return 'success'
        except Exception:
            errmsg = '邮件推送失败\n{}'.format(traceback.format_exc())
            logger.error(errmsg)
            return errmsg