diff --git a/countryfier.py b/countryfier.py new file mode 100644 index 0000000..cb2f239 --- /dev/null +++ b/countryfier.py @@ -0,0 +1,53 @@ +import os +import pickle +import json +import multiprocessing +import pycountry + + +def filter_info(old): + new = { + 'created_at': old['created_at'], + 'id': old['id'], + 'text': old['text'], + 'source': old['source'], + 'country_code': old['place']['country_code'] + } + return new + + +def file_processor(q): + while True: + file_path = q.get() + with open(file_path) as f: + for line in f: + tweet = filter_info(json.loads(line)) + country = tweet['country_code'] + if country == '': + continue + if country not in country_tweets: + country_tweets[country] = list() + tweets = country_tweets[country] + tweets.append(tweet) + q.task_done() + + +if __name__ == '__main__': + q = multiprocessing.JoinableQueue() + manager = multiprocessing.Manager() + country_tweets = manager.dict() + for i in range(os.cpu_count()-1): + p = multiprocessing.Process(target=file_processor, args=(q,), daemon=True) + p.start() + for path, dirs, files in os.walk('processed'): + for file in files: + file_path = os.path.join(path, file) + if file.endswith('.json'): + q.put(file_path) + + q.join() + for country, tweets in country_tweets.items(): + with open(os.path.join('countries', country + '.json'), 'a') as f: + f.write(json.dumps(tweets)) + + print('done, safe to ctrl-c if it does not exit automatically') diff --git a/geo_filter.py b/geo_filter.py index 1440f96..7290701 100644 --- a/geo_filter.py +++ b/geo_filter.py @@ -7,14 +7,14 @@ import pickle -def worker(q, geo_filtered_dict): +def worker(q, ): while True: path, file_name = q.get() - process_files(path, file_name, geo_filtered_dict) + process_files(path, file_name) q.task_done() -def process_files(path, file_name, geo_filtered_dict): +def process_files(path, file_name): file_name_no_extension, file_extension = os.path.splitext(file_name) out_file_name = os.path.join('processed', os.path.relpath(path, start='untarred'), file_name_no_extension) os.makedirs(os.path.dirname(out_file_name), exist_ok=True) @@ -34,39 +34,19 @@ def process_files(path, file_name, geo_filtered_dict): pass out_file.write(json.dumps(tweets)) logging.info('finished ' + os.path.join(path, file_name)) - geo_filtered_dict[os.path.join(path, file_name)] = True if __name__ == '__main__': - try: - with open('geo_filtered.pkl', 'rb') as f: - geo_filtered = pickle.load(f) - except FileNotFoundError: - geo_filtered = set() coloredlogs.install() logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) q = multiprocessing.JoinableQueue() - manager = multiprocessing.Manager() - geo_filtered_dict = manager.dict() - for key in geo_filtered: - geo_filtered_dict[key] = True - try: - os.makedirs('processed') - except FileExistsError: - pass - processes = [] + os.makedirs('processed', exist_ok=True) for i in range(os.cpu_count() - 1): - p = multiprocessing.Process(target=worker, args=(q, geo_filtered_dict)) - processes.append(p) + p = multiprocessing.Process(target=worker, args=(q, ), daemon=True) p.start() for path, dirs, files in os.walk('untarred'): for file_name in files: - if not os.path.join(path, file_name) in geo_filtered_dict and file_name.endswith('.json.bz2'): + if file_name.endswith('.json.bz2'): q.put((path, file_name)) q.join() - geo_filtered = set(geo_filtered_dict.keys()) - with open('geo_filtered.pkl', 'wb') as f: - pickle.dump(geo_filtered, f) - print('done, safe to ctrl-c if it does not exit automatically') - for p in processes: - p.terminate() + print('done, safe to ctrl-c if it does not exit automatically') \ No newline at end of file