Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map item catch #365

Merged
merged 18 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion backend/abstract/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from common.lib.dataset import DataSet
from common.lib.fourcat_module import FourcatModule
from common.lib.helpers import get_software_version, remove_nuls
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, MapItemException

csv.field_size_limit(1024 * 1024 * 1024)

Expand Down Expand Up @@ -609,6 +609,42 @@ def create_standalone(self):

return standalone

@classmethod
def map_item_method_available(cls, dataset):
"""
Checks if map_item method exists and is compatible with dataset. If dataset does not have an extension,
returns False

:param BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
:param DataSet dataset: The DataSet object with which to use map_item
"""
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
# converted to an ndjson-based data source, for example
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dale-wahl is there any datasource that does this at the moment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's some old voodoo (I think it is yours to be honest!)... I extracted that code and comment into a new method to use as a check to see if map_item should be run, but I did not modify the code. I believe it has to do with custom imports of, say, a Twitter dataset. You could convert the dataset type from custom to a Twitter datasource type, but it would not be an NDJSON as expected. That would also presumably apply to any ZeeSchuimer datasource (a custom Doutin/Instagram CSV uploaded would not be able to use map_item). In practice, I am not exactly sure how often we ran into this problem since users cannot change datatypes only admins (am I wrong about that?).

# todo: this is kind of ugly, and a better fix may be possible
dataset_extension = dataset.get_extension()
if not dataset_extension:
# DataSet results file does not exist or has no extension, use expected extension
if hasattr(dataset, "extension"):
dataset_extension = dataset.extension
else:
# No known DataSet extension; cannot determine if map_item method compatible
return False

return hasattr(cls, "map_item") and cls.extension == dataset_extension

@classmethod
def get_mapped_item(cls, item):
"""
Get the mapped item using a processors map_item method.

Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
"""
mapped_item = cls.map_item(item)
if not mapped_item:
raise MapItemException("Unable to map item!")
return mapped_item

@classmethod
def is_filter(cls):
"""
Expand Down
31 changes: 26 additions & 5 deletions backend/abstract/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from common.lib.dataset import DataSet
from backend.abstract.processor import BasicProcessor
from common.lib.helpers import strip_tags, dict_search_and_update, remove_nuls, HashCache
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, MapItemException


class Search(BasicProcessor, ABC):
Expand Down Expand Up @@ -48,6 +48,8 @@ class Search(BasicProcessor, ABC):
# Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp']
return_cols = ['thread_id', 'body', 'subject', 'timestamp']

flawless = True

def process(self):
"""
Create 4CAT dataset from a data source
Expand Down Expand Up @@ -112,8 +114,11 @@ def process(self):
# file exists somewhere, so we create it as an empty file
with open(query_parameters.get("copy_to"), "w") as empty_file:
empty_file.write("")

self.dataset.finish(num_rows=num_items)
if self.flawless:
self.dataset.finish(num_rows=num_items)
else:
self.dataset.update_status("Some items did not map correctly. Data is maintained, but will not be available in 4CAT; check logs for details", is_final=True)
self.dataset.finish(num_rows=num_items)

