Python源码示例:email.policy()

示例1
def write_to(self, fp):
        """Output the MHTML file to the given file-like object.

        Args:
            fp: The file-object, opened in "wb" mode.
        """
        msg = email.mime.multipart.MIMEMultipart(
            'related', '---=_qute-{}'.format(uuid.uuid4()))

        root = self._create_root_file()
        msg.attach(root)

        for _, file_data in sorted(self._files.items()):
            msg.attach(self._create_file(file_data))

        gen = email.generator.BytesGenerator(fp, policy=MHTMLPolicy)
        gen.flatten(msg) 
示例2
def get_imap_messages() -> Generator[EmailMessage, None, None]:
    mbox = IMAP4_SSL(settings.EMAIL_GATEWAY_IMAP_SERVER, settings.EMAIL_GATEWAY_IMAP_PORT)
    mbox.login(settings.EMAIL_GATEWAY_LOGIN, settings.EMAIL_GATEWAY_PASSWORD)
    try:
        mbox.select(settings.EMAIL_GATEWAY_IMAP_FOLDER)
        try:
            status, num_ids_data = mbox.search(None, 'ALL')
            for message_id in num_ids_data[0].split():
                status, msg_data = mbox.fetch(message_id, '(RFC822)')
                assert isinstance(msg_data[0], tuple)
                msg_as_bytes = msg_data[0][1]
                message = email.message_from_bytes(msg_as_bytes, policy=email.policy.default)
                assert isinstance(message, EmailMessage)  # https://github.com/python/typeshed/issues/2417
                yield message
                mbox.store(message_id, '+FLAGS', '\\Deleted')
            mbox.expunge()
        finally:
            mbox.close()
    finally:
        mbox.logout() 
示例3
def test_charset_not_specified(self) -> None:
        message_as_string = self.fixture_data('1.txt', type='email')
        message_as_string = message_as_string.replace("Content-Type: text/plain; charset=\"us-ascii\"",
                                                      "Content-Type: text/plain")
        incoming_message = message_from_string(message_as_string, policy=email.policy.default)
        assert isinstance(incoming_message, EmailMessage)  # https://github.com/python/typeshed/issues/2417

        user_profile = self.example_user('hamlet')
        self.login_user(user_profile)
        self.subscribe(user_profile, "Denmark")
        stream = get_stream("Denmark", user_profile.realm)
        stream_to_address = encode_email_address(stream)

        del incoming_message['To']
        incoming_message['To'] = stream_to_address
        process_message(incoming_message)
        message = most_recent_message(user_profile)

        self.assertEqual(message.content, "Email fixture 1.txt body") 
示例4
def consume(self, event: Mapping[str, Any]) -> None:
        rcpt_to = event['rcpt_to']
        msg = email.message_from_bytes(
            base64.b64decode(event["msg_base64"]),
            policy=email.policy.default,
        )
        assert isinstance(msg, EmailMessage)  # https://github.com/python/typeshed/issues/2417
        if not is_missed_message_address(rcpt_to):
            # Missed message addresses are one-time use, so we don't need
            # to worry about emails to them resulting in message spam.
            recipient_realm = decode_stream_email_address(rcpt_to)[0].realm
            try:
                rate_limit_mirror_by_realm(recipient_realm)
            except RateLimited:
                logger.warning("MirrorWorker: Rejecting an email from: %s "
                               "to realm: %s - rate limited.",
                               msg['From'], recipient_realm.name)
                return

        mirror_email(msg, rcpt_to=rcpt_to) 
示例5
def flatten_message(
    message: email.message.Message, utf8: bool = False, cte_type: str = "8bit"
) -> bytes:
    # Make a local copy so we can delete the bcc headers.
    message_copy = copy.copy(message)
    del message_copy["Bcc"]
    del message_copy["Resent-Bcc"]

    if isinstance(message.policy, email.policy.Compat32):  # type: ignore
        # Compat32 cannot use UTF8
        policy = message.policy.clone(  # type: ignore
            linesep=LINE_SEP, cte_type=cte_type
        )
    else:
        policy = message.policy.clone(  # type: ignore
            linesep=LINE_SEP, utf8=utf8, cte_type=cte_type
        )

    with io.BytesIO() as messageio:
        generator = email.generator.BytesGenerator(messageio, policy=policy)
        generator.flatten(message_copy)
        flat_message = messageio.getvalue()

    return flat_message 
示例6
def _parse_email_fixture(self, fixture_path: str) -> EmailMessage:
        if not self._does_fixture_path_exist(fixture_path):
            raise CommandError(f'Fixture {fixture_path} does not exist')

        if fixture_path.endswith('.json'):
            return self._parse_email_json_fixture(fixture_path)
        else:
            with open(fixture_path, "rb") as fp:
                message = email.message_from_binary_file(fp, policy=email.policy.default)
                assert isinstance(message, EmailMessage)  # https://github.com/python/typeshed/issues/2417
                return message 
