diff --git a/client/ayon_deadline/plugins/publish/nuke/submit_nuke_deadline.py b/client/ayon_deadline/plugins/publish/nuke/submit_nuke_deadline.py index e5bf8a071e..45817d60a4 100644 --- a/client/ayon_deadline/plugins/publish/nuke/submit_nuke_deadline.py +++ b/client/ayon_deadline/plugins/publish/nuke/submit_nuke_deadline.py @@ -1,25 +1,32 @@ import os import re -import json -import getpass -from datetime import datetime +import attr import pyblish.api from ayon_core.pipeline.publish import ( AYONPyblishPluginMixin ) -from ayon_core.lib import ( - is_in_tests, - BoolDef, - NumberDef -) -from ayon_deadline.abstract_submit_deadline import requests_post -from ayon_deadline.lib import get_instance_job_envs, get_ayon_render_job_envs +from ayon_deadline import abstract_submit_deadline + + +@attr.s +class NukePluginInfo: + SceneFile: str = attr.ib(default=None) # Input + Version: str = attr.ib(default=None) # Mandatory for Deadline + # Mandatory for Deadline + ProjectPath: str = attr.ib(default=None) + OutputFilePath: str = attr.ib(default=None) + # Use GPU + UseGpu: bool = attr.ib(default=True) + WriteNode: str = attr.ib(default=None) -class NukeSubmitDeadline(pyblish.api.InstancePlugin, - AYONPyblishPluginMixin): + +class NukeSubmitDeadline( + abstract_submit_deadline.AbstractSubmitDeadline, + AYONPyblishPluginMixin +): """Submit write to Deadline Renders are submitted to a Deadline Web Service as @@ -33,436 +40,150 @@ class NukeSubmitDeadline(pyblish.api.InstancePlugin, families = ["render", "prerender"] optional = True targets = ["local"] - settings_category = "deadline" - - # presets - priority = 50 - chunk_size = 1 - concurrent_tasks = 1 - group = "" - department = "" - limit_groups = [] - use_gpu = False - env_allowed_keys = [] - env_search_replace_values = [] - workfile_dependency = True - use_published_workfile = True - - @classmethod - def get_attribute_defs(cls): - return [ - NumberDef( - "priority", - label="Priority", - default=cls.priority, - decimals=0 - ), - NumberDef( - "chunk", - label="Frames Per Task", - default=cls.chunk_size, - decimals=0, - minimum=1, - maximum=1000 - ), - NumberDef( - "concurrency", - label="Concurrency", - default=cls.concurrent_tasks, - decimals=0, - minimum=1, - maximum=10 - ), - BoolDef( - "use_gpu", - default=cls.use_gpu, - label="Use GPU" - ), - BoolDef( - "workfile_dependency", - default=cls.workfile_dependency, - label="Workfile Dependency" - ), - BoolDef( - "use_published_workfile", - default=cls.use_published_workfile, - label="Use Published Workfile" - ) - ] def process(self, instance): - if not instance.data.get("farm"): - self.log.debug("Skipping local instance.") - return - instance.data["attributeValues"] = self.get_attr_values_from_data( - instance.data) + """Plugin entry point.""" + self._instance = instance - families = instance.data["families"] - - node = instance.data["transientData"]["node"] context = instance.context + self._deadline_url = instance.data["deadline"]["url"] + assert self._deadline_url, "Requires Deadline Webservice URL" - deadline_url = instance.data["deadline"]["url"] - assert deadline_url, "Requires Deadline Webservice URL" + # adding expected files to instance.data + write_node = instance.data["transientData"]["node"] + render_path = instance.data["path"] + start_frame = int(instance.data["frameStartHandle"]) + end_frame = int(instance.data["frameEndHandle"]) + self._expected_files( + instance, + render_path, + start_frame, + end_frame + ) - self.deadline_url = "{}/api/jobs".format(deadline_url) - self._comment = context.data.get("comment", "") - self._ver = re.search(r"\d+\.\d+", context.data.get("hostVersion")) - self._deadline_user = context.data.get( - "deadlineUser", getpass.getuser()) - submit_frame_start = int(instance.data["frameStartHandle"]) - submit_frame_end = int(instance.data["frameEndHandle"]) + job_info = self.get_generic_job_info(instance) + self.job_info = self.get_job_info(job_info=job_info) - # get output path - render_path = instance.data['path'] - script_path = context.data["currentFile"] + self._set_scene_path( + context.data["currentFile"], job_info.UsePublished) - use_published_workfile = instance.data["attributeValues"].get( - "use_published_workfile", self.use_published_workfile + self.plugin_info = self.get_plugin_info( + scene_path=self.scene_path, + render_path=render_path, + write_node_name=write_node.name() ) - if use_published_workfile: - script_path = self._get_published_workfile_path(context) - # only add main rendering job if target is not frames_farm - r_job_response_json = None + self.aux_files = self.get_aux_files() + + plugin_info_data = instance.data["deadline"]["plugin_info_data"] + if plugin_info_data: + self.apply_additional_plugin_info(plugin_info_data) + if instance.data["render_target"] != "frames_farm": - r_job_response = self.payload_submit( - instance, - script_path, - render_path, - node.name(), - submit_frame_start, - submit_frame_end - ) - r_job_response_json = r_job_response.json() - instance.data["deadlineSubmissionJob"] = r_job_response_json - - # Store output dir for unified publisher (filesequence) + job_id = self.process_submission() + self.log.info("Submitted job to Deadline: {}.".format(job_id)) + + render_path = instance.data["path"] instance.data["outputDir"] = os.path.dirname( render_path).replace("\\", "/") instance.data["publishJobState"] = "Suspended" if instance.data.get("bakingNukeScripts"): for baking_script in instance.data["bakingNukeScripts"]: - render_path = baking_script["bakeRenderPath"] - script_path = baking_script["bakeScriptPath"] - exe_node_name = baking_script["bakeWriteNodeName"] - - b_job_response = self.payload_submit( - instance, - script_path, - render_path, - exe_node_name, - submit_frame_start, - submit_frame_end, - r_job_response_json, - baking_submission=True - ) + self.job_info.JobType = "Normal" + self.job_info.ChunkSize = 99999999 - # Store output dir for unified publisher (filesequence) - instance.data["deadlineSubmissionJob"] = b_job_response.json() + response_data = instance.data["deadlineSubmissionJob"] + if response_data.get("_id"): + self.job_info.BatchName = response_data["Props"]["Batch"] + self.job_info.JobDependency0 = response_data["_id"] - instance.data["publishJobState"] = "Suspended" + render_path = baking_script["bakeRenderPath"] + scene_path = baking_script["bakeScriptPath"] + write_node_name = baking_script["bakeWriteNodeName"] + + self.plugin_info = self.get_plugin_info( + scene_path=scene_path, + render_path=render_path, + write_node_name=write_node_name + ) + job_id = self.process_submission() + self.log.info( + "Submitted baking job to Deadline: {}.".format(job_id)) # add to list of job Id if not instance.data.get("bakingSubmissionJobs"): instance.data["bakingSubmissionJobs"] = [] - instance.data["bakingSubmissionJobs"].append( - b_job_response.json()["_id"]) - - # redefinition of families - if "render" in instance.data["productType"]: - instance.data["family"] = "write" - instance.data["productType"] = "write" - families.insert(0, "render2d") - elif "prerender" in instance.data["productType"]: - instance.data["family"] = "write" - instance.data["productType"] = "write" - families.insert(0, "prerender") - instance.data["families"] = families - - def _get_published_workfile_path(self, context): - """This method is temporary while the class is not inherited from - AbstractSubmitDeadline""" - anatomy = context.data["anatomy"] - # WARNING Hardcoded template name 'default' > may not be used - publish_template = anatomy.get_template_item( - "publish", "default", "path" - ) - for instance in context: - if ( - instance.data["productType"] != "workfile" - # Disabled instances won't be integrated - or instance.data("publish") is False - ): - continue - template_data = instance.data["anatomyData"] - # Expect workfile instance has only one representation - representation = instance.data["representations"][0] - # Get workfile extension - repre_file = representation["files"] - self.log.info(repre_file) - ext = os.path.splitext(repre_file)[1].lstrip(".") - - # Fill template data - template_data["representation"] = representation["name"] - template_data["ext"] = ext - template_data["comment"] = None - - template_filled = publish_template.format(template_data) - script_path = os.path.normpath(template_filled) - self.log.info( - "Using published scene for render {}".format( - script_path - ) - ) - return script_path + instance.data["bakingSubmissionJobs"].append(job_id) - return None + def get_job_info(self, job_info=None, **kwargs): + instance = self._instance - def payload_submit( - self, - instance, - script_path, - render_path, - exe_node_name, - start_frame, - end_frame, - response_data=None, - baking_submission=False, - ): - """Submit payload to Deadline - - Args: - instance (pyblish.api.Instance): pyblish instance - script_path (str): path to nuke script - render_path (str): path to rendered images - exe_node_name (str): name of the node to render - start_frame (int): start frame - end_frame (int): end frame - response_data Optional[dict]: response data from - previous submission - baking_submission Optional[bool]: if it's baking submission - - Returns: - requests.Response - """ - render_dir = os.path.normpath(os.path.dirname(render_path)) - - # batch name - src_filepath = instance.context.data["currentFile"] - batch_name = os.path.basename(src_filepath) - job_name = os.path.basename(render_path) - - if is_in_tests(): - batch_name += datetime.now().strftime("%d%m%Y%H%M%S") - - output_filename_0 = self.preview_fname(render_path) - - if not response_data: - response_data = {} - - try: - # Ensure render folder exists - os.makedirs(render_dir) - except OSError: - pass - - # resolve any limit groups - limit_groups = self.get_limit_groups() - self.log.debug("Limit groups: `{}`".format(limit_groups)) - - payload = { - "JobInfo": { - # Top-level group name - "BatchName": batch_name, - - # Job name, as seen in Monitor - "Name": job_name, - - # Arbitrary username, for visualisation in Monitor - "UserName": self._deadline_user, - - "Priority": instance.data["attributeValues"].get( - "priority", self.priority), - "ChunkSize": instance.data["attributeValues"].get( - "chunk", self.chunk_size), - "ConcurrentTasks": instance.data["attributeValues"].get( - "concurrency", - self.concurrent_tasks - ), - - "Department": self.department, - - "Pool": instance.data.get("primaryPool"), - "SecondaryPool": instance.data.get("secondaryPool"), - "Group": self.group, - - "Plugin": "Nuke", - "Frames": "{start}-{end}".format( - start=start_frame, - end=end_frame - ), - "Comment": self._comment, - - # Optional, enable double-click to preview rendered - # frames from Deadline Monitor - "OutputFilename0": output_filename_0.replace("\\", "/"), - - # limiting groups - "LimitGroups": ",".join(limit_groups) - - }, - "PluginInfo": { - # Input - "SceneFile": script_path, - - # Output directory and filename - "OutputFilePath": render_dir.replace("\\", "/"), - # "OutputFilePrefix": render_variables["filename_prefix"], - - # Mandatory for Deadline - "Version": self._ver.group(), - - # Resolve relative references - "ProjectPath": script_path, - "AWSAssetFile0": render_path, - - # using GPU by default - "UseGpu": instance.data["attributeValues"].get( - "use_gpu", self.use_gpu), - - # Only the specific write node is rendered. - "WriteNode": exe_node_name - }, - - # Mandatory for Deadline, may be empty - "AuxFiles": [] - } - - # Add workfile dependency. - workfile_dependency = instance.data["attributeValues"].get( - "workfile_dependency", self.workfile_dependency - ) - if workfile_dependency: - payload["JobInfo"].update({"AssetDependency0": script_path}) - - # TODO: rewrite for baking with sequences - if baking_submission: - payload["JobInfo"].update({ - "JobType": "Normal", - "ChunkSize": 99999999 - }) - - if response_data.get("_id"): - payload["JobInfo"].update({ - "BatchName": response_data["Props"]["Batch"], - "JobDependency0": response_data["_id"], - }) - - # Include critical environment variables with submission - keys = [ - "NUKE_PATH", - "FOUNDRY_LICENSE" - ] - - # add allowed keys from preset if any - if self.env_allowed_keys: - keys += self.env_allowed_keys - - nuke_specific_env = { - key: os.environ[key] - for key in keys - if key in os.environ - } - - # Set job environment variables - environment = get_instance_job_envs(instance) - environment.update(get_ayon_render_job_envs()) - environment.update(nuke_specific_env) - - # finally search replace in values of any key - if self.env_search_replace_values: - for key, value in environment.items(): - for item in self.env_search_replace_values: - environment[key] = value.replace( - item["name"], item["value"] - ) - - payload["JobInfo"].update({ - "EnvironmentKeyValue%d" % index: "{key}={value}".format( - key=key, - value=environment[key] - ) for index, key in enumerate(environment) - }) - - plugin = payload["JobInfo"]["Plugin"] - self.log.debug("using render plugin : {}".format(plugin)) - - self.log.debug("Submitting..") - self.log.debug(json.dumps(payload, indent=4, sort_keys=True)) + job_info.Plugin = "Nuke" - # adding expected files to instance.data - self.expected_files( - instance, - render_path, - start_frame, - end_frame + start_frame = int(instance.data["frameStartHandle"]) + end_frame = int(instance.data["frameEndHandle"]) + job_info.Frames = "{start}-{end}".format( + start=start_frame, + end=end_frame ) - self.log.debug("__ expectedFiles: `{}`".format( - instance.data["expectedFiles"])) - auth = instance.data["deadline"]["auth"] - verify = instance.data["deadline"]["verify"] - response = requests_post(self.deadline_url, - json=payload, - timeout=10, - auth=auth, - verify=verify) - - if not response.ok: - raise Exception(response.text) + limit_groups = self._get_limit_groups(job_info.LimitGroups or []) + job_info.LimitGroups = limit_groups - return response + return job_info - def preflight_check(self, instance): - """Ensure the startFrame, endFrame and byFrameStep are integers""" - - for key in ("frameStart", "frameEnd"): - value = instance.data[key] - - if int(value) == value: - continue - - self.log.warning( - "%f=%d was rounded off to nearest integer" - % (value, int(value)) - ) + def get_plugin_info( + self, scene_path=None, render_path=None, write_node_name=None): + instance = self._instance + context = instance.context + version = re.search(r"\d+\.\d+", context.data.get("hostVersion")) + + render_dir = os.path.dirname(render_path) + plugin_info = NukePluginInfo( + SceneFile=scene_path, + Version=version.group(), + OutputFilePath=render_dir.replace("\\", "/"), + ProjectPath=scene_path, + UseGpu=True, + WriteNode=write_node_name + ) - def preview_fname(self, path): - """Return output file path with #### for padding. + plugin_payload: dict = attr.asdict(plugin_info) + return plugin_payload - Deadline requires the path to be formatted with # in place of numbers. - For example `/path/to/render.####.png` + def _get_limit_groups(self, limit_groups): + """Search for limit group nodes and return group name. + Limit groups will be defined as pairs in Nuke deadline submitter + presents where the key will be name of limit group and value will be + a list of plugin's node class names. Thus, when a plugin uses more + than one node, these will be captured and the triggered process + will add the appropriate limit group to the payload jobinfo attributes. + Returning: + list: captured groups list + """ + # Not all hosts can import this module. + import nuke - Args: - path (str): path to rendered images + captured_groups = [] + for limit_group in limit_groups: + lg_name = limit_group["name"] - Returns: - str + for node_class in limit_group["value"]: + for node in nuke.allNodes(recurseGroups=True): + # ignore all nodes not member of defined class + if node.Class() not in node_class: + continue + # ignore all disabled nodes + if node["disable"].value(): + continue + # add group name if not already added + if lg_name not in captured_groups: + captured_groups.append(lg_name) + return captured_groups - """ - self.log.debug("_ path: `{}`".format(path)) - if "%" in path: - search_results = re.search(r"(%0)(\d)(d.)", path).groups() - self.log.debug("_ search_results: `{}`".format(search_results)) - return int(search_results[1]) - if "#" in path: - self.log.debug("_ path: `{}`".format(path)) - return path - - def expected_files( + def _expected_files( self, instance, filepath, @@ -515,33 +236,3 @@ def expected_files( for i in range(start_frame, (end_frame + 1)): instance.data["expectedFiles"].append( os.path.join(dirname, (file % i)).replace("\\", "/")) - - def get_limit_groups(self): - """Search for limit group nodes and return group name. - Limit groups will be defined as pairs in Nuke deadline submitter - presents where the key will be name of limit group and value will be - a list of plugin's node class names. Thus, when a plugin uses more - than one node, these will be captured and the triggered process - will add the appropriate limit group to the payload jobinfo attributes. - Returning: - list: captured groups list - """ - # Not all hosts can import this module. - import nuke - - captured_groups = [] - for limit_group in self.limit_groups: - lg_name = limit_group["name"] - - for node_class in limit_group["value"]: - for node in nuke.allNodes(recurseGroups=True): - # ignore all nodes not member of defined class - if node.Class() not in node_class: - continue - # ignore all disabled nodes - if node["disable"].value(): - continue - # add group name if not already added - if lg_name not in captured_groups: - captured_groups.append(lg_name) - return captured_groups