| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- # Natural Language Toolkit: IPI PAN Corpus Reader
- #
- # Copyright (C) 2001-2020 NLTK Project
- # Author: Konrad Goluchowski <kodie@mimuw.edu.pl>
- # URL: <http://nltk.org/>
- # For license information, see LICENSE.TXT
- import functools
- from nltk.corpus.reader.util import StreamBackedCorpusView, concat
- from nltk.corpus.reader.api import CorpusReader
- def _parse_args(fun):
- @functools.wraps(fun)
- def decorator(self, fileids=None, **kwargs):
- kwargs.pop("tags", None)
- if not fileids:
- fileids = self.fileids()
- return fun(self, fileids, **kwargs)
- return decorator
- class IPIPANCorpusReader(CorpusReader):
- """
- Corpus reader designed to work with corpus created by IPI PAN.
- See http://korpus.pl/en/ for more details about IPI PAN corpus.
- The corpus includes information about text domain, channel and categories.
- You can access possible values using ``domains()``, ``channels()`` and
- ``categories()``. You can use also this metadata to filter files, e.g.:
- ``fileids(channel='prasa')``, ``fileids(categories='publicystyczny')``.
- The reader supports methods: words, sents, paras and their tagged versions.
- You can get part of speech instead of full tag by giving "simplify_tags=True"
- parameter, e.g.: ``tagged_sents(simplify_tags=True)``.
- Also you can get all tags disambiguated tags specifying parameter
- "one_tag=False", e.g.: ``tagged_paras(one_tag=False)``.
- You can get all tags that were assigned by a morphological analyzer specifying
- parameter "disamb_only=False", e.g. ``tagged_words(disamb_only=False)``.
- The IPIPAN Corpus contains tags indicating if there is a space between two
- tokens. To add special "no space" markers, you should specify parameter
- "append_no_space=True", e.g. ``tagged_words(append_no_space=True)``.
- As a result in place where there should be no space between two tokens new
- pair ('', 'no-space') will be inserted (for tagged data) and just '' for
- methods without tags.
- The corpus reader can also try to append spaces between words. To enable this
- option, specify parameter "append_space=True", e.g. ``words(append_space=True)``.
- As a result either ' ' or (' ', 'space') will be inserted between tokens.
- By default, xml entities like " and & are replaced by corresponding
- characters. You can turn off this feature, specifying parameter
- "replace_xmlentities=False", e.g. ``words(replace_xmlentities=False)``.
- """
- def __init__(self, root, fileids):
- CorpusReader.__init__(self, root, fileids, None, None)
- def raw(self, fileids=None):
- if not fileids:
- fileids = self.fileids()
- filecontents = []
- for fileid in self._list_morph_files(fileids):
- with open(fileid, "r") as infile:
- filecontents.append(infile.read())
- return "".join(filecontents)
- def channels(self, fileids=None):
- if not fileids:
- fileids = self.fileids()
- return self._parse_header(fileids, "channel")
- def domains(self, fileids=None):
- if not fileids:
- fileids = self.fileids()
- return self._parse_header(fileids, "domain")
- def categories(self, fileids=None):
- if not fileids:
- fileids = self.fileids()
- return [
- self._map_category(cat) for cat in self._parse_header(fileids, "keyTerm")
- ]
- def fileids(self, channels=None, domains=None, categories=None):
- if channels is not None and domains is not None and categories is not None:
- raise ValueError(
- "You can specify only one of channels, domains "
- "and categories parameter at once"
- )
- if channels is None and domains is None and categories is None:
- return CorpusReader.fileids(self)
- if isinstance(channels, str):
- channels = [channels]
- if isinstance(domains, str):
- domains = [domains]
- if isinstance(categories, str):
- categories = [categories]
- if channels:
- return self._list_morph_files_by("channel", channels)
- elif domains:
- return self._list_morph_files_by("domain", domains)
- else:
- return self._list_morph_files_by(
- "keyTerm", categories, map=self._map_category
- )
- @_parse_args
- def sents(self, fileids=None, **kwargs):
- return concat(
- [
- self._view(
- fileid, mode=IPIPANCorpusView.SENTS_MODE, tags=False, **kwargs
- )
- for fileid in self._list_morph_files(fileids)
- ]
- )
- @_parse_args
- def paras(self, fileids=None, **kwargs):
- return concat(
- [
- self._view(
- fileid, mode=IPIPANCorpusView.PARAS_MODE, tags=False, **kwargs
- )
- for fileid in self._list_morph_files(fileids)
- ]
- )
- @_parse_args
- def words(self, fileids=None, **kwargs):
- return concat(
- [
- self._view(fileid, tags=False, **kwargs)
- for fileid in self._list_morph_files(fileids)
- ]
- )
- @_parse_args
- def tagged_sents(self, fileids=None, **kwargs):
- return concat(
- [
- self._view(fileid, mode=IPIPANCorpusView.SENTS_MODE, **kwargs)
- for fileid in self._list_morph_files(fileids)
- ]
- )
- @_parse_args
- def tagged_paras(self, fileids=None, **kwargs):
- return concat(
- [
- self._view(fileid, mode=IPIPANCorpusView.PARAS_MODE, **kwargs)
- for fileid in self._list_morph_files(fileids)
- ]
- )
- @_parse_args
- def tagged_words(self, fileids=None, **kwargs):
- return concat(
- [self._view(fileid, **kwargs) for fileid in self._list_morph_files(fileids)]
- )
- def _list_morph_files(self, fileids):
- return [f for f in self.abspaths(fileids)]
- def _list_header_files(self, fileids):
- return [
- f.replace("morph.xml", "header.xml")
- for f in self._list_morph_files(fileids)
- ]
- def _parse_header(self, fileids, tag):
- values = set()
- for f in self._list_header_files(fileids):
- values_list = self._get_tag(f, tag)
- for v in values_list:
- values.add(v)
- return list(values)
- def _list_morph_files_by(self, tag, values, map=None):
- fileids = self.fileids()
- ret_fileids = set()
- for f in fileids:
- fp = self.abspath(f).replace("morph.xml", "header.xml")
- values_list = self._get_tag(fp, tag)
- for value in values_list:
- if map is not None:
- value = map(value)
- if value in values:
- ret_fileids.add(f)
- return list(ret_fileids)
- def _get_tag(self, f, tag):
- tags = []
- with open(f, "r") as infile:
- header = infile.read()
- tag_end = 0
- while True:
- tag_pos = header.find("<" + tag, tag_end)
- if tag_pos < 0:
- return tags
- tag_end = header.find("</" + tag + ">", tag_pos)
- tags.append(header[tag_pos + len(tag) + 2 : tag_end])
- def _map_category(self, cat):
- pos = cat.find(">")
- if pos == -1:
- return cat
- else:
- return cat[pos + 1 :]
- def _view(self, filename, **kwargs):
- tags = kwargs.pop("tags", True)
- mode = kwargs.pop("mode", 0)
- simplify_tags = kwargs.pop("simplify_tags", False)
- one_tag = kwargs.pop("one_tag", True)
- disamb_only = kwargs.pop("disamb_only", True)
- append_no_space = kwargs.pop("append_no_space", False)
- append_space = kwargs.pop("append_space", False)
- replace_xmlentities = kwargs.pop("replace_xmlentities", True)
- if len(kwargs) > 0:
- raise ValueError("Unexpected arguments: %s" % kwargs.keys())
- if not one_tag and not disamb_only:
- raise ValueError(
- "You cannot specify both one_tag=False and " "disamb_only=False"
- )
- if not tags and (simplify_tags or not one_tag or not disamb_only):
- raise ValueError(
- "You cannot specify simplify_tags, one_tag or "
- "disamb_only with functions other than tagged_*"
- )
- return IPIPANCorpusView(
- filename,
- tags=tags,
- mode=mode,
- simplify_tags=simplify_tags,
- one_tag=one_tag,
- disamb_only=disamb_only,
- append_no_space=append_no_space,
- append_space=append_space,
- replace_xmlentities=replace_xmlentities,
- )
- class IPIPANCorpusView(StreamBackedCorpusView):
- WORDS_MODE = 0
- SENTS_MODE = 1
- PARAS_MODE = 2
- def __init__(self, filename, startpos=0, **kwargs):
- StreamBackedCorpusView.__init__(self, filename, None, startpos, None)
- self.in_sentence = False
- self.position = 0
- self.show_tags = kwargs.pop("tags", True)
- self.disamb_only = kwargs.pop("disamb_only", True)
- self.mode = kwargs.pop("mode", IPIPANCorpusView.WORDS_MODE)
- self.simplify_tags = kwargs.pop("simplify_tags", False)
- self.one_tag = kwargs.pop("one_tag", True)
- self.append_no_space = kwargs.pop("append_no_space", False)
- self.append_space = kwargs.pop("append_space", False)
- self.replace_xmlentities = kwargs.pop("replace_xmlentities", True)
- def read_block(self, stream):
- sentence = []
- sentences = []
- space = False
- no_space = False
- tags = set()
- lines = self._read_data(stream)
- while True:
- # we may have only part of last line
- if len(lines) <= 1:
- self._seek(stream)
- lines = self._read_data(stream)
- if lines == [""]:
- assert not sentences
- return []
- line = lines.pop()
- self.position += len(line) + 1
- if line.startswith('<chunk type="s"'):
- self.in_sentence = True
- elif line.startswith('<chunk type="p"'):
- pass
- elif line.startswith("<tok"):
- if self.append_space and space and not no_space:
- self._append_space(sentence)
- space = True
- no_space = False
- orth = ""
- tags = set()
- elif line.startswith("</chunk"):
- if self.in_sentence:
- self.in_sentence = False
- self._seek(stream)
- if self.mode == self.SENTS_MODE:
- return [sentence]
- elif self.mode == self.WORDS_MODE:
- if self.append_space:
- self._append_space(sentence)
- return sentence
- else:
- sentences.append(sentence)
- elif self.mode == self.PARAS_MODE:
- self._seek(stream)
- return [sentences]
- elif line.startswith("<orth"):
- orth = line[6:-7]
- if self.replace_xmlentities:
- orth = orth.replace(""", '"').replace("&", "&")
- elif line.startswith("<lex"):
- if not self.disamb_only or line.find("disamb=") != -1:
- tag = line[line.index("<ctag") + 6 : line.index("</ctag")]
- tags.add(tag)
- elif line.startswith("</tok"):
- if self.show_tags:
- if self.simplify_tags:
- tags = [t.split(":")[0] for t in tags]
- if not self.one_tag or not self.disamb_only:
- sentence.append((orth, tuple(tags)))
- else:
- sentence.append((orth, tags.pop()))
- else:
- sentence.append(orth)
- elif line.startswith("<ns/>"):
- if self.append_space:
- no_space = True
- if self.append_no_space:
- if self.show_tags:
- sentence.append(("", "no-space"))
- else:
- sentence.append("")
- elif line.startswith("</cesAna"):
- pass
- def _read_data(self, stream):
- self.position = stream.tell()
- buff = stream.read(4096)
- lines = buff.split("\n")
- lines.reverse()
- return lines
- def _seek(self, stream):
- stream.seek(self.position)
- def _append_space(self, sentence):
- if self.show_tags:
- sentence.append((" ", "space"))
- else:
- sentence.append(" ")
|