Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map item catch #365

Merged
merged 18 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from common.lib.dataset import DataSet
from common.lib.fourcat_module import FourcatModule
from common.lib.helpers import get_software_version, remove_nuls
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, MapItemException
from common.config_manager import config, ConfigWrapper


csv.field_size_limit(1024 * 1024 * 1024)


Expand Down Expand Up @@ -625,6 +626,42 @@ def create_standalone(self):

return standalone

@classmethod
def map_item_method_available(cls, dataset):
"""
Checks if map_item method exists and is compatible with dataset. If dataset does not have an extension,
returns False
:param BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
:param DataSet dataset: The DataSet object with which to use map_item
"""
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
# converted to an ndjson-based data source, for example
# todo: this is kind of ugly, and a better fix may be possible
dataset_extension = dataset.get_extension()
if not dataset_extension:
# DataSet results file does not exist or has no extension, use expected extension
if hasattr(dataset, "extension"):
dataset_extension = dataset.extension
else:
# No known DataSet extension; cannot determine if map_item method compatible
return False

return hasattr(cls, "map_item") and cls.extension == dataset_extension

@classmethod
def get_mapped_item(cls, item):
"""
Get the mapped item using a processors map_item method.
Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
"""
mapped_item = cls.map_item(item)
if not mapped_item:
raise MapItemException("Unable to map item!")
return mapped_item

@classmethod
def is_filter(cls):
"""
Expand Down
33 changes: 28 additions & 5 deletions backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from common.lib.dataset import DataSet
from backend.lib.processor import BasicProcessor
from common.lib.helpers import strip_tags, dict_search_and_update, remove_nuls, HashCache
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, MapItemException


class Search(BasicProcessor, ABC):
Expand Down Expand Up @@ -48,6 +48,8 @@ class Search(BasicProcessor, ABC):
# Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp']
return_cols = ['thread_id', 'body', 'subject', 'timestamp']

flawless = 0

def process(self):
"""
Create 4CAT dataset from a data source
Expand Down Expand Up @@ -112,8 +114,11 @@ def process(self):
# file exists somewhere, so we create it as an empty file
with open(query_parameters.get("copy_to"), "w") as empty_file:
empty_file.write("")

self.dataset.finish(num_rows=num_items)
if self.flawless == 0:
self.dataset.finish(num_rows=num_items)
else:
self.dataset.update_status(f"Unexpected data format for {self.flawless} items. All data can be downloaded, but will not be available to 4CAT processors; check logs for details", is_final=True)
self.dataset.finish(num_rows=num_items)

def search(self, query):
"""
Expand Down Expand Up @@ -174,19 +179,37 @@ def import_from_file(self, path):
if not path.exists():
return []

# Check if processor and dataset can use map_item
check_map_item = self.map_item_method_available(dataset=self.dataset)
if not check_map_item:
self.log.warning(
f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}")

with path.open(encoding="utf-8") as infile:
for line in infile:
unmapped_items = False
for i, line in enumerate(infile):
if self.interrupted:
raise WorkerInterruptedException()

# remove NUL bytes here because they trip up a lot of other
# things
# also include import metadata in item
item = json.loads(line.replace("\0", ""))
yield {
new_item = {
**item["data"],
"__import_meta": {k: v for k, v in item.items() if k != "data"}
}
# Check map item here!
if check_map_item:
try:
self.get_mapped_item(new_item)
except MapItemException:
# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
self.flawless += 1
self.dataset.warn_unmappable_item(i, processor=self, warn_admins=unmapped_items is False)
unmapped_items = True

yield new_item

path.unlink()
self.dataset.delete_parameter("file")
Expand Down
115 changes: 65 additions & 50 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from common.lib.job import Job, JobNotFoundException
from common.lib.helpers import get_software_version, NullAwareTextIOWrapper, convert_to_int
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.exceptions import ProcessorInterruptedException, DataSetException, MapItemException


class DataSet(FourcatModule):
Expand Down Expand Up @@ -252,7 +252,7 @@ def get_log_iterator(self):
logmsg = ":".join(line.split(":")[1:])
yield (logtime, logmsg)

def iterate_items(self, processor=None, bypass_map_item=False):
def iterate_items(self, processor=None, bypass_map_item=False, warn_unmappable=True):
"""
A generator that iterates through a CSV or NDJSON file