def search(self, query):
"""
Expand Down Expand Up @@ -174,19 +179,35 @@ def import_from_file(self, path):
if not path.exists():
return []

# Check if processor and dataset can use map_item
check_map_item = self.map_item_method_available(dataset=self.dataset)
if not check_map_item:
self.log.warning(
f"Processor {self.type} importing item without map_item method for Dataset {self.dataset.type} - {self.dataset.key}")

with path.open() as infile:
for line in infile:
for i, line in enumerate(infile):
if self.interrupted:
raise WorkerInterruptedException()

# remove NUL bytes here because they trip up a lot of other
# things
# also include import metadata in item
item = json.loads(line.replace("\0", ""))
yield {
new_item = {
**item["data"],
"__import_meta": {k: v for k, v in item.items() if k != "data"}
}
# Check map item here!
if check_map_item:
try:
self.get_mapped_item(new_item)
except MapItemException:
# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
self.flawless = False
self.dataset.warn_unmappable_item(i, processor=self)

yield new_item

path.unlink()

Expand Down
96 changes: 48 additions & 48 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from common.lib.job import Job, JobNotFoundException
from common.lib.helpers import get_software_version, NullAwareTextIOWrapper
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.exceptions import ProcessorInterruptedException, DataSetException, MapItemException


class DataSet(FourcatModule):
Expand Down Expand Up @@ -270,47 +270,48 @@ def iterate_items(self, processor=None, bypass_map_item=False):
# see if an item mapping function has been defined
# open question if 'source_dataset' shouldn't be an attribute of the dataset
# instead of the processor...
item_mapper = None

item_mapper = False
own_processor = self.get_own_processor()
if not bypass_map_item:
own_processor = self.get_own_processor()
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
# converted to an ndjson-based data source, for example
# todo: this is kind of ugly, and a better fix may be possible
extension_fits = hasattr(own_processor, "extension") and own_processor.extension == self.get_extension()
if hasattr(own_processor, "map_item") and extension_fits:
item_mapper = own_processor.map_item
if own_processor.map_item_method_available(dataset=self):
item_mapper = True

# go through items one by one, optionally mapping them
if path.suffix.lower() == ".csv":
with path.open("rb") as infile:
own_processor = self.get_own_processor()
csv_parameters = own_processor.get_csv_parameters(csv) if own_processor else {}

wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
reader = csv.DictReader(wrapped_infile, **csv_parameters)

for item in reader:
for i, item in enumerate(reader):
if hasattr(processor, "interrupted") and processor.interrupted:
raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file")

if item_mapper:
item = item_mapper(item)
try:
item = own_processor.get_mapped_item(item)
except MapItemException:
self.warn_unmappable_item(i, processor)
continue

yield item

elif path.suffix.lower() == ".ndjson":
# in this format each line in the file is a self-contained JSON
# file
with path.open(encoding="utf-8") as infile:
for line in infile:
for i, line in enumerate(infile):
if hasattr(processor, "interrupted") and processor.interrupted:
raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file")

item = json.loads(line)
if item_mapper:
item = item_mapper(item)
try:
item = own_processor.get_mapped_item(item)
except MapItemException:
self.warn_unmappable_item(i, processor)
continue

yield item

Expand All @@ -328,19 +329,23 @@ def iterate_mapped_items(self, processor=None):
:return generator: A generator that yields a tuple with the unmapped item followed by the mapped item
"""
# Collect item_mapper for use with filter
item_mapper = None
item_mapper = False
own_processor = self.get_own_processor()
if hasattr(own_processor, "map_item"):
item_mapper = own_processor.map_item
if own_processor.map_item_method_available(dataset=self):
item_mapper = True

# Loop through items
for item in self.iterate_items(processor=processor, bypass_map_item=True):
for i, item in enumerate(self.iterate_items(processor=processor, bypass_map_item=True)):
# Save original to yield
original_item = item.copy()

# Map item for filter
# Map item
if item_mapper:
mapped_item = item_mapper(item)
try:
mapped_item = own_processor.get_mapped_item(item)
except MapItemException:
self.warn_unmappable_item(i, processor)
continue
else:
mapped_item = original_item

Expand Down Expand Up @@ -614,32 +619,7 @@ def get_columns(self):
# no file to get columns from
return False

if self.get_results_path().suffix.lower() == ".csv":
with self.get_results_path().open(encoding="utf-8") as infile:
own_processor = self.get_own_processor()
csv_parameters = own_processor.get_csv_parameters(csv) if own_processor else {}

reader = csv.DictReader(infile, **csv_parameters)
try:
return list(reader.fieldnames)
except (TypeError, ValueError):
# not a valid CSV file?
return []

elif self.get_results_path().suffix.lower() == ".ndjson" and hasattr(self.get_own_processor(), "map_item"):
with self.get_results_path().open(encoding="utf-8") as infile:
first_line = infile.readline()

try:
item = json.loads(first_line)
return list(self.get_own_processor().map_item(item).keys())
except (json.JSONDecodeError, ValueError):
# not a valid NDJSON file?
return []

else:
# not a CSV or NDJSON file, or no map_item function available
return []
return self.get_item_keys(processor=self.get_own_processor())

