Skip to content

Commit

Permalink
updated downloader so concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
DE0CH committed Jun 20, 2020
1 parent 5daadd8 commit d687b80
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 43 deletions.
76 changes: 45 additions & 31 deletions download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,51 @@
import coloredlogs
import logging
import os
import multiprocessing


def worker(queue):
while True:
url = queue.get()
download_file(url)
queue.task_done()


def download_file(url):
url = url.strip()
if not url:
return
f_name = url.rsplit('/', 1)[-1]
f_path = path.join('downloads', f_name)
if os.path.isfile(f_path):
logging.debug('file already downloaded: ' + url)
return
print(url)
logging.info('started downloading: ' + url)
# below is downloading content

with open(f_path, 'wb') as f:
response = requests.get(url, allow_redirects=True, stream=True)
total_length = response.headers.get('content-length')

if total_length is None: # no content length header
f.write(response.content)
else:
for data in response.iter_content(chunk_size=4096):
f.write(data)
logging.info('finished downloading: ' + url)


if __name__ == '__main__':
coloredlogs.install()
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
with open('archive_links.txt') as links_f:
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)
queue = multiprocessing.JoinableQueue()
for i in range(50):
multiprocessing.Process(target=worker, args=(queue,), daemon=True).start()

with open('archive_links_raw.txt') as links_f:
for url in links_f:
url = url.strip()
if not url:
continue
f_name = url.rsplit('/', 1)[-1]
f_path = path.join('downloads', f_name)
if os.path.isfile(f_path):
logging.info('file already downloaded: ' + url)
continue
logging.info('started downloading: ' + url)
# below is downloading content

with open(f_path, 'wb') as f:
response = requests.get(url, allow_redirects=True, stream=True)
total_length = response.headers.get('content-length')

if total_length is None: # no content length header
f.write(response.content)
else:
total_length = int(total_length)
with progressbar.ProgressBar(max_value=total_length) as bar:
dl = 0
for data in response.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
# noinspection PyBroadException
try:
bar.update(dl)
except:
pass
queue.put(url)

queue.join()
print('done')
24 changes: 15 additions & 9 deletions geo_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@
import json
import os
import bz2
import threading
import multiprocessing
import coloredlogs, logging
import queue
import pickle
import googletrans

def worker():

def worker(q, geo_filtered_dict):
translator = googletrans.Translator()
while True:
path, dirs, files = q.get()
process_files(path, dirs, files)
process_files(path, dirs, files, geo_filtered_dict, translator)
q.task_done()


def process_files(path, dirs, files):
def process_files(path, dirs, files, geo_filtered_dict, translator):
for file_name in files:
if os.path.join(path, file_name) in geo_filtered:
if os.path.join(path, file_name) in geo_filtered_dict:
continue
file_name_no_extension, file_extension = os.path.splitext(file_name)
out_file_name = os.path.join('processed', os.path.relpath(path, start='untarred'), file_name_no_extension)
Expand Down Expand Up @@ -49,7 +50,7 @@ def process_files(path, dirs, files):
except Exception:
logging.exception('failed to process tweet')
logging.info('finished ' + os.path.join(path, file_name))
geo_filtered.add(os.path.join(path, file_name))
geo_filtered_dict[os.path.join(path, file_name)] = True


if __name__ == '__main__':
Expand All @@ -60,13 +61,17 @@ def process_files(path, dirs, files):
geo_filtered = set()
coloredlogs.install()
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
q = queue.Queue()
q = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
geo_filtered_dict = manager.dict()
for key in geo_filtered:
geo_filtered_dict[key] = True
try:
os.makedirs('processed')
except FileExistsError:
pass
for i in range(10):
t = threading.Thread(target=worker, daemon=True).start()
multiprocessing.Process(target=worker, args=(q, geo_filtered_dict)).start()
total_num = 0
filtered_num = 0
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'twitter-sentiment-analysis-f22ce784b0a8.json'
Expand All @@ -79,6 +84,7 @@ def process_files(path, dirs, files):
q.put((path, dirs, files))

q.join()

with open('geo_filtered.pkl', 'wb') as f:
pickle.dump(geo_filtered, f)
print('done')
2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
ipython
requests
googletrans
googlemaps
boto3
progressbar2
google-cloud-translate==2.0.1
coloredlogs
pycountry
4 changes: 3 additions & 1 deletion untarpper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def untar_file():
logging.info(f'message: {file_path} -- {out}')
elif err:
logging.error(f'failed: {file_path} -- {err}')
failed_files.write(file_path + '\n')
else:
logging.info(f'untarred: {file_path}')
untarred.add(file_path)
Expand All @@ -32,6 +33,7 @@ def untar_file():
coloredlogs.install()
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
q = queue.Queue()
failed_files = open('untarred_failed.txt', 'w')
for i in range(10):
threading.Thread(target=untar_file, daemon=True).start()
try:
Expand All @@ -51,6 +53,6 @@ def untar_file():
pickle.dump(untarred, f)

q.join()
print('done')
failed_files.close()


0 comments on commit d687b80

Please sign in to comment.