Expand Down Expand Up @@ -280,59 +280,65 @@ def iterate_items(self, processor=None, bypass_map_item=False):
`map_item` method of the datasource when returning items.
:return generator: A generator that yields each item as a dictionary
"""
unmapped_items = False
path = self.get_results_path()

# see if an item mapping function has been defined
# open question if 'source_dataset' shouldn't be an attribute of the dataset
# instead of the processor...
item_mapper = None

if not bypass_map_item:
own_processor = self.get_own_processor()
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
# converted to an ndjson-based data source, for example
# todo: this is kind of ugly, and a better fix may be possible
extension_fits = hasattr(own_processor, "extension") and own_processor.extension == self.get_extension()
if hasattr(own_processor, "map_item") and extension_fits:
item_mapper = own_processor.map_item
item_mapper = False
own_processor = self.get_own_processor()
if not bypass_map_item and own_processor is not None:
if own_processor.map_item_method_available(dataset=self):
item_mapper = True

# go through items one by one, optionally mapping them
if path.suffix.lower() == ".csv":
with path.open("rb") as infile:
own_processor = self.get_own_processor()
csv_parameters = own_processor.get_csv_parameters(csv) if own_processor else {}

wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
reader = csv.DictReader(wrapped_infile, **csv_parameters)

for item in reader:
for i, item in enumerate(reader):
if hasattr(processor, "interrupted") and processor.interrupted:
raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file")

if item_mapper:
item = item_mapper(item)
try:
item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue

yield item

elif path.suffix.lower() == ".ndjson":
# in this format each line in the file is a self-contained JSON
# file
with path.open(encoding="utf-8") as infile:
for line in infile:
for i, line in enumerate(infile):
if hasattr(processor, "interrupted") and processor.interrupted:
raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file")

item = json.loads(line)
if item_mapper:
item = item_mapper(item)
try:
item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue

yield item

else:
raise NotImplementedError("Cannot iterate through %s file" % path.suffix)

def iterate_mapped_items(self, processor=None):
def iterate_mapped_items(self, processor=None, warn_unmappable=True):
"""
Wrapper for iterate_items that returns both the original item and the mapped item (or else the same identical item).
No extension check is performed here as the point is to be able to handle the original object and save as an appropriate
Expand All @@ -342,20 +348,27 @@ def iterate_mapped_items(self, processor=None):
iterating the dataset.
:return generator: A generator that yields a tuple with the unmapped item followed by the mapped item
"""
unmapped_items = False
# Collect item_mapper for use with filter
item_mapper = None
item_mapper = False
own_processor = self.get_own_processor()
if hasattr(own_processor, "map_item"):
item_mapper = own_processor.map_item
if own_processor.map_item_method_available(dataset=self):
item_mapper = True

# Loop through items
for item in self.iterate_items(processor=processor, bypass_map_item=True):
for i, item in enumerate(self.iterate_items(processor=processor, bypass_map_item=True)):
# Save original to yield
original_item = item.copy()

# Map item for filter
# Map item
if item_mapper:
mapped_item = item_mapper(item)
try:
mapped_item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue
else:
mapped_item = original_item

Expand All @@ -377,7 +390,7 @@ def get_item_keys(self, processor=None):
dataset
"""

items = self.iterate_items(processor)
items = self.iterate_items(processor, warn_unmappable=False)
try:
keys = list(items.__next__().keys())
except StopIteration:
Expand Down Expand Up @@ -811,31 +824,10 @@ def get_columns(self):
# no file to get columns from
return False

if self.get_results_path().suffix.lower() == ".csv":
with self.get_results_path().open(encoding="utf-8") as infile:
own_processor = self.get_own_processor()
csv_parameters = own_processor.get_csv_parameters(csv) if own_processor else {}

reader = csv.DictReader(infile, **csv_parameters)
try:
return list(reader.fieldnames)
except (TypeError, ValueError):
# not a valid CSV file?
return []

elif self.get_results_path().suffix.lower() == ".ndjson" and hasattr(self.get_own_processor(), "map_item"):
with self.get_results_path().open(encoding="utf-8") as infile:
first_line = infile.readline()