def get_annotation_fields(self):
"""
Expand Down Expand Up @@ -1230,6 +1210,26 @@ def get_result_url(self):
config.get("flask.server_name") + '/result/' + filename
return url_to_file

def warn_unmappable_item(self, item_count, processor=None):
"""
Log an item that is unable to be mapped and warn administrators.

:param int item_count: Item index
:param Processor processor: Processor calling function
"""
if processor is not None:
# Log to dataset that is using map_item
processor.dataset.log(f"Item {item_count} is unable to be mapped! Check raw datafile.")
else:
# Log to this dataset
self.log(f"Item {item_count} is unable to be mapped! Check raw datafile.")

if hasattr(self.db, "log"):
self.db.log.warning(f"Processor {processor.type if processor is not None else self.get_own_processor().type} unable to map item {item_count} for dataset {self.key}.")
else:
# No other log available
raise DataSetException(f"Unable to map item {item_count} and properly warn")

def __getattr__(self, attr):
"""
Getter so we don't have to use .data all the time
Expand Down
14 changes: 14 additions & 0 deletions common/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ class ProcessorException(FourcatException):
pass


class MapItemException(ProcessorException):
"""
Raise if processor throws an exception
"""
pass


class DataSetException(FourcatException):
"""
Raise if dataset throws an exception
"""
pass


class JobClaimedException(QueueException):
"""
Raise if job is claimed, but is already marked as such
Expand Down
1 change: 0 additions & 1 deletion datasources/twitter-import/search_twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def map_item_modern(tweet):
# sometimes this is one level deeper, sometimes not...
quote_tweet["result"] = quote_tweet["result"]["tweet"]

print(json.dumps(quote_tweet))
return {
"id": tweet["rest_id"],
"thread_id": tweet["legacy"]["conversation_id_str"],
Expand Down
3 changes: 2 additions & 1 deletion processors/visualisation/download_tiktok.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.user_input import UserInput
from datasources.tiktok_urls.search_tiktok_urls import TikTokScraper
from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport
from backend.abstract.processor import BasicProcessor


Expand Down Expand Up @@ -291,7 +292,7 @@ def process(self):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while downloading TikTok images")

refreshed_mapped_item = self.source_dataset.get_own_processor().map_item(refreshed_item)
refreshed_mapped_item = SearchTikTokByImport.map_item(refreshed_item)
post_id = refreshed_mapped_item.get("id")
url = refreshed_mapped_item.get(url_column)

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"shapely",
"spacy==3.4.3",
"svgwrite~=1.4.0",
"typing_extensions<4.6.0",
"Telethon~=1.25.2",
"ural~=0.33",
"unidecode~=1.3",
Expand Down
22 changes: 9 additions & 13 deletions webtool/views/views_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ def get_mapped_result(key):
# cannot map without a mapping method
return error(404, error="File not found.")

mapper = dataset.get_own_processor().map_item

def map_response():
"""
Yield a CSV file line by line
Expand All @@ -204,21 +202,19 @@ def map_response():
"""
writer = None
buffer = io.StringIO()
with dataset.get_results_path().open() as infile:
for line in infile:
mapped_item = mapper(json.loads(line))
if not writer:
writer = csv.DictWriter(buffer, fieldnames=tuple(mapped_item.keys()))
writer.writeheader()
yield buffer.getvalue()
buffer.truncate(0)
buffer.seek(0)

writer.writerow(mapped_item)
for mapped_item in dataset.iterate_items(processor=dataset.get_own_processor()):
if not writer:
writer = csv.DictWriter(buffer, fieldnames=tuple(mapped_item.keys()))
writer.writeheader()
yield buffer.getvalue()
buffer.truncate(0)
buffer.seek(0)

writer.writerow(mapped_item)
yield buffer.getvalue()
buffer.truncate(0)
buffer.seek(0)

disposition = 'attachment; filename="%s"' % dataset.get_results_path().with_suffix(".csv").name
return app.response_class(stream_with_context(map_response()), mimetype="text/csv",
headers={"Content-Disposition": disposition})
Expand Down