From 4b3d6bdc0d8cac0d6f9c9680436af2d2e1b14f81 Mon Sep 17 00:00:00 2001 From: "robin@ynput.io" Date: Tue, 12 Nov 2024 16:59:36 -0500 Subject: [PATCH] Allow CSV ingest to create new shots. --- .../plugins/create/create_csv_ingest.py | 93 +++++++++++++++++-- 1 file changed, 87 insertions(+), 6 deletions(-) diff --git a/client/ayon_traypublisher/plugins/create/create_csv_ingest.py b/client/ayon_traypublisher/plugins/create/create_csv_ingest.py index 55e77ca..068b041 100644 --- a/client/ayon_traypublisher/plugins/create/create_csv_ingest.py +++ b/client/ayon_traypublisher/plugins/create/create_csv_ingest.py @@ -181,6 +181,8 @@ def __init__( self.variant = variant self.product_type = product_type self.repre_items: List[RepreItem] = [] + self.has_promised_context = False + self.parents = None self._unique_name = None self._pre_product_name = None @@ -388,6 +390,54 @@ def _resolve_repre_path( return filepath + @staticmethod + def _validate_parents(project_name: str, product_item: ProductItem) -> list: + """ Ensure parent exists for provided product_item.folder_path + + Args: + project_name (str): The project name. + product_item (ProductItem): The product item to inspect. + + Returns: + list. The parent list if any + + Raise: + ValueError: When provided folder_path parent do not exist. + """ + parent_folder_path, folder_name = product_item.folder_path.rsplit("/", 1) + if not parent_folder_path: + return [] + + folder_data = ayon_api.get_folder_by_path( + project_name, + parent_folder_path, + fields=("folderType",) + ) + + if folder_data is None: + raise ValueError(f"Unknown parent folder {parent_folder_path}") + + parent_data = [] + inspected = [] + + for key in parent_folder_path.split("/"): + if not key: + continue + + inspected.append(key) + folder_data = ayon_api.get_folder_by_path( + project_name, + f"/{'/'.join(inspected)}", + fields=("folderType",) + ) + parent_data.append({ + "folder_type": folder_data["folderType"], + "entity_name": key + }) + + return parent_data + + def _get_data_from_csv( self, csv_dir: str, filename: str ) -> Dict[str, ProductItem]: @@ -458,12 +508,6 @@ def _get_data_from_csv( ) } missing_paths: Set[str] = folder_paths - set(folder_ids_by_path.keys()) - if missing_paths: - ending = "" if len(missing_paths) == 1 else "s" - joined_paths = "\n".join(sorted(missing_paths)) - raise CreatorError( - f"Folder{ending} not found.\n{joined_paths}" - ) task_names: Set[str] = { product_item.task_name @@ -482,6 +526,22 @@ def _get_data_from_csv( missing_tasks: Set[str] = set() for product_item in product_items_by_name.values(): folder_path = product_item.folder_path + + if folder_path in missing_paths: + product_item.has_promised_context = True + product_item.task_type = None + try: + product_item.parents = self._validate_parents( + project_name, + product_item + ) + except ValueError: + raise CreatorError( + f"Parent context must exists for new shots: {folder_path}" + ) + + continue + task_name = product_item.task_name folder_id = folder_ids_by_path[folder_path] task_entities = task_entities_by_folder_id[folder_id] @@ -835,6 +895,23 @@ def _create_instances_from_csv_data(self, csv_dir: str, filename: str): "prepared_data_for_repres": [] } + if product_item.has_promised_context: + hierarchy, _ = folder_path.rsplit("/",1) + families.append("csv_ingest_shot") + instance_data.update( + { + "newHierarchyIntegration": True, + "hierarchy": hierarchy, + "parents": product_item.parents + } + ) + # TODO create new task from provided task name +# if product_item.task_name: +# instance_data["tasks"] = { +# "name": product_item.task_name, +# "type": "Generic" +# } + # create new instance new_instance: CreatedInstance = CreatedInstance( product_item.product_type, @@ -843,6 +920,10 @@ def _create_instances_from_csv_data(self, csv_dir: str, filename: str): self ) self._prepare_representations(product_item, new_instance) + + if product_item.has_promised_context: + new_instance.transient_data["has_promised_context"] = True + instances.append(new_instance) return instances