Skip to content

Commit

Permalink
optimised geo_filter.py
Browse files Browse the repository at this point in the history
  • Loading branch information
DE0CH committed Jun 22, 2020
1 parent 0f16b5d commit 1a40272
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 27 deletions.
53 changes: 53 additions & 0 deletions countryfier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import pickle
import json
import multiprocessing
import pycountry


def filter_info(old):
new = {
'created_at': old['created_at'],
'id': old['id'],
'text': old['text'],
'source': old['source'],
'country_code': old['place']['country_code']
}
return new


def file_processor(q):
while True:
file_path = q.get()
with open(file_path) as f:
for line in f:
tweet = filter_info(json.loads(line))
country = tweet['country_code']
if country == '':
continue
if country not in country_tweets:
country_tweets[country] = list()
tweets = country_tweets[country]
tweets.append(tweet)
q.task_done()


if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
country_tweets = manager.dict()
for i in range(os.cpu_count()-1):
p = multiprocessing.Process(target=file_processor, args=(q,), daemon=True)
p.start()
for path, dirs, files in os.walk('processed'):
for file in files:
file_path = os.path.join(path, file)
if file.endswith('.json'):
q.put(file_path)

q.join()
for country, tweets in country_tweets.items():
with open(os.path.join('countries', country + '.json'), 'a') as f:
f.write(json.dumps(tweets))

print('done, safe to ctrl-c if it does not exit automatically')
34 changes: 7 additions & 27 deletions geo_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import pickle


def worker(q, geo_filtered_dict):
def worker(q, ):
while True:
path, file_name = q.get()
process_files(path, file_name, geo_filtered_dict)
process_files(path, file_name)
q.task_done()


def process_files(path, file_name, geo_filtered_dict):
def process_files(path, file_name):
file_name_no_extension, file_extension = os.path.splitext(file_name)
out_file_name = os.path.join('processed', os.path.relpath(path, start='untarred'), file_name_no_extension)
os.makedirs(os.path.dirname(out_file_name), exist_ok=True)
Expand All @@ -34,39 +34,19 @@ def process_files(path, file_name, geo_filtered_dict):
pass
out_file.write(json.dumps(tweets))
logging.info('finished ' + os.path.join(path, file_name))
geo_filtered_dict[os.path.join(path, file_name)] = True


if __name__ == '__main__':
try:
with open('geo_filtered.pkl', 'rb') as f:
geo_filtered = pickle.load(f)
except FileNotFoundError:
geo_filtered = set()
coloredlogs.install()
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
q = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
geo_filtered_dict = manager.dict()
for key in geo_filtered:
geo_filtered_dict[key] = True
try:
os.makedirs('processed')
except FileExistsError:
pass
processes = []
os.makedirs('processed', exist_ok=True)
for i in range(os.cpu_count() - 1):
p = multiprocessing.Process(target=worker, args=(q, geo_filtered_dict))
processes.append(p)
p = multiprocessing.Process(target=worker, args=(q, ), daemon=True)
p.start()
for path, dirs, files in os.walk('untarred'):
for file_name in files:
if not os.path.join(path, file_name) in geo_filtered_dict and file_name.endswith('.json.bz2'):
if file_name.endswith('.json.bz2'):
q.put((path, file_name))
q.join()
geo_filtered = set(geo_filtered_dict.keys())
with open('geo_filtered.pkl', 'wb') as f:
pickle.dump(geo_filtered, f)
print('done, safe to ctrl-c if it does not exit automatically')
for p in processes:
p.terminate()
print('done, safe to ctrl-c if it does not exit automatically')

0 comments on commit 1a40272

Please sign in to comment.