Source code for orangecontrib.text.corpus

import os
from copy import copy
from numbers import Integral
from itertools import chain

import nltk
import numpy as np
import scipy.sparse as sp
from gensim import corpora

from Orange.data import Table, Domain, ContinuousVariable
from orangecontrib.text.vectorization import BowVectorizer


def get_sample_corpora_dir():
    path = os.path.dirname(__file__)
    directory = os.path.join(path, 'datasets')
    return os.path.abspath(directory)


def _check_arrays(*arrays):
    for a in arrays:
        if not (a is None or isinstance(a, np.ndarray) or sp.issparse(a)):
            raise TypeError('Argument {} should be of type np.array, sparse or None.'.format(a))

    lengths = set(a.shape[0] for a in arrays if a is not None)
    if len(lengths) > 1:
        raise ValueError('Leading dimension mismatch')

    return lengths.pop() if len(lengths) else 0


[docs]class Corpus(Table): """Internal class for storing a corpus."""
[docs] def __new__(cls, *args, **kwargs): """Bypass Table.__new__.""" return object.__new__(cls)
[docs] def __init__(self, X=None, Y=None, metas=None, domain=None, text_features=None): """ Args: X (numpy.ndarray): attributes Y (numpy.ndarray): class variables metas (numpy.ndarray): meta attributes; e.g. text domain (Orange.data.Domain): the domain for this Corpus text_features (list): meta attributes that are used for text mining. Infer them if None. """ n_doc = _check_arrays(X, Y, metas) self.X = X if X is not None else np.zeros((n_doc, 0)) self.Y = Y if Y is not None else np.zeros((n_doc, 0)) self.metas = metas if metas is not None else np.zeros((n_doc, 0)) self.W = np.zeros((n_doc, 0)) self.domain = domain self.text_features = None # list of text features for mining self._tokens = None self._dictionary = None self._ngrams_corpus = None self.ngram_range = (1, 1) self.attributes = {} self.pos_tags = None if domain is not None and text_features is None: self._infer_text_features() elif domain is not None: self.set_text_features(text_features) Table._init_ids(self)
[docs] def set_text_features(self, feats): """ Select which meta-attributes to include when mining text. Args: feats (list): list of text features to include. """ for f in feats: if f not in chain(self.domain.variables, self.domain.metas): raise ValueError('Feature "{}" not found.'.format(f)) if len(set(feats)) != len(feats): raise ValueError('Text features must be unique.') self.text_features = feats self._tokens = None # invalidate tokens
def _infer_text_features(self): """ Infer which text features to use. If nothing was provided in the file header, use the first text feature. """ include_feats = [] first = None for attr in self.domain.metas: if attr.is_string: if first is None: first = attr if attr.attributes.get('include', 'False') == 'True': include_feats.append(attr) if len(include_feats) == 0 and first: include_feats.append(first) self.set_text_features(include_feats)
[docs] def extend_corpus(self, metadata, Y): """ Append documents to corpus. Args: metadata (numpy.ndarray): Meta data Y (numpy.ndarray): Class variables """ self.metas = np.vstack((self.metas, metadata)) cv = self.domain.class_var for val in set(Y): if val not in cv.values: cv.add_value(val) new_Y = np.array([cv.to_val(i) for i in Y])[:, None] self._Y = np.vstack((self._Y, new_Y)) self.X = self.W = np.zeros((len(self), 0)) Table._init_ids(self) self._tokens = None # invalidate tokens
[docs] def extend_attributes(self, X, feature_names, var_attrs=None): """ Append features to corpus. Args: X (numpy.ndarray or scipy.sparse.csr_matrix): Features to append feature_names (list): List of string containing feature names var_attrs (dict): Additional attributes appended to variable.attributes. """ if self.X.size == 0: self.X = X elif sp.issparse(self.X) or sp.issparse(X): self.X = sp.hstack((self.X, X)).tocsr() else: self.X = np.hstack((self.X, X)) new_attr = self.domain.attributes for f in feature_names: var = ContinuousVariable.make(f) if isinstance(var_attrs, dict): var.attributes.update(var_attrs) new_attr += (var, ) new_domain = Domain( attributes=new_attr, class_vars=self.domain.class_vars, metas=self.domain.metas ) self.domain = new_domain
@property def documents(self): """ Returns: a list of strings representing documents — created by joining selected text features. """ return self.documents_from_features(self.text_features)
[docs] def documents_from_features(self, feats): """ Args: feats (list): A list fo features to join. Returns: a list of strings constructed by joining feats. """ # create a Table where feats are in metas data = Table(Domain([], [], [i.name for i in feats], source=self.domain), self) # When we use only features coming from sparse X data.metas is sparse. # Transform it to dense. if sp.issparse(data.metas): data.metas = data.metas.toarray() return [' '.join(f.str_val(val) for f, val in zip(data.domain.metas, row)) for row in data.metas]
[docs] def store_tokens(self, tokens, dictionary=None): """ Args: tokens (list): List of lists containing tokens. """ self._tokens = np.array(tokens) self._dictionary = dictionary or corpora.Dictionary(self.tokens)
@property def tokens(self): """ np.ndarray: A list of lists containing tokens. If tokens are not yet present, run default preprocessor and save tokens. """ if self._tokens is None: self._apply_base_preprocessor() return self._tokens
[docs] def has_tokens(self): """ Return whether corpus is preprocessed or not. """ return self._tokens is not None
def _apply_base_preprocessor(self): from orangecontrib.text.preprocess import base_preprocessor corpus = base_preprocessor(self) self.store_tokens(corpus.tokens, corpus.dictionary) @property def dictionary(self): """ corpora.Dictionary: A token to id mapper. """ if self._dictionary is None: self._apply_base_preprocessor() return self._dictionary @classmethod def from_table(cls, domain, source, row_indices=...): t = super().from_table(domain, source, row_indices) return Corpus(t.X, t.Y, t.metas, t.domain, None) @classmethod def from_corpus(cls, domain, source, row_indices=...): c = cls.from_table(domain, source, row_indices) c.text_features = source.text_features return c @classmethod def from_file(cls, filename): if not os.path.exists(filename): # check the default location abs_path = os.path.join(get_sample_corpora_dir(), filename) if not abs_path.endswith('.tab'): abs_path += '.tab' if not os.path.exists(abs_path): raise FileNotFoundError('File "{}" not found.'.format(filename)) else: filename = abs_path table = Table.from_file(filename) return cls(table.X, table.Y, table.metas, table.domain, None) def ngrams_iterator(self, join_with=' ', include_postags=False): if self.pos_tags is None: include_postags = False if include_postags: data = zip(self.tokens, self.pos_tags) else: data = self.tokens if join_with is None: processor = lambda doc, n: nltk.ngrams(doc, n) elif include_postags: processor = lambda doc, n: (join_with.join(token + '_' + tag for token, tag in ngram) for ngram in nltk.ngrams(zip(*doc), n)) else: processor = lambda doc, n: (join_with.join(ngram) for ngram in nltk.ngrams(doc, n)) return (list(chain(*(processor(doc, n) for n in range(self.ngram_range[0], self.ngram_range[1]+1)))) for doc in data) @property def ngrams_corpus(self): if self._ngrams_corpus is None: return BowVectorizer().transform(self).ngrams_corpus return self._ngrams_corpus @ngrams_corpus.setter def ngrams_corpus(self, value): self._ngrams_corpus = value @property def ngrams(self): """generator: Ngram representations of documents.""" return self.ngrams_iterator(join_with=' ')
[docs] def copy(self): """Return a copy of the table.""" c = self.__class__(self.X.copy(), self.Y.copy(), self.metas.copy(), self.domain, copy(self.text_features)) # since tokens and dictionary are considered immutable copies are not needed c._tokens = self._tokens c._dictionary = self._dictionary c.ngram_range = self.ngram_range c.pos_tags = self.pos_tags return c
def __getitem__(self, key): c = super().__getitem__(key) if self._tokens is not None: # retain preprocessing if isinstance(key, tuple): # get row selection key = key[0] if isinstance(key, Integral): c._tokens = np.array([self._tokens[key]]) c.pos_tags = None if self.pos_tags is None else np.array([self.pos_tags[key]]) elif isinstance(key, list) or isinstance(key, np.ndarray) or isinstance(key, slice): c._tokens = self._tokens[key] c.pos_tags = None if self.pos_tags is None else self.pos_tags[key] else: raise TypeError('Indexing by type {} not supported.'.format(type(key))) c._dictionary = self._dictionary c.text_features = self.text_features c.ngram_range = self.ngram_range c.attributes = self.attributes return c def __len__(self): return len(self.metas) def __eq__(self, other): return (self.text_features == other.text_features and self._dictionary == other._dictionary and np.array_equal(self._tokens, other._tokens) and np.array_equal(self.X, other.X) and np.array_equal(self.Y, other.Y) and np.array_equal(self.metas, other.metas) and np.array_equal(self.pos_tags, other.pos_tags) and self.domain == other.domain and self.ngram_range == other.ngram_range)