diff --git a/backend/lib/manager.py b/backend/lib/manager.py index b4857e7f4..012b9ac45 100644 --- a/backend/lib/manager.py +++ b/backend/lib/manager.py @@ -101,6 +101,8 @@ def delegate(self): except JobClaimedException: # it's fine pass + else: + self.log.error("Unknown job type: %s" % jobtype) time.sleep(1) diff --git a/backend/lib/scraper.py b/backend/lib/scraper.py index 364190f84..930290e72 100644 --- a/backend/lib/scraper.py +++ b/backend/lib/scraper.py @@ -23,6 +23,7 @@ class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta): log_level = "warning" _logger_method = None + category = "Collector" def __init__(self, job, logger=None, manager=None, modules=None): """ @@ -30,6 +31,13 @@ def __init__(self, job, logger=None, manager=None, modules=None): """ super().__init__(logger=logger, manager=manager, job=job, modules=modules) self.prefix = self.type.split("-")[0] + # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database + # TODO: update database.sql names and create migrate script, then remove this + self.prefix = { + "fourchan": "4chan", + "eightkun": "8kun", + "eightchan": "8chan", + }[self.prefix] if not hasattr(logger, self.log_level): self.log_level = "warning" @@ -67,7 +75,7 @@ def work(self): try: # see if any proxies were configured that would work for this URL protocol = url.split(":")[0] - if protocol in config.get('SCRAPE_PROXIES') and config.get('SCRAPE_PROXIES')[protocol]: + if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]: proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])} else: proxies = None diff --git a/backend/workers/datasource_metrics.py b/backend/workers/datasource_metrics.py index 7fd318ca8..3f20c8676 100644 --- a/backend/workers/datasource_metrics.py +++ b/backend/workers/datasource_metrics.py @@ -58,6 +58,9 @@ def work(self): if not datasource: continue + # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") + database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id + is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False @@ -85,7 +88,7 @@ def work(self): # ------------------------- # Get the name of the posts table for this datasource - posts_table = datasource_id if "posts_" + datasource_id not in all_tables else "posts_" + datasource_id + posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id # Count and update for every board individually for board in boards: @@ -104,8 +107,7 @@ def work(self): # If the datasource is dynamic, we also only update days # that haven't been added yet - these are heavy queries. if not is_static: - - days_added = self.db.fetchall("SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (datasource_id, board)) + days_added = self.db.fetchall("SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (database_db_id, board)) if days_added: @@ -130,8 +132,7 @@ def work(self): FROM %s WHERE %s AND %s GROUP BY metric, datasource, board, date; - """ % (datasource_id, posts_table, board_sql, time_sql) - + """ % (database_db_id, posts_table, board_sql, time_sql) # Add to metrics table rows = [dict(row) for row in self.db.fetchall(query)] diff --git a/common/lib/config_definition.py b/common/lib/config_definition.py index 7418665ec..a2930d077 100644 --- a/common/lib/config_definition.py +++ b/common/lib/config_definition.py @@ -213,6 +213,13 @@ "tooltip": "When enabled, users can request a 4CAT account via the login page if they do not have one, " "provided e-mail settings are configured." }, + "4cat.sphinx_host": { + "type": UserInput.OPTION_TEXT, + "default": "localhost", + "help": "Sphinx host", + "tooltip": "Sphinx is used for full-text search for collected datasources (e.g., 4chan, 8kun, 8chan) and requires additional setup (see 4CAT wiki on GitHub).", + "global": True + }, "logging.slack.level": { "type": UserInput.OPTION_CHOICE, "default": "WARNING", diff --git a/common/lib/dataset.py b/common/lib/dataset.py index 7a1d7ac07..c18289935 100644 --- a/common/lib/dataset.py +++ b/common/lib/dataset.py @@ -174,6 +174,16 @@ def get_results_path(self): """ return self.folder.joinpath(self.data["result_file"]) + def get_results_folder_path(self): + """ + Get path to folder containing accompanying results + + Returns a path that may not yet be created + + :return Path: A path to the results file + """ + return self.folder.joinpath("folder_" + self.key) + def get_log_path(self): """ Get path to dataset log file @@ -537,6 +547,8 @@ def delete(self, commit=True): self.get_results_path().unlink() if self.get_results_path().with_suffix(".log").exists(): self.get_results_path().with_suffix(".log").unlink() + if self.get_results_folder_path().exists(): + shutil.rmtree(self.get_results_folder_path()) except FileNotFoundError: # already deleted, apparently pass diff --git a/common/lib/dmi_service_manager.py b/common/lib/dmi_service_manager.py index 5b52ce1f8..11ee84ce3 100644 --- a/common/lib/dmi_service_manager.py +++ b/common/lib/dmi_service_manager.py @@ -22,6 +22,12 @@ class DmiServiceManagerException(Exception): """ pass +class DsmOutOfMemory(DmiServiceManagerException): + """ + Raised when there is a problem with the configuration settings. + """ + pass + class DmiServiceManager: """ @@ -42,6 +48,21 @@ def __init__(self, processor): self.path_to_files = None self.path_to_results = None + def check_gpu_memory_available(self, service_endpoint): + """ + Returns tuple with True if server has some memory available and False otherwise as well as the JSON response + from server containing the memory information. + """ + api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint + resp = requests.get(api_endpoint, timeout=30) + if resp.status_code == 200: + return True, resp.json() + elif resp.status_code in [400, 404, 500, 503]: + return False, resp.json() + else: + self.processor.log.warning("Unknown response from DMI Service Manager: %s" % resp.text) + return False, None + def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name): """ Process files according to DMI Service Manager local or remote settings @@ -71,7 +92,7 @@ def process_files(self, input_file_dir, filenames, output_file_dir, server_file_ def check_progress(self): if self.local_or_remote == "local": - current_completed = self.count_local_files(self.path_to_results) + current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results)) elif self.local_or_remote == "remote": existing_files = self.request_folder_files(self.server_file_collection_name) current_completed = len(existing_files.get(self.server_results_folder_name, [])) @@ -80,13 +101,16 @@ def check_progress(self): if current_completed != self.processed_files: self.processor.dataset.update_status( - f"Collected text from {current_completed} of {self.num_files_to_process} files") + f"Processed {current_completed} of {self.num_files_to_process} files") self.processor.dataset.update_progress(current_completed / self.num_files_to_process) self.processed_files = current_completed - def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60): + def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True): """ Send request and wait for results to be ready. + + Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False. + If counts the number of files in the output folder and compares it to the number of input files. """ if self.local_or_remote == "local": service_endpoint += "_local" @@ -103,7 +127,11 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period= else: try: resp_json = resp.json() - raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}") + if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists": + # Request already exists + results_url = api_endpoint + "?key=" + resp_json['key'] + else: + raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}") except JSONDecodeError: # Unexpected Error raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}") @@ -125,7 +153,8 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period= if (time.time() - check_time) > wait_period: check_time = time.time() # Update progress - self.check_progress() + if check_process: + self.check_progress() result = requests.get(results_url, timeout=30) if 'status' in result.json().keys() and result.json()['status'] == 'running': @@ -136,6 +165,17 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period= self.processor.dataset.update_status(f"Completed {service_endpoint}!") success = True break + + elif 'returncode' in result.json().keys() and int(result.json()['returncode']) == 1: + # Error + if 'error' in result.json().keys(): + error = result.json()['error'] + if "CUDA error: out of memory" in error: + raise DmiServiceManagerException("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.") + else: + raise DmiServiceManagerException(f"Error {service_endpoint}: " + error) + else: + raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json())) else: # Something botched raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json())) @@ -147,22 +187,32 @@ def process_results(self, local_output_dir): # Output files are already in local directory pass elif self.local_or_remote == "remote": - # Update list of result files - existing_files = self.request_folder_files(self.server_file_collection_name) - result_files = existing_files.get(self.server_results_folder_name, []) - - self.download_results(result_files, self.server_file_collection_name, self.server_results_folder_name, local_output_dir) + results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name) + self.processor.dataset.log(f"Downloading results from {results_path}...") + # Collect result filenames from server + result_files = self.request_folder_files(results_path) + for path, files in result_files.items(): + if path == '.': + self.download_results(files, results_path, local_output_dir) + else: + Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True) + self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path)) def request_folder_files(self, folder_name): """ Request files from a folder on the DMI Service Manager server. """ - filename_url = f"{self.server_address}list_filenames?folder_name={folder_name}" + filename_url = f"{self.server_address}list_filenames/{folder_name}" filename_response = requests.get(filename_url, timeout=30) # Check if 4CAT has access to this PixPlot server if filename_response.status_code == 403: raise DmiServiceManagerException("403: 4CAT does not have permission to use the DMI Service Manager server") + elif filename_response.status_code in [400, 405]: + raise DmiServiceManagerException(f"400: DMI Service Manager server {filename_response.json()['reason']}") + elif filename_response.status_code == 404: + # Folder not found; no files + return {} return filename_response.json() @@ -187,7 +237,7 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi # Check if files have already been sent self.processor.dataset.update_status("Connecting to DMI Service Manager...") existing_files = self.request_folder_files(file_collection_name) - uploaded_files = existing_files.get('files', []) + uploaded_files = existing_files.get('4cat_uploads', []) if len(uploaded_files) > 0: self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files))) @@ -205,7 +255,7 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi self.processor.dataset.update_status(f"Uploading {len(to_upload_filenames)} files") response = requests.post(api_upload_endpoint, - files=[('files', open(dir_with_files.joinpath(file), 'rb')) for file in + files=[('4cat_uploads', open(dir_with_files.joinpath(file), 'rb')) for file in to_upload_filenames] + [ (results_name, open(dir_with_files.joinpath(empty_placeholder), 'rb'))], data=data, timeout=120) @@ -219,12 +269,12 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi else: self.processor.dataset.update_status(f"Unable to upload {len(to_upload_filenames)} files!") - server_path_to_files = Path(file_collection_name).joinpath("files") + server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads") server_path_to_results = Path(file_collection_name).joinpath(results_name) return server_path_to_files, server_path_to_results - def download_results(self, filenames_to_download, file_collection_name, folder_name, local_output_dir): + def download_results(self, filenames_to_download, folder_name, local_output_dir): """ Download results from the DMI Service Manager server. @@ -235,10 +285,11 @@ def download_results(self, filenames_to_download, file_collection_name, folder_n :param Dataset dataset: Dataset object for status updates """ # Download the result files - api_upload_endpoint = f"{self.server_address}uploads/" + api_upload_endpoint = f"{self.server_address}download/" + self.processor.dataset.update_status(f"Downloading {len(filenames_to_download)} from {folder_name}...") for filename in filenames_to_download: - file_response = requests.get(api_upload_endpoint + f"{file_collection_name}/{folder_name}/{filename}", timeout=30) - self.processor.dataset.update_status(f"Downloading {filename}...") + file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=30) + with open(local_output_dir.joinpath(filename), 'wb') as file: file.write(file_response.content) diff --git a/common/lib/module_loader.py b/common/lib/module_loader.py index e0debdaab..24edb233a 100644 --- a/common/lib/module_loader.py +++ b/common/lib/module_loader.py @@ -73,10 +73,14 @@ def is_4cat_class(object, only_processors=False): # but that requires importing the classes themselves, which leads to # circular imports # todo: fix this because this sucks + # agreed - Dale parent_classes = {"BasicWorker", "BasicProcessor", "Search", "SearchWithScope", "Search4Chan", - "ProcessorPreset", "TwitterStatsBase", "BaseFilter", "TwitterAggregatedStats", "ColumnFilter"} + "ProcessorPreset", "TwitterStatsBase", "BaseFilter", "TwitterAggregatedStats", "ColumnFilter", + "BasicJSONScraper", "BoardScraper4chan", "ThreadScraper4chan"} if only_processors: - parent_classes.remove("BasicWorker") + # only allow processors + for worker_only_class in ["BasicWorker", "BasicJSONScraper", "BoardScraper4chan", "ThreadScraper4chan"]: + parent_classes.remove(worker_only_class) return inspect.isclass(object) and \ parent_classes & set([f.__name__ for f in object.__bases__]) and \ diff --git a/datasources/eightchan/README.md b/datasources/eightchan/README.md index 71a85e1b3..e2521a8c4 100644 --- a/datasources/eightchan/README.md +++ b/datasources/eightchan/README.md @@ -1,4 +1,4 @@ # 8chan data source for 4CAT The 8chan data source works much the same as the 4chan data source. Please -refer to `/datasources/fourchan/README.md` for more information. \ No newline at end of file +refer to the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) and the `/datasources/fourchan/README.md` for more information. \ No newline at end of file diff --git a/datasources/eightchan/search_8chan.py b/datasources/eightchan/search_8chan.py index f6349ad60..b3d6702b8 100644 --- a/datasources/eightchan/search_8chan.py +++ b/datasources/eightchan/search_8chan.py @@ -79,11 +79,18 @@ class Search8Chan(Search4Chan): } config = { + "eightchan-search.autoscrape": { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Enable collecting", + "tooltip": "Toggle to automatically collect new boards and threads", + "global": True + }, "eightchan-search.boards": { "type": UserInput.OPTION_TEXT_JSON, "help": "Boards to index", "tooltip": "These boards will be scraped and made available for searching. Provide as a JSON-formatted " - "list of strings, e.g. ['pol', 'v'].", + "list of strings, e.g. [\"pol\", \"v\"].", "default": [""], "global": True }, diff --git a/datasources/eightkun/README.md b/datasources/eightkun/README.md index a627f01ad..68306e7de 100644 --- a/datasources/eightkun/README.md +++ b/datasources/eightkun/README.md @@ -1,9 +1,9 @@ # 8kun data source for 4CAT The 8kun data source works much the same as the 4chan data source. Please -refer to `/datasources/fourchan/README.md` for more information. +refer to the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) and `/datasources/fourchan/README.md` for more information. -It is virtually identical to the 8chan data source also provided. However, +This data source virtually identical to the 8chan data source. However, since 8kun is distinct from 8chan and has a new owner, it serves as a separate data source to allow for changes to the platform without impacting existing 8chan archives. \ No newline at end of file diff --git a/datasources/eightkun/search_8kun.py b/datasources/eightkun/search_8kun.py index bbf08bb1b..e54e69d3f 100644 --- a/datasources/eightkun/search_8kun.py +++ b/datasources/eightkun/search_8kun.py @@ -82,11 +82,18 @@ class Search8Kun(Search4Chan): } config = { + "eightkun-search.autoscrape": { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Enable collecting", + "tooltip": "Toggle to automatically collect new boards and threads", + "global": True + }, "eightkun-search.boards": { "type": UserInput.OPTION_TEXT_JSON, "help": "Boards to index", "tooltip": "These boards will be scraped and made available for searching. Provide as a JSON-formatted " - "list of strings, e.g. ['pol', 'v'].", + "list of strings, e.g. [\"pol\", \"v\"].", "default": [""], "global": True }, diff --git a/datasources/fourchan/README.md b/datasources/fourchan/README.md index eb3741c91..26ad2f77d 100644 --- a/datasources/fourchan/README.md +++ b/datasources/fourchan/README.md @@ -1,11 +1,10 @@ # 4chan data source for 4CAT This data source can be used to allow 4CAT users to interface with 4chan data. -Since 4chan has no API that is useful for 4CAT's purposes, this data source -includes a scraper to locally store 4chan data for subsetting and manipulation. +Since 4chan's data is ephemeral, this data source includes a scraper to locally +store 4chan data. -As such, it requires its own database tables. Run `database.sql` with 4CAT's -PostgreSQL user before enabling this dataset. +Please follow the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) on the 4CAT GitHub to enable this data source. ## Scraping data The scraper requires very little configuration; you only need to set the boards @@ -13,46 +12,19 @@ to scrape. This can be done in the 4CAT settings panel. ## Full-text search This data source also requires a full-text search engine to allow for keyword -search. 4CAT is currently compatible with the [Sphinx](https://sphinxsearch.com) -full-text search engine. We recommend using version 3.3.1 downloadable -[here](sphinxsearch.com/downloads/current). You should make sure this Sphinx instance -is running locally before enabling this data source. -Installing and running Sphinx: -1. [Download the Sphinx 3.3.1 source code](sphinxsearch.com/downloads/current). -2. Create a sphinx directory somewhere, e.g. in the directory of your 4CAT instance -`4cat/sphinx/`. In it, paste all the unzipped contents of the sphinx-3.3.1.zip file -you just downloaded (so that it's filled with the directories `api`, `bin`, etc.). -In the Sphinx directory, also create a folder called `data`, and in this `data` -directory, one called `binlog`. -3. Add a configuration file. You can generate one by running the `generate_sphinx_config.py` -script in the folder `helper-scripts.py`. After running it, a file called `sphinx.conf` -will appear in the `helper-scripts` directory. Copy-paste this file to the `bin` folder -in your Sphinx directory (in the case of the example above: `4cat/sphinx/bin/sphinx.conf`). -4. Generate indexes for the posts that you already collected (if you haven't run any -scrape yet, you can do this later). Generating indexes means Sphinx will create fast -lookup tables so words can be searched quickly. In your command line interface, navigate -to the `bin` directory of your Sphinx installation and run the command `indexer.exe --all`. -This should generate the indexes. -5. Finally, before executing any searches, make sure Sphinx is active by running -`searchd.exe` in your command line interface (once again within the `bin` folder). - -On Windows, you might encounter the error `The code execution cannot proceed because - ssleay32.dll was not found` ([see also this page](https://www.sqlshack.com/getting-started-with-sphinx-search-engine/)). - This can be solved by downloading Sphinx version 3.1.1. and copy-pasting the following - files from the 3.1.1. `bin` directory to your 3.3.1 `bin` directory: -- libeay32.dll -- msvcr120.dll -- ssleay32.dll - +search. 4CAT is currently compatible with the [Sphinx](https://sphinxsearch.com) +full-text search engine. See the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources). ## Importing 4chan data from elsewhere If you want to import 4chan data from elsewhere rather than (or in addition to) -scraping it yourself, two helper scripts are included in `/helper-scripts`: +scraping it yourself, various scripts in `/helper-scripts` allow to import external data: -* `scrape_fuuka.py` can be used to scrape posts from any FoolFuuka-based 4chan - archive. The resulting JSON files can then be imported into the database with +* `scrape_fuuka.py` scrapes posts from any FoolFuuka-based 4chan + archive, like 4plebs. The resulting JSON files can then be imported into the database with `import_json_folder`. -* `import_4plebs.py` can be used to import a data dump from +* `import_4plebs.py` imports data dumps from [4plebs](http://4plebs.org), a 4chan archive that publishes semi-annual data dumps for a number of large boards. -* `import_dump.py` can be used to import csv [files dumped by the 4chan archive archived.moe](https://archive.org/details/archivedmoe_db_201908). \ No newline at end of file +* `import_dump.py` imports [csv files dumped by the 4chan archive archived.moe](https://archive.org/details/archivedmoe_db_201908). +* `import_sqlite_dump.py` imports [4archived data](https://archive.org/download/4archive/4archive_dump-sqlite.7z). +* `import_4chan_csv.py` import data exported from another 4CAT instance. \ No newline at end of file diff --git a/datasources/fourchan/scrapers/scrape_boards.py b/datasources/fourchan/scrapers/scrape_boards.py index 8b238b016..0c794bf7d 100644 --- a/datasources/fourchan/scrapers/scrape_boards.py +++ b/datasources/fourchan/scrapers/scrape_boards.py @@ -147,7 +147,7 @@ def update_unindexed_threads(self, index_thread_ids): # which also updates its deleted/archived status try: # Add a new thread job if it isn't in the jobs table anymore - jobtype = self.prefix + "-thread" + jobtype = self.type.replace("-board", "-thread") query = "SELECT remote_id FROM jobs WHERE remote_id = '%s' AND details = '%s';" % (str(thread["id"]), json.dumps({"board": board_id})) remote_id = self.db.fetchone(query) diff --git a/datasources/fourchan/search_4chan.py b/datasources/fourchan/search_4chan.py index b5da7d44b..17694badc 100644 --- a/datasources/fourchan/search_4chan.py +++ b/datasources/fourchan/search_4chan.py @@ -22,7 +22,7 @@ class Search4Chan(SearchWithScope): """ type = "fourchan-search" # job ID title = "4chan search" - sphinx_index = "4chan" # prefix for sphinx indexes for this data source. Should usually match sphinx.conf + sphinx_index = "4chan" # sphinx index name; this should match the index name in sphinx.conf prefix = "4chan" # table identifier for this datasource; see below for usage is_local = True # Whether this datasource is locally scraped is_static = False # Whether this datasource is still updated @@ -44,7 +44,7 @@ class Search4Chan(SearchWithScope): "intro": { "type": UserInput.OPTION_INFO, "help": "Results are limited to 5 million items maximum. Be sure to read the [query " - "syntax](/data-overview/4chan#query-syntax) for local data sources first - your query design will " + "syntax](/data-overview/fourchan#query-syntax) for local data sources first - your query design will " "significantly impact the results. Note that large queries can take a long time to complete!" }, "board": { @@ -400,11 +400,18 @@ class Search4Chan(SearchWithScope): } config = { + "fourchan-search.autoscrape": { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Enable collecting", + "tooltip": "Toggle to automatically collect new boards and threads", + "global": True + }, "fourchan-search.boards": { "type": UserInput.OPTION_TEXT_JSON, "help": "Boards to index", "tooltip": "These boards will be scraped and made available for searching. Provide as a JSON-formatted " - "list of strings, e.g. ['pol', 'v'].", + "list of strings, e.g. [\"pol\", \"v\"].", "default": [""], "global": True }, @@ -435,7 +442,7 @@ class Search4Chan(SearchWithScope): "help": "Can query without keyword", "default": False, "tooltip": "Allows users to query the 4chan data without specifying a keyword. This can lead to HUGE datasets!" - } + }, } def get_items_simple(self, query): @@ -706,7 +713,7 @@ def fetch_posts(self, post_ids, join="", where=None, replacements=None): if self.interrupted: raise ProcessorInterruptedException("Interrupted while fetching post data") - query = "SELECT " + columns + " FROM posts_" + self.prefix + " " + join + " WHERE " + " AND ".join( + query = "SELECT " + columns + " FROM posts_" + self.sphinx_index + " " + join + " WHERE " + " AND ".join( where) + " ORDER BY id ASC" return self.db.fetchall_interruptable(self.queue, query, replacements) @@ -801,7 +808,7 @@ def get_sphinx_handler(self): :return MySQLDatabase: """ return MySQLDatabase( - host="localhost", + host=config.get("4cat.sphinx_host"), user=config.get('DB_USER'), password=config.get('DB_PASSWORD'), port=9306, diff --git a/datasources/instagram/explorer/instagram-explorer.css b/datasources/instagram/explorer/instagram-explorer.css new file mode 100644 index 000000000..63bc05fb7 --- /dev/null +++ b/datasources/instagram/explorer/instagram-explorer.css @@ -0,0 +1,34 @@ +* { + color: black; +} + +h1 span { + color: white; +} + +body { + background-color: white; +} + +.posts li.post { + max-width: 225px; + background-color: white; + font-family: "Segoe UI", Roboto, Helvetica, Arial, sans-serif; + font-size: 14px; + border-bottom: 1px solid grey; +} + +.posts header { + border: none; +} + +.posts .alt, .posts .alt time { + color: grey; +} + +.posts .post-image { + max-width: 200px; + margin: 0 auto; + margin-top: 30px; + margin-bottom: 30px; +} \ No newline at end of file diff --git a/datasources/instagram/explorer/instagram-explorer.json b/datasources/instagram/explorer/instagram-explorer.json new file mode 100644 index 000000000..9e5935297 --- /dev/null +++ b/datasources/instagram/explorer/instagram-explorer.json @@ -0,0 +1,33 @@ +{ + "ndjson": { + "author": "{{ user.full_name }}", + "body": "{{ caption.text }}", + "image": "retrieve:{{ image_versions2.candidates.url }}", + "likes": "{{ like_count }} likes", + "comments": "{{ comment_count }} comments", + "date": "{{ taken_at | datetime }}", + "external_url": "https://instagram.com/p/{{ code }}", + "type": "{{ product_type }}", + "sort_options": [ + { + "key": "taken_at", + "label": "Old to new" + }, + { + "key": "taken_at", + "label": "New to old", + "descending": true + }, + { + "key": "like_count", + "label": "Likes", + "descending": true + }, + { + "key": "stats.commentCount", + "label": "Comments", + "descending": true + } + ] + } +} \ No newline at end of file diff --git a/datasources/tiktok_urls/search_tiktok_urls.py b/datasources/tiktok_urls/search_tiktok_urls.py index f17af2e44..5fe60b816 100644 --- a/datasources/tiktok_urls/search_tiktok_urls.py +++ b/datasources/tiktok_urls/search_tiktok_urls.py @@ -353,8 +353,13 @@ async def request_metadata(self, urls): try: if sigil.text: metadata = json.loads(sigil.text) - else: + elif sigil.contents and len(sigil.contents) > 0: metadata = json.loads(sigil.contents[0]) + else: + failed += 1 + self.processor.dataset.log( + "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) + continue except json.JSONDecodeError: failed += 1 self.processor.dataset.log( diff --git a/datasources/usenet/README.md b/datasources/usenet/README.md index 6f372df35..e0bb3afa7 100644 --- a/datasources/usenet/README.md +++ b/datasources/usenet/README.md @@ -2,6 +2,8 @@ This data source allows importing and searching archived Usenet messages. +To enable this data source, please follow the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources). + One way of acquiring data is available out of the box - with the script `import_usenet_posts.py` in `helper-scripts` in the 4CAT root folder you can import any message databases created with diff --git a/docker-compose.yml b/docker-compose.yml index 7dfd4981d..039f82a14 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,11 @@ services: - POSTGRES_HOST_AUTH_METHOD=${POSTGRES_HOST_AUTH_METHOD} volumes: - 4cat_db:/var/lib/postgresql/data/ + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U postgres" ] + interval: 5s + timeout: 5s + retries: 5 backend: image: digitalmethodsinitiative/4cat:${DOCKER_TAG} @@ -20,7 +25,8 @@ services: env_file: - .env depends_on: - - db + db: + condition: service_healthy ports: - ${PUBLIC_API_PORT}:4444 volumes: diff --git a/docker-compose_build.yml b/docker-compose_build.yml index 4c29ab8a4..dcb3896c9 100644 --- a/docker-compose_build.yml +++ b/docker-compose_build.yml @@ -11,6 +11,11 @@ services: - POSTGRES_HOST_AUTH_METHOD=${POSTGRES_HOST_AUTH_METHOD} volumes: - 4cat_db:/var/lib/postgresql/data/ + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U postgres" ] + interval: 5s + timeout: 5s + retries: 5 backend: image: 4cat @@ -21,7 +26,8 @@ services: env_file: - .env depends_on: - - db + db: + condition: service_healthy ports: - ${PUBLIC_API_PORT}:4444 volumes: diff --git a/docker-compose_public_ip.yml b/docker-compose_public_ip.yml index 2791416c9..174473ed9 100644 --- a/docker-compose_public_ip.yml +++ b/docker-compose_public_ip.yml @@ -20,6 +20,11 @@ services: - POSTGRES_HOST_AUTH_METHOD=${POSTGRES_HOST_AUTH_METHOD} volumes: - 4cat_db:/var/lib/postgresql/data/ + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U postgres" ] + interval: 5s + timeout: 5s + retries: 5 backend: image: digitalmethodsinitiative/4cat:${DOCKER_TAG} @@ -29,7 +34,8 @@ services: env_file: - .env depends_on: - - db + db: + condition: service_healthy ports: - ${PUBLIC_API_PORT}:4444 volumes: diff --git a/helper-scripts/generate_sphinx_config.py b/helper-scripts/generate_sphinx_config.py index 79cd01339..19e27c528 100644 --- a/helper-scripts/generate_sphinx_config.py +++ b/helper-scripts/generate_sphinx_config.py @@ -76,12 +76,17 @@ defined_sources = regex_source.findall(confsrc) # parse found sources into index definitions - prefix = "" + # this is to ensure index names conform given the change to datasource names + prefixes = {"fourchan": "4chan", "eightchan": "8chan", "eightkun": "8kun"} + if datasource in prefixes: + prefix = prefixes[datasource] + else: + prefix = datasource for source in defined_sources: print("...adding one Sphinx source for data source %s" % datasource_id) sources.append("source %s : 4cat {%s}" % source) name = source[0] - index_name = datasource + "_posts" if "posts" in name else datasource + "_threads" if "threads" in name else False + index_name = prefix + "_posts" if "posts" in name else prefix + "_threads" if "threads" in name else False if not index_name: # we only know how to deal with post and thread sources print("Unrecognized data source %s. Skipping." % name) diff --git a/processors/audio/whisper_speech_to_text.py b/processors/audio/whisper_speech_to_text.py index 72803c0eb..c03af0927 100644 --- a/processors/audio/whisper_speech_to_text.py +++ b/processors/audio/whisper_speech_to_text.py @@ -3,12 +3,9 @@ """ import os import json -import time -import requests -from json import JSONDecodeError from backend.lib.processor import BasicProcessor -from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException +from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException, DsmOutOfMemory from common.lib.exceptions import ProcessorException, ProcessorInterruptedException from common.lib.user_input import UserInput from common.config_manager import config @@ -161,6 +158,15 @@ def process(self): # Initialize DMI Service Manager dmi_service_manager = DmiServiceManager(processor=self) + # Check GPU memory available + gpu_memory, info = dmi_service_manager.check_gpu_memory_available("whisper") + if not gpu_memory: + if info.get("reason") == "GPU not enabled on this instance of DMI Service Manager": + self.dataset.update_status("DMI Service Manager GPU not enabled; using CPU") + elif int(info.get("memory", {}).get("gpu_free_mem", 0)) < 1000000: + self.dataset.finish_with_error("DMI Service Manager currently busy; no GPU memory available. Please try again later.") + return + # Provide audio files to DMI Service Manager # Results should be unique to this dataset results_folder_name = f"texts_{self.dataset.key}" @@ -193,6 +199,10 @@ def process(self): self.dataset.update_status(f"Requesting service from DMI Service Manager...") try: dmi_service_manager.send_request_and_wait_for_results(whisper_endpoint, data, wait_period=30) + except DsmOutOfMemory: + self.dataset.finish_with_error( + "DMI Service Manager ran out of memory; Try decreasing the number of audio files or try again or try again later.") + return except DmiServiceManagerException as e: self.dataset.finish_with_error(str(e)) return diff --git a/processors/conversion/text_from_image.py b/processors/conversion/text_from_image.py index e0b81c961..126a7f3f9 100644 --- a/processors/conversion/text_from_image.py +++ b/processors/conversion/text_from_image.py @@ -3,16 +3,14 @@ The DMI OCR Server can be downloaded seperately here: https://github.com/digitalmethodsinitiative/ocr_server#readme - -Note: if using a Docker hosted OCR Server, the setting in 4CAT Settings for -URL to the OCR server should be "http://host.docker.internal:4000" (or whatever -port you chose). +and is run using the DMI Service Manager """ import requests import json import os from common.config_manager import config +from common.lib.dmi_service_manager import DmiServiceManager, DsmOutOfMemory, DmiServiceManagerException from common.lib.helpers import UserInput, convert_to_int from backend.lib.processor import BasicProcessor from common.lib.exceptions import ProcessorInterruptedException, ProcessorException @@ -49,19 +47,23 @@ class ImageTextDetector(BasicProcessor): ] config = { - "text-from-images.server_url": { - "type": UserInput.OPTION_TEXT, - "default": "", - "help": 'URL to the OCR server', - "tooltip": "URL to the API endpoint of a version of the DMI OCR server (more info at https://github.com/digitalmethodsinitiative/ocr_server)", - } + "dmi-service-manager.ea_ocr-intro-1": { + "type": UserInput.OPTION_INFO, + "help": "OCR (optical character recognition) allows text in images to be identified and extracted. Use our [prebuilt OCR image](https://github.com/digitalmethodsinitiative/ocr_server) with different available models.", + }, + "dmi-service-manager.eb_ocr_enabled": { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Enable OCR processor", + }, } options = { "amount": { "type": UserInput.OPTION_TEXT, "help": "Images to process (0 = all)", - "default": 0 + "default": 0, + "coerce_type": int, }, "model_type": { "type": UserInput.OPTION_CHOICE, @@ -87,7 +89,9 @@ def is_compatible_with(cls, module=None, user=None): :param module: Module to determine compatibility with """ - return module.type.startswith("image-downloader") and config.get('text-from-images.server_url', False, user=user) + return config.get('dmi-service-manager.eb_ocr_enabled', False, user=user) and \ + config.get("dmi-service-manager.ab_server_address", False, user=user) and \ + module.type.startswith("image-downloader") def process(self): """ @@ -95,68 +99,115 @@ def process(self): following structure: """ - max_images = convert_to_int(self.parameters.get("amount", 0), 100) - total = self.source_dataset.num_rows if not max_images else min(max_images, self.source_dataset.num_rows) - done = 0 + if self.source_dataset.num_rows == 0: + self.dataset.finish_with_error("No images available.") + return + + # Unpack the images into a staging_area + self.dataset.update_status("Unzipping images") + staging_area = self.unpack_archive_contents(self.source_file) + + # Collect filenames (skip .json metadata files) + image_filenames = [filename for filename in os.listdir(staging_area) if + filename.split('.')[-1] not in ["json", "log"]] + if int(self.parameters.get("amount", 100)) != 0: + image_filenames = image_filenames[:int(self.parameters.get("amount", 100))] + total_image_files = len(image_filenames) + + # Make output dir + output_dir = self.dataset.get_staging_area() + + # Initialize DMI Service Manager + dmi_service_manager = DmiServiceManager(processor=self) + + # Results should be unique to this dataset + server_results_folder_name = f"4cat_results_{self.dataset.key}" + # Files can be based on the parent dataset (to avoid uploading the same files multiple times) + file_collection_name = dmi_service_manager.get_folder_name(self.source_dataset) + + # Process the image files (upload to server if needed) + path_to_files, path_to_results = dmi_service_manager.process_files(input_file_dir=staging_area, + filenames=image_filenames, + output_file_dir=output_dir, + server_file_collection_name=file_collection_name, + server_results_folder_name=server_results_folder_name) + + # Arguments for the OCR server + data = {'args': ['--model', self.parameters.get("model_type"), + '--output_dir', f"data/{path_to_results}", + '--images']} + data["args"].extend([f"data/{path_to_files.joinpath(filename)}" for filename in image_filenames]) + + # Send request to DMI Service Manager + self.dataset.update_status(f"Requesting service from DMI Service Manager...") + api_endpoint = "ocr" + try: + dmi_service_manager.send_request_and_wait_for_results(api_endpoint, data, wait_period=30, + check_process=True) + except DsmOutOfMemory: + self.dataset.finish_with_error( + "DMI Service Manager ran out of memory; Try decreasing the number of images or try again or try again later.") + return + except DmiServiceManagerException as e: + self.dataset.finish_with_error(str(e)) + return + + self.dataset.update_status("Processing OCR results...") + # Download the result files if necessary + dmi_service_manager.process_results(output_dir) + + # Load the metadata from the archive + image_metadata = {} + with open(os.path.join(staging_area, '.metadata.json')) as file: + image_data = json.load(file) + for url, data in image_data.items(): + if data.get('success'): + data.update({"url": url}) + image_metadata[data['filename']] = data # Check if we need to collect data for updating the original dataset update_original = self.parameters.get("update_original", False) if update_original: - # We need to unpack the archive to get the metadata - # If we use the file from iterate_archive_contents() we may not have the metadata for the first few files - staging_area = self.unpack_archive_contents(self.source_file) - # Load the metadata from the archive - with open(os.path.join(staging_area, '.metadata.json')) as file: - image_data = json.load(file) - filename_to_post_id = {} - for url, data in image_data.items(): - if data.get('success'): - filename_to_post_id[data.get('filename')] = data.get('post_ids') - del image_data - - # And something to store the results + filename_to_post_id = {} + for url, data in image_data.items(): + if data.get('success'): + filename_to_post_id[data.get('filename')] = data.get('post_ids') post_id_to_results = {} - else: - staging_area = None - - for image_file in self.iterate_archive_contents(self.source_file, staging_area=staging_area): - if self.interrupted: - raise ProcessorInterruptedException("Interrupted while fetching data from Google Vision API") - - if image_file.name == '.metadata.json': - continue - - done += 1 - self.dataset.update_status("Annotating image %i/%i" % (done, total)) - self.dataset.update_progress(done / total) - - annotations = self.annotate_image(image_file) - - if not annotations: - continue - - annotations = {"file_name": image_file.name, **annotations} - - # Collect annotations for updating the original dataset - if update_original: - # Need to include filename as there may be many images to a single post - detected_text = '%s:"""%s"""' % (image_file.name, annotations.get('simplified_text', {}).get('raw_text', '')) - post_ids = filename_to_post_id[image_file.name] - for post_id in post_ids: - # Posts can have multiple images - if post_id in post_id_to_results.keys(): - post_id_to_results[post_id].append(detected_text) - else: - post_id_to_results[post_id] = [detected_text] - - with self.dataset.get_results_path().open("a", encoding="utf-8") as outfile: - outfile.write(json.dumps(annotations) + "\n") - - if max_images and done >= max_images: - break - - self.dataset.update_status("Annotations retrieved for %i images" % done) + # Save files as NDJSON, then use map_item for 4CAT to interact + processed = 0 + with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as outfile: + for result_filename in os.listdir(output_dir): + if self.interrupted: + raise ProcessorInterruptedException("Interrupted while writing results to file") + + self.dataset.log(f"Writing {result_filename}...") + with open(output_dir.joinpath(result_filename), "r") as result_file: + result_data = json.loads(''.join(result_file)) + image_name = result_data.get("filename") + + # Collect annotations for updating the original dataset + if update_original: + # Need to include filename as there may be many images to a single post + detected_text = '%s:"""%s"""' % (image_name, result_data.get('simplified_text', {}).get('raw_text', '')) + + post_ids = filename_to_post_id[image_name] + for post_id in post_ids: + # Posts can have multiple images + if post_id in post_id_to_results.keys(): + post_id_to_results[post_id].append(detected_text) + else: + post_id_to_results[post_id] = [detected_text] + + data = { + "id": image_name, + **result_data, + "image_metadata": image_metadata.get(image_name, {}) if image_metadata else {}, + } + outfile.write(json.dumps(data) + "\n") + + processed += 1 + self.dataset.update_status("Annotations retrieved for %i images" % processed) # Update the original dataset with the detected text if requested if update_original: @@ -168,55 +219,18 @@ def process(self): detected_text_column.append('\n'.join(post_id_to_results.get(post.get('id'), []))) try: - self.add_field_to_parent(field_name='detexted_text', + self.add_field_to_parent(field_name='4CAT_detexted_text', new_data=detected_text_column, which_parent=self.dataset.top_parent()) except ProcessorException as e: self.dataset.update_status("Error updating parent dataset: %s" % e) - self.dataset.finish(done) - - def annotate_image(self, image_file): - """ - Get annotations from the DMI OCR server - - :param Path image_file: Path to file to annotate - :return dict: Lists of detected features, one key for each feature - """ - server = self.config.get('text-from-images.server_url', '') - - # Get model_type if available - parameters = {} - model_type = self.parameters.get("model_type") - if model_type: - parameters['model_type'] = model_type - - if not server: - raise ProcessorException('DMI OCR server not configured') - - with image_file.open("rb") as infile: - try: - api_request = requests.post(server.rstrip('/') + '/api/detect_text', files={'image': infile}, data=parameters, timeout=30) - except requests.exceptions.ConnectionError as e: - message = f"Unable to establish connection to OCR server {e}. 4CAT admins notified; your processor will continue when issue is resolved." - self.dataset.update_status(message) - raise ProcessorException(message) - - if api_request.status_code != 200: - self.dataset.update_status("Got response code %i from DMI OCR server for image %s: %s" % (api_request.status_code, image_file.name, api_request.content)) - return None - - try: - response = api_request.json() - except (json.JSONDecodeError, KeyError): - self.dataset.update_status("Got an improperly formatted response from DMI OCR server for image %s, skipping" % image_file.name) - return None - - return response + self.dataset.update_status(f"Detected speech in {processed} of {total_image_files} images") + self.dataset.finish(processed) @staticmethod def map_item(item): """ For preview frontend """ - return {'filename': item.get('filename'), 'text':item.get('simplified_text').get('raw_text')} + return {"filename": item.get("filename"), "model_type": item.get("model_type"), "text": item.get("simplified_text", {}).get("raw_text"), "post_ids": ", ".join([str(post_id) for post_id in item.get("image_metadata", {}).get("post_ids", [])]), "image_url": item.get("image_metadata", {}).get("url")} diff --git a/processors/conversion/view_metadata.py b/processors/conversion/view_metadata.py index b5f3f6e18..ab3d53f95 100644 --- a/processors/conversion/view_metadata.py +++ b/processors/conversion/view_metadata.py @@ -43,7 +43,7 @@ def is_compatible_with(cls, module=None, user=None): :param module: Module to determine compatibility with """ - return module.type.startswith("video-downloader") + return module.type.startswith("video-downloader") or module.type.startswith("image-downloader") def process(self): """ diff --git a/processors/presets/neologisms.py b/processors/presets/neologisms.py index 6f75b3655..2c106152c 100644 --- a/processors/presets/neologisms.py +++ b/processors/presets/neologisms.py @@ -19,14 +19,34 @@ class NeologismExtractor(ProcessorPreset): references = ["Van Soest, Jeroen. 2019. 'Language Innovation Tracker: Detecting language innovation in online discussion fora.' (MA thesis), Beuls, K. (Promotor), Van Eecke, P. (Advisor).'"] - options = { - "timeframe": { - "type": UserInput.OPTION_CHOICE, - "default": "month", - "options": {"all": "Overall", "year": "Year", "month": "Month", "week": "Week", "day": "Day"}, - "help": "Extract neologisms per" + @classmethod + def get_options(cls, parent_dataset=None, user=None): + """ + Get processor options + """ + options = { + "timeframe": { + "type": UserInput.OPTION_CHOICE, + "default": "month", + "options": {"all": "Overall", "year": "Year", "month": "Month", "week": "Week", "day": "Day"}, + "help": "Extract neologisms per" + }, + "columns": { + "type": UserInput.OPTION_TEXT, + "help": "Column(s) from which to extract neologisms", + "tooltip": "Each enabled column will be treated as a separate item to tokenise. Columns must contain text." + }, } - } + if parent_dataset and parent_dataset.get_columns(): + columns = parent_dataset.get_columns() + options["columns"]["type"] = UserInput.OPTION_MULTI + options["columns"]["inline"] = True + options["columns"]["options"] = {v: v for v in columns} + default_options = [default for default in ["body", "text", "subject"] if default in columns] + if default_options: + options["columns"]["default"] = default_options.pop(0) + + return options def get_processor_pipeline(self): """ @@ -35,6 +55,7 @@ def get_processor_pipeline(self): ranking is used as the result of this processor, once available. """ timeframe = self.parameters.get("timeframe") + columns = self.parameters.get("columns") pipeline = [ # first, tokenise the posts, excluding all common words @@ -45,6 +66,7 @@ def get_processor_pipeline(self): "strip_symbols": True, "lemmatise": False, "docs_per": timeframe, + "columns": columns, "filter": ["wordlist-googlebooks-english", "stopwords-iso-all"] } }, diff --git a/processors/text-analysis/tokenise.py b/processors/text-analysis/tokenise.py index 8883567d7..15e0386cd 100644 --- a/processors/text-analysis/tokenise.py +++ b/processors/text-analysis/tokenise.py @@ -170,7 +170,7 @@ def get_options(cls, parent_dataset=None, user=None): options["columns"]["options"] = {v: v for v in columns} default_options = [default for default in ["body", "text", "subject"] if default in columns] if default_options: - options["columns"]["default"] = default_options.pop() + options["columns"]["default"] = default_options.pop(0) return options diff --git a/processors/visualisation/clip_categorize_images.py b/processors/visualisation/clip_categorize_images.py index 372f4318b..dcf967eab 100644 --- a/processors/visualisation/clip_categorize_images.py +++ b/processors/visualisation/clip_categorize_images.py @@ -11,7 +11,7 @@ from backend.lib.processor import BasicProcessor -from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException +from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException, DsmOutOfMemory from common.lib.exceptions import ProcessorException, ProcessorInterruptedException from common.lib.user_input import UserInput from common.config_manager import config @@ -150,6 +150,16 @@ def process(self): # Initialize DMI Service Manager dmi_service_manager = DmiServiceManager(processor=self) + # Check GPU memory available + gpu_memory, info = dmi_service_manager.check_gpu_memory_available("clip") + if not gpu_memory: + if info.get("reason") == "GPU not enabled on this instance of DMI Service Manager": + self.dataset.update_status("DMI Service Manager GPU not enabled; using CPU") + elif int(info.get("memory", {}).get("gpu_free_mem", 0)) < 1000000: + self.dataset.finish_with_error( + "DMI Service Manager currently busy; no GPU memory available. Please try again later.") + return + # Results should be unique to this dataset results_folder_name = f"texts_{self.dataset.key}" # Files can be based on the parent dataset (to avoid uploading the same files multiple times) @@ -173,16 +183,25 @@ def process(self): api_endpoint = "clip" try: dmi_service_manager.send_request_and_wait_for_results(api_endpoint, data, wait_period=30) + except DsmOutOfMemory: + self.dataset.finish_with_error( + "DMI Service Manager ran out of memory; Try decreasing the number of images or try again or try again later.") + return except DmiServiceManagerException as e: self.dataset.finish_with_error(str(e)) return # Load the video metadata if available - image_metadata = None + image_metadata = {} if staging_area.joinpath(".metadata.json").is_file(): with open(staging_area.joinpath(".metadata.json")) as file: - image_metadata = json.load(file) + image_data = json.load(file) self.dataset.log("Found and loaded image metadata") + for url, data in image_data.items(): + if data.get('success'): + data.update({"url": url}) + # using the filename without extension as the key; since that is how the results form their filename + image_metadata[".".join(data['filename'].split(".")[:-1])] = data self.dataset.update_status("Processing CLIP results...") # Download the result files @@ -202,7 +221,6 @@ def process(self): data = { "id": image_name, "categories": result_data, - # TODO: need to pass along filename/videoname/postid/SOMETHING consistent "image_metadata": image_metadata.get(image_name, {}) if image_metadata else {}, } outfile.write(json.dumps(data) + "\n") @@ -232,7 +250,7 @@ def map_item(item): "top_categories": ", ".join([f"{cat[0]}: {100* cat[1]:.2f}%" for cat in top_cats]), "original_url": image_metadata.get("url", ""), "image_filename": image_metadata.get("filename", ""), - "post_ids": ", ".join(image_metadata.get("post_ids", [])), + "post_ids": ", ".join([str(post_id) for post_id in image_metadata.get("post_ids", [])]), "from_dataset": image_metadata.get("from_dataset", ""), **all_cats } diff --git a/processors/visualisation/download-telegram-images.py b/processors/visualisation/download-telegram-images.py index d825ab675..8ad504712 100644 --- a/processors/visualisation/download-telegram-images.py +++ b/processors/visualisation/download-telegram-images.py @@ -221,4 +221,22 @@ def cancel_start(): raise a RuntimeError. This will be caught and the user will be told they need to re-authenticate via 4CAT. """ - raise RuntimeError("Connection cancelled") \ No newline at end of file + raise RuntimeError("Connection cancelled") + + @staticmethod + def map_metadata(filename, data): + """ + Iterator to yield modified metadata for CSV + + :param str url: string that may contain URLs + :param dict data: dictionary with metadata collected previously + :yield dict: iterator containing reformated metadata + """ + row = { + "number_of_posts_with_image": len(data.get("post_ids", [])), + "post_ids": ", ".join(data.get("post_ids", [])), + "filename": filename, + "download_successful": data.get('success', "") + } + + yield row diff --git a/processors/visualisation/download_images.py b/processors/visualisation/download_images.py index 0ddad601c..caa867925 100644 --- a/processors/visualisation/download_images.py +++ b/processors/visualisation/download_images.py @@ -347,7 +347,7 @@ def process(self): metadata = { url: { "filename": url_file_map.get(url), - "success": not url_file_map.get(url) is None and url not in failures, # skipped and fails are NOT success + "success": not url_file_map.get(url) is None and url not in failures, # skipped and fails are NOT success "from_dataset": self.source_dataset.key, "post_ids": urls[url] } for url in urls @@ -575,3 +575,22 @@ def request_get_w_error_handling(self, url, retries=0, **kwargs): raise FileNotFoundError() return response + + @staticmethod + def map_metadata(url, data): + """ + Iterator to yield modified metadata for CSV + + :param str url: string that may contain URLs + :param dict data: dictionary with metadata collected previously + :yield dict: iterator containing reformated metadata + """ + row = { + "url": url, + "number_of_posts_with_url": len(data.get("post_ids", [])), + "post_ids": ", ".join(data.get("post_ids", [])), + "filename": data.get("filename"), + "download_successful": data.get('success', "") + } + + yield row diff --git a/processors/visualisation/download_tiktok.py b/processors/visualisation/download_tiktok.py index 163aad771..e9c9a933c 100644 --- a/processors/visualisation/download_tiktok.py +++ b/processors/visualisation/download_tiktok.py @@ -392,3 +392,22 @@ def collect_image(url, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_ extension = response.headers["Content-Type"].split("/")[-1] return picture, extension + + @staticmethod + def map_metadata(url, data): + """ + Iterator to yield modified metadata for CSV + + :param str url: string that may contain URLs + :param dict data: dictionary with metadata collected previously + :yield dict: iterator containing reformated metadata + """ + row = { + "url": url, + "number_of_posts_with_url": len(data.get("post_ids", [])), + "post_ids": ", ".join(data.get("post_ids", [])), + "filename": data.get("filename"), + "download_successful": data.get('success', "") + } + + yield row diff --git a/processors/visualisation/image_category_wall.py b/processors/visualisation/image_category_wall.py index 1aa5f92af..43e216397 100644 --- a/processors/visualisation/image_category_wall.py +++ b/processors/visualisation/image_category_wall.py @@ -94,7 +94,7 @@ def get_options(cls, parent_dataset=None, user=None): } default_options = [default for default in ["top_categories", "impression_count", "category", "type"] if default in parent_columns] if default_options: - options["category"]["default"] = default_options.pop() + options["category"]["default"] = default_options.pop(0) return options @@ -196,7 +196,7 @@ def process(self): if category_type == str: post_category = post.get(category_column) - if post_category is "": + if post_category == "": post_category = "None" if post_category not in categories: categories[post_category] = [{"id": post.get("id")}] @@ -259,7 +259,14 @@ def process(self): # Drop categories with no images (ranges may have no images) categories = {cat: images for cat, images in categories.items() if images} self.dataset.log(f"Found {len(categories)} categories") + # TODO: this is semi arbitrary; max_images is only ever hit if each category is evenly sized + # If we break, when max_images is hit, categories are not representative and the last categories will be empty + # Instead, we could calculate each category's proportional size and then use that to determine how many images + # to take from each category while remaining under max_images images_per_category = max(max_images // len(categories), 1) + # Could do something like this, but it also appears to cut smaller categories off uncessarily + # total_images = sum([len(images) for images in categories.values()]) + # max_images_per_categories = {cat: max(math.ceil((len(images)/total_images) * max_images), 1) for cat, images in categories.items()} # Create SVG with categories and images base_height = self.parameters.get("height", 100) @@ -280,7 +287,18 @@ def process(self): offset_w = 0 for i, image in enumerate(images): - if i > images_per_category: + if i >= images_per_category: + remaining = f"+ {len(images) - images_per_category} more images" + footersize = (fontsize * (len(remaining) + 2) * 0.5925, fontsize * 2) + footer_shape = SVG(insert=(offset_w, base_height/2 - footersize[1]), size=footersize) + footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) + label_element = Text(insert=("50%", "50%"), text=remaining, dominant_baseline="middle", + text_anchor="middle", fill="#FFF", style="font-size:%ipx" % fontsize) + footer_shape.add(label_element) + category_image.add(footer_shape) + offset_w += footersize[0] + + category_widths[category] += footersize[0] break image_filename = filename_map.get(image.get("id")) diff --git a/processors/visualisation/pix-plot.py b/processors/visualisation/pix-plot.py index 1b53f0f51..c436191e3 100644 --- a/processors/visualisation/pix-plot.py +++ b/processors/visualisation/pix-plot.py @@ -1,14 +1,7 @@ """ Create an PixPlot of downloaded images - -Use http://host.docker.internal:4000 to connect to docker hosted PixPlot on -same server (assuming that container is exposing port 4000). """ import shutil -from json import JSONDecodeError - -import requests -import time import json from datetime import datetime import csv @@ -17,7 +10,7 @@ from werkzeug.utils import secure_filename from common.config_manager import config -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.dmi_service_manager import DmiServiceManager, DsmOutOfMemory, DmiServiceManagerException from common.lib.helpers import UserInput, convert_to_int from backend.lib.processor import BasicProcessor @@ -50,24 +43,25 @@ class PixPlotGenerator(BasicProcessor): min_photos_needed = 12 config = { - # If you host a version of https://github.com/digitalmethodsinitiative/dmi_pix_plot, you can use a processor to publish - # downloaded images into a PixPlot there - 'pix-plot.server_url': { - 'type': UserInput.OPTION_TEXT, - 'default': "", - 'help': 'PixPlot Server Address/URL', - 'tooltip': "", + "dmi-service-manager.da_pixplot-intro-1": { + "type": UserInput.OPTION_INFO, + "help": "Explore images with [Yale Digital Humanities Lab Team's PixPlot](https://github.com/digitalmethodsinitiative/dmi_pix_plot).", + }, + "dmi-service-manager.db_pixplot_enabled": { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Enable PixPlot Image Viewer", }, - "pix-plot.max_images": { + "dmi-service-manager.dc_pixplot_num_files": { "type": UserInput.OPTION_TEXT, "coerce_type": int, - "default": 10000, - "help": "Max images to upload", - "tooltip": "Only allow uploading up to this many images per plot. Increasing this can easily lead to " - "very long-running processors and large datasets. 0 allows as many images as available." - } + "default": 0, + "help": "PixPlot max number of images", + "tooltip": "Use '0' to allow unlimited number" + }, } + @classmethod def get_options(cls, parent_dataset=None, user=None): # Update the amount max and help from config @@ -125,7 +119,7 @@ def get_options(cls, parent_dataset=None, user=None): }, } - max_number_images = int(config.get("pix-plot.max_images", 10000, user=user)) + max_number_images = int(config.get("dmi-service-manager.dc_pixplot_num_files", 10000, user=user)) if max_number_images == 0: options["amount"]["help"] = options["amount"]["help"] + " (max: all available)" options["amount"]["min"] = 0 @@ -145,7 +139,9 @@ def is_compatible_with(cls, module=None, user=None): :param module: Dataset or processor to determine compatibility with """ - return module.type.startswith("image-downloader") and config.get('pix-plot.server_url') + return config.get("dmi-service-manager.db_pixplot_enabled", False, user=user) and \ + config.get("dmi-service-manager.ab_server_address", False, user=user) and \ + module.type.startswith("image-downloader") def process(self): """ @@ -160,50 +156,19 @@ def process(self): self.dataset.finish(0) return - # 0 = use as many images as in the archive, up to the max - max_images = convert_to_int(self.parameters.get("amount"), 1000) - if max_images == 0: - max_images = None - - # Get labels to send PixPlot server - date = datetime.now().strftime("%Y-%m-%d-%H%M%S") - top_dataset = self.dataset.top_parent() - label_formated = ''.join(e if e.isalnum() else '_' for e in top_dataset.get_label()) - image_label = datetime.fromtimestamp(self.source_dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + label_formated + '-' + str(top_dataset.key) - plot_label = date + '-' + label_formated + '-' + str(self.dataset.key) - pixplot_server = self.config.get('pix-plot.server_url').rstrip("/") - - # Folder name is PixPlot identifier and set at dataset key - data = {'folder_name': image_label} - - # Check if images have already been sent - filename_url = pixplot_server + '/api/list_filenames?folder_name=' + image_label - filename_response = requests.get(filename_url, timeout=30) - - # Check if 4CAT has access to this PixPlot server - if filename_response.status_code == 403: - self.dataset.update_status("403: 4CAT does not have permission to use this PixPlot server", is_final=True) - self.dataset.finish(0) - return - - uploaded_files = filename_response.json().get('filenames', []) - if len(uploaded_files) > 0: - self.dataset.update_status("Found %i images previously uploaded" % (len(uploaded_files))) - - # Images # Unpack the images into a staging_area self.dataset.update_status("Unzipping images") staging_area = self.unpack_archive_contents(self.source_file) - self.log.info('PixPlot image staging area created: ' + str(staging_area)) - filenames = os.listdir(staging_area) - # Compare photos with upload images - filenames = [filename for filename in filenames if - filename not in uploaded_files + ['.metadata.json', 'metadata.csv']] - total_images = len(filenames) + len(uploaded_files) + # Collect filenames (skip .json metadata files) + image_filenames = [filename for filename in os.listdir(staging_area) if + filename.split('.')[-1] not in ["json", "log"]] + if self.parameters.get("amount", 100) != 0: + image_filenames = image_filenames[:self.parameters.get("amount", 100)] + total_image_files = len(image_filenames) # Check to ensure enough photos will be uploaded to create a PixPlot - if total_images < self.min_photos_needed: + if total_image_files < self.min_photos_needed: self.dataset.update_status( "Minimum of %i photos needed for a PixPlot to be created" % self.min_photos_needed, is_final=True) self.dataset.finish(0) @@ -212,116 +177,56 @@ def process(self): # Gather metadata self.dataset.update_status("Collecting metadata") metadata_file_path = self.format_metadata(staging_area) - # Metadata - upload_url = pixplot_server + '/api/send_metadata' - metadata_response = requests.post(upload_url, files={'metadata': open(metadata_file_path, 'rb')}, data=data, timeout=120) - - # Now send photos to PixPlot - self.dataset.update_status("Uploading images to PixPlot") - # Configure upload photo url - upload_url = pixplot_server + '/api/send_photo' - images_uploaded = 0 - estimated_num_images = len(filenames) - self.dataset.update_status("Uploading %i images" % (estimated_num_images)) - # Begin looping through photos - for i, filename in enumerate(filenames): - if self.interrupted: - raise ProcessorInterruptedException("Interrupted while downloading images.") - - if max_images is not None and i > max_images: - break - with open(os.path.join(staging_area, filename), 'rb') as image: - response = requests.post(upload_url, files={'image': image}, data=data, timeout=120) - - if response.status_code == 200: - image_response = response - images_uploaded += 1 - if images_uploaded % 100 == 0: - self.dataset.update_status("Images uploaded: %i of %i" % (i, estimated_num_images)) - else: - self.dataset.update_status( - "Error with image %s: %i - %s" % (filename, response.status_code, response.reason)) - - self.dataset.update_progress(i / self.source_dataset.num_rows) - - # Request PixPlot server create PixPlot - self.dataset.update_status("Sending create PixPlot request") - create_plot_url = pixplot_server + '/api/pixplot' - # Gather info from PixPlot server response - create_pixplot_post_info = metadata_response.json()['create_pixplot_post_info'] + + # Make output dir + output_dir = self.dataset.get_results_folder_path() + output_dir.mkdir(exist_ok=True) + + # Initialize DMI Service Manager + dmi_service_manager = DmiServiceManager(processor=self) + + # Results should be unique to this dataset + server_results_folder_name = f"4cat_results_{self.dataset.key}" + # Files can be based on the parent dataset (to avoid uploading the same files multiple times) + file_collection_name = dmi_service_manager.get_folder_name(self.source_dataset) + + path_to_files, path_to_results = dmi_service_manager.process_files(staging_area, image_filenames + [metadata_file_path], output_dir, + file_collection_name, server_results_folder_name) + + # PixPlot # Create json package for creation request - json_data = {'args': ['--images', create_pixplot_post_info.get('images_folder') + "/*", - '--out_dir', create_pixplot_post_info.get('plot_folder_root') + '/' + plot_label, - '--metadata', create_pixplot_post_info.get('metadata_filepath')]} + data = {'args': ['--images', f"data/{path_to_files}/*", + '--out_dir', f"data/{path_to_results}", + '--metadata', f"data/{path_to_files}/{metadata_file_path.name}"]} # Additional options for PixPlot cell_size = self.parameters.get('image_size') n_neighbors = self.parameters.get('n_neighbors') min_dist = self.parameters.get('min_dist') - json_data['args'] += ['--cell_size', str(cell_size), '--n_neighbors', str(n_neighbors), '--min_dist', - str(min_dist)] + data['args'] += ['--cell_size', str(cell_size), '--n_neighbors', str(n_neighbors), '--min_dist', str(min_dist)] # Increase timeout (default is 3600 seconds) - json_data['timeout'] = 21600 - - # Send; receives response that process has started - resp = requests.post(create_plot_url, json=json_data, timeout=30) - if resp.status_code == 202: - # new request - new_request = True - results_url = self.config.get('pix-plot.server_url').rstrip('/') + '/api/pixplot?key=' + resp.json()['key'] - else: - try: - resp_json = resp.json() - except JSONDecodeError as e: - # Unexpected Error - self.log.error('PixPlot create response: ' + str(resp.status_code) + ': ' + str(resp.text)) - if staging_area: - shutil.rmtree(staging_area) - raise RuntimeError("PixPlot unable to process request") - - if resp.status_code == 202: - # new request - new_request = True - results_url = pixplot_server + '/api/pixplot?key=' + resp.json()['key'] - elif 'already exists' in resp.json()['error']: - # repeat request - new_request = False - else: - self.log.error('PixPlot create response: ' + str(resp.status_code) + ': ' + str(resp.text)) - if staging_area: - shutil.rmtree(staging_area) - raise RuntimeError("PixPlot unable to process request") - - # Wait for PixPlot to complete - self.dataset.update_status("PixPlot generating results") - start_time = time.time() - while new_request: - time.sleep(1) - # If interrupted is called, attempt to finish dataset while PixPlot server still running - if self.interrupted: - break - - # Send request to check status every 60 seconds - if int(time.time() - start_time) % 60 == 0: - result = requests.get(results_url, timeout=30) - self.log.debug(str(result.json())) - if 'status' in result.json().keys() and result.json()['status'] == 'running': - # Still running - continue - elif 'report' in result.json().keys() and result.json()['report'][-6:-1] == 'Done!': - # Complete without error - self.dataset.update_status("PixPlot Completed!") - self.log.info('PixPlot saved on : ' + pixplot_server) - break - else: - # Something botched - self.dataset.finish_with_error("PixPlot Error on creation") - self.log.error("PixPlot Error: " + str(result.json())) - return - - # Create HTML file - plot_url = pixplot_server + '/plots/' + plot_label + '/index.html' + data['timeout'] = 21600 + + # Send request to DMI Service Manager + self.dataset.update_status(f"Requesting service from DMI Service Manager...") + api_endpoint = "pixplot" + try: + dmi_service_manager.send_request_and_wait_for_results(api_endpoint, data, wait_period=30, check_process=False) + except DsmOutOfMemory: + self.dataset.finish_with_error( + "DMI Service Manager ran out of memory; Try decreasing the number of images or try again or try again later.") + return + except DmiServiceManagerException as e: + self.dataset.finish_with_error(str(e)) + return + + self.dataset.update_status("Processing PixPlot results...") + # Download the result files + dmi_service_manager.process_results(output_dir) + + # Results HTML file redirects to output_dir/index.html + plot_url = ('https://' if config.get("flask.https") else 'http://') + config.get("flask.server_name") + '/result/' + f"{os.path.relpath(self.dataset.get_results_folder_path(), self.dataset.folder)}/index.html" html_file = self.get_html_page(plot_url) # Write HTML file @@ -382,6 +287,10 @@ def format_metadata(self, temp_path): ids = data.get('post_ids') # dmi_pix_plot API uses sercure_filename while pixplot.py (in PixPlot library) uses clean_filename # Ensure our metadata filenames match results + if data.get('filename') is None: + # Bad metadata; file was not actually downloaded, fixed in 9b603cd1ecdf97fd92c3e1c6200e4b6700dc1e37 + continue + filename = self.clean_filename(secure_filename(data.get('filename'))) for post_id in ids: # Add to key diff --git a/webtool/templates/explorer/post.html b/webtool/templates/explorer/post.html index a083b47f9..ac6827fc9 100644 --- a/webtool/templates/explorer/post.html +++ b/webtool/templates/explorer/post.html @@ -33,7 +33,7 @@ {% if 'thread_id' in post %} {% if is_local %}{{ post.thread_id }}{% else %}{{ post.thread_id }}{% endif %} {% endif %} - {{ post.id }} + {% if 'timestamp' in post %} {% if post.timestamp is integer %} {{ post.timestamp|datetime('%Y-%m-%d %H:%M')|safe }} diff --git a/webtool/views/api_explorer.py b/webtool/views/api_explorer.py index c19519754..1665929d1 100644 --- a/webtool/views/api_explorer.py +++ b/webtool/views/api_explorer.py @@ -666,9 +666,6 @@ def get_custom_fields(datasource, filetype=None): datasource_dir = datasource.replace("4", "four") elif datasource.startswith("8"): datasource_dir = datasource.replace("8", "eight") - elif "facebook" in datasource or "instagram" in datasource: - datasource_dir = "import-from-tool" - datasource = "import-from-tool" elif datasource == "twitter": datasource_dir = "twitter-import" datasource = "twitter-import" diff --git a/webtool/views/views_dataset.py b/webtool/views/views_dataset.py index 06f42c268..f31366b11 100644 --- a/webtool/views/views_dataset.py +++ b/webtool/views/views_dataset.py @@ -154,7 +154,7 @@ def show_results(page): """ Downloading results """ -@app.route('/result/') +@app.route('/result/') def get_result(query_file): """ Get dataset result file @@ -163,8 +163,8 @@ def get_result(query_file): :return: Result file :rmime: text/csv """ - directory = str(config.get('PATH_ROOT').joinpath(config.get('PATH_DATA'))) - return send_from_directory(directory=directory, path=query_file) + path = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA')).joinpath(query_file) + return send_from_directory(directory=path.parent, path=path.name) @app.route('/mapped-result//') diff --git a/webtool/views/views_misc.py b/webtool/views/views_misc.py index 251fff63e..1160e6ab0 100644 --- a/webtool/views/views_misc.py +++ b/webtool/views/views_misc.py @@ -133,6 +133,8 @@ def data_overview(datasource=None): datasource_id = datasource worker_class = backend.all_modules.workers.get(datasource_id + "-search") + # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") + database_db_id = worker_class.prefix if hasattr(worker_class, "prefix") else datasource_id # Get description description_path = Path(datasources[datasource_id].get("path"), "DESCRIPTION.md") @@ -156,7 +158,7 @@ def data_overview(datasource=None): # Get daily post counts for local datasource to display in a graph if is_local == "local": - total_counts = db.fetchall("SELECT board, SUM(count) AS post_count FROM metrics WHERE metric = 'posts_per_day' AND datasource = %s GROUP BY board", (datasource_id,)) + total_counts = db.fetchall("SELECT board, SUM(count) AS post_count FROM metrics WHERE metric = 'posts_per_day' AND datasource = %s GROUP BY board", (database_db_id,)) if total_counts: @@ -165,7 +167,7 @@ def data_overview(datasource=None): boards = set(total_counts.keys()) # Fetch date counts per board from the database - db_counts = db.fetchall("SELECT board, date, count FROM metrics WHERE metric = 'posts_per_day' AND datasource = %s", (datasource_id,)) + db_counts = db.fetchall("SELECT board, date, count FROM metrics WHERE metric = 'posts_per_day' AND datasource = %s", (database_db_id,)) # Get the first and last days for padding all_dates = [datetime.strptime(row["date"], "%Y-%m-%d").timestamp() for row in db_counts]