Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
stijn-uva committed Aug 21, 2023
2 parents 88fd2d3 + 225b57a commit 844006e
Show file tree
Hide file tree
Showing 37 changed files with 609 additions and 388 deletions.
2 changes: 2 additions & 0 deletions backend/lib/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def delegate(self):
except JobClaimedException:
# it's fine
pass
else:
self.log.error("Unknown job type: %s" % jobtype)

time.sleep(1)

Expand Down
10 changes: 9 additions & 1 deletion backend/lib/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@ class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta):

log_level = "warning"
_logger_method = None
category = "Collector"

def __init__(self, job, logger=None, manager=None, modules=None):
"""
Set up database connection - we need one to store the thread data
"""
super().__init__(logger=logger, manager=manager, job=job, modules=modules)
self.prefix = self.type.split("-")[0]
# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
# TODO: update database.sql names and create migrate script, then remove this
self.prefix = {
"fourchan": "4chan",
"eightkun": "8kun",
"eightchan": "8chan",
}[self.prefix]

if not hasattr(logger, self.log_level):
self.log_level = "warning"
Expand Down Expand Up @@ -67,7 +75,7 @@ def work(self):
try:
# see if any proxies were configured that would work for this URL
protocol = url.split(":")[0]
if protocol in config.get('SCRAPE_PROXIES') and config.get('SCRAPE_PROXIES')[protocol]:
if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]:
proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])}
else:
proxies = None
Expand Down
11 changes: 6 additions & 5 deletions backend/workers/datasource_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def work(self):
if not datasource:
continue

# Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id

is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False

Expand Down Expand Up @@ -85,7 +88,7 @@ def work(self):
# -------------------------

# Get the name of the posts table for this datasource
posts_table = datasource_id if "posts_" + datasource_id not in all_tables else "posts_" + datasource_id
posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id

# Count and update for every board individually
for board in boards:
Expand All @@ -104,8 +107,7 @@ def work(self):
# If the datasource is dynamic, we also only update days
# that haven't been added yet - these are heavy queries.
if not is_static:

