| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- # Natural Language Toolkit:
- #
- # Copyright (C) 2001-2020 NLTK Project
- # Author: Piotr Kasprzyk <p.j.kasprzyk@gmail.com>
- # URL: <http://nltk.org/>
- # For license information, see LICENSE.TXT
- from nltk.corpus.reader.api import *
- from nltk.corpus.reader.xmldocs import XMLCorpusReader
- PARA = re.compile(r"<p(?: [^>]*){0,1}>(.*?)</p>")
- SENT = re.compile(r"<s(?: [^>]*){0,1}>(.*?)</s>")
- TAGGEDWORD = re.compile(r"<([wc](?: [^>]*){0,1}>)(.*?)</[wc]>")
- WORD = re.compile(r"<[wc](?: [^>]*){0,1}>(.*?)</[wc]>")
- TYPE = re.compile(r'type="(.*?)"')
- ANA = re.compile(r'ana="(.*?)"')
- TEXTID = re.compile(r'text id="(.*?)"')
- class TEICorpusView(StreamBackedCorpusView):
- def __init__(
- self,
- corpus_file,
- tagged,
- group_by_sent,
- group_by_para,
- tagset=None,
- head_len=0,
- textids=None,
- ):
- self._tagged = tagged
- self._textids = textids
- self._group_by_sent = group_by_sent
- self._group_by_para = group_by_para
- # WARNING -- skip header
- StreamBackedCorpusView.__init__(self, corpus_file, startpos=head_len)
- _pagesize = 4096
- def read_block(self, stream):
- block = stream.readlines(self._pagesize)
- block = concat(block)
- while (block.count("<text id") > block.count("</text>")) or block.count(
- "<text id"
- ) == 0:
- tmp = stream.readline()
- if len(tmp) <= 0:
- break
- block += tmp
- block = block.replace("\n", "")
- textids = TEXTID.findall(block)
- if self._textids:
- for tid in textids:
- if tid not in self._textids:
- beg = block.find(tid) - 1
- end = block[beg:].find("</text>") + len("</text>")
- block = block[:beg] + block[beg + end :]
- output = []
- for para_str in PARA.findall(block):
- para = []
- for sent_str in SENT.findall(para_str):
- if not self._tagged:
- sent = WORD.findall(sent_str)
- else:
- sent = list(map(self._parse_tag, TAGGEDWORD.findall(sent_str)))
- if self._group_by_sent:
- para.append(sent)
- else:
- para.extend(sent)
- if self._group_by_para:
- output.append(para)
- else:
- output.extend(para)
- return output
- def _parse_tag(self, tag_word_tuple):
- (tag, word) = tag_word_tuple
- if tag.startswith("w"):
- tag = ANA.search(tag).group(1)
- else: # tag.startswith('c')
- tag = TYPE.search(tag).group(1)
- return word, tag
- class Pl196xCorpusReader(CategorizedCorpusReader, XMLCorpusReader):
- head_len = 2770
- def __init__(self, *args, **kwargs):
- if "textid_file" in kwargs:
- self._textids = kwargs["textid_file"]
- else:
- self._textids = None
- XMLCorpusReader.__init__(self, *args)
- CategorizedCorpusReader.__init__(self, kwargs)
- self._init_textids()
- def _init_textids(self):
- self._f2t = defaultdict(list)
- self._t2f = defaultdict(list)
- if self._textids is not None:
- with open(self._textids) as fp:
- for line in fp:
- line = line.strip()
- file_id, text_ids = line.split(" ", 1)
- if file_id not in self.fileids():
- raise ValueError(
- "In text_id mapping file %s: %s not found"
- % (self._textids, file_id)
- )
- for text_id in text_ids.split(self._delimiter):
- self._add_textids(file_id, text_id)
- def _add_textids(self, file_id, text_id):
- self._f2t[file_id].append(text_id)
- self._t2f[text_id].append(file_id)
- def _resolve(self, fileids, categories, textids=None):
- tmp = None
- if (
- len(list(
- filter(
- lambda accessor: accessor is None, (fileids, categories, textids)
- )
- ))
- != 1
- ):
- raise ValueError(
- "Specify exactly one of: fileids, " "categories or textids"
- )
- if fileids is not None:
- return fileids, None
- if categories is not None:
- return self.fileids(categories), None
- if textids is not None:
- if isinstance(textids, str):
- textids = [textids]
- files = sum((self._t2f[t] for t in textids), [])
- tdict = dict()
- for f in files:
- tdict[f] = set(self._f2t[f]) & set(textids)
- return files, tdict
- def decode_tag(self, tag):
- # to be implemented
- return tag
- def textids(self, fileids=None, categories=None):
- """
- In the pl196x corpus each category is stored in single
- file and thus both methods provide identical functionality. In order
- to accommodate finer granularity, a non-standard textids() method was
- implemented. All the main functions can be supplied with a list
- of required chunks---giving much more control to the user.
- """
- fileids, _ = self._resolve(fileids, categories)
- if fileids is None:
- return sorted(self._t2f)
- if isinstance(fileids, str):
- fileids = [fileids]
- return sorted(sum((self._f2t[d] for d in fileids), []))
- def words(self, fileids=None, categories=None, textids=None):
- fileids, textids = self._resolve(fileids, categories, textids)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- if textids:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- False,
- False,
- False,
- head_len=self.head_len,
- textids=textids[fileid],
- )
- for fileid in fileids
- ]
- )
- else:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- False,
- False,
- False,
- head_len=self.head_len,
- )
- for fileid in fileids
- ]
- )
- def sents(self, fileids=None, categories=None, textids=None):
- fileids, textids = self._resolve(fileids, categories, textids)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- if textids:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- False,
- True,
- False,
- head_len=self.head_len,
- textids=textids[fileid],
- )
- for fileid in fileids
- ]
- )
- else:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid), False, True, False, head_len=self.head_len
- )
- for fileid in fileids
- ]
- )
- def paras(self, fileids=None, categories=None, textids=None):
- fileids, textids = self._resolve(fileids, categories, textids)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- if textids:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- False,
- True,
- True,
- head_len=self.head_len,
- textids=textids[fileid],
- )
- for fileid in fileids
- ]
- )
- else:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid), False, True, True, head_len=self.head_len
- )
- for fileid in fileids
- ]
- )
- def tagged_words(self, fileids=None, categories=None, textids=None):
- fileids, textids = self._resolve(fileids, categories, textids)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- if textids:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- True,
- False,
- False,
- head_len=self.head_len,
- textids=textids[fileid],
- )
- for fileid in fileids
- ]
- )
- else:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid), True, False, False, head_len=self.head_len
- )
- for fileid in fileids
- ]
- )
- def tagged_sents(self, fileids=None, categories=None, textids=None):
- fileids, textids = self._resolve(fileids, categories, textids)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- if textids:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- True,
- True,
- False,
- head_len=self.head_len,
- textids=textids[fileid],
- )
- for fileid in fileids
- ]
- )
- else:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid), True, True, False, head_len=self.head_len
- )
- for fileid in fileids
- ]
- )
- def tagged_paras(self, fileids=None, categories=None, textids=None):
- fileids, textids = self._resolve(fileids, categories, textids)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- if textids:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid),
- True,
- True,
- True,
- head_len=self.head_len,
- textids=textids[fileid],
- )
- for fileid in fileids
- ]
- )
- else:
- return concat(
- [
- TEICorpusView(
- self.abspath(fileid), True, True, True, head_len=self.head_len
- )
- for fileid in fileids
- ]
- )
- def xml(self, fileids=None, categories=None):
- fileids, _ = self._resolve(fileids, categories)
- if len(fileids) == 1:
- return XMLCorpusReader.xml(self, fileids[0])
- else:
- raise TypeError("Expected a single file")
- def raw(self, fileids=None, categories=None):
- fileids, _ = self._resolve(fileids, categories)
- if fileids is None:
- fileids = self._fileids
- elif isinstance(fileids, str):
- fileids = [fileids]
- return concat([self.open(f).read() for f in fileids])
|