Skip to content
52 changes: 52 additions & 0 deletions monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf -*-
"""
monitor.py

Simple script to monitor one or more output files from track_keywords.py

Based on:
https://github.com/seb-m/pyinotify/blob/master/python2/examples/transient_file.py

Kevin Driscoll, 2014
"""

import colors
import pyinotify
import sys
import time

global nextcolor

class ProcessTransientFile(pyinotify.ProcessEvent):

def __init__(self, n):
global nextcolor
self.fg = nextcolor
self.lastupdate = time.time()
self.freq = 0

def process_IN_MODIFY(self, event):
now = time.time()
report = "{0}s".format(str(round(now-self.lastupdate, 2)))
self.lastupdate = now
self.freq += 1
if not self.freq % 50:
print colors.color(' '.join((event.name, report)), fg=self.fg)

def process_default(self, event):
print 'default: ', event.maskname

if __name__=="__main__":

filenames = sys.argv[1:]
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)

print "Tracking..."
for n, fn in enumerate(filenames):
print fn
nextcolor = (n % 6) + 1
wm.watch_transient_file(fn, pyinotify.IN_MODIFY, ProcessTransientFile)
print

notifier.loop()
111 changes: 95 additions & 16 deletions track_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
"""
Track keywords on Twitter using the public Streaming API

TODO
Add hooks for extracting info about incoming tweets
e.g., identifying new users
Better monitoring
E-mail admins on error

Limitations:
Max 400 keywords, 5000 users
https://dev.twitter.com/discussions/4120
2013

"""
Expand All @@ -11,8 +20,11 @@
import json
import time
import datetime
import math
import pymongo
import requests
import socket
import time
import webbrowser
from urlparse import parse_qs
from requests_oauthlib import OAuth1, OAuth1Session
Expand Down Expand Up @@ -92,6 +104,32 @@ def authorize(consumer_key, consumer_secret):
# API wrappers
#

def track_sample(twitter):
"""Yields tweets one dict at a time from "sample" endpoint
See: https://dev.twitter.com/docs/api/1.1/get/statuses/sample

twitter: OAuth1Session object authenticated already
"""

# Prepare for GET request
sample_url = "https://stream.twitter.com/1.1/statuses/sample.json"

# Create Request.get object
r = twitter.get(url=sample_url, params={}, stream=True)

# Iterate over the request
for line in r.iter_lines():
if line :
try:
# TODO
# Sometimes it returns a "disconnect" obj
# before closing the stream
tweet = json.loads(line)
yield tweet
except ValueError:
# Couldn't construct a valid tweet
pass

def track(twitter, keywords=[], user_ids=[]):
"""Iterator that yields tweets as dicts one at a time

Expand Down Expand Up @@ -181,8 +219,8 @@ def dump_to_mongo(tracker, collection):
# Insert each json as an entry in the mongodb collection
entry = collection.insert(tweet)

def dump_to_stdout(tracker, encoding='utf-16', tracer=0):
""" Loop over tweets in tracker and print them to stdout
def process(tracker, encoding='utf-16', tracer=0):
""" Yield tweets from tracker stream
If tracer a non-zero integer, then the text of
every tracer-th tweet will be printed to stderr
"""
Expand All @@ -194,7 +232,12 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0):

for n, tweet in enumerate(tracker):
j = json.dumps(tweet, encoding=encoding)
print j
# Check that this is a tweet
if not u'delete' in j:
# Any other pre-processing can happen here
# For example, removing unwanted keys to shrink the dict
yield j

minuteCounter += 1

# Print tracer to stderr
Expand Down Expand Up @@ -246,6 +289,10 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0):
type=str,
default=u"",
help="Access Token Secret")
parser.add_argument('--sample',
action='store_true',
default=False,
help="Use Twitter's sample endpoint")
parser.add_argument('--keywords',
type=str,
default='',
Expand All @@ -254,6 +301,10 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0):
type=str,
default='',
help='Path to file with user IDs, one per line')
parser.add_argument('--retries',
type=int,
default=0,
help="Maximum retries upon error")
parser.add_argument('--tracer',
type=int,
default=0,
Expand All @@ -266,8 +317,8 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0):
access_token = args.resourcekey
access_token_secret = args.resourcesecret

if not args.keywords and not args.userids:
sys.stderr.write("Nothing to track! Please supply keywords or user IDs.\n")
if not args.keywords and not args.userids and not args.sample:
sys.stderr.write("Nothing to track! Please supply keywords or user IDs or use the Twitter sample.\n")
sys.exit(1)

keywords = []
Expand All @@ -289,18 +340,46 @@ def dump_to_stdout(tracker, encoding='utf-16', tracer=0):
for line in f:
uid = line.strip()
if uid:
keywords.append(uid)
user_ids.append(uid)
sys.stderr.write('\t')
sys.stderr.write(uid)
sys.stderr.write('\n')

sys.stderr.write('\nAuthorizing tracker with Twitter...')
sesh = get_session(consumer_key,
consumer_secret,
access_token,
access_token_secret)
stream = track(sesh, keywords, user_ids)
sys.stderr.write('done!\n')

sys.stderr.write('\nStarting tracker...\n')
dump_to_stdout(stream, tracer=args.tracer)
# The purpose of this loop is to restart the tracker
# when an error comes down the stream.
# It's pretty dumb about which errors it catches
# which is why there is a maximum number of retries
retries = 0
while retries <= args.retries:
if retries:
naptime = int(round(math.log(retries)) * 10)
sys.stderr.write('Sleeping for {0} seconds...\n'.format(naptime))
time.sleep(naptime)
sys.stderr.write('\nAuthorizing tracker with Twitter...')
sesh = get_session(consumer_key,
consumer_secret,
access_token,
access_token_secret)
if args.sample:
sys.stderr.write('\nReading from the Twitter sample endpoint...')
stream = track_sample(sesh)
else:
sys.stderr.write('\nReading from the Twitter filter endpoint...')
stream = track(sesh, keywords, user_ids)
sys.stderr.write('done!\n')

sys.stderr.write('\nStarting tracker...\n')
try:
for cleantweet in process(stream, tracer=args.tracer):
print cleantweet
except socket.error, (value, message):
sys.stderr.write(value)
sys.stderr.write(message)
sys.stderr.write('\n')
except KeyboardInterrupt:
sys.exit(1)
except:
sys.stderr.write('Unknown exception.\n')
retries += 1
sys.stderr.write('Trying to restart tracker ({0})...\n'.format(retries))
sys.stderr.write('Nope. Maximum retries reached.\n')