From d687b8014789d9809b2e564be57eef4eae14124e Mon Sep 17 00:00:00 2001 From: DE0CH Date: Sun, 21 Jun 2020 00:38:35 +0800 Subject: [PATCH] updated downloader so concurrent --- download_manager.py | 76 +++++++++++++++++++++++++++------------------ geo_filter.py | 24 ++++++++------ requirements.txt | 2 -- untarpper.py | 4 ++- 4 files changed, 63 insertions(+), 43 deletions(-) diff --git a/download_manager.py b/download_manager.py index 4e36dcd..0ee73ec 100644 --- a/download_manager.py +++ b/download_manager.py @@ -6,37 +6,51 @@ import coloredlogs import logging import os +import multiprocessing + + +def worker(queue): + while True: + url = queue.get() + download_file(url) + queue.task_done() + + +def download_file(url): + url = url.strip() + if not url: + return + f_name = url.rsplit('/', 1)[-1] + f_path = path.join('downloads', f_name) + if os.path.isfile(f_path): + logging.debug('file already downloaded: ' + url) + return + print(url) + logging.info('started downloading: ' + url) + # below is downloading content + + with open(f_path, 'wb') as f: + response = requests.get(url, allow_redirects=True, stream=True) + total_length = response.headers.get('content-length') + + if total_length is None: # no content length header + f.write(response.content) + else: + for data in response.iter_content(chunk_size=4096): + f.write(data) + logging.info('finished downloading: ' + url) + + if __name__ == '__main__': coloredlogs.install() - logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) - with open('archive_links.txt') as links_f: + logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) + queue = multiprocessing.JoinableQueue() + for i in range(50): + multiprocessing.Process(target=worker, args=(queue,), daemon=True).start() + + with open('archive_links_raw.txt') as links_f: for url in links_f: - url = url.strip() - if not url: - continue - f_name = url.rsplit('/', 1)[-1] - f_path = path.join('downloads', f_name) - if os.path.isfile(f_path): - logging.info('file already downloaded: ' + url) - continue - logging.info('started downloading: ' + url) - # below is downloading content - - with open(f_path, 'wb') as f: - response = requests.get(url, allow_redirects=True, stream=True) - total_length = response.headers.get('content-length') - - if total_length is None: # no content length header - f.write(response.content) - else: - total_length = int(total_length) - with progressbar.ProgressBar(max_value=total_length) as bar: - dl = 0 - for data in response.iter_content(chunk_size=4096): - dl += len(data) - f.write(data) - # noinspection PyBroadException - try: - bar.update(dl) - except: - pass + queue.put(url) + + queue.join() + print('done') diff --git a/geo_filter.py b/geo_filter.py index 420ee50..409fd84 100644 --- a/geo_filter.py +++ b/geo_filter.py @@ -2,22 +2,23 @@ import json import os import bz2 -import threading +import multiprocessing import coloredlogs, logging -import queue import pickle import googletrans -def worker(): + +def worker(q, geo_filtered_dict): + translator = googletrans.Translator() while True: path, dirs, files = q.get() - process_files(path, dirs, files) + process_files(path, dirs, files, geo_filtered_dict, translator) q.task_done() -def process_files(path, dirs, files): +def process_files(path, dirs, files, geo_filtered_dict, translator): for file_name in files: - if os.path.join(path, file_name) in geo_filtered: + if os.path.join(path, file_name) in geo_filtered_dict: continue file_name_no_extension, file_extension = os.path.splitext(file_name) out_file_name = os.path.join('processed', os.path.relpath(path, start='untarred'), file_name_no_extension) @@ -49,7 +50,7 @@ def process_files(path, dirs, files): except Exception: logging.exception('failed to process tweet') logging.info('finished ' + os.path.join(path, file_name)) - geo_filtered.add(os.path.join(path, file_name)) + geo_filtered_dict[os.path.join(path, file_name)] = True if __name__ == '__main__': @@ -60,13 +61,17 @@ def process_files(path, dirs, files): geo_filtered = set() coloredlogs.install() logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) - q = queue.Queue() + q = multiprocessing.JoinableQueue() + manager = multiprocessing.Manager() + geo_filtered_dict = manager.dict() + for key in geo_filtered: + geo_filtered_dict[key] = True try: os.makedirs('processed') except FileExistsError: pass for i in range(10): - t = threading.Thread(target=worker, daemon=True).start() + multiprocessing.Process(target=worker, args=(q, geo_filtered_dict)).start() total_num = 0 filtered_num = 0 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'twitter-sentiment-analysis-f22ce784b0a8.json' @@ -79,6 +84,7 @@ def process_files(path, dirs, files): q.put((path, dirs, files)) q.join() + with open('geo_filtered.pkl', 'wb') as f: pickle.dump(geo_filtered, f) print('done') diff --git a/requirements.txt b/requirements.txt index 7c82c0b..c9dca7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,7 @@ ipython requests googletrans -googlemaps boto3 progressbar2 -google-cloud-translate==2.0.1 coloredlogs pycountry diff --git a/untarpper.py b/untarpper.py index b87029d..5db264a 100644 --- a/untarpper.py +++ b/untarpper.py @@ -22,6 +22,7 @@ def untar_file(): logging.info(f'message: {file_path} -- {out}') elif err: logging.error(f'failed: {file_path} -- {err}') + failed_files.write(file_path + '\n') else: logging.info(f'untarred: {file_path}') untarred.add(file_path) @@ -32,6 +33,7 @@ def untar_file(): coloredlogs.install() logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) q = queue.Queue() + failed_files = open('untarred_failed.txt', 'w') for i in range(10): threading.Thread(target=untar_file, daemon=True).start() try: @@ -51,6 +53,6 @@ def untar_file(): pickle.dump(untarred, f) q.join() - print('done') + failed_files.close()