| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965 |
- """Header value parser implementing various email-related RFC parsing rules.
- The parsing methods defined in this module implement various email related
- parsing rules. Principal among them is RFC 5322, which is the followon
- to RFC 2822 and primarily a clarification of the former. It also implements
- RFC 2047 encoded word decoding.
- RFC 5322 goes to considerable trouble to maintain backward compatibility with
- RFC 822 in the parse phase, while cleaning up the structure on the generation
- phase. This parser supports correct RFC 5322 generation by tagging white space
- as folding white space only when folding is allowed in the non-obsolete rule
- sets. Actually, the parser is even more generous when accepting input than RFC
- 5322 mandates, following the spirit of Postel's Law, which RFC 5322 encourages.
- Where possible deviations from the standard are annotated on the 'defects'
- attribute of tokens that deviate.
- The general structure of the parser follows RFC 5322, and uses its terminology
- where there is a direct correspondence. Where the implementation requires a
- somewhat different structure than that used by the formal grammar, new terms
- that mimic the closest existing terms are used. Thus, it really helps to have
- a copy of RFC 5322 handy when studying this code.
- Input to the parser is a string that has already been unfolded according to
- RFC 5322 rules. According to the RFC this unfolding is the very first step, and
- this parser leaves the unfolding step to a higher level message parser, which
- will have already detected the line breaks that need unfolding while
- determining the beginning and end of each header.
- The output of the parser is a TokenList object, which is a list subclass. A
- TokenList is a recursive data structure. The terminal nodes of the structure
- are Terminal objects, which are subclasses of str. These do not correspond
- directly to terminal objects in the formal grammar, but are instead more
- practical higher level combinations of true terminals.
- All TokenList and Terminal objects have a 'value' attribute, which produces the
- semantically meaningful value of that part of the parse subtree. The value of
- all whitespace tokens (no matter how many sub-tokens they may contain) is a
- single space, as per the RFC rules. This includes 'CFWS', which is herein
- included in the general class of whitespace tokens. There is one exception to
- the rule that whitespace tokens are collapsed into single spaces in values: in
- the value of a 'bare-quoted-string' (a quoted-string with no leading or
- trailing whitespace), any whitespace that appeared between the quotation marks
- is preserved in the returned value. Note that in all Terminal strings quoted
- pairs are turned into their unquoted values.
- All TokenList and Terminal objects also have a string value, which attempts to
- be a "canonical" representation of the RFC-compliant form of the substring that
- produced the parsed subtree, including minimal use of quoted pair quoting.
- Whitespace runs are not collapsed.
- Comment tokens also have a 'content' attribute providing the string found
- between the parens (including any nested comments) with whitespace preserved.
- All TokenList and Terminal objects have a 'defects' attribute which is a
- possibly empty list all of the defects found while creating the token. Defects
- may appear on any token in the tree, and a composite list of all defects in the
- subtree is available through the 'all_defects' attribute of any node. (For
- Terminal notes x.defects == x.all_defects.)
- Each object in a parse tree is called a 'token', and each has a 'token_type'
- attribute that gives the name from the RFC 5322 grammar that it represents.
- Not all RFC 5322 nodes are produced, and there is one non-RFC 5322 node that
- may be produced: 'ptext'. A 'ptext' is a string of printable ascii characters.
- It is returned in place of lists of (ctext/quoted-pair) and
- (qtext/quoted-pair).
- XXX: provide complete list of token types.
- """
- from __future__ import print_function
- from __future__ import unicode_literals
- from __future__ import division
- from __future__ import absolute_import
- from future.builtins import int, range, str, super, list
- import re
- from collections import namedtuple, OrderedDict
- from future.backports.urllib.parse import (unquote, unquote_to_bytes)
- from future.backports.email import _encoded_words as _ew
- from future.backports.email import errors
- from future.backports.email import utils
- #
- # Useful constants and functions
- #
- WSP = set(' \t')
- CFWS_LEADER = WSP | set('(')
- SPECIALS = set(r'()<>@,:;.\"[]')
- ATOM_ENDS = SPECIALS | WSP
- DOT_ATOM_ENDS = ATOM_ENDS - set('.')
- # '.', '"', and '(' do not end phrases in order to support obs-phrase
- PHRASE_ENDS = SPECIALS - set('."(')
- TSPECIALS = (SPECIALS | set('/?=')) - set('.')
- TOKEN_ENDS = TSPECIALS | WSP
- ASPECIALS = TSPECIALS | set("*'%")
- ATTRIBUTE_ENDS = ASPECIALS | WSP
- EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%')
- def quote_string(value):
- return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"'
- #
- # Accumulator for header folding
- #
- class _Folded(object):
- def __init__(self, maxlen, policy):
- self.maxlen = maxlen
- self.policy = policy
- self.lastlen = 0
- self.stickyspace = None
- self.firstline = True
- self.done = []
- self.current = list() # uses l.clear()
- def newline(self):
- self.done.extend(self.current)
- self.done.append(self.policy.linesep)
- self.current.clear()
- self.lastlen = 0
- def finalize(self):
- if self.current:
- self.newline()
- def __str__(self):
- return ''.join(self.done)
- def append(self, stoken):
- self.current.append(stoken)
- def append_if_fits(self, token, stoken=None):
- if stoken is None:
- stoken = str(token)
- l = len(stoken)
- if self.stickyspace is not None:
- stickyspace_len = len(self.stickyspace)
- if self.lastlen + stickyspace_len + l <= self.maxlen:
- self.current.append(self.stickyspace)
- self.lastlen += stickyspace_len
- self.current.append(stoken)
- self.lastlen += l
- self.stickyspace = None
- self.firstline = False
- return True
- if token.has_fws:
- ws = token.pop_leading_fws()
- if ws is not None:
- self.stickyspace += str(ws)
- stickyspace_len += len(ws)
- token._fold(self)
- return True
- if stickyspace_len and l + 1 <= self.maxlen:
- margin = self.maxlen - l
- if 0 < margin < stickyspace_len:
- trim = stickyspace_len - margin
- self.current.append(self.stickyspace[:trim])
- self.stickyspace = self.stickyspace[trim:]
- stickyspace_len = trim
- self.newline()
- self.current.append(self.stickyspace)
- self.current.append(stoken)
- self.lastlen = l + stickyspace_len
- self.stickyspace = None
- self.firstline = False
- return True
- if not self.firstline:
- self.newline()
- self.current.append(self.stickyspace)
- self.current.append(stoken)
- self.stickyspace = None
- self.firstline = False
- return True
- if self.lastlen + l <= self.maxlen:
- self.current.append(stoken)
- self.lastlen += l
- return True
- if l < self.maxlen:
- self.newline()
- self.current.append(stoken)
- self.lastlen = l
- return True
- return False
- #
- # TokenList and its subclasses
- #
- class TokenList(list):
- token_type = None
- def __init__(self, *args, **kw):
- super(TokenList, self).__init__(*args, **kw)
- self.defects = []
- def __str__(self):
- return ''.join(str(x) for x in self)
- def __repr__(self):
- return '{}({})'.format(self.__class__.__name__,
- super(TokenList, self).__repr__())
- @property
- def value(self):
- return ''.join(x.value for x in self if x.value)
- @property
- def all_defects(self):
- return sum((x.all_defects for x in self), self.defects)
- #
- # Folding API
- #
- # parts():
- #
- # return a list of objects that constitute the "higher level syntactic
- # objects" specified by the RFC as the best places to fold a header line.
- # The returned objects must include leading folding white space, even if
- # this means mutating the underlying parse tree of the object. Each object
- # is only responsible for returning *its* parts, and should not drill down
- # to any lower level except as required to meet the leading folding white
- # space constraint.
- #
- # _fold(folded):
- #
- # folded: the result accumulator. This is an instance of _Folded.
- # (XXX: I haven't finished factoring this out yet, the folding code
- # pretty much uses this as a state object.) When the folded.current
- # contains as much text as will fit, the _fold method should call
- # folded.newline.
- # folded.lastlen: the current length of the test stored in folded.current.
- # folded.maxlen: The maximum number of characters that may appear on a
- # folded line. Differs from the policy setting in that "no limit" is
- # represented by +inf, which means it can be used in the trivially
- # logical fashion in comparisons.
- #
- # Currently no subclasses implement parts, and I think this will remain
- # true. A subclass only needs to implement _fold when the generic version
- # isn't sufficient. _fold will need to be implemented primarily when it is
- # possible for encoded words to appear in the specialized token-list, since
- # there is no generic algorithm that can know where exactly the encoded
- # words are allowed. A _fold implementation is responsible for filling
- # lines in the same general way that the top level _fold does. It may, and
- # should, call the _fold method of sub-objects in a similar fashion to that
- # of the top level _fold.
- #
- # XXX: I'm hoping it will be possible to factor the existing code further
- # to reduce redundancy and make the logic clearer.
- @property
- def parts(self):
- klass = self.__class__
- this = list()
- for token in self:
- if token.startswith_fws():
- if this:
- yield this[0] if len(this)==1 else klass(this)
- this.clear()
- end_ws = token.pop_trailing_ws()
- this.append(token)
- if end_ws:
- yield klass(this)
- this = [end_ws]
- if this:
- yield this[0] if len(this)==1 else klass(this)
- def startswith_fws(self):
- return self[0].startswith_fws()
- def pop_leading_fws(self):
- if self[0].token_type == 'fws':
- return self.pop(0)
- return self[0].pop_leading_fws()
- def pop_trailing_ws(self):
- if self[-1].token_type == 'cfws':
- return self.pop(-1)
- return self[-1].pop_trailing_ws()
- @property
- def has_fws(self):
- for part in self:
- if part.has_fws:
- return True
- return False
- def has_leading_comment(self):
- return self[0].has_leading_comment()
- @property
- def comments(self):
- comments = []
- for token in self:
- comments.extend(token.comments)
- return comments
- def fold(self, **_3to2kwargs):
- # max_line_length 0/None means no limit, ie: infinitely long.
- policy = _3to2kwargs['policy']; del _3to2kwargs['policy']
- maxlen = policy.max_line_length or float("+inf")
- folded = _Folded(maxlen, policy)
- self._fold(folded)
- folded.finalize()
- return str(folded)
- def as_encoded_word(self, charset):
- # This works only for things returned by 'parts', which include
- # the leading fws, if any, that should be used.
- res = []
- ws = self.pop_leading_fws()
- if ws:
- res.append(ws)
- trailer = self.pop(-1) if self[-1].token_type=='fws' else ''
- res.append(_ew.encode(str(self), charset))
- res.append(trailer)
- return ''.join(res)
- def cte_encode(self, charset, policy):
- res = []
- for part in self:
- res.append(part.cte_encode(charset, policy))
- return ''.join(res)
- def _fold(self, folded):
- for part in self.parts:
- tstr = str(part)
- tlen = len(tstr)
- try:
- str(part).encode('us-ascii')
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- # XXX: this should be a policy setting
- charset = 'utf-8'
- tstr = part.cte_encode(charset, folded.policy)
- tlen = len(tstr)
- if folded.append_if_fits(part, tstr):
- continue
- # Peel off the leading whitespace if any and make it sticky, to
- # avoid infinite recursion.
- ws = part.pop_leading_fws()
- if ws is not None:
- # Peel off the leading whitespace and make it sticky, to
- # avoid infinite recursion.
- folded.stickyspace = str(part.pop(0))
- if folded.append_if_fits(part):
- continue
- if part.has_fws:
- part._fold(folded)
- continue
- # There are no fold points in this one; it is too long for a single
- # line and can't be split...we just have to put it on its own line.
- folded.append(tstr)
- folded.newline()
- def pprint(self, indent=''):
- print('\n'.join(self._pp(indent='')))
- def ppstr(self, indent=''):
- return '\n'.join(self._pp(indent=''))
- def _pp(self, indent=''):
- yield '{}{}/{}('.format(
- indent,
- self.__class__.__name__,
- self.token_type)
- for token in self:
- if not hasattr(token, '_pp'):
- yield (indent + ' !! invalid element in token '
- 'list: {!r}'.format(token))
- else:
- for line in token._pp(indent+' '):
- yield line
- if self.defects:
- extra = ' Defects: {}'.format(self.defects)
- else:
- extra = ''
- yield '{}){}'.format(indent, extra)
- class WhiteSpaceTokenList(TokenList):
- @property
- def value(self):
- return ' '
- @property
- def comments(self):
- return [x.content for x in self if x.token_type=='comment']
- class UnstructuredTokenList(TokenList):
- token_type = 'unstructured'
- def _fold(self, folded):
- if any(x.token_type=='encoded-word' for x in self):
- return self._fold_encoded(folded)
- # Here we can have either a pure ASCII string that may or may not
- # have surrogateescape encoded bytes, or a unicode string.
- last_ew = None
- for part in self.parts:
- tstr = str(part)
- is_ew = False
- try:
- str(part).encode('us-ascii')
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- charset = 'utf-8'
- if last_ew is not None:
- # We've already done an EW, combine this one with it
- # if there's room.
- chunk = get_unstructured(
- ''.join(folded.current[last_ew:]+[tstr])).as_encoded_word(charset)
- oldlastlen = sum(len(x) for x in folded.current[:last_ew])
- schunk = str(chunk)
- lchunk = len(schunk)
- if oldlastlen + lchunk <= folded.maxlen:
- del folded.current[last_ew:]
- folded.append(schunk)
- folded.lastlen = oldlastlen + lchunk
- continue
- tstr = part.as_encoded_word(charset)
- is_ew = True
- if folded.append_if_fits(part, tstr):
- if is_ew:
- last_ew = len(folded.current) - 1
- continue
- if is_ew or last_ew:
- # It's too big to fit on the line, but since we've
- # got encoded words we can use encoded word folding.
- part._fold_as_ew(folded)
- continue
- # Peel off the leading whitespace if any and make it sticky, to
- # avoid infinite recursion.
- ws = part.pop_leading_fws()
- if ws is not None:
- folded.stickyspace = str(ws)
- if folded.append_if_fits(part):
- continue
- if part.has_fws:
- part.fold(folded)
- continue
- # It can't be split...we just have to put it on its own line.
- folded.append(tstr)
- folded.newline()
- last_ew = None
- def cte_encode(self, charset, policy):
- res = []
- last_ew = None
- for part in self:
- spart = str(part)
- try:
- spart.encode('us-ascii')
- res.append(spart)
- except UnicodeEncodeError:
- if last_ew is None:
- res.append(part.cte_encode(charset, policy))
- last_ew = len(res)
- else:
- tl = get_unstructured(''.join(res[last_ew:] + [spart]))
- res.append(tl.as_encoded_word())
- return ''.join(res)
- class Phrase(TokenList):
- token_type = 'phrase'
- def _fold(self, folded):
- # As with Unstructured, we can have pure ASCII with or without
- # surrogateescape encoded bytes, or we could have unicode. But this
- # case is more complicated, since we have to deal with the various
- # sub-token types and how they can be composed in the face of
- # unicode-that-needs-CTE-encoding, and the fact that if a token a
- # comment that becomes a barrier across which we can't compose encoded
- # words.
- last_ew = None
- for part in self.parts:
- tstr = str(part)
- tlen = len(tstr)
- has_ew = False
- try:
- str(part).encode('us-ascii')
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- charset = 'utf-8'
- if last_ew is not None and not part.has_leading_comment():
- # We've already done an EW, let's see if we can combine
- # this one with it. The last_ew logic ensures that all we
- # have at this point is atoms, no comments or quoted
- # strings. So we can treat the text between the last
- # encoded word and the content of this token as
- # unstructured text, and things will work correctly. But
- # we have to strip off any trailing comment on this token
- # first, and if it is a quoted string we have to pull out
- # the content (we're encoding it, so it no longer needs to
- # be quoted).
- if part[-1].token_type == 'cfws' and part.comments:
- remainder = part.pop(-1)
- else:
- remainder = ''
- for i, token in enumerate(part):
- if token.token_type == 'bare-quoted-string':
- part[i] = UnstructuredTokenList(token[:])
- chunk = get_unstructured(
- ''.join(folded.current[last_ew:]+[tstr])).as_encoded_word(charset)
- schunk = str(chunk)
- lchunk = len(schunk)
- if last_ew + lchunk <= folded.maxlen:
- del folded.current[last_ew:]
- folded.append(schunk)
- folded.lastlen = sum(len(x) for x in folded.current)
- continue
- tstr = part.as_encoded_word(charset)
- tlen = len(tstr)
- has_ew = True
- if folded.append_if_fits(part, tstr):
- if has_ew and not part.comments:
- last_ew = len(folded.current) - 1
- elif part.comments or part.token_type == 'quoted-string':
- # If a comment is involved we can't combine EWs. And if a
- # quoted string is involved, it's not worth the effort to
- # try to combine them.
- last_ew = None
- continue
- part._fold(folded)
- def cte_encode(self, charset, policy):
- res = []
- last_ew = None
- is_ew = False
- for part in self:
- spart = str(part)
- try:
- spart.encode('us-ascii')
- res.append(spart)
- except UnicodeEncodeError:
- is_ew = True
- if last_ew is None:
- if not part.comments:
- last_ew = len(res)
- res.append(part.cte_encode(charset, policy))
- elif not part.has_leading_comment():
- if part[-1].token_type == 'cfws' and part.comments:
- remainder = part.pop(-1)
- else:
- remainder = ''
- for i, token in enumerate(part):
- if token.token_type == 'bare-quoted-string':
- part[i] = UnstructuredTokenList(token[:])
- tl = get_unstructured(''.join(res[last_ew:] + [spart]))
- res[last_ew:] = [tl.as_encoded_word(charset)]
- if part.comments or (not is_ew and part.token_type == 'quoted-string'):
- last_ew = None
- return ''.join(res)
- class Word(TokenList):
- token_type = 'word'
- class CFWSList(WhiteSpaceTokenList):
- token_type = 'cfws'
- def has_leading_comment(self):
- return bool(self.comments)
- class Atom(TokenList):
- token_type = 'atom'
- class Token(TokenList):
- token_type = 'token'
- class EncodedWord(TokenList):
- token_type = 'encoded-word'
- cte = None
- charset = None
- lang = None
- @property
- def encoded(self):
- if self.cte is not None:
- return self.cte
- _ew.encode(str(self), self.charset)
- class QuotedString(TokenList):
- token_type = 'quoted-string'
- @property
- def content(self):
- for x in self:
- if x.token_type == 'bare-quoted-string':
- return x.value
- @property
- def quoted_value(self):
- res = []
- for x in self:
- if x.token_type == 'bare-quoted-string':
- res.append(str(x))
- else:
- res.append(x.value)
- return ''.join(res)
- @property
- def stripped_value(self):
- for token in self:
- if token.token_type == 'bare-quoted-string':
- return token.value
- class BareQuotedString(QuotedString):
- token_type = 'bare-quoted-string'
- def __str__(self):
- return quote_string(''.join(str(x) for x in self))
- @property
- def value(self):
- return ''.join(str(x) for x in self)
- class Comment(WhiteSpaceTokenList):
- token_type = 'comment'
- def __str__(self):
- return ''.join(sum([
- ["("],
- [self.quote(x) for x in self],
- [")"],
- ], []))
- def quote(self, value):
- if value.token_type == 'comment':
- return str(value)
- return str(value).replace('\\', '\\\\').replace(
- '(', '\(').replace(
- ')', '\)')
- @property
- def content(self):
- return ''.join(str(x) for x in self)
- @property
- def comments(self):
- return [self.content]
- class AddressList(TokenList):
- token_type = 'address-list'
- @property
- def addresses(self):
- return [x for x in self if x.token_type=='address']
- @property
- def mailboxes(self):
- return sum((x.mailboxes
- for x in self if x.token_type=='address'), [])
- @property
- def all_mailboxes(self):
- return sum((x.all_mailboxes
- for x in self if x.token_type=='address'), [])
- class Address(TokenList):
- token_type = 'address'
- @property
- def display_name(self):
- if self[0].token_type == 'group':
- return self[0].display_name
- @property
- def mailboxes(self):
- if self[0].token_type == 'mailbox':
- return [self[0]]
- elif self[0].token_type == 'invalid-mailbox':
- return []
- return self[0].mailboxes
- @property
- def all_mailboxes(self):
- if self[0].token_type == 'mailbox':
- return [self[0]]
- elif self[0].token_type == 'invalid-mailbox':
- return [self[0]]
- return self[0].all_mailboxes
- class MailboxList(TokenList):
- token_type = 'mailbox-list'
- @property
- def mailboxes(self):
- return [x for x in self if x.token_type=='mailbox']
- @property
- def all_mailboxes(self):
- return [x for x in self
- if x.token_type in ('mailbox', 'invalid-mailbox')]
- class GroupList(TokenList):
- token_type = 'group-list'
- @property
- def mailboxes(self):
- if not self or self[0].token_type != 'mailbox-list':
- return []
- return self[0].mailboxes
- @property
- def all_mailboxes(self):
- if not self or self[0].token_type != 'mailbox-list':
- return []
- return self[0].all_mailboxes
- class Group(TokenList):
- token_type = "group"
- @property
- def mailboxes(self):
- if self[2].token_type != 'group-list':
- return []
- return self[2].mailboxes
- @property
- def all_mailboxes(self):
- if self[2].token_type != 'group-list':
- return []
- return self[2].all_mailboxes
- @property
- def display_name(self):
- return self[0].display_name
- class NameAddr(TokenList):
- token_type = 'name-addr'
- @property
- def display_name(self):
- if len(self) == 1:
- return None
- return self[0].display_name
- @property
- def local_part(self):
- return self[-1].local_part
- @property
- def domain(self):
- return self[-1].domain
- @property
- def route(self):
- return self[-1].route
- @property
- def addr_spec(self):
- return self[-1].addr_spec
- class AngleAddr(TokenList):
- token_type = 'angle-addr'
- @property
- def local_part(self):
- for x in self:
- if x.token_type == 'addr-spec':
- return x.local_part
- @property
- def domain(self):
- for x in self:
- if x.token_type == 'addr-spec':
- return x.domain
- @property
- def route(self):
- for x in self:
- if x.token_type == 'obs-route':
- return x.domains
- @property
- def addr_spec(self):
- for x in self:
- if x.token_type == 'addr-spec':
- return x.addr_spec
- else:
- return '<>'
- class ObsRoute(TokenList):
- token_type = 'obs-route'
- @property
- def domains(self):
- return [x.domain for x in self if x.token_type == 'domain']
- class Mailbox(TokenList):
- token_type = 'mailbox'
- @property
- def display_name(self):
- if self[0].token_type == 'name-addr':
- return self[0].display_name
- @property
- def local_part(self):
- return self[0].local_part
- @property
- def domain(self):
- return self[0].domain
- @property
- def route(self):
- if self[0].token_type == 'name-addr':
- return self[0].route
- @property
- def addr_spec(self):
- return self[0].addr_spec
- class InvalidMailbox(TokenList):
- token_type = 'invalid-mailbox'
- @property
- def display_name(self):
- return None
- local_part = domain = route = addr_spec = display_name
- class Domain(TokenList):
- token_type = 'domain'
- @property
- def domain(self):
- return ''.join(super(Domain, self).value.split())
- class DotAtom(TokenList):
- token_type = 'dot-atom'
- class DotAtomText(TokenList):
- token_type = 'dot-atom-text'
- class AddrSpec(TokenList):
- token_type = 'addr-spec'
- @property
- def local_part(self):
- return self[0].local_part
- @property
- def domain(self):
- if len(self) < 3:
- return None
- return self[-1].domain
- @property
- def value(self):
- if len(self) < 3:
- return self[0].value
- return self[0].value.rstrip()+self[1].value+self[2].value.lstrip()
- @property
- def addr_spec(self):
- nameset = set(self.local_part)
- if len(nameset) > len(nameset-DOT_ATOM_ENDS):
- lp = quote_string(self.local_part)
- else:
- lp = self.local_part
- if self.domain is not None:
- return lp + '@' + self.domain
- return lp
- class ObsLocalPart(TokenList):
- token_type = 'obs-local-part'
- class DisplayName(Phrase):
- token_type = 'display-name'
- @property
- def display_name(self):
- res = TokenList(self)
- if res[0].token_type == 'cfws':
- res.pop(0)
- else:
- if res[0][0].token_type == 'cfws':
- res[0] = TokenList(res[0][1:])
- if res[-1].token_type == 'cfws':
- res.pop()
- else:
- if res[-1][-1].token_type == 'cfws':
- res[-1] = TokenList(res[-1][:-1])
- return res.value
- @property
- def value(self):
- quote = False
- if self.defects:
- quote = True
- else:
- for x in self:
- if x.token_type == 'quoted-string':
- quote = True
- if quote:
- pre = post = ''
- if self[0].token_type=='cfws' or self[0][0].token_type=='cfws':
- pre = ' '
- if self[-1].token_type=='cfws' or self[-1][-1].token_type=='cfws':
- post = ' '
- return pre+quote_string(self.display_name)+post
- else:
- return super(DisplayName, self).value
- class LocalPart(TokenList):
- token_type = 'local-part'
- @property
- def value(self):
- if self[0].token_type == "quoted-string":
- return self[0].quoted_value
- else:
- return self[0].value
- @property
- def local_part(self):
- # Strip whitespace from front, back, and around dots.
- res = [DOT]
- last = DOT
- last_is_tl = False
- for tok in self[0] + [DOT]:
- if tok.token_type == 'cfws':
- continue
- if (last_is_tl and tok.token_type == 'dot' and
- last[-1].token_type == 'cfws'):
- res[-1] = TokenList(last[:-1])
- is_tl = isinstance(tok, TokenList)
- if (is_tl and last.token_type == 'dot' and
- tok[0].token_type == 'cfws'):
- res.append(TokenList(tok[1:]))
- else:
- res.append(tok)
- last = res[-1]
- last_is_tl = is_tl
- res = TokenList(res[1:-1])
- return res.value
- class DomainLiteral(TokenList):
- token_type = 'domain-literal'
- @property
- def domain(self):
- return ''.join(super(DomainLiteral, self).value.split())
- @property
- def ip(self):
- for x in self:
- if x.token_type == 'ptext':
- return x.value
- class MIMEVersion(TokenList):
- token_type = 'mime-version'
- major = None
- minor = None
- class Parameter(TokenList):
- token_type = 'parameter'
- sectioned = False
- extended = False
- charset = 'us-ascii'
- @property
- def section_number(self):
- # Because the first token, the attribute (name) eats CFWS, the second
- # token is always the section if there is one.
- return self[1].number if self.sectioned else 0
- @property
- def param_value(self):
- # This is part of the "handle quoted extended parameters" hack.
- for token in self:
- if token.token_type == 'value':
- return token.stripped_value
- if token.token_type == 'quoted-string':
- for token in token:
- if token.token_type == 'bare-quoted-string':
- for token in token:
- if token.token_type == 'value':
- return token.stripped_value
- return ''
- class InvalidParameter(Parameter):
- token_type = 'invalid-parameter'
- class Attribute(TokenList):
- token_type = 'attribute'
- @property
- def stripped_value(self):
- for token in self:
- if token.token_type.endswith('attrtext'):
- return token.value
- class Section(TokenList):
- token_type = 'section'
- number = None
- class Value(TokenList):
- token_type = 'value'
- @property
- def stripped_value(self):
- token = self[0]
- if token.token_type == 'cfws':
- token = self[1]
- if token.token_type.endswith(
- ('quoted-string', 'attribute', 'extended-attribute')):
- return token.stripped_value
- return self.value
- class MimeParameters(TokenList):
- token_type = 'mime-parameters'
- @property
- def params(self):
- # The RFC specifically states that the ordering of parameters is not
- # guaranteed and may be reordered by the transport layer. So we have
- # to assume the RFC 2231 pieces can come in any order. However, we
- # output them in the order that we first see a given name, which gives
- # us a stable __str__.
- params = OrderedDict()
- for token in self:
- if not token.token_type.endswith('parameter'):
- continue
- if token[0].token_type != 'attribute':
- continue
- name = token[0].value.strip()
- if name not in params:
- params[name] = []
- params[name].append((token.section_number, token))
- for name, parts in params.items():
- parts = sorted(parts)
- # XXX: there might be more recovery we could do here if, for
- # example, this is really a case of a duplicate attribute name.
- value_parts = []
- charset = parts[0][1].charset
- for i, (section_number, param) in enumerate(parts):
- if section_number != i:
- param.defects.append(errors.InvalidHeaderDefect(
- "inconsistent multipart parameter numbering"))
- value = param.param_value
- if param.extended:
- try:
- value = unquote_to_bytes(value)
- except UnicodeEncodeError:
- # source had surrogate escaped bytes. What we do now
- # is a bit of an open question. I'm not sure this is
- # the best choice, but it is what the old algorithm did
- value = unquote(value, encoding='latin-1')
- else:
- try:
- value = value.decode(charset, 'surrogateescape')
- except LookupError:
- # XXX: there should really be a custom defect for
- # unknown character set to make it easy to find,
- # because otherwise unknown charset is a silent
- # failure.
- value = value.decode('us-ascii', 'surrogateescape')
- if utils._has_surrogates(value):
- param.defects.append(errors.UndecodableBytesDefect())
- value_parts.append(value)
- value = ''.join(value_parts)
- yield name, value
- def __str__(self):
- params = []
- for name, value in self.params:
- if value:
- params.append('{}={}'.format(name, quote_string(value)))
- else:
- params.append(name)
- params = '; '.join(params)
- return ' ' + params if params else ''
- class ParameterizedHeaderValue(TokenList):
- @property
- def params(self):
- for token in reversed(self):
- if token.token_type == 'mime-parameters':
- return token.params
- return {}
- @property
- def parts(self):
- if self and self[-1].token_type == 'mime-parameters':
- # We don't want to start a new line if all of the params don't fit
- # after the value, so unwrap the parameter list.
- return TokenList(self[:-1] + self[-1])
- return TokenList(self).parts
- class ContentType(ParameterizedHeaderValue):
- token_type = 'content-type'
- maintype = 'text'
- subtype = 'plain'
- class ContentDisposition(ParameterizedHeaderValue):
- token_type = 'content-disposition'
- content_disposition = None
- class ContentTransferEncoding(TokenList):
- token_type = 'content-transfer-encoding'
- cte = '7bit'
- class HeaderLabel(TokenList):
- token_type = 'header-label'
- class Header(TokenList):
- token_type = 'header'
- def _fold(self, folded):
- folded.append(str(self.pop(0)))
- folded.lastlen = len(folded.current[0])
- # The first line of the header is different from all others: we don't
- # want to start a new object on a new line if it has any fold points in
- # it that would allow part of it to be on the first header line.
- # Further, if the first fold point would fit on the new line, we want
- # to do that, but if it doesn't we want to put it on the first line.
- # Folded supports this via the stickyspace attribute. If this
- # attribute is not None, it does the special handling.
- folded.stickyspace = str(self.pop(0)) if self[0].token_type == 'cfws' else ''
- rest = self.pop(0)
- if self:
- raise ValueError("Malformed Header token list")
- rest._fold(folded)
- #
- # Terminal classes and instances
- #
- class Terminal(str):
- def __new__(cls, value, token_type):
- self = super(Terminal, cls).__new__(cls, value)
- self.token_type = token_type
- self.defects = []
- return self
- def __repr__(self):
- return "{}({})".format(self.__class__.__name__, super(Terminal, self).__repr__())
- @property
- def all_defects(self):
- return list(self.defects)
- def _pp(self, indent=''):
- return ["{}{}/{}({}){}".format(
- indent,
- self.__class__.__name__,
- self.token_type,
- super(Terminal, self).__repr__(),
- '' if not self.defects else ' {}'.format(self.defects),
- )]
- def cte_encode(self, charset, policy):
- value = str(self)
- try:
- value.encode('us-ascii')
- return value
- except UnicodeEncodeError:
- return _ew.encode(value, charset)
- def pop_trailing_ws(self):
- # This terminates the recursion.
- return None
- def pop_leading_fws(self):
- # This terminates the recursion.
- return None
- @property
- def comments(self):
- return []
- def has_leading_comment(self):
- return False
- def __getnewargs__(self):
- return(str(self), self.token_type)
- class WhiteSpaceTerminal(Terminal):
- @property
- def value(self):
- return ' '
- def startswith_fws(self):
- return True
- has_fws = True
- class ValueTerminal(Terminal):
- @property
- def value(self):
- return self
- def startswith_fws(self):
- return False
- has_fws = False
- def as_encoded_word(self, charset):
- return _ew.encode(str(self), charset)
- class EWWhiteSpaceTerminal(WhiteSpaceTerminal):
- @property
- def value(self):
- return ''
- @property
- def encoded(self):
- return self[:]
- def __str__(self):
- return ''
- has_fws = True
- # XXX these need to become classes and used as instances so
- # that a program can't change them in a parse tree and screw
- # up other parse trees. Maybe should have tests for that, too.
- DOT = ValueTerminal('.', 'dot')
- ListSeparator = ValueTerminal(',', 'list-separator')
- RouteComponentMarker = ValueTerminal('@', 'route-component-marker')
- #
- # Parser
- #
- """Parse strings according to RFC822/2047/2822/5322 rules.
- This is a stateless parser. Each get_XXX function accepts a string and
- returns either a Terminal or a TokenList representing the RFC object named
- by the method and a string containing the remaining unparsed characters
- from the input. Thus a parser method consumes the next syntactic construct
- of a given type and returns a token representing the construct plus the
- unparsed remainder of the input string.
- For example, if the first element of a structured header is a 'phrase',
- then:
- phrase, value = get_phrase(value)
- returns the complete phrase from the start of the string value, plus any
- characters left in the string after the phrase is removed.
- """
- _wsp_splitter = re.compile(r'([{}]+)'.format(''.join(WSP))).split
- _non_atom_end_matcher = re.compile(r"[^{}]+".format(
- ''.join(ATOM_ENDS).replace('\\','\\\\').replace(']','\]'))).match
- _non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall
- _non_token_end_matcher = re.compile(r"[^{}]+".format(
- ''.join(TOKEN_ENDS).replace('\\','\\\\').replace(']','\]'))).match
- _non_attribute_end_matcher = re.compile(r"[^{}]+".format(
- ''.join(ATTRIBUTE_ENDS).replace('\\','\\\\').replace(']','\]'))).match
- _non_extended_attribute_end_matcher = re.compile(r"[^{}]+".format(
- ''.join(EXTENDED_ATTRIBUTE_ENDS).replace(
- '\\','\\\\').replace(']','\]'))).match
- def _validate_xtext(xtext):
- """If input token contains ASCII non-printables, register a defect."""
- non_printables = _non_printable_finder(xtext)
- if non_printables:
- xtext.defects.append(errors.NonPrintableDefect(non_printables))
- if utils._has_surrogates(xtext):
- xtext.defects.append(errors.UndecodableBytesDefect(
- "Non-ASCII characters found in header token"))
- def _get_ptext_to_endchars(value, endchars):
- """Scan printables/quoted-pairs until endchars and return unquoted ptext.
- This function turns a run of qcontent, ccontent-without-comments, or
- dtext-with-quoted-printables into a single string by unquoting any
- quoted printables. It returns the string, the remaining value, and
- a flag that is True iff there were any quoted printables decoded.
- """
- _3to2list = list(_wsp_splitter(value, 1))
- fragment, remainder, = _3to2list[:1] + [_3to2list[1:]]
- vchars = []
- escape = False
- had_qp = False
- for pos in range(len(fragment)):
- if fragment[pos] == '\\':
- if escape:
- escape = False
- had_qp = True
- else:
- escape = True
- continue
- if escape:
- escape = False
- elif fragment[pos] in endchars:
- break
- vchars.append(fragment[pos])
- else:
- pos = pos + 1
- return ''.join(vchars), ''.join([fragment[pos:]] + remainder), had_qp
- def _decode_ew_run(value):
- """ Decode a run of RFC2047 encoded words.
- _decode_ew_run(value) -> (text, value, defects)
- Scans the supplied value for a run of tokens that look like they are RFC
- 2047 encoded words, decodes those words into text according to RFC 2047
- rules (whitespace between encoded words is discarded), and returns the text
- and the remaining value (including any leading whitespace on the remaining
- value), as well as a list of any defects encountered while decoding. The
- input value may not have any leading whitespace.
- """
- res = []
- defects = []
- last_ws = ''
- while value:
- try:
- tok, ws, value = _wsp_splitter(value, 1)
- except ValueError:
- tok, ws, value = value, '', ''
- if not (tok.startswith('=?') and tok.endswith('?=')):
- return ''.join(res), last_ws + tok + ws + value, defects
- text, charset, lang, new_defects = _ew.decode(tok)
- res.append(text)
- defects.extend(new_defects)
- last_ws = ws
- return ''.join(res), last_ws, defects
- def get_fws(value):
- """FWS = 1*WSP
- This isn't the RFC definition. We're using fws to represent tokens where
- folding can be done, but when we are parsing the *un*folding has already
- been done so we don't need to watch out for CRLF.
- """
- newvalue = value.lstrip()
- fws = WhiteSpaceTerminal(value[:len(value)-len(newvalue)], 'fws')
- return fws, newvalue
- def get_encoded_word(value):
- """ encoded-word = "=?" charset "?" encoding "?" encoded-text "?="
- """
- ew = EncodedWord()
- if not value.startswith('=?'):
- raise errors.HeaderParseError(
- "expected encoded word but found {}".format(value))
- _3to2list1 = list(value[2:].split('?=', 1))
- tok, remainder, = _3to2list1[:1] + [_3to2list1[1:]]
- if tok == value[2:]:
- raise errors.HeaderParseError(
- "expected encoded word but found {}".format(value))
- remstr = ''.join(remainder)
- if remstr[:2].isdigit():
- _3to2list3 = list(remstr.split('?=', 1))
- rest, remainder, = _3to2list3[:1] + [_3to2list3[1:]]
- tok = tok + '?=' + rest
- if len(tok.split()) > 1:
- ew.defects.append(errors.InvalidHeaderDefect(
- "whitespace inside encoded word"))
- ew.cte = value
- value = ''.join(remainder)
- try:
- text, charset, lang, defects = _ew.decode('=?' + tok + '?=')
- except ValueError:
- raise errors.HeaderParseError(
- "encoded word format invalid: '{}'".format(ew.cte))
- ew.charset = charset
- ew.lang = lang
- ew.defects.extend(defects)
- while text:
- if text[0] in WSP:
- token, text = get_fws(text)
- ew.append(token)
- continue
- _3to2list5 = list(_wsp_splitter(text, 1))
- chars, remainder, = _3to2list5[:1] + [_3to2list5[1:]]
- vtext = ValueTerminal(chars, 'vtext')
- _validate_xtext(vtext)
- ew.append(vtext)
- text = ''.join(remainder)
- return ew, value
- def get_unstructured(value):
- """unstructured = (*([FWS] vchar) *WSP) / obs-unstruct
- obs-unstruct = *((*LF *CR *(obs-utext) *LF *CR)) / FWS)
- obs-utext = %d0 / obs-NO-WS-CTL / LF / CR
- obs-NO-WS-CTL is control characters except WSP/CR/LF.
- So, basically, we have printable runs, plus control characters or nulls in
- the obsolete syntax, separated by whitespace. Since RFC 2047 uses the
- obsolete syntax in its specification, but requires whitespace on either
- side of the encoded words, I can see no reason to need to separate the
- non-printable-non-whitespace from the printable runs if they occur, so we
- parse this into xtext tokens separated by WSP tokens.
- Because an 'unstructured' value must by definition constitute the entire
- value, this 'get' routine does not return a remaining value, only the
- parsed TokenList.
- """
- # XXX: but what about bare CR and LF? They might signal the start or
- # end of an encoded word. YAGNI for now, since out current parsers
- # will never send us strings with bard CR or LF.
- unstructured = UnstructuredTokenList()
- while value:
- if value[0] in WSP:
- token, value = get_fws(value)
- unstructured.append(token)
- continue
- if value.startswith('=?'):
- try:
- token, value = get_encoded_word(value)
- except errors.HeaderParseError:
- pass
- else:
- have_ws = True
- if len(unstructured) > 0:
- if unstructured[-1].token_type != 'fws':
- unstructured.defects.append(errors.InvalidHeaderDefect(
- "missing whitespace before encoded word"))
- have_ws = False
- if have_ws and len(unstructured) > 1:
- if unstructured[-2].token_type == 'encoded-word':
- unstructured[-1] = EWWhiteSpaceTerminal(
- unstructured[-1], 'fws')
- unstructured.append(token)
- continue
- _3to2list7 = list(_wsp_splitter(value, 1))
- tok, remainder, = _3to2list7[:1] + [_3to2list7[1:]]
- vtext = ValueTerminal(tok, 'vtext')
- _validate_xtext(vtext)
- unstructured.append(vtext)
- value = ''.join(remainder)
- return unstructured
- def get_qp_ctext(value):
- """ctext = <printable ascii except \ ( )>
- This is not the RFC ctext, since we are handling nested comments in comment
- and unquoting quoted-pairs here. We allow anything except the '()'
- characters, but if we find any ASCII other than the RFC defined printable
- ASCII an NonPrintableDefect is added to the token's defects list. Since
- quoted pairs are converted to their unquoted values, what is returned is
- a 'ptext' token. In this case it is a WhiteSpaceTerminal, so it's value
- is ' '.
- """
- ptext, value, _ = _get_ptext_to_endchars(value, '()')
- ptext = WhiteSpaceTerminal(ptext, 'ptext')
- _validate_xtext(ptext)
- return ptext, value
- def get_qcontent(value):
- """qcontent = qtext / quoted-pair
- We allow anything except the DQUOTE character, but if we find any ASCII
- other than the RFC defined printable ASCII an NonPrintableDefect is
- added to the token's defects list. Any quoted pairs are converted to their
- unquoted values, so what is returned is a 'ptext' token. In this case it
- is a ValueTerminal.
- """
- ptext, value, _ = _get_ptext_to_endchars(value, '"')
- ptext = ValueTerminal(ptext, 'ptext')
- _validate_xtext(ptext)
- return ptext, value
- def get_atext(value):
- """atext = <matches _atext_matcher>
- We allow any non-ATOM_ENDS in atext, but add an InvalidATextDefect to
- the token's defects list if we find non-atext characters.
- """
- m = _non_atom_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected atext but found '{}'".format(value))
- atext = m.group()
- value = value[len(atext):]
- atext = ValueTerminal(atext, 'atext')
- _validate_xtext(atext)
- return atext, value
- def get_bare_quoted_string(value):
- """bare-quoted-string = DQUOTE *([FWS] qcontent) [FWS] DQUOTE
- A quoted-string without the leading or trailing white space. Its
- value is the text between the quote marks, with whitespace
- preserved and quoted pairs decoded.
- """
- if value[0] != '"':
- raise errors.HeaderParseError(
- "expected '\"' but found '{}'".format(value))
- bare_quoted_string = BareQuotedString()
- value = value[1:]
- while value and value[0] != '"':
- if value[0] in WSP:
- token, value = get_fws(value)
- else:
- token, value = get_qcontent(value)
- bare_quoted_string.append(token)
- if not value:
- bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
- "end of header inside quoted string"))
- return bare_quoted_string, value
- return bare_quoted_string, value[1:]
- def get_comment(value):
- """comment = "(" *([FWS] ccontent) [FWS] ")"
- ccontent = ctext / quoted-pair / comment
- We handle nested comments here, and quoted-pair in our qp-ctext routine.
- """
- if value and value[0] != '(':
- raise errors.HeaderParseError(
- "expected '(' but found '{}'".format(value))
- comment = Comment()
- value = value[1:]
- while value and value[0] != ")":
- if value[0] in WSP:
- token, value = get_fws(value)
- elif value[0] == '(':
- token, value = get_comment(value)
- else:
- token, value = get_qp_ctext(value)
- comment.append(token)
- if not value:
- comment.defects.append(errors.InvalidHeaderDefect(
- "end of header inside comment"))
- return comment, value
- return comment, value[1:]
- def get_cfws(value):
- """CFWS = (1*([FWS] comment) [FWS]) / FWS
- """
- cfws = CFWSList()
- while value and value[0] in CFWS_LEADER:
- if value[0] in WSP:
- token, value = get_fws(value)
- else:
- token, value = get_comment(value)
- cfws.append(token)
- return cfws, value
- def get_quoted_string(value):
- """quoted-string = [CFWS] <bare-quoted-string> [CFWS]
- 'bare-quoted-string' is an intermediate class defined by this
- parser and not by the RFC grammar. It is the quoted string
- without any attached CFWS.
- """
- quoted_string = QuotedString()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- quoted_string.append(token)
- token, value = get_bare_quoted_string(value)
- quoted_string.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- quoted_string.append(token)
- return quoted_string, value
- def get_atom(value):
- """atom = [CFWS] 1*atext [CFWS]
- """
- atom = Atom()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- atom.append(token)
- if value and value[0] in ATOM_ENDS:
- raise errors.HeaderParseError(
- "expected atom but found '{}'".format(value))
- token, value = get_atext(value)
- atom.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- atom.append(token)
- return atom, value
- def get_dot_atom_text(value):
- """ dot-text = 1*atext *("." 1*atext)
- """
- dot_atom_text = DotAtomText()
- if not value or value[0] in ATOM_ENDS:
- raise errors.HeaderParseError("expected atom at a start of "
- "dot-atom-text but found '{}'".format(value))
- while value and value[0] not in ATOM_ENDS:
- token, value = get_atext(value)
- dot_atom_text.append(token)
- if value and value[0] == '.':
- dot_atom_text.append(DOT)
- value = value[1:]
- if dot_atom_text[-1] is DOT:
- raise errors.HeaderParseError("expected atom at end of dot-atom-text "
- "but found '{}'".format('.'+value))
- return dot_atom_text, value
- def get_dot_atom(value):
- """ dot-atom = [CFWS] dot-atom-text [CFWS]
- """
- dot_atom = DotAtom()
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- dot_atom.append(token)
- token, value = get_dot_atom_text(value)
- dot_atom.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- dot_atom.append(token)
- return dot_atom, value
- def get_word(value):
- """word = atom / quoted-string
- Either atom or quoted-string may start with CFWS. We have to peel off this
- CFWS first to determine which type of word to parse. Afterward we splice
- the leading CFWS, if any, into the parsed sub-token.
- If neither an atom or a quoted-string is found before the next special, a
- HeaderParseError is raised.
- The token returned is either an Atom or a QuotedString, as appropriate.
- This means the 'word' level of the formal grammar is not represented in the
- parse tree; this is because having that extra layer when manipulating the
- parse tree is more confusing than it is helpful.
- """
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- else:
- leader = None
- if value[0]=='"':
- token, value = get_quoted_string(value)
- elif value[0] in SPECIALS:
- raise errors.HeaderParseError("Expected 'atom' or 'quoted-string' "
- "but found '{}'".format(value))
- else:
- token, value = get_atom(value)
- if leader is not None:
- token[:0] = [leader]
- return token, value
- def get_phrase(value):
- """ phrase = 1*word / obs-phrase
- obs-phrase = word *(word / "." / CFWS)
- This means a phrase can be a sequence of words, periods, and CFWS in any
- order as long as it starts with at least one word. If anything other than
- words is detected, an ObsoleteHeaderDefect is added to the token's defect
- list. We also accept a phrase that starts with CFWS followed by a dot;
- this is registered as an InvalidHeaderDefect, since it is not supported by
- even the obsolete grammar.
- """
- phrase = Phrase()
- try:
- token, value = get_word(value)
- phrase.append(token)
- except errors.HeaderParseError:
- phrase.defects.append(errors.InvalidHeaderDefect(
- "phrase does not start with word"))
- while value and value[0] not in PHRASE_ENDS:
- if value[0]=='.':
- phrase.append(DOT)
- phrase.defects.append(errors.ObsoleteHeaderDefect(
- "period in 'phrase'"))
- value = value[1:]
- else:
- try:
- token, value = get_word(value)
- except errors.HeaderParseError:
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- phrase.defects.append(errors.ObsoleteHeaderDefect(
- "comment found without atom"))
- else:
- raise
- phrase.append(token)
- return phrase, value
- def get_local_part(value):
- """ local-part = dot-atom / quoted-string / obs-local-part
- """
- local_part = LocalPart()
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError(
- "expected local-part but found '{}'".format(value))
- try:
- token, value = get_dot_atom(value)
- except errors.HeaderParseError:
- try:
- token, value = get_word(value)
- except errors.HeaderParseError:
- if value[0] != '\\' and value[0] in PHRASE_ENDS:
- raise
- token = TokenList()
- if leader is not None:
- token[:0] = [leader]
- local_part.append(token)
- if value and (value[0]=='\\' or value[0] not in PHRASE_ENDS):
- obs_local_part, value = get_obs_local_part(str(local_part) + value)
- if obs_local_part.token_type == 'invalid-obs-local-part':
- local_part.defects.append(errors.InvalidHeaderDefect(
- "local-part is not dot-atom, quoted-string, or obs-local-part"))
- else:
- local_part.defects.append(errors.ObsoleteHeaderDefect(
- "local-part is not a dot-atom (contains CFWS)"))
- local_part[0] = obs_local_part
- try:
- local_part.value.encode('ascii')
- except UnicodeEncodeError:
- local_part.defects.append(errors.NonASCIILocalPartDefect(
- "local-part contains non-ASCII characters)"))
- return local_part, value
- def get_obs_local_part(value):
- """ obs-local-part = word *("." word)
- """
- obs_local_part = ObsLocalPart()
- last_non_ws_was_dot = False
- while value and (value[0]=='\\' or value[0] not in PHRASE_ENDS):
- if value[0] == '.':
- if last_non_ws_was_dot:
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "invalid repeated '.'"))
- obs_local_part.append(DOT)
- last_non_ws_was_dot = True
- value = value[1:]
- continue
- elif value[0]=='\\':
- obs_local_part.append(ValueTerminal(value[0],
- 'misplaced-special'))
- value = value[1:]
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "'\\' character outside of quoted-string/ccontent"))
- last_non_ws_was_dot = False
- continue
- if obs_local_part and obs_local_part[-1].token_type != 'dot':
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "missing '.' between words"))
- try:
- token, value = get_word(value)
- last_non_ws_was_dot = False
- except errors.HeaderParseError:
- if value[0] not in CFWS_LEADER:
- raise
- token, value = get_cfws(value)
- obs_local_part.append(token)
- if (obs_local_part[0].token_type == 'dot' or
- obs_local_part[0].token_type=='cfws' and
- obs_local_part[1].token_type=='dot'):
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "Invalid leading '.' in local part"))
- if (obs_local_part[-1].token_type == 'dot' or
- obs_local_part[-1].token_type=='cfws' and
- obs_local_part[-2].token_type=='dot'):
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "Invalid trailing '.' in local part"))
- if obs_local_part.defects:
- obs_local_part.token_type = 'invalid-obs-local-part'
- return obs_local_part, value
- def get_dtext(value):
- """ dtext = <printable ascii except \ [ ]> / obs-dtext
- obs-dtext = obs-NO-WS-CTL / quoted-pair
- We allow anything except the excluded characters, but if we find any
- ASCII other than the RFC defined printable ASCII an NonPrintableDefect is
- added to the token's defects list. Quoted pairs are converted to their
- unquoted values, so what is returned is a ptext token, in this case a
- ValueTerminal. If there were quoted-printables, an ObsoleteHeaderDefect is
- added to the returned token's defect list.
- """
- ptext, value, had_qp = _get_ptext_to_endchars(value, '[]')
- ptext = ValueTerminal(ptext, 'ptext')
- if had_qp:
- ptext.defects.append(errors.ObsoleteHeaderDefect(
- "quoted printable found in domain-literal"))
- _validate_xtext(ptext)
- return ptext, value
- def _check_for_early_dl_end(value, domain_literal):
- if value:
- return False
- domain_literal.append(errors.InvalidHeaderDefect(
- "end of input inside domain-literal"))
- domain_literal.append(ValueTerminal(']', 'domain-literal-end'))
- return True
- def get_domain_literal(value):
- """ domain-literal = [CFWS] "[" *([FWS] dtext) [FWS] "]" [CFWS]
- """
- domain_literal = DomainLiteral()
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- domain_literal.append(token)
- if not value:
- raise errors.HeaderParseError("expected domain-literal")
- if value[0] != '[':
- raise errors.HeaderParseError("expected '[' at start of domain-literal "
- "but found '{}'".format(value))
- value = value[1:]
- if _check_for_early_dl_end(value, domain_literal):
- return domain_literal, value
- domain_literal.append(ValueTerminal('[', 'domain-literal-start'))
- if value[0] in WSP:
- token, value = get_fws(value)
- domain_literal.append(token)
- token, value = get_dtext(value)
- domain_literal.append(token)
- if _check_for_early_dl_end(value, domain_literal):
- return domain_literal, value
- if value[0] in WSP:
- token, value = get_fws(value)
- domain_literal.append(token)
- if _check_for_early_dl_end(value, domain_literal):
- return domain_literal, value
- if value[0] != ']':
- raise errors.HeaderParseError("expected ']' at end of domain-literal "
- "but found '{}'".format(value))
- domain_literal.append(ValueTerminal(']', 'domain-literal-end'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- domain_literal.append(token)
- return domain_literal, value
- def get_domain(value):
- """ domain = dot-atom / domain-literal / obs-domain
- obs-domain = atom *("." atom))
- """
- domain = Domain()
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError(
- "expected domain but found '{}'".format(value))
- if value[0] == '[':
- token, value = get_domain_literal(value)
- if leader is not None:
- token[:0] = [leader]
- domain.append(token)
- return domain, value
- try:
- token, value = get_dot_atom(value)
- except errors.HeaderParseError:
- token, value = get_atom(value)
- if leader is not None:
- token[:0] = [leader]
- domain.append(token)
- if value and value[0] == '.':
- domain.defects.append(errors.ObsoleteHeaderDefect(
- "domain is not a dot-atom (contains CFWS)"))
- if domain[0].token_type == 'dot-atom':
- domain[:] = domain[0]
- while value and value[0] == '.':
- domain.append(DOT)
- token, value = get_atom(value[1:])
- domain.append(token)
- return domain, value
- def get_addr_spec(value):
- """ addr-spec = local-part "@" domain
- """
- addr_spec = AddrSpec()
- token, value = get_local_part(value)
- addr_spec.append(token)
- if not value or value[0] != '@':
- addr_spec.defects.append(errors.InvalidHeaderDefect(
- "add-spec local part with no domain"))
- return addr_spec, value
- addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
- token, value = get_domain(value[1:])
- addr_spec.append(token)
- return addr_spec, value
- def get_obs_route(value):
- """ obs-route = obs-domain-list ":"
- obs-domain-list = *(CFWS / ",") "@" domain *("," [CFWS] ["@" domain])
- Returns an obs-route token with the appropriate sub-tokens (that is,
- there is no obs-domain-list in the parse tree).
- """
- obs_route = ObsRoute()
- while value and (value[0]==',' or value[0] in CFWS_LEADER):
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- obs_route.append(token)
- elif value[0] == ',':
- obs_route.append(ListSeparator)
- value = value[1:]
- if not value or value[0] != '@':
- raise errors.HeaderParseError(
- "expected obs-route domain but found '{}'".format(value))
- obs_route.append(RouteComponentMarker)
- token, value = get_domain(value[1:])
- obs_route.append(token)
- while value and value[0]==',':
- obs_route.append(ListSeparator)
- value = value[1:]
- if not value:
- break
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- obs_route.append(token)
- if value[0] == '@':
- obs_route.append(RouteComponentMarker)
- token, value = get_domain(value[1:])
- obs_route.append(token)
- if not value:
- raise errors.HeaderParseError("end of header while parsing obs-route")
- if value[0] != ':':
- raise errors.HeaderParseError( "expected ':' marking end of "
- "obs-route but found '{}'".format(value))
- obs_route.append(ValueTerminal(':', 'end-of-obs-route-marker'))
- return obs_route, value[1:]
- def get_angle_addr(value):
- """ angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr
- obs-angle-addr = [CFWS] "<" obs-route addr-spec ">" [CFWS]
- """
- angle_addr = AngleAddr()
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- angle_addr.append(token)
- if not value or value[0] != '<':
- raise errors.HeaderParseError(
- "expected angle-addr but found '{}'".format(value))
- angle_addr.append(ValueTerminal('<', 'angle-addr-start'))
- value = value[1:]
- # Although it is not legal per RFC5322, SMTP uses '<>' in certain
- # circumstances.
- if value[0] == '>':
- angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
- angle_addr.defects.append(errors.InvalidHeaderDefect(
- "null addr-spec in angle-addr"))
- value = value[1:]
- return angle_addr, value
- try:
- token, value = get_addr_spec(value)
- except errors.HeaderParseError:
- try:
- token, value = get_obs_route(value)
- angle_addr.defects.append(errors.ObsoleteHeaderDefect(
- "obsolete route specification in angle-addr"))
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected addr-spec or obs-route but found '{}'".format(value))
- angle_addr.append(token)
- token, value = get_addr_spec(value)
- angle_addr.append(token)
- if value and value[0] == '>':
- value = value[1:]
- else:
- angle_addr.defects.append(errors.InvalidHeaderDefect(
- "missing trailing '>' on angle-addr"))
- angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- angle_addr.append(token)
- return angle_addr, value
- def get_display_name(value):
- """ display-name = phrase
- Because this is simply a name-rule, we don't return a display-name
- token containing a phrase, but rather a display-name token with
- the content of the phrase.
- """
- display_name = DisplayName()
- token, value = get_phrase(value)
- display_name.extend(token[:])
- display_name.defects = token.defects[:]
- return display_name, value
- def get_name_addr(value):
- """ name-addr = [display-name] angle-addr
- """
- name_addr = NameAddr()
- # Both the optional display name and the angle-addr can start with cfws.
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError(
- "expected name-addr but found '{}'".format(leader))
- if value[0] != '<':
- if value[0] in PHRASE_ENDS:
- raise errors.HeaderParseError(
- "expected name-addr but found '{}'".format(value))
- token, value = get_display_name(value)
- if not value:
- raise errors.HeaderParseError(
- "expected name-addr but found '{}'".format(token))
- if leader is not None:
- token[0][:0] = [leader]
- leader = None
- name_addr.append(token)
- token, value = get_angle_addr(value)
- if leader is not None:
- token[:0] = [leader]
- name_addr.append(token)
- return name_addr, value
- def get_mailbox(value):
- """ mailbox = name-addr / addr-spec
- """
- # The only way to figure out if we are dealing with a name-addr or an
- # addr-spec is to try parsing each one.
- mailbox = Mailbox()
- try:
- token, value = get_name_addr(value)
- except errors.HeaderParseError:
- try:
- token, value = get_addr_spec(value)
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected mailbox but found '{}'".format(value))
- if any(isinstance(x, errors.InvalidHeaderDefect)
- for x in token.all_defects):
- mailbox.token_type = 'invalid-mailbox'
- mailbox.append(token)
- return mailbox, value
- def get_invalid_mailbox(value, endchars):
- """ Read everything up to one of the chars in endchars.
- This is outside the formal grammar. The InvalidMailbox TokenList that is
- returned acts like a Mailbox, but the data attributes are None.
- """
- invalid_mailbox = InvalidMailbox()
- while value and value[0] not in endchars:
- if value[0] in PHRASE_ENDS:
- invalid_mailbox.append(ValueTerminal(value[0],
- 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- invalid_mailbox.append(token)
- return invalid_mailbox, value
- def get_mailbox_list(value):
- """ mailbox-list = (mailbox *("," mailbox)) / obs-mbox-list
- obs-mbox-list = *([CFWS] ",") mailbox *("," [mailbox / CFWS])
- For this routine we go outside the formal grammar in order to improve error
- handling. We recognize the end of the mailbox list only at the end of the
- value or at a ';' (the group terminator). This is so that we can turn
- invalid mailboxes into InvalidMailbox tokens and continue parsing any
- remaining valid mailboxes. We also allow all mailbox entries to be null,
- and this condition is handled appropriately at a higher level.
- """
- mailbox_list = MailboxList()
- while value and value[0] != ';':
- try:
- token, value = get_mailbox(value)
- mailbox_list.append(token)
- except errors.HeaderParseError:
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value or value[0] in ',;':
- mailbox_list.append(leader)
- mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
- "empty element in mailbox-list"))
- else:
- token, value = get_invalid_mailbox(value, ',;')
- if leader is not None:
- token[:0] = [leader]
- mailbox_list.append(token)
- mailbox_list.defects.append(errors.InvalidHeaderDefect(
- "invalid mailbox in mailbox-list"))
- elif value[0] == ',':
- mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
- "empty element in mailbox-list"))
- else:
- token, value = get_invalid_mailbox(value, ',;')
- if leader is not None:
- token[:0] = [leader]
- mailbox_list.append(token)
- mailbox_list.defects.append(errors.InvalidHeaderDefect(
- "invalid mailbox in mailbox-list"))
- if value and value[0] not in ',;':
- # Crap after mailbox; treat it as an invalid mailbox.
- # The mailbox info will still be available.
- mailbox = mailbox_list[-1]
- mailbox.token_type = 'invalid-mailbox'
- token, value = get_invalid_mailbox(value, ',;')
- mailbox.extend(token)
- mailbox_list.defects.append(errors.InvalidHeaderDefect(
- "invalid mailbox in mailbox-list"))
- if value and value[0] == ',':
- mailbox_list.append(ListSeparator)
- value = value[1:]
- return mailbox_list, value
- def get_group_list(value):
- """ group-list = mailbox-list / CFWS / obs-group-list
- obs-group-list = 1*([CFWS] ",") [CFWS]
- """
- group_list = GroupList()
- if not value:
- group_list.defects.append(errors.InvalidHeaderDefect(
- "end of header before group-list"))
- return group_list, value
- leader = None
- if value and value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- # This should never happen in email parsing, since CFWS-only is a
- # legal alternative to group-list in a group, which is the only
- # place group-list appears.
- group_list.defects.append(errors.InvalidHeaderDefect(
- "end of header in group-list"))
- group_list.append(leader)
- return group_list, value
- if value[0] == ';':
- group_list.append(leader)
- return group_list, value
- token, value = get_mailbox_list(value)
- if len(token.all_mailboxes)==0:
- if leader is not None:
- group_list.append(leader)
- group_list.extend(token)
- group_list.defects.append(errors.ObsoleteHeaderDefect(
- "group-list with empty entries"))
- return group_list, value
- if leader is not None:
- token[:0] = [leader]
- group_list.append(token)
- return group_list, value
- def get_group(value):
- """ group = display-name ":" [group-list] ";" [CFWS]
- """
- group = Group()
- token, value = get_display_name(value)
- if not value or value[0] != ':':
- raise errors.HeaderParseError("expected ':' at end of group "
- "display name but found '{}'".format(value))
- group.append(token)
- group.append(ValueTerminal(':', 'group-display-name-terminator'))
- value = value[1:]
- if value and value[0] == ';':
- group.append(ValueTerminal(';', 'group-terminator'))
- return group, value[1:]
- token, value = get_group_list(value)
- group.append(token)
- if not value:
- group.defects.append(errors.InvalidHeaderDefect(
- "end of header in group"))
- if value[0] != ';':
- raise errors.HeaderParseError(
- "expected ';' at end of group but found {}".format(value))
- group.append(ValueTerminal(';', 'group-terminator'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- group.append(token)
- return group, value
- def get_address(value):
- """ address = mailbox / group
- Note that counter-intuitively, an address can be either a single address or
- a list of addresses (a group). This is why the returned Address object has
- a 'mailboxes' attribute which treats a single address as a list of length
- one. When you need to differentiate between to two cases, extract the single
- element, which is either a mailbox or a group token.
- """
- # The formal grammar isn't very helpful when parsing an address. mailbox
- # and group, especially when allowing for obsolete forms, start off very
- # similarly. It is only when you reach one of @, <, or : that you know
- # what you've got. So, we try each one in turn, starting with the more
- # likely of the two. We could perhaps make this more efficient by looking
- # for a phrase and then branching based on the next character, but that
- # would be a premature optimization.
- address = Address()
- try:
- token, value = get_group(value)
- except errors.HeaderParseError:
- try:
- token, value = get_mailbox(value)
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected address but found '{}'".format(value))
- address.append(token)
- return address, value
- def get_address_list(value):
- """ address_list = (address *("," address)) / obs-addr-list
- obs-addr-list = *([CFWS] ",") address *("," [address / CFWS])
- We depart from the formal grammar here by continuing to parse until the end
- of the input, assuming the input to be entirely composed of an
- address-list. This is always true in email parsing, and allows us
- to skip invalid addresses to parse additional valid ones.
- """
- address_list = AddressList()
- while value:
- try:
- token, value = get_address(value)
- address_list.append(token)
- except errors.HeaderParseError as err:
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value or value[0] == ',':
- address_list.append(leader)
- address_list.defects.append(errors.ObsoleteHeaderDefect(
- "address-list entry with no content"))
- else:
- token, value = get_invalid_mailbox(value, ',')
- if leader is not None:
- token[:0] = [leader]
- address_list.append(Address([token]))
- address_list.defects.append(errors.InvalidHeaderDefect(
- "invalid address in address-list"))
- elif value[0] == ',':
- address_list.defects.append(errors.ObsoleteHeaderDefect(
- "empty element in address-list"))
- else:
- token, value = get_invalid_mailbox(value, ',')
- if leader is not None:
- token[:0] = [leader]
- address_list.append(Address([token]))
- address_list.defects.append(errors.InvalidHeaderDefect(
- "invalid address in address-list"))
- if value and value[0] != ',':
- # Crap after address; treat it as an invalid mailbox.
- # The mailbox info will still be available.
- mailbox = address_list[-1][0]
- mailbox.token_type = 'invalid-mailbox'
- token, value = get_invalid_mailbox(value, ',')
- mailbox.extend(token)
- address_list.defects.append(errors.InvalidHeaderDefect(
- "invalid address in address-list"))
- if value: # Must be a , at this point.
- address_list.append(ValueTerminal(',', 'list-separator'))
- value = value[1:]
- return address_list, value
- #
- # XXX: As I begin to add additional header parsers, I'm realizing we probably
- # have two level of parser routines: the get_XXX methods that get a token in
- # the grammar, and parse_XXX methods that parse an entire field value. So
- # get_address_list above should really be a parse_ method, as probably should
- # be get_unstructured.
- #
- def parse_mime_version(value):
- """ mime-version = [CFWS] 1*digit [CFWS] "." [CFWS] 1*digit [CFWS]
- """
- # The [CFWS] is implicit in the RFC 2045 BNF.
- # XXX: This routine is a bit verbose, should factor out a get_int method.
- mime_version = MIMEVersion()
- if not value:
- mime_version.defects.append(errors.HeaderMissingRequiredValue(
- "Missing MIME version number (eg: 1.0)"))
- return mime_version
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if not value:
- mime_version.defects.append(errors.HeaderMissingRequiredValue(
- "Expected MIME version number but found only CFWS"))
- digits = ''
- while value and value[0] != '.' and value[0] not in CFWS_LEADER:
- digits += value[0]
- value = value[1:]
- if not digits.isdigit():
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Expected MIME major version number but found {!r}".format(digits)))
- mime_version.append(ValueTerminal(digits, 'xtext'))
- else:
- mime_version.major = int(digits)
- mime_version.append(ValueTerminal(digits, 'digits'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if not value or value[0] != '.':
- if mime_version.major is not None:
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Incomplete MIME version; found only major number"))
- if value:
- mime_version.append(ValueTerminal(value, 'xtext'))
- return mime_version
- mime_version.append(ValueTerminal('.', 'version-separator'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if not value:
- if mime_version.major is not None:
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Incomplete MIME version; found only major number"))
- return mime_version
- digits = ''
- while value and value[0] not in CFWS_LEADER:
- digits += value[0]
- value = value[1:]
- if not digits.isdigit():
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Expected MIME minor version number but found {!r}".format(digits)))
- mime_version.append(ValueTerminal(digits, 'xtext'))
- else:
- mime_version.minor = int(digits)
- mime_version.append(ValueTerminal(digits, 'digits'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if value:
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Excess non-CFWS text after MIME version"))
- mime_version.append(ValueTerminal(value, 'xtext'))
- return mime_version
- def get_invalid_parameter(value):
- """ Read everything up to the next ';'.
- This is outside the formal grammar. The InvalidParameter TokenList that is
- returned acts like a Parameter, but the data attributes are None.
- """
- invalid_parameter = InvalidParameter()
- while value and value[0] != ';':
- if value[0] in PHRASE_ENDS:
- invalid_parameter.append(ValueTerminal(value[0],
- 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- invalid_parameter.append(token)
- return invalid_parameter, value
- def get_ttext(value):
- """ttext = <matches _ttext_matcher>
- We allow any non-TOKEN_ENDS in ttext, but add defects to the token's
- defects list if we find non-ttext characters. We also register defects for
- *any* non-printables even though the RFC doesn't exclude all of them,
- because we follow the spirit of RFC 5322.
- """
- m = _non_token_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected ttext but found '{}'".format(value))
- ttext = m.group()
- value = value[len(ttext):]
- ttext = ValueTerminal(ttext, 'ttext')
- _validate_xtext(ttext)
- return ttext, value
- def get_token(value):
- """token = [CFWS] 1*ttext [CFWS]
- The RFC equivalent of ttext is any US-ASCII chars except space, ctls, or
- tspecials. We also exclude tabs even though the RFC doesn't.
- The RFC implies the CFWS but is not explicit about it in the BNF.
- """
- mtoken = Token()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mtoken.append(token)
- if value and value[0] in TOKEN_ENDS:
- raise errors.HeaderParseError(
- "expected token but found '{}'".format(value))
- token, value = get_ttext(value)
- mtoken.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mtoken.append(token)
- return mtoken, value
- def get_attrtext(value):
- """attrtext = 1*(any non-ATTRIBUTE_ENDS character)
- We allow any non-ATTRIBUTE_ENDS in attrtext, but add defects to the
- token's defects list if we find non-attrtext characters. We also register
- defects for *any* non-printables even though the RFC doesn't exclude all of
- them, because we follow the spirit of RFC 5322.
- """
- m = _non_attribute_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected attrtext but found {!r}".format(value))
- attrtext = m.group()
- value = value[len(attrtext):]
- attrtext = ValueTerminal(attrtext, 'attrtext')
- _validate_xtext(attrtext)
- return attrtext, value
- def get_attribute(value):
- """ [CFWS] 1*attrtext [CFWS]
- This version of the BNF makes the CFWS explicit, and as usual we use a
- value terminal for the actual run of characters. The RFC equivalent of
- attrtext is the token characters, with the subtraction of '*', "'", and '%'.
- We include tab in the excluded set just as we do for token.
- """
- attribute = Attribute()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- if value and value[0] in ATTRIBUTE_ENDS:
- raise errors.HeaderParseError(
- "expected token but found '{}'".format(value))
- token, value = get_attrtext(value)
- attribute.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- return attribute, value
- def get_extended_attrtext(value):
- """attrtext = 1*(any non-ATTRIBUTE_ENDS character plus '%')
- This is a special parsing routine so that we get a value that
- includes % escapes as a single string (which we decode as a single
- string later).
- """
- m = _non_extended_attribute_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected extended attrtext but found {!r}".format(value))
- attrtext = m.group()
- value = value[len(attrtext):]
- attrtext = ValueTerminal(attrtext, 'extended-attrtext')
- _validate_xtext(attrtext)
- return attrtext, value
- def get_extended_attribute(value):
- """ [CFWS] 1*extended_attrtext [CFWS]
- This is like the non-extended version except we allow % characters, so that
- we can pick up an encoded value as a single string.
- """
- # XXX: should we have an ExtendedAttribute TokenList?
- attribute = Attribute()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- if value and value[0] in EXTENDED_ATTRIBUTE_ENDS:
- raise errors.HeaderParseError(
- "expected token but found '{}'".format(value))
- token, value = get_extended_attrtext(value)
- attribute.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- return attribute, value
- def get_section(value):
- """ '*' digits
- The formal BNF is more complicated because leading 0s are not allowed. We
- check for that and add a defect. We also assume no CFWS is allowed between
- the '*' and the digits, though the RFC is not crystal clear on that.
- The caller should already have dealt with leading CFWS.
- """
- section = Section()
- if not value or value[0] != '*':
- raise errors.HeaderParseError("Expected section but found {}".format(
- value))
- section.append(ValueTerminal('*', 'section-marker'))
- value = value[1:]
- if not value or not value[0].isdigit():
- raise errors.HeaderParseError("Expected section number but "
- "found {}".format(value))
- digits = ''
- while value and value[0].isdigit():
- digits += value[0]
- value = value[1:]
- if digits[0] == '0' and digits != '0':
- section.defects.append(errors.InvalidHeaderError("section number"
- "has an invalid leading 0"))
- section.number = int(digits)
- section.append(ValueTerminal(digits, 'digits'))
- return section, value
- def get_value(value):
- """ quoted-string / attribute
- """
- v = Value()
- if not value:
- raise errors.HeaderParseError("Expected value but found end of string")
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError("Expected value but found "
- "only {}".format(leader))
- if value[0] == '"':
- token, value = get_quoted_string(value)
- else:
- token, value = get_extended_attribute(value)
- if leader is not None:
- token[:0] = [leader]
- v.append(token)
- return v, value
- def get_parameter(value):
- """ attribute [section] ["*"] [CFWS] "=" value
- The CFWS is implied by the RFC but not made explicit in the BNF. This
- simplified form of the BNF from the RFC is made to conform with the RFC BNF
- through some extra checks. We do it this way because it makes both error
- recovery and working with the resulting parse tree easier.
- """
- # It is possible CFWS would also be implicitly allowed between the section
- # and the 'extended-attribute' marker (the '*') , but we've never seen that
- # in the wild and we will therefore ignore the possibility.
- param = Parameter()
- token, value = get_attribute(value)
- param.append(token)
- if not value or value[0] == ';':
- param.defects.append(errors.InvalidHeaderDefect("Parameter contains "
- "name ({}) but no value".format(token)))
- return param, value
- if value[0] == '*':
- try:
- token, value = get_section(value)
- param.sectioned = True
- param.append(token)
- except errors.HeaderParseError:
- pass
- if not value:
- raise errors.HeaderParseError("Incomplete parameter")
- if value[0] == '*':
- param.append(ValueTerminal('*', 'extended-parameter-marker'))
- value = value[1:]
- param.extended = True
- if value[0] != '=':
- raise errors.HeaderParseError("Parameter not followed by '='")
- param.append(ValueTerminal('=', 'parameter-separator'))
- value = value[1:]
- leader = None
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- param.append(token)
- remainder = None
- appendto = param
- if param.extended and value and value[0] == '"':
- # Now for some serious hackery to handle the common invalid case of
- # double quotes around an extended value. We also accept (with defect)
- # a value marked as encoded that isn't really.
- qstring, remainder = get_quoted_string(value)
- inner_value = qstring.stripped_value
- semi_valid = False
- if param.section_number == 0:
- if inner_value and inner_value[0] == "'":
- semi_valid = True
- else:
- token, rest = get_attrtext(inner_value)
- if rest and rest[0] == "'":
- semi_valid = True
- else:
- try:
- token, rest = get_extended_attrtext(inner_value)
- except:
- pass
- else:
- if not rest:
- semi_valid = True
- if semi_valid:
- param.defects.append(errors.InvalidHeaderDefect(
- "Quoted string value for extended parameter is invalid"))
- param.append(qstring)
- for t in qstring:
- if t.token_type == 'bare-quoted-string':
- t[:] = []
- appendto = t
- break
- value = inner_value
- else:
- remainder = None
- param.defects.append(errors.InvalidHeaderDefect(
- "Parameter marked as extended but appears to have a "
- "quoted string value that is non-encoded"))
- if value and value[0] == "'":
- token = None
- else:
- token, value = get_value(value)
- if not param.extended or param.section_number > 0:
- if not value or value[0] != "'":
- appendto.append(token)
- if remainder is not None:
- assert not value, value
- value = remainder
- return param, value
- param.defects.append(errors.InvalidHeaderDefect(
- "Apparent initial-extended-value but attribute "
- "was not marked as extended or was not initial section"))
- if not value:
- # Assume the charset/lang is missing and the token is the value.
- param.defects.append(errors.InvalidHeaderDefect(
- "Missing required charset/lang delimiters"))
- appendto.append(token)
- if remainder is None:
- return param, value
- else:
- if token is not None:
- for t in token:
- if t.token_type == 'extended-attrtext':
- break
- t.token_type == 'attrtext'
- appendto.append(t)
- param.charset = t.value
- if value[0] != "'":
- raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
- "delimiter, but found {!r}".format(value))
- appendto.append(ValueTerminal("'", 'RFC2231 delimiter'))
- value = value[1:]
- if value and value[0] != "'":
- token, value = get_attrtext(value)
- appendto.append(token)
- param.lang = token.value
- if not value or value[0] != "'":
- raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
- "delimiter, but found {}".format(value))
- appendto.append(ValueTerminal("'", 'RFC2231 delimiter'))
- value = value[1:]
- if remainder is not None:
- # Treat the rest of value as bare quoted string content.
- v = Value()
- while value:
- if value[0] in WSP:
- token, value = get_fws(value)
- else:
- token, value = get_qcontent(value)
- v.append(token)
- token = v
- else:
- token, value = get_value(value)
- appendto.append(token)
- if remainder is not None:
- assert not value, value
- value = remainder
- return param, value
- def parse_mime_parameters(value):
- """ parameter *( ";" parameter )
- That BNF is meant to indicate this routine should only be called after
- finding and handling the leading ';'. There is no corresponding rule in
- the formal RFC grammar, but it is more convenient for us for the set of
- parameters to be treated as its own TokenList.
- This is 'parse' routine because it consumes the reminaing value, but it
- would never be called to parse a full header. Instead it is called to
- parse everything after the non-parameter value of a specific MIME header.
- """
- mime_parameters = MimeParameters()
- while value:
- try:
- token, value = get_parameter(value)
- mime_parameters.append(token)
- except errors.HeaderParseError as err:
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- mime_parameters.append(leader)
- return mime_parameters
- if value[0] == ';':
- if leader is not None:
- mime_parameters.append(leader)
- mime_parameters.defects.append(errors.InvalidHeaderDefect(
- "parameter entry with no content"))
- else:
- token, value = get_invalid_parameter(value)
- if leader:
- token[:0] = [leader]
- mime_parameters.append(token)
- mime_parameters.defects.append(errors.InvalidHeaderDefect(
- "invalid parameter {!r}".format(token)))
- if value and value[0] != ';':
- # Junk after the otherwise valid parameter. Mark it as
- # invalid, but it will have a value.
- param = mime_parameters[-1]
- param.token_type = 'invalid-parameter'
- token, value = get_invalid_parameter(value)
- param.extend(token)
- mime_parameters.defects.append(errors.InvalidHeaderDefect(
- "parameter with invalid trailing text {!r}".format(token)))
- if value:
- # Must be a ';' at this point.
- mime_parameters.append(ValueTerminal(';', 'parameter-separator'))
- value = value[1:]
- return mime_parameters
- def _find_mime_parameters(tokenlist, value):
- """Do our best to find the parameters in an invalid MIME header
- """
- while value and value[0] != ';':
- if value[0] in PHRASE_ENDS:
- tokenlist.append(ValueTerminal(value[0], 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- tokenlist.append(token)
- if not value:
- return
- tokenlist.append(ValueTerminal(';', 'parameter-separator'))
- tokenlist.append(parse_mime_parameters(value[1:]))
- def parse_content_type_header(value):
- """ maintype "/" subtype *( ";" parameter )
- The maintype and substype are tokens. Theoretically they could
- be checked against the official IANA list + x-token, but we
- don't do that.
- """
- ctype = ContentType()
- recover = False
- if not value:
- ctype.defects.append(errors.HeaderMissingRequiredValue(
- "Missing content type specification"))
- return ctype
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Expected content maintype but found {!r}".format(value)))
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.append(token)
- # XXX: If we really want to follow the formal grammer we should make
- # mantype and subtype specialized TokenLists here. Probably not worth it.
- if not value or value[0] != '/':
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Invalid content type"))
- if value:
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.maintype = token.value.strip().lower()
- ctype.append(ValueTerminal('/', 'content-type-separator'))
- value = value[1:]
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Expected content subtype but found {!r}".format(value)))
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.append(token)
- ctype.subtype = token.value.strip().lower()
- if not value:
- return ctype
- if value[0] != ';':
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Only parameters are valid after content type, but "
- "found {!r}".format(value)))
- # The RFC requires that a syntactically invalid content-type be treated
- # as text/plain. Perhaps we should postel this, but we should probably
- # only do that if we were checking the subtype value against IANA.
- del ctype.maintype, ctype.subtype
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.append(ValueTerminal(';', 'parameter-separator'))
- ctype.append(parse_mime_parameters(value[1:]))
- return ctype
- def parse_content_disposition_header(value):
- """ disposition-type *( ";" parameter )
- """
- disp_header = ContentDisposition()
- if not value:
- disp_header.defects.append(errors.HeaderMissingRequiredValue(
- "Missing content disposition"))
- return disp_header
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Expected content disposition but found {!r}".format(value)))
- _find_mime_parameters(disp_header, value)
- return disp_header
- disp_header.append(token)
- disp_header.content_disposition = token.value.strip().lower()
- if not value:
- return disp_header
- if value[0] != ';':
- disp_header.defects.append(errors.InvalidHeaderDefect(
- "Only parameters are valid after content disposition, but "
- "found {!r}".format(value)))
- _find_mime_parameters(disp_header, value)
- return disp_header
- disp_header.append(ValueTerminal(';', 'parameter-separator'))
- disp_header.append(parse_mime_parameters(value[1:]))
- return disp_header
- def parse_content_transfer_encoding_header(value):
- """ mechanism
- """
- # We should probably validate the values, since the list is fixed.
- cte_header = ContentTransferEncoding()
- if not value:
- cte_header.defects.append(errors.HeaderMissingRequiredValue(
- "Missing content transfer encoding"))
- return cte_header
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Expected content trnasfer encoding but found {!r}".format(value)))
- else:
- cte_header.append(token)
- cte_header.cte = token.value.strip().lower()
- if not value:
- return cte_header
- while value:
- cte_header.defects.append(errors.InvalidHeaderDefect(
- "Extra text after content transfer encoding"))
- if value[0] in PHRASE_ENDS:
- cte_header.append(ValueTerminal(value[0], 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- cte_header.append(token)
- return cte_header
|