Skip to content

Commit

Permalink
Map item catch (#365)
Browse files Browse the repository at this point in the history
* use iterate_item to check map_item returns something if not warn admin and user

* apparently something with spacy and typing_extensions is broken

* remove old debug print

* wrap map_item with processor.get_mapped_item as well as check for map_item compatibility with dataset

* conform iterate_mapped_items to new method

* get_columns needs to detect non CSV and NDJSON

`get_item_keys` would also raise an error on a non csv/ndjson. It will return the first item keys even in the instance of `map_item` not existing... but that was how it always functioned so I am leaving it the same.

* Check Instagram items for ads

* warn on instagram ads; add customizable warning message

* add healthcheck from master

* do not always warn on map_item error (for example when getting dataset keys which is done A LOT)

* ensure processor exists prior to checking map_item (processors may be removed/deprecated)

* no mapping when there is no processor

* do not warn unmappable for previews or creating CSV extract (warning is already in the log)

* only warn admins once per dataset

* fix warning when processor is None

* Count number of unmappable items
  • Loading branch information
dale-wahl authored Oct 12, 2023
1 parent 2b3e7ea commit 1101a0a
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 82 deletions.
39 changes: 38 additions & 1 deletion backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from common.lib.dataset import DataSet
from common.lib.fourcat_module import FourcatModule
from common.lib.helpers import get_software_version, remove_nuls
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, MapItemException
from common.config_manager import config, ConfigWrapper


csv.field_size_limit(1024 * 1024 * 1024)


Expand Down Expand Up @@ -625,6 +626,42 @@ def create_standalone(self):

return standalone

@classmethod
def map_item_method_available(cls, dataset):
"""
Checks if map_item method exists and is compatible with dataset. If dataset does not have an extension,
returns False
:param BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
:param DataSet dataset: The DataSet object with which to use map_item
"""
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
# converted to an ndjson-based data source, for example
# todo: this is kind of ugly, and a better fix may be possible
dataset_extension = dataset.get_extension()
if not dataset_extension:
# DataSet results file does not exist or has no extension, use expected extension
if hasattr(dataset, "extension"):
dataset_extension = dataset.extension
else:
# No known DataSet extension; cannot determine if map_item method compatible
return False

return hasattr(cls, "map_item") and cls.extension == dataset_extension

@classmethod
def get_mapped_item(cls, item):
"""
Get the mapped item using a processors map_item method.
Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
"""
mapped_item = cls.map_item(item)
if not mapped_item:
raise MapItemException("Unable to map item!")
return mapped_item

@classmethod
def is_filter(cls):
"""
Expand Down
33 changes: 28 additions & 5 deletions backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from common.lib.dataset import DataSet
from backend.lib.processor import BasicProcessor
from common.lib.helpers import strip_tags, dict_search_and_update, remove_nuls, HashCache
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, MapItemException


class Search(BasicProcessor, ABC):
Expand Down Expand Up @@ -48,6 +48,8 @@ class Search(BasicProcessor, ABC):
# Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp']
return_cols = ['thread_id', 'body', 'subject', 'timestamp']

flawless = 0

def process(self):
"""
Create 4CAT dataset from a data source
Expand Down Expand Up @@ -112,8 +114,11 @@ def process(self):
# file exists somewhere, so we create it as an empty file
with open(query_parameters.get("copy_to"), "w") as empty_file:
empty_file.write("")

self.dataset.finish(num_rows=num_items)
if self.flawless == 0:
self.dataset.finish(num_rows=num_items)
else:
self.dataset.update_status(f"Unexpected data format for {self.flawless} items. All data can be downloaded, but will not be available to 4CAT processors; check logs for details", is_final=True)
self.dataset.finish(num_rows=num_items)

def search(self, query):
"""
Expand Down Expand Up @@ -174,19 +179,37 @@ def import_from_file(self, path):
if not path.exists():
return []

# Check if processor and dataset can use map_item
check_map_item = self.map_item_method_available(dataset=self.dataset)
if not check_map_item:
self.log.warning(
f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}")

with path.open(encoding="utf-8") as infile:
for line in infile:
unmapped_items = False
for i, line in enumerate(infile):
if self.interrupted:
raise WorkerInterruptedException()

# remove NUL bytes here because they trip up a lot of other
# things
# also include import metadata in item
item = json.loads(line.replace("\0", ""))
yield {
new_item = {
**item["data"],
"__import_meta": {k: v for k, v in item.items() if k != "data"}
}
# Check map item here!
if check_map_item:
try:
self.get_mapped_item(new_item)
except MapItemException:
# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
self.flawless += 1
self.dataset.warn_unmappable_item(i, processor=self, warn_admins=unmapped_items is False)
unmapped_items = True

yield new_item

path.unlink()
self.dataset.delete_parameter("file")
Expand Down
115 changes: 65 additions & 50 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from common.lib.job import Job, JobNotFoundException
from common.lib.helpers import get_software_version, NullAwareTextIOWrapper, convert_to_int
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.exceptions import ProcessorInterruptedException, DataSetException, MapItemException


class DataSet(FourcatModule):
Expand Down Expand Up @@ -254,7 +254,7 @@ def get_log_iterator(self):
logmsg = ":".join(line.split(":")[1:])
yield (logtime, logmsg)

def iterate_items(self, processor=None, bypass_map_item=False):
def iterate_items(self, processor=None, bypass_map_item=False, warn_unmappable=True):
"""
A generator that iterates through a CSV or NDJSON file
Expand Down Expand Up @@ -282,59 +282,65 @@ def iterate_items(self, processor=None, bypass_map_item=False):
`map_item` method of the datasource when returning items.
:return generator: A generator that yields each item as a dictionary
"""
unmapped_items = False
path = self.get_results_path()

# see if an item mapping function has been defined
# open question if 'source_dataset' shouldn't be an attribute of the dataset
# instead of the processor...
item_mapper = None

if not bypass_map_item:
own_processor = self.get_own_processor()
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
# converted to an ndjson-based data source, for example
# todo: this is kind of ugly, and a better fix may be possible
extension_fits = hasattr(own_processor, "extension") and own_processor.extension == self.get_extension()
if hasattr(own_processor, "map_item") and extension_fits:
item_mapper = own_processor.map_item
item_mapper = False
own_processor = self.get_own_processor()
if not bypass_map_item and own_processor is not None:
if own_processor.map_item_method_available(dataset=self):
item_mapper = True

# go through items one by one, optionally mapping them
if path.suffix.lower() == ".csv":
with path.open("rb") as infile:
own_processor = self.get_own_processor()
csv_parameters = own_processor.get_csv_parameters(csv) if own_processor else {}

wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
reader = csv.DictReader(wrapped_infile, **csv_parameters)

for item in reader:
for i, item in enumerate(reader):
if hasattr(processor, "interrupted") and processor.interrupted:
raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file")

if item_mapper:
item = item_mapper(item)
try:
item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue

yield item

elif path.suffix.lower() == ".ndjson":
# in this format each line in the file is a self-contained JSON
# file
with path.open(encoding="utf-8") as infile:
for line in infile:
for i, line in enumerate(infile):
if hasattr(processor, "interrupted") and processor.interrupted:
raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file")

item = json.loads(line)
if item_mapper:
item = item_mapper(item)
try:
item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue

yield item

else:
raise NotImplementedError("Cannot iterate through %s file" % path.suffix)

def iterate_mapped_items(self, processor=None):
def iterate_mapped_items(self, processor=None, warn_unmappable=True):
"""
Wrapper for iterate_items that returns both the original item and the mapped item (or else the same identical item).
No extension check is performed here as the point is to be able to handle the original object and save as an appropriate
Expand All @@ -344,20 +350,27 @@ def iterate_mapped_items(self, processor=None):
iterating the dataset.
:return generator: A generator that yields a tuple with the unmapped item followed by the mapped item
"""
unmapped_items = False
# Collect item_mapper for use with filter
item_mapper = None
item_mapper = False
own_processor = self.get_own_processor()
if hasattr(own_processor, "map_item"):
item_mapper = own_processor.map_item
if own_processor.map_item_method_available(dataset=self):
item_mapper = True

# Loop through items
for item in self.iterate_items(processor=processor, bypass_map_item=True):
for i, item in enumerate(self.iterate_items(processor=processor, bypass_map_item=True)):
# Save original to yield
original_item = item.copy()

# Map item for filter
# Map item
if item_mapper:
mapped_item = item_mapper(item)
try:
mapped_item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue
else:
mapped_item = original_item

Expand All @@ -379,7 +392,7 @@ def get_item_keys(self, processor=None):
dataset
"""

items = self.iterate_items(processor)
items = self.iterate_items(processor, warn_unmappable=False)
try:
keys = list(items.__next__().keys())
except StopIteration:
Expand Down Expand Up @@ -813,31 +826,10 @@ def get_columns(self):
# no file to get columns from
return False

if self.get_results_path().suffix.lower() == ".csv":
with self.get_results_path().open(encoding="utf-8") as infile:
own_processor = self.get_own_processor()
csv_parameters = own_processor.get_csv_parameters(csv) if own_processor else {}

reader = csv.DictReader(infile, **csv_parameters)
try:
return list(reader.fieldnames)
except (TypeError, ValueError):
# not a valid CSV file?
return []

elif self.get_results_path().suffix.lower() == ".ndjson" and hasattr(self.get_own_processor(), "map_item"):
with self.get_results_path().open(encoding="utf-8") as infile:
first_line = infile.readline()

try:
item = json.loads(first_line)
return list(self.get_own_processor().map_item(item).keys())
except (json.JSONDecodeError, ValueError):
# not a valid NDJSON file?
return []

if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)):
return self.get_item_keys(processor=self.get_own_processor())
else:
# not a CSV or NDJSON file, or no map_item function available
# Filetype not CSV or an NDJSON with `map_item`
return []