days_added = self.db.fetchall("SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (datasource_id, board))
days_added = self.db.fetchall("SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (database_db_id, board))

if days_added:

Expand All @@ -130,8 +132,7 @@ def work(self):
FROM %s
WHERE %s AND %s
GROUP BY metric, datasource, board, date;
""" % (datasource_id, posts_table, board_sql, time_sql)

""" % (database_db_id, posts_table, board_sql, time_sql)
# Add to metrics table
rows = [dict(row) for row in self.db.fetchall(query)]

Expand Down
7 changes: 7 additions & 0 deletions common/lib/config_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@
"tooltip": "When enabled, users can request a 4CAT account via the login page if they do not have one, "
"provided e-mail settings are configured."
},
"4cat.sphinx_host": {
"type": UserInput.OPTION_TEXT,
"default": "localhost",
"help": "Sphinx host",
"tooltip": "Sphinx is used for full-text search for collected datasources (e.g., 4chan, 8kun, 8chan) and requires additional setup (see 4CAT wiki on GitHub).",
"global": True
},
"logging.slack.level": {
"type": UserInput.OPTION_CHOICE,
"default": "WARNING",
Expand Down
12 changes: 12 additions & 0 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ def get_results_path(self):
"""
return self.folder.joinpath(self.data["result_file"])

def get_results_folder_path(self):
"""
Get path to folder containing accompanying results
Returns a path that may not yet be created
:return Path: A path to the results file
"""
return self.folder.joinpath("folder_" + self.key)

def get_log_path(self):
"""
Get path to dataset log file
Expand Down Expand Up @@ -537,6 +547,8 @@ def delete(self, commit=True):
self.get_results_path().unlink()
if self.get_results_path().with_suffix(".log").exists():
self.get_results_path().with_suffix(".log").unlink()
if self.get_results_folder_path().exists():
shutil.rmtree(self.get_results_folder_path())
except FileNotFoundError:
# already deleted, apparently
pass
Expand Down
87 changes: 69 additions & 18 deletions common/lib/dmi_service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class DmiServiceManagerException(Exception):
"""
pass

class DsmOutOfMemory(DmiServiceManagerException):
"""
Raised when there is a problem with the configuration settings.
"""
pass


class DmiServiceManager:
"""
Expand All @@ -42,6 +48,21 @@ def __init__(self, processor):
self.path_to_files = None
self.path_to_results = None

def check_gpu_memory_available(self, service_endpoint):
"""
Returns tuple with True if server has some memory available and False otherwise as well as the JSON response
from server containing the memory information.
"""
api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint
resp = requests.get(api_endpoint, timeout=30)
if resp.status_code == 200:
return True, resp.json()
elif resp.status_code in [400, 404, 500, 503]:
return False, resp.json()
else:
self.processor.log.warning("Unknown response from DMI Service Manager: %s" % resp.text)
return False, None

def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
"""
Process files according to DMI Service Manager local or remote settings
Expand Down Expand Up @@ -71,7 +92,7 @@ def process_files(self, input_file_dir, filenames, output_file_dir, server_file_

def check_progress(self):
if self.local_or_remote == "local":
current_completed = self.count_local_files(self.path_to_results)
current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results))
elif self.local_or_remote == "remote":
existing_files = self.request_folder_files(self.server_file_collection_name)
current_completed = len(existing_files.get(self.server_results_folder_name, []))
Expand All @@ -80,13 +101,16 @@ def check_progress(self):

if current_completed != self.processed_files:
self.processor.dataset.update_status(
f"Collected text from {current_completed} of {self.num_files_to_process} files")
f"Processed {current_completed} of {self.num_files_to_process} files")
self.processor.dataset.update_progress(current_completed / self.num_files_to_process)
self.processed_files = current_completed

def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60):
def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True):
"""
Send request and wait for results to be ready.
Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False.
If counts the number of files in the output folder and compares it to the number of input files.
"""
if self.local_or_remote == "local":
service_endpoint += "_local"
Expand All @@ -103,7 +127,11 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
else:
try:
resp_json = resp.json()
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
# Request already exists
results_url = api_endpoint + "?key=" + resp_json['key']
else:
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
except JSONDecodeError:
# Unexpected Error
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")
Expand All @@ -125,7 +153,8 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
if (time.time() - check_time) > wait_period:
check_time = time.time()
# Update progress
self.check_progress()
if check_process:
self.check_progress()

result = requests.get(results_url, timeout=30)
if 'status' in result.json().keys() and result.json()['status'] == 'running':
Expand All @@ -136,6 +165,17 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
self.processor.dataset.update_status(f"Completed {service_endpoint}!")
success = True
break

elif 'returncode' in result.json().keys() and int(result.json()['returncode']) == 1:
# Error
if 'error' in result.json().keys():
error = result.json()['error']
if "CUDA error: out of memory" in error:
raise DmiServiceManagerException("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.")
else:
raise DmiServiceManagerException(f"Error {service_endpoint}: " + error)
else:
raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
else:
# Something botched
raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
Expand All @@ -147,22 +187,32 @@ def process_results(self, local_output_dir):
# Output files are already in local directory
pass
elif self.local_or_remote == "remote":
# Update list of result files
existing_files = self.request_folder_files(self.server_file_collection_name)
result_files = existing_files.get(self.server_results_folder_name, [])

self.download_results(result_files, self.server_file_collection_name, self.server_results_folder_name, local_output_dir)
results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name)
self.processor.dataset.log(f"Downloading results from {results_path}...")
# Collect result filenames from server
result_files = self.request_folder_files(results_path)
for path, files in result_files.items():
if path == '.':
self.download_results(files, results_path, local_output_dir)
else:
Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True)
self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path))

def request_folder_files(self, folder_name):
"""
Request files from a folder on the DMI Service Manager server.
"""
filename_url = f"{self.server_address}list_filenames?folder_name={folder_name}"
filename_url = f"{self.server_address}list_filenames/{folder_name}"
filename_response = requests.get(filename_url, timeout=30)

# Check if 4CAT has access to this PixPlot server
if filename_response.status_code == 403:
raise DmiServiceManagerException("403: 4CAT does not have permission to use the DMI Service Manager server")
elif filename_response.status_code in [400, 405]:
raise DmiServiceManagerException(f"400: DMI Service Manager server {filename_response.json()['reason']}")
elif filename_response.status_code == 404:
# Folder not found; no files
return {}

return filename_response.json()

Expand All @@ -187,7 +237,7 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi
# Check if files have already been sent
self.processor.dataset.update_status("Connecting to DMI Service Manager...")
existing_files = self.request_folder_files(file_collection_name)
uploaded_files = existing_files.get('files', [])
uploaded_files = existing_files.get('4cat_uploads', [])
if len(uploaded_files) > 0:
self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files)))

Expand All @@ -205,7 +255,7 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi

self.processor.dataset.update_status(f"Uploading {len(to_upload_filenames)} files")
response = requests.post(api_upload_endpoint,
files=[('files', open(dir_with_files.joinpath(file), 'rb')) for file in
files=[('4cat_uploads', open(dir_with_files.joinpath(file), 'rb')) for file in
to_upload_filenames] + [
(results_name, open(dir_with_files.joinpath(empty_placeholder), 'rb'))],
data=data, timeout=120)
Expand All @@ -219,12 +269,12 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi
else:
self.processor.dataset.update_status(f"Unable to upload {len(to_upload_filenames)} files!")

server_path_to_files = Path(file_collection_name).joinpath("files")
server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads")
server_path_to_results = Path(file_collection_name).joinpath(results_name)

return server_path_to_files, server_path_to_results

def download_results(self, filenames_to_download, file_collection_name, folder_name, local_output_dir):
def download_results(self, filenames_to_download, folder_name, local_output_dir):
"""
Download results from the DMI Service Manager server.
Expand All @@ -235,10 +285,11 @@ def download_results(self, filenames_to_download, file_collection_name, folder_n
:param Dataset dataset: Dataset object for status updates
"""
# Download the result files
api_upload_endpoint = f"{self.server_address}uploads/"
api_upload_endpoint = f"{self.server_address}download/"
self.processor.dataset.update_status(f"Downloading {len(filenames_to_download)} from {folder_name}...")
for filename in filenames_to_download:
file_response = requests.get(api_upload_endpoint + f"{file_collection_name}/{folder_name}/{filename}", timeout=30)
self.processor.dataset.update_status(f"Downloading {filename}...")
file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=30)

with open(local_output_dir.joinpath(filename), 'wb') as file:
file.write(file_response.content)

Expand Down
8 changes: 6 additions & 2 deletions common/lib/module_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ def is_4cat_class(object, only_processors=False):
# but that requires importing the classes themselves, which leads to
# circular imports
# todo: fix this because this sucks
# agreed - Dale
parent_classes = {"BasicWorker", "BasicProcessor", "Search", "SearchWithScope", "Search4Chan",
"ProcessorPreset", "TwitterStatsBase", "BaseFilter", "TwitterAggregatedStats", "ColumnFilter"}
"ProcessorPreset", "TwitterStatsBase", "BaseFilter", "TwitterAggregatedStats", "ColumnFilter",
"BasicJSONScraper", "BoardScraper4chan", "ThreadScraper4chan"}
if only_processors:
parent_classes.remove("BasicWorker")
# only allow processors
for worker_only_class in ["BasicWorker", "BasicJSONScraper", "BoardScraper4chan", "ThreadScraper4chan"]:
parent_classes.remove(worker_only_class)

return inspect.isclass(object) and \
parent_classes & set([f.__name__ for f in object.__bases__]) and \
Expand Down
2 changes: 1 addition & 1 deletion datasources/eightchan/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 8chan data source for 4CAT

The 8chan data source works much the same as the 4chan data source. Please
refer to `/datasources/fourchan/README.md` for more information.
refer to the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) and the `/datasources/fourchan/README.md` for more information.
9 changes: 8 additions & 1 deletion datasources/eightchan/search_8chan.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,18 @@ class Search8Chan(Search4Chan):
}

config = {
"eightchan-search.autoscrape": {
"type": UserInput.OPTION_TOGGLE,
"default": False,
"help": "Enable collecting",
"tooltip": "Toggle to automatically collect new boards and threads",
"global": True
},
"eightchan-search.boards": {
"type": UserInput.OPTION_TEXT_JSON,
"help": "Boards to index",
"tooltip": "These boards will be scraped and made available for searching. Provide as a JSON-formatted "
"list of strings, e.g. ['pol', 'v'].",
"list of strings, e.g. [\"pol\", \"v\"].",
"default": [""],
"global": True
},
Expand Down
4 changes: 2 additions & 2 deletions datasources/eightkun/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# 8kun data source for 4CAT

The 8kun data source works much the same as the 4chan data source. Please
refer to `/datasources/fourchan/README.md` for more information.
refer to the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) and `/datasources/fourchan/README.md` for more information.

It is virtually identical to the 8chan data source also provided. However,
This data source virtually identical to the 8chan data source. However,
since 8kun is distinct from 8chan and has a new owner, it serves as a
separate data source to allow for changes to the platform without impacting
existing 8chan archives.
9 changes: 8 additions & 1 deletion datasources/eightkun/search_8kun.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ class Search8Kun(Search4Chan):
}

config = {
"eightkun-search.autoscrape": {
"type": UserInput.OPTION_TOGGLE,
"default": False,
"help": "Enable collecting",
"tooltip": "Toggle to automatically collect new boards and threads",
"global": True
},
"eightkun-search.boards": {
"type": UserInput.OPTION_TEXT_JSON,
"help": "Boards to index",
"tooltip": "These boards will be scraped and made available for searching. Provide as a JSON-formatted "
"list of strings, e.g. ['pol', 'v'].",
"list of strings, e.g. [\"pol\", \"v\"].",
"default": [""],
"global": True
},
Expand Down
Loading

0 comments on commit 844006e

Please sign in to comment.