示例7
def __init__(self, _maintype, _subtype, *, policy=None, **_params):
        """This constructor adds a Content-Type: and a MIME-Version: header.

        The Content-Type: header is taken from the _maintype and _subtype
        arguments.  Additional parameters for this header are taken from the
        keyword arguments.
        """
        if policy is None:
            policy = email.policy.compat32
        message.Message.__init__(self, policy=policy)
        ctype = '%s/%s' % (_maintype, _subtype)
        self.add_header('Content-Type', ctype, **_params)
        self['MIME-Version'] = '1.0' 
示例8
def test_mbox_with_8bit_tags(self):
        self.cli_login()
        self.cli_import("0028-tags-need-8bit-encoding.mbox.gz")
        self.cli_logout()
        mbox = self.client.get("/QEMU/20181126152836.25379-1-rkagan@virtuozzo.com/mbox")
        parser = email.parser.BytesParser(policy=email.policy.SMTP)
        msg = parser.parsebytes(mbox.content)
        payload = decode_payload(msg)
        self.assertIn("SynICState *synic = get_synic(cs);", payload)
        self.assertIn(
            "Reviewed-by: Philippe Mathieu-Daudé <philmd@redhat.com>", payload
        ) 
示例9
def new():
    return MIMEPart(policy) 
示例10
def test_as_string_policy(self):
        msg = self._msgobj('msg_01.txt')
        newpolicy = msg.policy.clone(linesep='\r\n')
        fullrepr = msg.as_string(policy=newpolicy)
        s = StringIO()
        g = Generator(s, policy=newpolicy)
        g.flatten(msg)
        self.assertEqual(fullrepr, s.getvalue()) 
示例11
def test_as_bytes_policy(self):
        msg = self._msgobj('msg_01.txt')
        newpolicy = msg.policy.clone(linesep='\r\n')
        fullrepr = msg.as_bytes(policy=newpolicy)
        s = BytesIO()
        g = BytesGenerator(s,policy=newpolicy)
        g.flatten(msg)
        self.assertEqual(fullrepr, s.getvalue())

    # test_headerregistry.TestContentTypeHeader.bad_params 
示例12
def test_bytes_parser_on_exception_does_not_close_file(self):
        with openfile('msg_15.txt', 'rb') as fp:
            bytesParser = email.parser.BytesParser
            self.assertRaises(email.errors.StartBoundaryNotFoundDefect,
                              bytesParser(policy=email.policy.strict).parse,
                              fp)
            self.assertFalse(fp.closed) 
示例13
def test_parser_on_exception_does_not_close_file(self):
        with openfile('msg_15.txt', 'r') as fp:
            parser = email.parser.Parser
            self.assertRaises(email.errors.StartBoundaryNotFoundDefect,
                              parser(policy=email.policy.strict).parse, fp)
            self.assertFalse(fp.closed) 
示例14
def __init__(self, policy):
            self.check_policy = policy
            super().__init__() 
示例15
def test_custom_message_gets_policy_if_possible_from_string(self):
        msg = email.message_from_string("Subject: bogus\n\nmsg\n",
                                        self.MyMessage,
                                        policy=self.MyPolicy)
        self.assertIsInstance(msg, self.MyMessage)
        self.assertIs(msg.check_policy, self.MyPolicy) 
示例16
def test_custom_message_gets_policy_if_possible_from_file(self):
        source_file = io.StringIO("Subject: bogus\n\nmsg\n")
        msg = email.message_from_file(source_file,
                                      self.MyMessage,
                                      policy=self.MyPolicy)
        self.assertIsInstance(msg, self.MyMessage)
        self.assertIs(msg.check_policy, self.MyPolicy)

    # XXX add tests for other functions that take Message arg. 
示例17
def test_only_split_on_cr_lf(self):
        # The unicode line splitter splits on unicode linebreaks, which are
        # more numerous than allowed by the email RFCs; make sure we are only
        # splitting on those two.
        msg = self.parser(
            "Next-Line: not\x85broken\r\n"
            "Null: not\x00broken\r\n"
            "Vertical-Tab: not\vbroken\r\n"
            "Form-Feed: not\fbroken\r\n"
            "File-Separator: not\x1Cbroken\r\n"
            "Group-Separator: not\x1Dbroken\r\n"
            "Record-Separator: not\x1Ebroken\r\n"
            "Line-Separator: not\u2028broken\r\n"
            "Paragraph-Separator: not\u2029broken\r\n"
            "\r\n",
            policy=default,
        )
        self.assertEqual(msg.items(), [
            ("Next-Line", "not\x85broken"),
            ("Null", "not\x00broken"),
            ("Vertical-Tab", "not\vbroken"),
            ("Form-Feed", "not\fbroken"),
            ("File-Separator", "not\x1Cbroken"),
            ("Group-Separator", "not\x1Dbroken"),
            ("Record-Separator", "not\x1Ebroken"),
            ("Line-Separator", "not\u2028broken"),
            ("Paragraph-Separator", "not\u2029broken"),
        ])
        self.assertEqual(msg.get_payload(), "") 
