Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor user code status #9286

Merged
merged 20 commits into from
Sep 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
filter out errored items in migrartion
  • Loading branch information
abyesilyurt committed Sep 19, 2024
commit 5b362bad9e9fab624868ec0600c865b799f9e179
77 changes: 37 additions & 40 deletions packages/syft/src/syft/service/migration/migration_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from ...store.db.db import DBManager
from ...store.db.stash import ObjectStash
from ...store.document_store_errors import NotFoundException
from ...store.document_store_errors import UniqueConstraintException
from ...types.blob_storage import BlobStorageEntry
from ...types.errors import SyftException
from ...types.result import as_result
Expand Down Expand Up @@ -255,34 +254,38 @@ def create_migrated_objects(
def _create_migrated_objects(
self,
context: AuthedServiceContext,
migrated_objects: list[SyftObject],
migrated_objects: dict[type[SyftObject], list[SyftObject]],
ignore_existing: bool = True,
skip_check_type: bool = False,
) -> SyftSuccess:
for migrated_object in migrated_objects:
stash = self._search_stash_for_klass(
context, type(migrated_object)
).unwrap()
) -> dict[type[SyftObject], list[SyftObject]]:
created_objects: dict[type[SyftObject], list[SyftObject]] = {}
for key, objects in migrated_objects.items():
created_objects[key] = []
for migrated_object in objects:
stash = self._search_stash_for_klass(
context, type(migrated_object)
).unwrap()

result = stash.set(
context.credentials,
obj=migrated_object,
skip_check_type=skip_check_type,
)
# Exception from the new Error Handling pattern, no need to change
if result.is_err():
# TODO: subclass a DuplicationKeyError
if ignore_existing and (
"Duplication Key Error" in result.err()._private_message # type: ignore
or "Duplication Key Error" in result.err().public_message # type: ignore
):
print(
f"{type(migrated_object)} #{migrated_object.id} already exists"
)
continue
else:
result.unwrap() # this will raise the exception inside the wrapper
return SyftSuccess(message="Created migrate objects!")
result = stash.set(
context.credentials,
obj=migrated_object,
skip_check_type=skip_check_type,
)
# Exception from the new Error Handling pattern, no need to change
if result.is_err():
# TODO: subclass a DuplicationKeyError
if ignore_existing and (
"Duplication Key Error" in result.err()._private_message # type: ignore
or "Duplication Key Error" in result.err().public_message # type: ignore
):
print(
f"{type(migrated_object)} #{migrated_object.id} already exists"
)
continue
else:
result.unwrap() # this will raise the exception inside the wrapper
created_objects[key].append(result.unwrap())
return created_objects

@as_result(SyftException)
def _update_migrated_objects(
Expand All @@ -293,13 +296,10 @@ def _update_migrated_objects(
context, type(migrated_object)
).unwrap()

try:
stash.update(
context.credentials,
obj=migrated_object,
).unwrap()
except UniqueConstraintException as e:
print(f"Failed to update {migrated_object}: {e}")
stash.update(
context.credentials,
obj=migrated_object,
).unwrap()

return SyftSuccess(message="Updated migration objects!")

Expand Down Expand Up @@ -447,17 +447,14 @@ def apply_migration_data(
"please use 'client.load_migration_data' instead."
)

pre_objects = [
o for objects in migration_data.store_objects.values() for o in objects
]
self._create_migrated_objects(
context, pre_objects, skip_check_type=True
created_objects = self._create_migrated_objects(
context, migration_data.store_objects, skip_check_type=True
).unwrap()
print("UPDATING")

# migrate + apply store objects
migrated_objects = self._migrate_objects(
context, migration_data.store_objects
context,
created_objects,
).unwrap()
self._update_migrated_objects(context, migrated_objects).unwrap()

Expand Down
Loading