Skip to content

Commit

Permalink
Start using a process pool
Browse files Browse the repository at this point in the history
  • Loading branch information
matthiask committed Jun 26, 2024
1 parent 3bb1e7f commit 11fbdab
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Next version
~~~~~~~~~~~~

- Updated the pre-commit configuration, switched to biomejs.
- Started using a process pool to process images in parallel in
``process_imagefields``.


0.18 (2023-12-07)
Expand Down
74 changes: 39 additions & 35 deletions imagefield/management/commands/process_imagefields.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import sys
from concurrent.futures import ProcessPoolExecutor
from fnmatch import fnmatch
from functools import partial

from django.core.management.base import BaseCommand, CommandError

from imagefield.fields import IMAGEFIELDS


def iterator(queryset):
# Relatively low chunk_size to avoid slowness when having to load
# width and height for images when instantiating models.
try:
return queryset.iterator(chunk_size=100)
except TypeError: # Older versions of Django
return queryset.iterator()


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
Expand Down Expand Up @@ -94,33 +87,44 @@ def _process_field(self, field, options):
force=options.get("force"),
)

for index, instance in enumerate(iterator(queryset)):
self._process_instance(
instance,
field,
housekeep=options.get("housekeep"),
force=options.get("force"),
)
progress = "*" * (50 * index // count)
self.stdout.write(
f"\r|{progress.ljust(50)}| {index + 1}/{count}", ending=""
)
fn = partial(
_process_instance,
field=field,
housekeep=options.get("housekeep"),
force=options.get("force"),
)

# Save instance once for good measure; fills in width/height
# if not done already
instance._skip_generate_files = True
instance.save()
with ProcessPoolExecutor() as executor:
for index, (instance, errors) in enumerate(
executor.map(fn, queryset.iterator(chunk_size=100))
):
if errors:
self.stderr.write("\n".join(errors))

progress = "*" * (50 * index // count)
self.stdout.write(
f"\r|{progress.ljust(50)}| {index + 1}/{count}", ending=""
)

# Save instance once for good measure; fills in width/height
# if not done already
instance._skip_generate_files = True
instance.save()

self.stdout.write("\r|{}| {}/{}".format("*" * 50, count, count))

def _process_instance(self, instance, field, housekeep, **kwargs):
fieldfile = getattr(instance, field.name)
for key in field.formats:
try:
fieldfile.process(key, **kwargs)
except Exception as exc:
self.stderr.write(
f"Error while processing {fieldfile.name} ({field.field_label}, #{instance.pk}):\n{exc}\n"
)
if housekeep == "blank-on-failure":
field.save_form_data(instance, "")

def _process_instance(instance, field, housekeep, **kwargs):
fieldfile = getattr(instance, field.name)
for key in field.formats:
try:
fieldfile.process(key, **kwargs)
except Exception as exc:
if housekeep == "blank-on-failure":
field.save_form_data(instance, "")

return instance, [
f"Error while processing {fieldfile.name} ({field.field_label}, #{instance.pk}):\n{exc}\n"
]

return instance, None

2 comments on commit 11fbdab

@matthiask
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabiangermann Jetzt wohl leider zu spät :)

@fabiangermann
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ja jetzt ist es durch. Aber ich lass es heute Nacht nochmals drüber laufen, irgendwie scheint noch nicht ganz alles gemacht zu sein (zumindest scheint es immer noch was zu machen, wenn ich es nochmals starte). Mal schauen wie lang es dann geht :).

Please sign in to comment.