示例18
def __init__(self, _maintype, _subtype, *, policy=None, **_params):
        """This constructor adds a Content-Type: and a MIME-Version: header.

        The Content-Type: header is taken from the _maintype and _subtype
        arguments.  Additional parameters for this header are taken from the
        keyword arguments.
        """
        if policy is None:
            policy = email.policy.compat32
        message.Message.__init__(self, policy=policy)
        ctype = '%s/%s' % (_maintype, _subtype)
        self.add_header('Content-Type', ctype, **_params)
        self['MIME-Version'] = '1.0' 
示例19
def __init__(self, _maintype, _subtype, *, policy=None, **_params):
        """This constructor adds a Content-Type: and a MIME-Version: header.

        The Content-Type: header is taken from the _maintype and _subtype
        arguments.  Additional parameters for this header are taken from the
        keyword arguments.
        """
        if policy is None:
            policy = email.policy.compat32
        message.Message.__init__(self, policy=policy)
        ctype = '%s/%s' % (_maintype, _subtype)
        self.add_header('Content-Type', ctype, **_params)
        self['MIME-Version'] = '1.0' 
示例20
def __init__(self, _maintype, _subtype, *, policy=None, **_params):
        """This constructor adds a Content-Type: and a MIME-Version: header.

        The Content-Type: header is taken from the _maintype and _subtype
        arguments.  Additional parameters for this header are taken from the
        keyword arguments.
        """
        if policy is None:
            policy = email.policy.compat32
        message.Message.__init__(self, policy=policy)
        ctype = '%s/%s' % (_maintype, _subtype)
        self.add_header('Content-Type', ctype, **_params)
        self['MIME-Version'] = '1.0' 
示例21
def __init__(self, policy):
            self.check_policy = policy
            super().__init__() 
示例22
def test_custom_message_gets_policy_if_possible_from_string(self):
        msg = email.message_from_string("Subject: bogus\n\nmsg\n",
                                        self.MyMessage,
                                        policy=self.MyPolicy)
        self.assertIsInstance(msg, self.MyMessage)
        self.assertIs(msg.check_policy, self.MyPolicy) 
示例23
def test_custom_message_gets_policy_if_possible_from_file(self):
        source_file = io.StringIO("Subject: bogus\n\nmsg\n")
        msg = email.message_from_file(source_file,
                                      self.MyMessage,
                                      policy=self.MyPolicy)
        self.assertIsInstance(msg, self.MyMessage)
        self.assertIs(msg.check_policy, self.MyPolicy)

    # XXX add tests for other functions that take Message arg. 
示例24
def test_only_split_on_cr_lf(self):
        # The unicode line splitter splits on unicode linebreaks, which are
        # more numerous than allowed by the email RFCs; make sure we are only
        # splitting on those two.
        for parser in self.parsers:
            with self.subTest(parser=parser.__name__):
                msg = parser(
                    "Next-Line: not\x85broken\r\n"
                    "Null: not\x00broken\r\n"
                    "Vertical-Tab: not\vbroken\r\n"
                    "Form-Feed: not\fbroken\r\n"
                    "File-Separator: not\x1Cbroken\r\n"
                    "Group-Separator: not\x1Dbroken\r\n"
                    "Record-Separator: not\x1Ebroken\r\n"
                    "Line-Separator: not\u2028broken\r\n"
                    "Paragraph-Separator: not\u2029broken\r\n"
                    "\r\n",
                    policy=default,
                )
                self.assertEqual(msg.items(), [
                    ("Next-Line", "not\x85broken"),
                    ("Null", "not\x00broken"),
                    ("Vertical-Tab", "not\vbroken"),
                    ("Form-Feed", "not\fbroken"),
                    ("File-Separator", "not\x1Cbroken"),
                    ("Group-Separator", "not\x1Dbroken"),
                    ("Record-Separator", "not\x1Ebroken"),
                    ("Line-Separator", "not\u2028broken"),
                    ("Paragraph-Separator", "not\u2029broken"),
                ])
                self.assertEqual(msg.get_payload(), "") 
示例25
def test_factory_arg_overrides_policy(self):
        for parser in self.parsers:
            with self.subTest(parser=parser.__name__):
                MyPolicy = default.clone(message_factory=self.MyMessage)
                msg = parser("To: foo\n\ntest", Message, policy=MyPolicy)
                self.assertNotIsInstance(msg, self.MyMessage)
                self.assertIsInstance(msg, Message)

# Play some games to get nice output in subTest.  This code could be clearer
# if staticmethod supported __name__.