Skip to content

Commit

Permalink
Merge pull request #2 from stanupab/development-v2
Browse files Browse the repository at this point in the history
Development v2
  • Loading branch information
stanupab authored Apr 18, 2018
2 parents bb41144 + bfeb1af commit a251247
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 35 deletions.
6 changes: 2 additions & 4 deletions app/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,7 @@ def daemonize(self):
# Exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write(
"fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)

# Decouple from parent environment
Expand All @@ -367,8 +366,7 @@ def daemonize(self):
# Exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write(
"fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)

sys.stdout.flush()
Expand Down
17 changes: 15 additions & 2 deletions app/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import config
from datetime import datetime
from bson.objectid import ObjectId
from pymongo import MongoClient
from werkzeug import check_password_hash
from app import app


class DB(object):
"""
A STACK wrapper to handle recurring interactions with MongoDB
Expand All @@ -13,6 +15,10 @@ def __init__(self):
# Class instance connection to Mongo
self.connection = MongoClient()

if config.AUTH:
self.connection.admin.authenticate(config.USERNAME, config.PASSWORD)


# App-wide config file for project info access
self.config_db = self.connection.config
self.stack_config = self.config_db.config
Expand Down Expand Up @@ -45,6 +51,8 @@ def create(self, project_name, password, hashed_password, description=None,
'project_name': project_name,
'password': hashed_password,
'description': description,
'created_date': (datetime.now()).isoformat(),
'last_updated': (datetime.now()).isoformat(),
'configdb': None,
'collectors': None,
'admin': 1
Expand All @@ -57,6 +65,8 @@ def create(self, project_name, password, hashed_password, description=None,
'project_name': project_name,
'password': hashed_password,
'description': description,
'created_date': (datetime.now()).isoformat(),
'last_updated': (datetime.now()).isoformat(),
'email': email,
'collectors': [],
'configdb': configdb,
Expand Down Expand Up @@ -364,7 +374,7 @@ def set_collector_detail(self, project_id, collector_name, network, collection_t
resp = coll.find_one({'collector_name': collector_name})
collector_id = str(resp['_id'])

self.stack_config.update({'_id': ObjectId(project_id)}, {'$push': {'collectors': {
self.stack_config.update({'_id': ObjectId(project_id)}, {'$set': {'last_updated': (datetime.now()).isoformat()}, '$push': {'collectors': {
'name': collector_name, 'collector_id': collector_id, 'active': 0}}})
status = 1
message = 'Collector created successfully!'
Expand Down Expand Up @@ -448,9 +458,12 @@ def update_collector_detail(self, project_id, collector_id, **kwargs):
}]
update_doc['terms_list'] = kwargs['terms_list']

# Finally, updated the collector
# Finally, updated the collector and project detail
try:
coll.update({'_id': ObjectId(collector_id)}, {'$set': update_doc})
self.stack_config.update({'_id': ObjectId(project_id)}, {'$set': {'last_updated': datetime.date(datetime.now()).isoformat()}})
status = 1
message = 'Collector created successfully!'

status = 1
message = 'Collector updated successfully.'
Expand Down
8 changes: 5 additions & 3 deletions app/twitter/ThreadedCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def on_data(self, data):
if not os.path.isfile(JSONfileName):
self.logger.info('Creating new file: %s' % JSONfileName)
myFile = open(JSONfileName,'a')
message['tweet_type'] = self.collection_type
myFile.write(json.dumps(message).encode('utf-8'))
myFile.write('\n')
myFile.close()
Expand Down Expand Up @@ -196,10 +197,11 @@ def on_limit(self, message):
"""
Handles limit notices
"""
self.logger.warning('COLLECTION LISTENER: Stream rate limiting caused us to miss %s tweets' % (message['limit'].get('track')))
print 'Stream rate limiting caused us to miss %s tweets' % (message['limit'].get('track'))

message['limit']['time'] = time.strftime('%Y-%m-%dT%H:%M:%S')
message['collector'] = self.collector['collector_name']

self.logger.warning('COLLECTION LISTENER: Stream rate limiting caused us to miss %s tweets at %s' % (message['limit'].get('track'), message['limit']['time']))
print 'Stream rate limiting caused us to miss %s tweets at %s' % (message['limit'].get('track'), message['limit']['time'])

time_str = time.strftime(self.tweetsOutFileDateFrmt)
JSON_file_name = self.tweetsOutFilePath + time_str + '-' + self.collector['collector_name'] + '-streamlimits-' + self.project_id + '-' + self.collector_id + '-' + self.tweetsOutFile
Expand Down
11 changes: 10 additions & 1 deletion app/twitter/mongoBatchInsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import time
import glob
import simplejson
from pymongo import Connection
from email.utils import parsedate_tz
from collections import defaultdict
import sys
Expand Down Expand Up @@ -77,6 +76,13 @@ def insert_tweet_list(mongoCollection, tweets_list, line_number, processedTweets
print traceback.format_exc()
pass

except pymongo.errors.DuplicateKeyError, e:
print "Exception during mongo insert"
logger.warning("Duplicate error during mongo insert at or before file line number %d (%s)" % (line_number, processedTweetsFile))
logging.exception(e)
print traceback.format_exc()
pass

return inserted_ids_list

# Parse Twitter created_at datestring and turn it into
Expand Down Expand Up @@ -170,6 +176,9 @@ def go(project_id, rawdir, insertdir, logdir):

tweet = simplejson.loads(line)

# use tweet id as mongo id
#tweet['_id'] = tweet['id']

# now, when we did the process tweet step we already worked with
# these dates. If they failed before, they shouldn't file now, but
# if they do we are going to skip this tweet and go on to the next one
Expand Down
191 changes: 168 additions & 23 deletions app/twitter/tweetprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,40 +93,185 @@ def process_tweet(line, track_list, expand_url=False):
tweet['mentions'].sort()

tweet['text_hash'] = hashlib.md5(tweet['text'].encode("utf-8")).hexdigest()

tweet["track_kw"] = {"org_tweet" : {}, "rt_tweet" : {}, "qt_tweet" : {}}

# Check to see if we have a retweet
if tweet.has_key("retweeted_status") and tweet['retweeted_status']['truncated']== True:

# Check to see if we have a retweet
if tweet.has_key("retweeted_status") and tweet['truncated']== True:
# Track rule matches
tweet['track_kw'] = {}
rt_hashtags = []
rt_mentions = []
rt_urls = []

for index in range(len(tweet['retweeted_status']['extended_tweet']['entities']['hashtags'])):
rt_hashtags.append(tweet['retweeted_status']['extended_tweet']['entities']['hashtags'][index]['text'].lower())
for index in range(len(tweet['retweeted_status']['extended_tweet']['entities']['user_mentions'])):
rt_mentions.append(tweet['retweeted_status']['extended_tweet']['entities']['user_mentions'][index]['screen_name'].lower())
for index in range(len(tweet['retweeted_status']['extended_tweet']['entities']['urls'])):
rt_urls.append(tweet['retweeted_status']['extended_tweet']['entities']['urls'][index]['expanded_url'].lower())

if track_set:
rt_hashtags = set([x.lower() for x in rt_hashtags])
rt_mentions = set([x.lower() for x in rt_mentions])
track_set = set([x.lower() for x in track_set])
tweet["track_kw"]["rt_tweet"]["hashtags"] = list(set(rt_hashtags).intersection(track_set))
tweet["track_kw"]["rt_tweet"]["mentions"] = list(set(rt_mentions).intersection(track_set))
rt_text = re.sub('[%s]' % punct, ' ', tweet['retweeted_status']['extended_tweet']['full_text'])
rt_text = rt_text.lower().split()
tweet["track_kw"]["rt_tweet"]["text"] = list(set(rt_text).intersection(track_set))
tmpURLs = []
for url in rt_urls:
for x in track_set:
if x in url:
tmpURLs.append(url)
tweet["track_kw"]["rt_tweet"]["urls"] = list(tmpURLs)

#Check if is retweet and it is not truncated
elif tweet.has_key("retweeted_status") and tweet['retweeted_status']['truncated']== False:
rt_hashtags = []
rt_mentions = []
rt_urls = []

for index in range(len(tweet['retweeted_status']['entities']['hashtags'])):
rt_hashtags.append(tweet['retweeted_status']['entities']['hashtags'][index]['text'].lower())
rt_hashtags.append(tweet['retweeted_status']['entities']['hashtags'][index]['text'].lower())
for index in range(len(tweet['retweeted_status']['entities']['user_mentions'])):
rt_mentions.append(tweet['retweeted_status']['entities']['user_mentions'][index]['screen_name'].lower())
untion_hashtags = set(tweet['hashtags']).union(set(rt_hashtags))
untion_mentions = set(tweet['mentions']).union(set(rt_hashtags))
rt_mentions.append(tweet['retweeted_status']['entities']['user_mentions'][index]['screen_name'].lower())
for index in range(len(tweet['retweeted_status']['entities']['urls'])):
rt_urls.append(tweet['retweeted_status']['entities']['urls'][index]['expanded_url'].lower())

if track_set:
tweet['track_kw']['hashtags'] = list(untion_hashtags.intersection(track_set))
tweet['track_kw']['mentions'] = list(untion_mentions.intersection(track_set))
tweet_text = re.sub('[%s]' % punct, ' ', tweet['text'])
rt_hashtags = set([x.lower() for x in rt_hashtags])
rt_mentions = set([x.lower() for x in rt_mentions])
track_set = set([x.lower() for x in track_set])
tweet["track_kw"]["rt_tweet"]["hashtags"] = list(set(rt_hashtags).intersection(track_set)) #list(rt_hashtags.intersection(track_set))
tweet["track_kw"]["rt_tweet"]["mentions"] = list(set(rt_mentions).intersection(track_set)) #list(rt_mentions.intersection(track_set))
rt_text = re.sub('[%s]' % punct, ' ', tweet['retweeted_status']['text'])
tweet_text = tweet_text.lower().split()
rt_text = rt_text.lower().split()
union_text = set(rt_text).union(set(tweet_text))
tweet['track_kw']['text'] = list(union_text.intersection(track_set))

elif track_set:
# Track rule matches
tweet['track_kw'] = {}
tweet['track_kw']['hashtags'] = list(set(tweet['hashtags']).intersection(track_set))
tweet['track_kw']['mentions'] = list(set(tweet['mentions']).intersection(track_set))
tweet_text = re.sub('[%s]' % punct, ' ', tweet['text'])
tweet_text = tweet_text.lower().split()
tweet['track_kw']['text'] = list(set(tweet_text).intersection(track_set))
tweet["track_kw"]["rt_tweet"]["text"] = list(set(rt_text).intersection(track_set))
tmpURLs = []
for url in rt_urls:
for x in track_set:
if x in url:
tmpURLs.append(url)
tweet["track_kw"]["rt_tweet"]["urls"] = list(tmpURLs)


#check if we have a quoted tweet and if it is truncated
if tweet.has_key("quoted_status") and tweet['quoted_status']['truncated']== True :

qt_hashtags = []
qt_mentions = []
qt_urls = []

for index in range(len(tweet['quoted_status']['extended_tweet']['entities']['hashtags'])):
qt_hashtags.append(tweet['quoted_status']['extended_tweet']['entities']['hashtags'][index]['text'].lower())
for index in range(len(tweet['quoted_status']['extended_tweet']['entities']['user_mentions'])):
qt_mentions.append(tweet['quoted_status']['extended_tweet']['entities']['user_mentions'][index]['screen_name'].lower())
for index in range(len(tweet['quoted_status']['extended_tweet']['entities']['urls'])):
qt_urls.append(tweet['quoted_status']['extended_tweet']['entities']['urls'][index]['expanded_url'].lower())


if track_set:
qt_hashtags = set([x.lower() for x in qt_hashtags])
qt_mentions = set([x.lower() for x in qt_mentions])
track_set = set([x.lower() for x in track_set])
tweet["track_kw"]["qt_tweet"]["hashtags"] = list(set(qt_hashtags).intersection(track_set))
tweet["track_kw"]["qt_tweet"]["mentions"] = list(set(qt_mentions).intersection(track_set))
qt_text = re.sub('[%s]' % punct, ' ', tweet['quoted_status']['extended_tweet']['full_text'])
qt_text = qt_text.lower().split()
tweet["track_kw"]["qt_tweet"]["text"] = list(set(qt_text).intersection(track_set))
tmpURLs = []
for url in qt_urls:
for x in track_set:
if x in url:
tmpURLs.append(url)
tweet["track_kw"]["qt_tweet"]["urls"] = list(tmpURLs)

#Check if we have a quoted tweet and it is not truncated
elif tweet.has_key("quoted_status") and tweet['quoted_status']['truncated']== False :

qt_hashtags = []
qt_mentions = []
qt_urls = []

for index in range(len(tweet['quoted_status']['entities']['hashtags'])):
qt_hashtags.append(tweet['quoted_status']['entities']['hashtags'][index]['text'].lower())
for index in range(len(tweet['quoted_status']['entities']['user_mentions'])):
qt_mentions.append(tweet['quoted_status']['entities']['user_mentions'][index]['screen_name'].lower())
for index in range(len(tweet['quoted_status']['entities']['urls'])):
qt_urls.append(tweet['quoted_status']['entities']['urls'][index]['expanded_url'].lower())


if track_set:
qt_hashtags = set([x.lower() for x in qt_hashtags])
qt_mentions = set([x.lower() for x in qt_mentions])
track_set = set([x.lower() for x in track_set])
tweet["track_kw"]["qt_tweet"]["hashtags"] = list(set(qt_hashtags).intersection(track_set))
tweet["track_kw"]["qt_tweet"]["mentions"] = list(set(qt_mentions).intersection(track_set))
qt_text = re.sub('[%s]' % punct, ' ', tweet['quoted_status']['text'])
qt_text = qt_text.lower().split()
tweet["track_kw"]["qt_tweet"]["text"] = list(set(qt_text).intersection(track_set))

tmpURLs = []
for url in qt_urls:
for x in track_set:
if x in url:
tmpURLs.append(url)
tweet["track_kw"]["qt_tweet"]["urls"] = list(tmpURLs)

#Check Original tweets
if track_set and tweet['truncated'] == False :

myURLs = []
for index in range(len(tweet['entities']['urls'])):
myURLs.append(tweet['entities']['urls'][index]['expanded_url'].lower())

hashTags_set = set([x.lower() for x in tweet['hashtags']])
mentions_set = set([x.lower() for x in tweet['mentions']])
track_set = set([x.lower() for x in track_set])
tweet["track_kw"]["org_tweet"]["hashtags"] = list(set(hashTags_set).intersection(track_set))
tweet["track_kw"]["org_tweet"]["mentions"] = list(set(mentions_set).intersection(track_set))

tweet_text = re.sub('[%s]' % punct, ' ', tweet['text'])
tweet_text = tweet_text.lower().split()
tweet["track_kw"]["org_tweet"]["text"] = list(set(tweet_text).intersection(track_set))
tmpURLs = []
for url in myURLs:
for x in track_set:
if x in url:
tmpURLs.append(url)
tweet["track_kw"]["org_tweet"]["urls"] = list(tmpURLs)

elif track_set and tweet['truncated'] == True :
ext_hashtags = []
ext_mentions = []
ext_urls = []

for index in range(len(tweet['extended_tweet']['entities']['hashtags'])):
ext_hashtags.append(tweet['extended_tweet']['entities']['hashtags'][index]['text'].lower())
for index in range(len(tweet['extended_tweet']['entities']['user_mentions'])):
ext_mentions.append(tweet['extended_tweet']['entities']['user_mentions'][index]['screen_name'].lower())
for index in range(len(tweet['extended_tweet']['entities']['urls'])):
ext_urls.append(tweet['extended_tweet']['entities']['urls'][index]['expanded_url'].lower())

hashTags_set = set([x.lower() for x in ext_hashtags])
mentions_set = set([x.lower() for x in ext_mentions])
track_set = set([x.lower() for x in track_set])
tweet["track_kw"]["org_tweet"]["hashtags"] = list(set(hashTags_set).intersection(track_set))
tweet["track_kw"]["org_tweet"]["mentions"] = list(set(mentions_set).intersection(track_set))
#tweet['track_kw']['hashtags'] = list(set(hashTags_set).intersection(track_set))
#tweet['track_kw']['mentions'] = list(set(mentions_set).intersection(track_set))
#---------------------------------End new code by Dani-------------------------------------------------
tweet_text = re.sub('[%s]' % punct, ' ', tweet['extended_tweet']['full_text'])
tweet_text = tweet_text.lower().split()
tweet["track_kw"]["org_tweet"]["text"] = list(set(tweet_text).intersection(track_set))
tmpURLs = []
for url in ext_urls:
for x in track_set:
if x in url:
tmpURLs.append(url)
tweet["track_kw"]["org_tweet"]["urls"] = list(tmpURLs)


# Convert dates 2012-09-22 00:10:46
# Note that we convert these to a datetime object and then convert back to string
Expand Down
4 changes: 4 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os

# MONGODB CONFIG
AUTH = True
USERNAME = 'USERNAME'
PASSWORD = 'PASSWORD'
# Directory structure config vars
BASEDIR = os.path.abspath(os.path.dirname(__file__))
LOGDIR = BASEDIR + '/out'
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
amqp==1.4.6
anyjson==0.3.3
celery==4.0.2
billiard==3.3.0.19
celery==3.1.17
configparser==3.3.0r2
DateTime==4.0.1
Flask==0.10.1
Flask-WTF==0.11
itsdangerous==0.24
Jinja2==2.7.3
kombu==3.0.24
kombu==3.0.30
logging==0.4.9.6
MarkupSafe==0.23
pymongo==2.7.2
Expand Down

0 comments on commit a251247

Please sign in to comment.