try:
item = json.loads(first_line)
return list(self.get_own_processor().map_item(item).keys())
except (json.JSONDecodeError, ValueError):
# not a valid NDJSON file?
return []

if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the extension checks necessary here? I think if there is a map_item there will be columns and if not then not, regardless of extension

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I know they were already there, but I think they might be a left-over from when we were less extension-agnostic)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember issue here with something in the frontend. get_item_keys would fail because get_own_processor would return None. But perhaps it the extension checks are redundant. I can test this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV datasets do not have map_item, but do have columns we wish to return. NDJSON datasets should only return columns if they can be mapped (else keys are not consistent enough to be used as "columns"). Some datasets do not have processors (rare; from what I can tell only deprecated datasets) and a check here avoids an error being raised by iterate_items.

Potentially we could combine the two Dataset methods get_item_keys and get_columns. They are almost the same thing. get_item_keys is used in various processors while get_columns is used by the frontend and most often get_options. Currently, get_item_keys takes advantage of iterate_items and grabs the first item's keys. get_columns was a copy of the code from iterate_items which is the code that I'm modifying in this PR.

When attempting to consolidate the code, I realized that get_columns could not be completely replaced by get_item_keys because of the previously mentioned instances where we do not wish to return columns. Possibly we do not wish to return item keys either in these instances via get_item_keys, but I did not explore all usages of that function as no errors were occurring. Mostly likely, get_columns returns False via the frontend and so the backend never runs into a misuse of get_item_keys.

return self.get_item_keys(processor=self.get_own_processor())
else:
# not a CSV or NDJSON file, or no map_item function available
# Filetype not CSV or an NDJSON with `map_item`
return []

def get_annotation_fields(self):
Expand Down Expand Up @@ -1498,6 +1490,29 @@ def get_result_url(self):
config.get("flask.server_name") + '/result/' + filename
return url_to_file

def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True):
"""
Log an item that is unable to be mapped and warn administrators.

:param int item_count: Item index
:param Processor processor: Processor calling function8
"""
dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"

# Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self
# Log error to dataset log
closest_dataset.log(dataset_error_message)

if warn_admins:
if processor is not None:
processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.")
elif hasattr(self.db, "log"):
self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.")
else:
# No other log available
raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn")

def __getattr__(self, attr):
"""
Getter so we don't have to use .data all the time
Expand Down
14 changes: 14 additions & 0 deletions common/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ class ProcessorException(FourcatException):
pass


class MapItemException(ProcessorException):
"""
Raise if processor throws an exception
"""
pass


class DataSetException(FourcatException):
"""
Raise if dataset throws an exception
"""
pass


class JobClaimedException(QueueException):
"""
Raise if job is claimed, but is already marked as such
Expand Down
7 changes: 7 additions & 0 deletions datasources/instagram/search_instagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import re

from backend.lib.search import Search
from common.lib.helpers import UserInput
from common.lib.exceptions import WorkerInterruptedException, MapItemException


class SearchInstagram(Search):
Expand Down Expand Up @@ -56,6 +58,11 @@ def map_item(item):
:param dict item: Item to map
:return: Mapped item
"""
if (item.get("product_type", "") == "ad") or \
(item.get("link", "").startswith("https://www.facebook.com/ads/ig_redirect")):
# These are ads
raise MapItemException("appears to be Instagram ad, check raw data to confirm and ensure ZeeSchuimer is up to date.")

is_graph_response = "__typename" in item

if is_graph_response:
Expand Down
3 changes: 2 additions & 1 deletion processors/visualisation/download_tiktok.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.user_input import UserInput
from datasources.tiktok_urls.search_tiktok_urls import TikTokScraper
from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport
from backend.lib.processor import BasicProcessor


Expand Down Expand Up @@ -297,7 +298,7 @@ def process(self):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while downloading TikTok images")

refreshed_mapped_item = self.source_dataset.get_own_processor().map_item(refreshed_item)
refreshed_mapped_item = SearchTikTokByImport.map_item(refreshed_item)
post_id = refreshed_mapped_item.get("id")
url = refreshed_mapped_item.get(url_column)

Expand Down
Loading
Loading