Source code for orangecontrib.text.twitter

""" A module for fetching data from `The Twitter Search API <https://dev.twitter.com/rest/public/search>`_. """
import threading
from collections import OrderedDict

import numpy as np
import tweepy

from Orange import data
from orangecontrib.text.corpus import Corpus
from orangecontrib.text.language_codes import code2lang

__all__ = ['Credentials', 'TwitterAPI']


def coordinates_geoJSON(json):
    if json:
        return json.get('coordinates', [None, None])
    return [None, None]


[docs]class Credentials: """ Twitter API credentials. """ def __init__(self, consumer_key, consumer_secret): self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.auth = tweepy.OAuthHandler(self.consumer_key, self.consumer_secret) self._valid = None @property def valid(self): """bool: Indicates whether it's a valid credentials. """ if self._valid is None: self.check() return self._valid def check(self): try: self.auth.get_authorization_url() self._valid = True except tweepy.TweepError: self._valid = False return self._valid def __getstate__(self): odict = self.__dict__.copy() odict['_valid'] = None odict.pop('auth') return odict def __setstate__(self, odict): self.__dict__.update(odict) self.auth = tweepy.OAuthHandler(self.consumer_key, self.consumer_secret) def __eq__(self, other): return isinstance(other, Credentials) \ and self.consumer_key == other.consumer_key \ and self.consumer_secret == other.consumer_secret
[docs]class TwitterAPI: """ Fetch tweets from the Tweeter API. Notes: Every search accumulates downloaded tweets. To remove the stored tweets call `reset` method. """ metas = [ (data.StringVariable('text'), lambda st: st.text), (data.StringVariable('author_description'), lambda st: st.author.description), (data.StringVariable('place'), lambda st: getattr(st.place, 'country_code', None)), ] text_features = [metas[0][0]] attributes = [ (data.DiscreteVariable('id'), lambda st: st.id_str), (data.DiscreteVariable('in_reply_to_user_id'), lambda st: st.in_reply_to_user_id), (data.ContinuousVariable('favorite_count'), lambda st: st.favorite_count), (data.ContinuousVariable('retweet_count'), lambda st: st.retweet_count), (data.TimeVariable('created_at'), lambda st: st.created_at.timestamp()), (data.DiscreteVariable('lang'), lambda st: st.lang), (data.DiscreteVariable('author_id'), lambda st: st.author.id_str), (data.DiscreteVariable('author_name'), lambda st: st.author.name), (data.DiscreteVariable('author_screen_name'), lambda st: st.author.screen_name), (data.ContinuousVariable('author_statuses_count'), lambda st: st.author.statuses_count), (data.ContinuousVariable('author_favourites_count'), lambda st: st.author.favourites_count), (data.ContinuousVariable('author_friends_count'), lambda st: st.author.friends_count), (data.ContinuousVariable('author_followers_count'), lambda st: st.author.followers_count), (data.ContinuousVariable('author_listed_count'), lambda st: st.author.listed_count), (data.DiscreteVariable('author_verified'), lambda st: str(st.author.verified)), (data.ContinuousVariable('coordinates_longitude'), lambda st: coordinates_geoJSON(st.coordinates)[0]), (data.ContinuousVariable('coordinates_latitude'), lambda st: coordinates_geoJSON(st.coordinates)[1]), ] supported_fields = metas + attributes def __init__(self, credentials, on_start=None, on_progress=None, on_error=None, on_finish=None): self.key = credentials self.api = tweepy.API(credentials.auth) self.statuses_lock = threading.Lock() self.task = None # Callbacks: self.on_progress = on_progress self.on_error = on_error self.on_finish = on_finish self.on_start = on_start self.container = OrderedDict() self.history = [] @property def tweets(self): """ Iterator over the downloaded documents. """ return self.container.values() @staticmethod def build_query(word_list=None, authors=None, since=None, until=None, allow_retweets=True): if authors is None: authors = [] if word_list is None: word_list = [] if not word_list and not authors: # allows empty queries query = "from: " else: query = " OR ".join(['"{}"'.format(q) for q in word_list] + ['from:{}'.format(user) for user in authors]) if since: query += ' since:' + since.strftime('%Y-%m-%d') if until: query += ' until:' + until.strftime('%Y-%m-%d') if not allow_retweets: query += ' -filter:retweets' return query
[docs] def search(self, *, word_list=None, authors=None, max_tweets=None, lang=None, since=None, until=None, allow_retweets=True): """ Performs search for tweets. All the parameters optional. Args: max_tweets (int): If present limits the number of downloaded tweets. word_list (list of str): A list of key words to search for. authors (list of str): A list of tweets' author. lang (str): A language's code (either ISO 639-1 or ISO 639-3 formats). since (str): Fetch tweets only from this date. until (str): Fetch tweets only to this date. allow_retweets(bool): Whether to download retweets. """ query = self.build_query(word_list=word_list, authors=authors, since=since, until=until, allow_retweets=allow_retweets) self.task = SearchTask(self, q=query, lang=lang, max_tweets=max_tweets) self.history.append(self.task) self.task.start()
def disconnect(self): if self.task: self.task.disconnect() @property def running(self): """bool: Indicates whether there is an active task. """ return self.task is not None and self.task.running def join(self, *args): if self.task: self.task.join(*args) def add_status(self, status): status_record = {attr.name: getter(status) for attr, getter in self.supported_fields} self.statuses_lock.acquire() self.container[status.id] = status_record self.statuses_lock.release()
[docs] def create_corpus(self, included_attributes=None): """ Creates a corpus with collected tweets. Args: included_attributes(Optional[List of string]): A list of tweets' attributes to be included in the result. """ if included_attributes: attributes = [(attr, _) for attr, _ in self.attributes if attr.name in included_attributes] metas = [(attr, _) for attr, _ in self.metas if attr.name in included_attributes] text_features = [attr for attr in self.text_features if attr.name in included_attributes] else: attributes = self.attributes metas = self.metas text_features = self.text_features domain = data.Domain(attributes=[attr for attr, _ in attributes], metas=[attr for attr, _ in metas]) self.statuses_lock.acquire() for attr in domain.attributes: if isinstance(attr, data.DiscreteVariable): attr.values = [] def to_val(attr, val): if isinstance(attr, data.DiscreteVariable) and val not in attr.values: attr.add_value(val) return attr.to_val(val) X = np.array([ [to_val(attr, record[attr.name]) for attr, _ in attributes] for record in self.tweets ]) metas = np.array([ [record[attr.name] for attr, _ in metas] for record in self.tweets ], dtype=object) self.statuses_lock.release() corpus = Corpus(X=X, metas=metas, domain=domain, text_features=text_features) corpus.name = 'Twitter' return corpus
[docs] def reset(self): """ Removes all downloaded tweets. """ if self.task: self.task.disconnect() self.task.join() self.history = [] self.container = OrderedDict()
class SearchTask(threading.Thread): def __init__(self, master, q, lang=None, max_tweets=None, **kwargs): super().__init__() self.master = master self.q = q self.lang = lang self.running = False self.max_tweets = max_tweets self.kwargs = kwargs def disconnect(self): self.running = False def start(self): self.running = True self.progress = 0 if self.master.on_start: self.master.on_start() super().start() def run(self): try: for status in tweepy.Cursor(self.master.api.search, q=self.q, lang=self.lang, **self.kwargs).items(self.max_tweets): self.master.add_status(status) self.progress += 1 if self.master.on_progress: self.master.on_progress(self.progress) if not self.running: break except tweepy.TweepError as e: if self.master.on_error: self.master.on_error(str(e)) self.finish() def finish(self): self.running = False if self.master.on_finish: self.master.on_finish() def report(self): return (('Query', self.q), ('Language', code2lang.get(self.lang, 'Any')), ('Tweets count', self.progress))