Source code for eletter.decompose

from datetime import datetime
from email import headerregistry as hr
from email.message import EmailMessage
from functools import partial
from typing import Dict, List, Optional, Tuple, Union
import attr
from mailbits import ContentType, parse_addresses
from .classes import (
    Alternative,
    Attachment,
    BytesAttachment,
    EmailAttachment,
    HTMLBody,
    MailItem,
    Mixed,
    Multipart,
    Related,
    TextAttachment,
    TextBody,
)
from .core import compose
from .errors import DecompositionError, MixedContentError, SimplificationError


[docs]@attr.s class Eletter: """ .. versionadded:: 0.5.0 A decomposed e-mail message """ #: The message's body content: MailItem = attr.ib() #: The message's subject line, if any subject: Optional[str] = attr.ib(default=None) #: The message's :mailheader:`From` addresses from_: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`To` addresses to: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`CC` addresses cc: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`BCC` addresses bcc: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`Reply-To` addresses reply_to: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`Sender` address, if any sender: Optional[hr.Address] = attr.ib(default=None) #: The message's :mailheader:`Date` header, if set date: Optional[datetime] = attr.ib(default=None) #: Any additional headers on the message. The header names are lowercase. headers: Dict[str, List[str]] = attr.ib(factory=dict)
[docs] def compose(self) -> EmailMessage: """ Convert the `Eletter` back into an `~email.message.EmailMessage` """ return self.content.compose( subject=self.subject, from_=self.from_, to=self.to, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, sender=self.sender, date=self.date, headers=self.headers, )
[docs] def simplify(self, unmix: bool = False) -> "SimpleEletter": """ Simplify the `Eletter` into a `SimpleEletter`, breaking down `Eletter.content` into a text body, HTML body, and a list of attachments. By default, a :mimetype:`multipart/mixed` message can only be simplified if all of the attachments come after all of the message bodies; set ``unmix`` to `True` to separate the attachments from the bodies regardless of what order they come in. :raises SimplificationError: if ``msg`` cannot be simplified """ content = smooth(self.content) text: Optional[str] html: Optional[str] attachments: List[Attachment] if isinstance(content, Alternative): text = None html = None attachments = [] for t, h, attach in map(partial(simplify_alt_part, unmix=unmix), content): if t is not None and h is None: if text is None: text = t else: raise SimplificationError( "Multiple text/plain parts in multipart/alternative" ) elif h is not None and t is None: if html is None: html = h else: raise SimplificationError( "Multiple text/html parts in multipart/alternative" ) elif t is None and h is None: raise SimplificationError( "Alternative part contains neither text/plain nor text/html" ) else: raise SimplificationError( "Alternative part contains both text/plain and text/html" ) attachments.extend(a for a in attach if a not in attachments) else: text, html, attachments = simplify_alt_part(content, unmix=unmix) if text is None and html is None: raise SimplificationError("No text or HTML bodies in message") return SimpleEletter( text=text, html=html, attachments=attachments, subject=self.subject, from_=self.from_, to=self.to, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, sender=self.sender, date=self.date, headers=self.headers, )
[docs]@attr.s class SimpleEletter: """ .. versionadded:: 0.5.0 A decomposed simple e-mail message, consisting of a text body and/or HTML body plus some number of attachments and headers """ #: The message's text body, if any text: Optional[str] = attr.ib(default=None) #: The message's HTML body, if any html: Optional[str] = attr.ib(default=None) #: Attachments on the message attachments: List[Attachment] = attr.ib(factory=list) #: The message's subject line, if any subject: Optional[str] = attr.ib(default=None) #: The message's :mailheader:`From` addresses from_: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`To` addresses to: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`CC` addresses cc: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`BCC` addresses bcc: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`Reply-To` addresses reply_to: List[Union[hr.Address, hr.Group]] = attr.ib(factory=list) #: The message's :mailheader:`Sender` address, if any sender: Optional[hr.Address] = attr.ib(default=None) #: The message's :mailheader:`Date` header, if set date: Optional[datetime] = attr.ib(default=None) #: Any additional headers on the message. The header names are lowercase. headers: Dict[str, List[str]] = attr.ib(factory=dict)
[docs] def compose(self) -> EmailMessage: """ Convert the `SimpleEletter` back into an `~email.message.EmailMessage` """ return compose( text=self.text, html=self.html, attachments=self.attachments, subject=self.subject, from_=self.from_, to=self.to, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, sender=self.sender, date=self.date, headers=self.headers, )
STANDARD_HEADERS = { "subject", "from", "to", "cc", "bcc", "reply-to", "sender", "date", "content-type", "content-id", "content-disposition", "content-transfer-encoding", "mime-version", }
[docs]def decompose(msg: EmailMessage) -> Eletter: """ .. versionadded:: 0.5.0 Decompose an `~email.message.EmailMessage` into an `Eletter` instance containing a `MailItem` and a collection of headers. Only structures that can be represented by ``eletter`` classes are supported. All message parts that are not :mimetype:`text/plain`, :mimetype:`text/html`, :mimetype:`multipart/*`, or :mimetype:`message/*` are treated as attachments. Attachments without filenames or an explicit "attachment" :mailheader:`Content-Disposition` are treated as inline. Any information specific to how the message is encoded is discarded (namely, "charset" parameters on :mimetype:`text/*` parts, :mailheader:`Content-Transfer-Encoding` headers, and :mailheader:`MIME-Version` headers). Headers on message sub-parts that do not have representations on `MailItem`\\s are discarded (namely, everything other than :mailheader:`Content-Type`, :mailheader:`Content-Disposition`, and :mailheader:`Content-ID`). :raises TypeError: if any sub-part of ``msg`` is not an `~email.message.EmailMessage` instance :raises DecompositionError: if ``msg`` contains a part with an unrepresentable :mailheader:`Content-Type` """ subject = get_str_header(msg, "Subject") from_ = get_address_list(msg, "From") to = get_address_list(msg, "To") cc = get_address_list(msg, "CC") bcc = get_address_list(msg, "BCC") reply_to = get_address_list(msg, "Reply-To") sender_head = msg.get("Sender") sender: Optional[hr.Address] if sender_head is not None: assert isinstance(sender_head, hr.SingleAddressHeader) sender = sender_head.address else: sender = None date_head = msg.get("Date") date: Optional[datetime] if date_head is not None: assert isinstance(date_head, hr.DateHeader) date = date_head.datetime else: date = None headers: Dict[str, List[str]] = {} for h in msg.keys(): h = h.lower() if h not in STANDARD_HEADERS: headers[h] = list(map(str, msg.get_all(h, []))) content = get_content(msg) return Eletter( subject=subject, from_=from_, to=to, cc=cc, bcc=bcc, reply_to=reply_to, sender=sender, date=date, headers=headers, content=content, )
def get_content(msg: EmailMessage) -> MailItem: try: ct = ContentType.parse(str(msg.get("Content-Type", msg.get_default_type()))) except ValueError: ct = ContentType.parse("text/plain") disposition = msg.get_content_disposition() filename = msg.get_filename(None) content_id = get_str_header(msg, "Content-ID") if filename is not None and disposition is None: disposition = "attachment" if ct.maintype == "multipart": content: Multipart if ct.subtype == "mixed": content = Mixed(content_id=content_id) elif ct.subtype == "alternative": content = Alternative(content_id=content_id) elif ct.subtype == "related": content = Related(content_id=content_id, start=ct.params.get("start")) else: raise DecompositionError(f"Unsupported Content-Type: {ct.content_type}") for p in msg.iter_parts(): if not isinstance(p, EmailMessage): # pragma: no cover raise TypeError("EmailMessage parts must be EmailMessage instances") content.append(get_content(p)) return content elif ct.maintype == "message": if ct.subtype == "rfc822": body = msg.get_content() if not isinstance(body, EmailMessage): # pragma: no cover raise TypeError("EmailMessage parts must be EmailMessage instances") return EmailAttachment( content=body, filename=filename, inline=disposition != "attachment", content_id=content_id, ) else: raise DecompositionError(f"Unsupported Content-Type: {ct.content_type}") elif ct.maintype == "text": text = msg.get_content() assert isinstance(text, str) if ( filename is not None or disposition == "attachment" or ct.subtype not in ("plain", "html") ): ct.params.pop("charset", None) return TextAttachment( content=text, filename=filename, content_type=str(ct), inline=disposition != "attachment", content_id=content_id, ) elif ct.subtype == "plain": return TextBody(text, content_id=content_id) else: assert ct.subtype == "html" return HTMLBody(text, content_id=content_id) else: blob = msg.get_content() assert isinstance(blob, bytes) return BytesAttachment( content=blob, filename=filename, content_type=str(ct), content_id=content_id, inline=disposition != "attachment", ) def get_str_header(msg: EmailMessage, header: str) -> Optional[str]: value = msg.get(header) if value is not None: return str(value) else: return None def get_address_list( msg: EmailMessage, header: str ) -> List[Union[hr.Address, hr.Group]]: addresses = [] for h in msg.get_all(header, []): assert isinstance(h, hr.AddressHeader) addresses.extend(parse_addresses(h)) return addresses
[docs]def decompose_simple(msg: EmailMessage, unmix: bool = False) -> SimpleEletter: """ .. versionadded:: 0.5.0 Decompose an `~email.message.EmailMessage` into a `SimpleEletter` instance consisting of a text body and/or HTML body, some number of attachments, and a collection of headers. The `~email.message.EmailMessage` is first decomposed with `decompose()` and then simplified by calling `Eletter.simplify()`. By default, a :mimetype:`multipart/mixed` message can only be simplified if all of the attachments come after all of the message bodies; set ``unmix`` to `True` to separate the attachments from the bodies regardless of what order they come in. :raises TypeError: if any sub-part of ``msg`` is not an `~email.message.EmailMessage` instance :raises DecompositionError: if ``msg`` contains a part with an unrepresentable :mailheader:`Content-Type` :raises SimplificationError: if ``msg`` cannot be simplified """ return decompose(msg).simplify(unmix=unmix)
def smooth(mi: MailItem) -> MailItem: if isinstance(mi, Multipart): out: List[MailItem] = [] for n in mi: n = smooth(n) # Flatten nested Mixed and Alternative, but not Related: if type(n) is type(mi) and not isinstance(mi, Related): assert isinstance(n, Multipart) out.extend(n) elif not (isinstance(n, Multipart) and len(n) == 0): out.append(n) if len(out) == 1: return out[0] else: return type(mi)(out) else: return mi def alt2text_html(alt: Alternative) -> Tuple[str, str]: if len(alt) == 2: if isinstance(alt[0], TextBody) and isinstance(alt[1], HTMLBody): return (alt[0].content, alt[1].content) elif isinstance(alt[0], HTMLBody) and isinstance(alt[1], TextBody): return (alt[1].content, alt[0].content) raise SimplificationError( "multipart/alternative inside multipart/mixed is not a text/plain part" " plus a text/html part" ) def simplify_alt_part( content: MailItem, unmix: bool = False ) -> Tuple[Optional[str], Optional[str], List[Attachment]]: text: Optional[str] = None html: Optional[str] = None attachments: List[Attachment] = [] def add_text(t: str) -> None: nonlocal text if text is None: text = t else: if not text.endswith("\n"): text += "\n" text += t def add_html(h: str) -> None: nonlocal html if html is None: html = h else: if not html.endswith("\n"): html += "\n" html += h if isinstance(content, TextBody): text = content.content elif isinstance(content, HTMLBody): html = content.content elif isinstance(content, Mixed): for mi in content: if isinstance(mi, TextBody): if attachments and not unmix: raise MixedContentError( "Message intersperses attachments with text" ) if html is not None: raise SimplificationError( "No matching HTML alternative for text part" ) add_text(mi.content) elif isinstance(mi, HTMLBody): if attachments and not unmix: raise MixedContentError( "Message intersperses attachments with text" ) if text is not None: raise SimplificationError( "No matching text alternative for HTML part" ) add_html(mi.content) # elif isinstance(mi, Mixed): # smoothed out elif isinstance(mi, Alternative): # Require the Alt to be only text | html; error on further # nesting text_part, html_part = alt2text_html(mi) if attachments and not unmix: raise MixedContentError( "Message intersperses attachments with text" ) if (text is None) == (html is None): add_text(text_part) add_html(html_part) elif text is not None: raise SimplificationError( "Text + HTML alternative follows text-only body" ) else: assert html is not None raise SimplificationError( "Text + HTML alternative follows HTML-only body" ) elif isinstance(mi, Related): raise SimplificationError("Cannot simplify multipart/related") elif isinstance(mi, Attachment): attachments.append(mi) else: raise TypeError(str(type(mi))) # pragma: no cover elif isinstance(content, Related): raise SimplificationError("Cannot simplify multipart/related") elif isinstance(content, Attachment): raise SimplificationError("Body is an attachment") else: raise TypeError(str(type(content))) # pragma: no cover return (text, html, attachments)