Skip to content

Commit

Permalink
Use a customized version of QueryExistingArtifacts
Browse files Browse the repository at this point in the history
In this change, the stage is comparing only sha256 digests between
existing and newly added artifacts. Furthermore, the time complexity
for the old comparison was O(N^2). With the attached change, we work
with O(2N).

closes pulp#289
  • Loading branch information
lubosmj committed Nov 16, 2023
1 parent 7fc7c00 commit 42cba31
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES/289.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved the performance of subsequent imports.
41 changes: 39 additions & 2 deletions pulp_ostree/app/tasks/importing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
ArtifactSaver,
ContentSaver,
DeclarativeVersion,
QueryExistingArtifacts,
QueryExistingContents,
ResolveContentFutures,
Stage,
)
from pulpcore.plugin.sync import sync_to_async_iterable

from pulp_ostree.app.models import (
OstreeCommit,
Expand Down Expand Up @@ -371,14 +371,51 @@ async def run(self):
await self.submit_metafile_object("summary", OstreeSummary())


class QueryExistingArtifactsOstree(Stage):
"""A customized version of the QueryExistingArtifacts stage comparing just sha256 digests."""

async def run(self):
"""Compare existing artifacts by leveraging dictionary access."""
async for batch in self.batches():
artifacts_digests = []

for d_content in batch:
d_artifact = d_content.d_artifacts[0]
if d_artifact.artifact._state.adding:
digest_value = d_artifact.artifact.sha256
artifacts_digests.append(digest_value)

query_params = {
"sha256__in": artifacts_digests,
"pulp_domain": self.domain,
}

existing_artifacts_qs = Artifact.objects.filter(**query_params)
await sync_to_async(existing_artifacts_qs.touch)()

d = {}
async for result in sync_to_async_iterable(existing_artifacts_qs):
d[result.sha256] = result

for d_content in batch:
d_artifact = d_content.d_artifacts[0]
artifact_digest = d_artifact.artifact.sha256
m = d.get(artifact_digest)
if m:
d_artifact.artifact = m

for d_content in batch:
await self.put(d_content)


class OstreeImportDeclarativeVersion(DeclarativeVersion):
"""A customized DeclarativeVersion class that creates a pipeline for the OSTree import."""

def pipeline_stages(self, new_version):
"""Build a list of stages."""
pipeline = [
self.first_stage,
QueryExistingArtifacts(),
QueryExistingArtifactsOstree(),
ArtifactSaver(),
QueryExistingContents(),
ContentSaver(),
Expand Down

0 comments on commit 42cba31

Please sign in to comment.