||
- # Natural Language Toolkit: Concordance Application
- #
- # Copyright (C) 2001-2020 NLTK Project
- # Author: Sumukh Ghodke <sghodke@csse.unimelb.edu.au>
- # URL: <http://nltk.org/>
- # For license information, see LICENSE.TXT
- import re
- import threading
- import queue as q
- from tkinter.font import Font
- from tkinter import (
- Tk,
- Button,
- END,
- Entry,
- Frame,
- IntVar,
- LEFT,
- Label,
- Menu,
- OptionMenu,
- SUNKEN,
- Scrollbar,
- StringVar,
- Text,
- )
- from nltk.corpus import (
- cess_cat,
- brown,
- nps_chat,
- treebank,
- sinica_treebank,
- alpino,
- indian,
- floresta,
- mac_morpho,
- cess_esp,
- )
- from nltk.util import in_idle
- from nltk.draw.util import ShowText
- WORD_OR_TAG = "[^/ ]+"
- BOUNDARY = r"\b"
- CORPUS_LOADED_EVENT = "<<CL_EVENT>>"
- SEARCH_TERMINATED_EVENT = "<<ST_EVENT>>"
- SEARCH_ERROR_EVENT = "<<SE_EVENT>>"
- ERROR_LOADING_CORPUS_EVENT = "<<ELC_EVENT>>"
- POLL_INTERVAL = 50
- # NB All corpora must be specified in a lambda expression so as not to be
- # loaded when the module is imported.
- _DEFAULT = "English: Brown Corpus (Humor, simplified)"
- _CORPORA = {
- "Catalan: CESS-CAT Corpus (simplified)": lambda: cess_cat.tagged_sents(
- tagset="universal"
- ),
- "English: Brown Corpus": lambda: brown.tagged_sents(),
- "English: Brown Corpus (simplified)": lambda: brown.tagged_sents(
- tagset="universal"
- ),
- "English: Brown Corpus (Press, simplified)": lambda: brown.tagged_sents(
- categories=["news", "editorial", "reviews"], tagset="universal"
- ),
- "English: Brown Corpus (Religion, simplified)": lambda: brown.tagged_sents(
- categories="religion", tagset="universal"
- ),
- "English: Brown Corpus (Learned, simplified)": lambda: brown.tagged_sents(
- categories="learned", tagset="universal"
- ),
- "English: Brown Corpus (Science Fiction, simplified)": lambda: brown.tagged_sents(
- categories="science_fiction", tagset="universal"
- ),
- "English: Brown Corpus (Romance, simplified)": lambda: brown.tagged_sents(
- categories="romance", tagset="universal"
- ),
- "English: Brown Corpus (Humor, simplified)": lambda: brown.tagged_sents(
- categories="humor", tagset="universal"
- ),
- "English: NPS Chat Corpus": lambda: nps_chat.tagged_posts(),
- "English: NPS Chat Corpus (simplified)": lambda: nps_chat.tagged_posts(
- tagset="universal"
- ),
- "English: Wall Street Journal Corpus": lambda: treebank.tagged_sents(),
- "English: Wall Street Journal Corpus (simplified)": lambda: treebank.tagged_sents(
- tagset="universal"
- ),
- "Chinese: Sinica Corpus": lambda: sinica_treebank.tagged_sents(),
- "Chinese: Sinica Corpus (simplified)": lambda: sinica_treebank.tagged_sents(
- tagset="universal"
- ),
- "Dutch: Alpino Corpus": lambda: alpino.tagged_sents(),
- "Dutch: Alpino Corpus (simplified)": lambda: alpino.tagged_sents(
- tagset="universal"
- ),
- "Hindi: Indian Languages Corpus": lambda: indian.tagged_sents(files="hindi.pos"),
- "Hindi: Indian Languages Corpus (simplified)": lambda: indian.tagged_sents(
- files="hindi.pos", tagset="universal"
- ),
- "Portuguese: Floresta Corpus (Portugal)": lambda: floresta.tagged_sents(),
- "Portuguese: Floresta Corpus (Portugal, simplified)": lambda: floresta.tagged_sents(
- tagset="universal"
- ),
- "Portuguese: MAC-MORPHO Corpus (Brazil)": lambda: mac_morpho.tagged_sents(),
- "Portuguese: MAC-MORPHO Corpus (Brazil, simplified)": lambda: mac_morpho.tagged_sents(
- tagset="universal"
- ),
- "Spanish: CESS-ESP Corpus (simplified)": lambda: cess_esp.tagged_sents(
- tagset="universal"
- ),
- }
- class ConcordanceSearchView(object):
- _BACKGROUND_COLOUR = "#FFF" # white
- # Colour of highlighted results
- _HIGHLIGHT_WORD_COLOUR = "#F00" # red
- _HIGHLIGHT_WORD_TAG = "HL_WRD_TAG"
- _HIGHLIGHT_LABEL_COLOUR = "#C0C0C0" # dark grey
- _HIGHLIGHT_LABEL_TAG = "HL_LBL_TAG"
- # Percentage of text left of the scrollbar position
- _FRACTION_LEFT_TEXT = 0.30
- def __init__(self):
- self.queue = q.Queue()
- self.model = ConcordanceSearchModel(self.queue)
- self.top = Tk()
- self._init_top(self.top)
- self._init_menubar()
- self._init_widgets(self.top)
- self.load_corpus(self.model.DEFAULT_CORPUS)
- self.after = self.top.after(POLL_INTERVAL, self._poll)
- def _init_top(self, top):
- top.geometry("950x680+50+50")
- top.title("NLTK Concordance Search")
- top.bind("<Control-q>", self.destroy)
- top.protocol("WM_DELETE_WINDOW", self.destroy)
- top.minsize(950, 680)
- def _init_widgets(self, parent):
- self.main_frame = Frame(
- parent, dict(background=self._BACKGROUND_COLOUR, padx=1, pady=1, border=1)
- )
- self._init_corpus_select(self.main_frame)
- self._init_query_box(self.main_frame)
- self._init_results_box(self.main_frame)
- self._init_paging(self.main_frame)
- self._init_status(self.main_frame)
- self.main_frame.pack(fill="both", expand=True)
- def _init_menubar(self):
- self._result_size = IntVar(self.top)
- self._cntx_bf_len = IntVar(self.top)
- self._cntx_af_len = IntVar(self.top)
- menubar = Menu(self.top)
- filemenu = Menu(menubar, tearoff=0, borderwidth=0)
- filemenu.add_command(
- label="Exit", underline=1, command=self.destroy, accelerator="Ctrl-q"
- )
- menubar.add_cascade(label="File", underline=0, menu=filemenu)
- editmenu = Menu(menubar, tearoff=0)
- rescntmenu = Menu(editmenu, tearoff=0)
- rescntmenu.add_radiobutton(
- label="20",
- variable=self._result_size,
- underline=0,
- value=20,
- command=self.set_result_size,
- )
- rescntmenu.add_radiobutton(
- label="50",
- variable=self._result_size,
- underline=0,
- value=50,
- command=self.set_result_size,
- )
- rescntmenu.add_radiobutton(
- label="100",
- variable=self._result_size,
- underline=0,
- value=100,
- command=self.set_result_size,
- )
- rescntmenu.invoke(1)
- editmenu.add_cascade(label="Result Count", underline=0, menu=rescntmenu)
- cntxmenu = Menu(editmenu, tearoff=0)
- cntxbfmenu = Menu(cntxmenu, tearoff=0)
- cntxbfmenu.add_radiobutton(
- label="60 characters",
- variable=self._cntx_bf_len,
- underline=0,
- value=60,
- command=self.set_cntx_bf_len,
- )
- cntxbfmenu.add_radiobutton(
- label="80 characters",
- variable=self._cntx_bf_len,
- underline=0,
- value=80,
- command=self.set_cntx_bf_len,
- )
- cntxbfmenu.add_radiobutton(
- label="100 characters",
- variable=self._cntx_bf_len,
- underline=0,
- value=100,
- command=self.set_cntx_bf_len,
- )
- cntxbfmenu.invoke(1)
- cntxmenu.add_cascade(label="Before", underline=0, menu=cntxbfmenu)
- cntxafmenu = Menu(cntxmenu, tearoff=0)
- cntxafmenu.add_radiobutton(
- label="70 characters",
- variable=self._cntx_af_len,
- underline=0,
- value=70,
- command=self.set_cntx_af_len,
- )
- cntxafmenu.add_radiobutton(
- label="90 characters",
- variable=self._cntx_af_len,
- underline=0,
- value=90,
- command=self.set_cntx_af_len,
- )
- cntxafmenu.add_radiobutton(
- label="110 characters",
- variable=self._cntx_af_len,
- underline=0,
- value=110,
- command=self.set_cntx_af_len,
- )
- cntxafmenu.invoke(1)
- cntxmenu.add_cascade(label="After", underline=0, menu=cntxafmenu)
- editmenu.add_cascade(label="Context", underline=0, menu=cntxmenu)
- menubar.add_cascade(label="Edit", underline=0, menu=editmenu)
- self.top.config(menu=menubar)
- def set_result_size(self, **kwargs):
- self.model.result_count = self._result_size.get()
- def set_cntx_af_len(self, **kwargs):
- self._char_after = self._cntx_af_len.get()
- def set_cntx_bf_len(self, **kwargs):
- self._char_before = self._cntx_bf_len.get()
- def _init_corpus_select(self, parent):
- innerframe = Frame(parent, background=self._BACKGROUND_COLOUR)
- self.var = StringVar(innerframe)
- self.var.set(self.model.DEFAULT_CORPUS)
- Label(
- innerframe,
- justify=LEFT,
- text=" Corpus: ",
- background=self._BACKGROUND_COLOUR,
- padx=2,
- pady=1,
- border=0,
- ).pack(side="left")
- other_corpora = list(self.model.CORPORA.keys()).remove(
- self.model.DEFAULT_CORPUS
- )
- om = OptionMenu(
- innerframe,
- self.var,
- self.model.DEFAULT_CORPUS,
- command=self.corpus_selected,
- *self.model.non_default_corpora()
- )
- om["borderwidth"] = 0
- om["highlightthickness"] = 1
- om.pack(side="left")
- innerframe.pack(side="top", fill="x", anchor="n")
- def _init_status(self, parent):
- self.status = Label(
- parent,
- justify=LEFT,
- relief=SUNKEN,
- background=self._BACKGROUND_COLOUR,
- border=0,
- padx=1,
- pady=0,
- )
- self.status.pack(side="top", anchor="sw")
- def _init_query_box(self, parent):
- innerframe = Frame(parent, background=self._BACKGROUND_COLOUR)
- another = Frame(innerframe, background=self._BACKGROUND_COLOUR)
- self.query_box = Entry(another, width=60)
- self.query_box.pack(side="left", fill="x", pady=25, anchor="center")
- self.search_button = Button(
- another,
- text="Search",
- command=self.search,
- borderwidth=1,
- highlightthickness=1,
- )
- self.search_button.pack(side="left", fill="x", pady=25, anchor="center")
- self.query_box.bind("<KeyPress-Return>", self.search_enter_keypress_handler)
- another.pack()
- innerframe.pack(side="top", fill="x", anchor="n")
- def search_enter_keypress_handler(self, *event):
- self.search()
- def _init_results_box(self, parent):
- innerframe = Frame(parent)
- i1 = Frame(innerframe)
- i2 = Frame(innerframe)
- vscrollbar = Scrollbar(i1, borderwidth=1)
- hscrollbar = Scrollbar(i2, borderwidth=1, orient="horiz")
- self.results_box = Text(
- i1,
- font=Font(family="courier", size="16"),
- state="disabled",
- borderwidth=1,
- yscrollcommand=vscrollbar.set,
- xscrollcommand=hscrollbar.set,
- wrap="none",
- width="40",
- height="20",
- exportselection=1,
- )
- self.results_box.pack(side="left", fill="both", expand=True)
- self.results_box.tag_config(
- self._HIGHLIGHT_WORD_TAG, foreground=self._HIGHLIGHT_WORD_COLOUR
- )
- self.results_box.tag_config(
- self._HIGHLIGHT_LABEL_TAG, foreground=self._HIGHLIGHT_LABEL_COLOUR
- )
- vscrollbar.pack(side="left", fill="y", anchor="e")
- vscrollbar.config(command=self.results_box.yview)
- hscrollbar.pack(side="left", fill="x", expand=True, anchor="w")
- hscrollbar.config(command=self.results_box.xview)
- # there is no other way of avoiding the overlap of scrollbars while using pack layout manager!!!
- Label(i2, text=" ", background=self._BACKGROUND_COLOUR).pack(
- side="left", anchor="e"
- )
- i1.pack(side="top", fill="both", expand=True, anchor="n")
- i2.pack(side="bottom", fill="x", anchor="s")
- innerframe.pack(side="top", fill="both", expand=True)
- def _init_paging(self, parent):
- innerframe = Frame(parent, background=self._BACKGROUND_COLOUR)
- self.prev = prev = Button(
- innerframe,
- text="Previous",
- command=self.previous,
- width="10",
- borderwidth=1,
- highlightthickness=1,
- state="disabled",
- )
- prev.pack(side="left", anchor="center")
- self.next = next = Button(
- innerframe,
- text="Next",
- command=self.__next__,
- width="10",
- borderwidth=1,
- highlightthickness=1,
- state="disabled",
- )
- next.pack(side="right", anchor="center")
- innerframe.pack(side="top", fill="y")
- self.current_page = 0
- def previous(self):
- self.clear_results_box()
- self.freeze_editable()
- self.model.prev(self.current_page - 1)
- def __next__(self):
- self.clear_results_box()
- self.freeze_editable()
- self.model.next(self.current_page + 1)
- def about(self, *e):
- ABOUT = "NLTK Concordance Search Demo\n"
- TITLE = "About: NLTK Concordance Search Demo"
- try:
- from tkinter.messagebox import Message
- Message(message=ABOUT, title=TITLE, parent=self.main_frame).show()
- except:
- ShowText(self.top, TITLE, ABOUT)
- def _bind_event_handlers(self):
- self.top.bind(CORPUS_LOADED_EVENT, self.handle_corpus_loaded)
- self.top.bind(SEARCH_TERMINATED_EVENT, self.handle_search_terminated)
- self.top.bind(SEARCH_ERROR_EVENT, self.handle_search_error)
- self.top.bind(ERROR_LOADING_CORPUS_EVENT, self.handle_error_loading_corpus)
- def _poll(self):
- try:
- event = self.queue.get(block=False)
- except q.Empty:
- pass
- else:
- if event == CORPUS_LOADED_EVENT:
- self.handle_corpus_loaded(event)
- elif event == SEARCH_TERMINATED_EVENT:
- self.handle_search_terminated(event)
- elif event == SEARCH_ERROR_EVENT:
- self.handle_search_error(event)
- elif event == ERROR_LOADING_CORPUS_EVENT:
- self.handle_error_loading_corpus(event)
- self.after = self.top.after(POLL_INTERVAL, self._poll)
- def handle_error_loading_corpus(self, event):
- self.status["text"] = "Error in loading " + self.var.get()
- self.unfreeze_editable()
- self.clear_all()
- self.freeze_editable()
- def handle_corpus_loaded(self, event):
- self.status["text"] = self.var.get() + " is loaded"
- self.unfreeze_editable()
- self.clear_all()
- self.query_box.focus_set()
- def handle_search_terminated(self, event):
- # todo: refactor the model such that it is less state sensitive
- results = self.model.get_results()
- self.write_results(results)
- self.status["text"] = ""
- if len(results) == 0:
- self.status["text"] = "No results found for " + self.model.query
- else:
- self.current_page = self.model.last_requested_page
- self.unfreeze_editable()
- self.results_box.xview_moveto(self._FRACTION_LEFT_TEXT)
- def handle_search_error(self, event):
- self.status["text"] = "Error in query " + self.model.query
- self.unfreeze_editable()
- def corpus_selected(self, *args):
- new_selection = self.var.get()
- self.load_corpus(new_selection)
- def load_corpus(self, selection):
- if self.model.selected_corpus != selection:
- self.status["text"] = "Loading " + selection + "..."
- self.freeze_editable()
- self.model.load_corpus(selection)
- def search(self):
- self.current_page = 0
- self.clear_results_box()
- self.model.reset_results()
- query = self.query_box.get()
- if len(query.strip()) == 0:
- return
- self.status["text"] = "Searching for " + query
- self.freeze_editable()
- self.model.search(query, self.current_page + 1)
- def write_results(self, results):
- self.results_box["state"] = "normal"
- row = 1
- for each in results:
- sent, pos1, pos2 = each[0].strip(), each[1], each[2]
- if len(sent) != 0:
- if pos1 < self._char_before:
- sent, pos1, pos2 = self.pad(sent, pos1, pos2)
- sentence = sent[pos1 - self._char_before : pos1 + self._char_after]
- if not row == len(results):
- sentence += "\n"
- self.results_box.insert(str(row) + ".0", sentence)
- word_markers, label_markers = self.words_and_labels(sent, pos1, pos2)
- for marker in word_markers:
- self.results_box.tag_add(
- self._HIGHLIGHT_WORD_TAG,
- str(row) + "." + str(marker[0]),
- str(row) + "." + str(marker[1]),
- )
- for marker in label_markers:
- self.results_box.tag_add(
- self._HIGHLIGHT_LABEL_TAG,
- str(row) + "." + str(marker[0]),
- str(row) + "." + str(marker[1]),
- )
- row += 1
- self.results_box["state"] = "disabled"
- def words_and_labels(self, sentence, pos1, pos2):
- search_exp = sentence[pos1:pos2]
- words, labels = [], []
- labeled_words = search_exp.split(" ")
- index = 0
- for each in labeled_words:
- if each == "":
- index += 1
- else:
- word, label = each.split("/")
- words.append(
- (self._char_before + index, self._char_before + index + len(word))
- )
- index += len(word) + 1
- labels.append(
- (self._char_before + index, self._char_before + index + len(label))
- )
- index += len(label)
- index += 1
- return words, labels
- def pad(self, sent, hstart, hend):
- if hstart >= self._char_before:
- return sent, hstart, hend
- d = self._char_before - hstart
- sent = "".join([" "] * d) + sent
- return sent, hstart + d, hend + d
- def destroy(self, *e):
- if self.top is None:
- return
- self.top.after_cancel(self.after)
- self.top.destroy()
- self.top = None
- def clear_all(self):
- self.query_box.delete(0, END)
- self.model.reset_query()
- self.clear_results_box()
- def clear_results_box(self):
- self.results_box["state"] = "normal"
- self.results_box.delete("1.0", END)
- self.results_box["state"] = "disabled"
- def freeze_editable(self):
- self.query_box["state"] = "disabled"
- self.search_button["state"] = "disabled"
- self.prev["state"] = "disabled"
- self.next["state"] = "disabled"
- def unfreeze_editable(self):
- self.query_box["state"] = "normal"
- self.search_button["state"] = "normal"
- self.set_paging_button_states()
- def set_paging_button_states(self):
- if self.current_page == 0 or self.current_page == 1:
- self.prev["state"] = "disabled"
- else:
- self.prev["state"] = "normal"
- if self.model.has_more_pages(self.current_page):
- self.next["state"] = "normal"
- else:
- self.next["state"] = "disabled"
- def fire_event(self, event):
- # Firing an event so that rendering of widgets happen in the mainloop thread
- self.top.event_generate(event, when="tail")
- def mainloop(self, *args, **kwargs):
- if in_idle():
- return
- self.top.mainloop(*args, **kwargs)
- class ConcordanceSearchModel(object):
- def __init__(self, queue):
- self.queue = queue
- self.CORPORA = _CORPORA
- self.DEFAULT_CORPUS = _DEFAULT
- self.selected_corpus = None
- self.reset_query()
- self.reset_results()
- self.result_count = None
- self.last_sent_searched = 0
- def non_default_corpora(self):
- copy = []
- copy.extend(list(self.CORPORA.keys()))
- copy.remove(self.DEFAULT_CORPUS)
- copy.sort()
- return copy
- def load_corpus(self, name):
- self.selected_corpus = name
- self.tagged_sents = []
- runner_thread = self.LoadCorpus(name, self)
- runner_thread.start()
- def search(self, query, page):
- self.query = query
- self.last_requested_page = page
- self.SearchCorpus(self, page, self.result_count).start()
- def next(self, page):
- self.last_requested_page = page
- if len(self.results) < page:
- self.search(self.query, page)
- else:
- self.queue.put(SEARCH_TERMINATED_EVENT)
- def prev(self, page):
- self.last_requested_page = page
- self.queue.put(SEARCH_TERMINATED_EVENT)
- def reset_results(self):
- self.last_sent_searched = 0
- self.results = []
- self.last_page = None
- def reset_query(self):
- self.query = None
- def set_results(self, page, resultset):
- self.results.insert(page - 1, resultset)
- def get_results(self):
- return self.results[self.last_requested_page - 1]
- def has_more_pages(self, page):
- if self.results == [] or self.results[0] == []:
- return False
- if self.last_page is None:
- return True
- return page < self.last_page
- class LoadCorpus(threading.Thread):
- def __init__(self, name, model):
- threading.Thread.__init__(self)
- self.model, self.name = model, name
- def run(self):
- try:
- ts = self.model.CORPORA[self.name]()
- self.model.tagged_sents = [
- " ".join(w + "/" + t for (w, t) in sent) for sent in ts
- ]
- self.model.queue.put(CORPUS_LOADED_EVENT)
- except Exception as e:
- print(e)
- self.model.queue.put(ERROR_LOADING_CORPUS_EVENT)
- class SearchCorpus(threading.Thread):
- def __init__(self, model, page, count):
- self.model, self.count, self.page = model, count, page
- threading.Thread.__init__(self)
- def run(self):
- q = self.processed_query()
- sent_pos, i, sent_count = [], 0, 0
- for sent in self.model.tagged_sents[self.model.last_sent_searched :]:
- try:
- m = re.search(q, sent)
- except re.error:
- self.model.reset_results()
- self.model.queue.put(SEARCH_ERROR_EVENT)
- return
- if m:
- sent_pos.append((sent, m.start(), m.end()))
- i += 1
- if i > self.count:
- self.model.last_sent_searched += sent_count - 1
- break
- sent_count += 1
- if self.count >= len(sent_pos):
- self.model.last_sent_searched += sent_count - 1
- self.model.last_page = self.page
- self.model.set_results(self.page, sent_pos)
- else:
- self.model.set_results(self.page, sent_pos[:-1])
- self.model.queue.put(SEARCH_TERMINATED_EVENT)
- def processed_query(self):
- new = []
- for term in self.model.query.split():
- term = re.sub(r"\.", r"[^/ ]", term)
- if re.match("[A-Z]+$", term):
- new.append(BOUNDARY + WORD_OR_TAG + "/" + term + BOUNDARY)
- elif "/" in term:
- new.append(BOUNDARY + term + BOUNDARY)
- else:
- new.append(BOUNDARY + term + "/" + WORD_OR_TAG + BOUNDARY)
- return " ".join(new)
- def app():
- d = ConcordanceSearchView()
- d.mainloop()
- if __name__ == "__main__":
- app()
- __all__ = ["app"]
|