def get_annotation_fields(self):
Expand Down Expand Up @@ -1523,6 +1515,29 @@ def get_result_url(self):
config.get("flask.server_name") + '/result/' + filename
return url_to_file

def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True):
"""
Log an item that is unable to be mapped and warn administrators.
:param int item_count: Item index
:param Processor processor: Processor calling function8
"""
dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"

# Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self
# Log error to dataset log
closest_dataset.log(dataset_error_message)

if warn_admins:
if processor is not None:
processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.")
elif hasattr(self.db, "log"):
self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.")
else:
# No other log available
raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn")

def __getattr__(self, attr):
"""
Getter so we don't have to use .data all the time
Expand Down
14 changes: 14 additions & 0 deletions common/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ class ProcessorException(FourcatException):
pass


class MapItemException(ProcessorException):
"""
Raise if processor throws an exception
"""
pass


class DataSetException(FourcatException):
"""
Raise if dataset throws an exception
"""
pass


class JobClaimedException(QueueException):
"""
Raise if job is claimed, but is already marked as such
Expand Down
7 changes: 7 additions & 0 deletions datasources/instagram/search_instagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import re

from backend.lib.search import Search
from common.lib.helpers import UserInput
from common.lib.exceptions import WorkerInterruptedException, MapItemException


class SearchInstagram(Search):
Expand Down Expand Up @@ -56,6 +58,11 @@ def map_item(item):
:param dict item: Item to map
:return: Mapped item
"""
if (item.get("product_type", "") == "ad") or \
(item.get("link", "").startswith("https://www.facebook.com/ads/ig_redirect")):
# These are ads
raise MapItemException("appears to be Instagram ad, check raw data to confirm and ensure ZeeSchuimer is up to date.")

is_graph_response = "__typename" in item

if is_graph_response:
Expand Down
3 changes: 2 additions & 1 deletion processors/visualisation/download_tiktok.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.user_input import UserInput
from datasources.tiktok_urls.search_tiktok_urls import TikTokScraper
from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport
from backend.lib.processor import BasicProcessor


Expand Down Expand Up @@ -297,7 +298,7 @@ def process(self):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while downloading TikTok images")

refreshed_mapped_item = self.source_dataset.get_own_processor().map_item(refreshed_item)
refreshed_mapped_item = SearchTikTokByImport.map_item(refreshed_item)
post_id = refreshed_mapped_item.get("id")
url = refreshed_mapped_item.get(url_column)

Expand Down
Loading

0 comments on commit 1101a0a

Please sign in to comment.