diff --git a/nvme_gw.config b/nvme_gw.config index 2db9eaac5..8320cd7a2 100644 --- a/nvme_gw.config +++ b/nvme_gw.config @@ -12,7 +12,7 @@ enable_auth = False gateway_addr = [::] gateway_port = 5500 -gateway_group = nvme_gw_group1 +gateway_group = grpc_server_max_workers = 10 [ceph] @@ -30,7 +30,7 @@ client_cert = ./client.crt [spdk] spdk_path = /path/to/spdk -spdk_tgt = spdk/build/bin/nvmf_tgt +tgt_path = spdk/build/bin/nvmf_tgt rpc_socket = /var/tmp/spdk.sock timeout = 60.0 log_level = ERROR diff --git a/nvme_gw_cli.py b/nvme_gw_cli.py index 5b3d2b755..1224ac18c 100644 --- a/nvme_gw_cli.py +++ b/nvme_gw_cli.py @@ -135,7 +135,7 @@ def connect(self, nvme_config): @cli.cmd([ argument("-i", "--image", help="RBD image name", required=True), argument("-p", "--pool", help="Ceph pool name", required=True), - argument("-b", "--bdev", help="Bdev name"), + argument("-b", "--bdev", help="Bdev name", required=True), argument("-u", "--user", help="User ID"), argument("-s", "--block-size", help="Block size", default=4096), ]) @@ -147,6 +147,7 @@ def create_bdev(self, args): ceph_pool_name=args.pool, rbd_name=args.image, block_size=int(args.block_size), + bdev_name=args.bdev, ) ret = self.stub.bdev_rbd_create(create_req) self.logger.info(f"Created bdev: {ret.bdev_name}") diff --git a/nvme_gw_persistence.py b/nvme_gw_persistence.py index e83933df6..6ed6a7561 100644 --- a/nvme_gw_persistence.py +++ b/nvme_gw_persistence.py @@ -10,6 +10,7 @@ import rados from typing import Dict, Optional from abc import ABC, abstractmethod +import nvme_gw_pb2 as pb2 class PersistentConfig(ABC): @@ -77,7 +78,7 @@ def delete_config(self): pass @abstractmethod - def restore(self): + def restore(self, callbacks): pass @@ -116,7 +117,7 @@ def __init__(self, nvme_config): self.logger = nvme_config.logger gateway_group = self.nvme_config.get("config", "gateway_group") - self.omap_name = gateway_group + "_config" + self.omap_name = f"nvme.{gateway_group}.config" if gateway_group else "nvme.config" ceph_pool = self.nvme_config.get("ceph", "ceph_pool") ceph_conf = self.nvme_config.get("ceph", "ceph_config_file") @@ -135,11 +136,7 @@ def __init__(self, nvme_config): self.logger.info( f"First gateway: created object {self.omap_name}") except rados.ObjectExists: - self.logger.info(f"{self.omap_name} already exists.") - - omap_version = self._read_key(self.OMAP_VERSION_KEY) - self.logger.info( - f"omap_version: {omap_version}, local version: {self.version}.") + self.logger.info(f"{self.omap_name} omap object already exists.") def _write_key(self, key: str, val: str): """Writes key and value to the persistent config.""" @@ -148,13 +145,14 @@ def _write_key(self, key: str, val: str): version_update = int(self.version) + 1 with rados.WriteOpCtx() as write_op: # Compare operation failure will cause write failure - write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1) + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), + rados.LIBRADOS_CMPXATTR_OP_EQ) self.ioctx.set_omap(write_op, (key,), (val,)) self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), (str(version_update),)) self.ioctx.operate_write_op(write_op, self.omap_name) self.version = version_update - self.logger.info(f"omap_key generated: {key}") + self.logger.debug(f"omap_key generated: {key}") except Exception as ex: self.logger.error(f"Unable to write to omap: {ex}. Exiting!") raise @@ -165,13 +163,14 @@ def _delete_key(self, key: str): version_update = int(self.version) + 1 with rados.WriteOpCtx() as write_op: # Compare operation failure will cause delete failure - write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1) - self.ioctx.delete_omap_keys(write_op, (key,)) + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), + rados.LIBRADOS_CMPXATTR_OP_EQ) + self.ioctx.remove_omap_keys(write_op, (key,)) self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), (str(version_update),)) self.ioctx.operate_write_op(write_op, self.omap_name) self.version = version_update - self.logger.info(f"omap_key deleted: {key}") + self.logger.debug(f"omap_key deleted: {key}") def add_bdev(self, bdev_name: str, val: str): """Adds a bdev to the persistent config.""" @@ -183,21 +182,19 @@ def delete_bdev(self, bdev_name: str): key = self.BDEV_PREFIX + bdev_name self._delete_key(key) - def _restore_bdevs(self, omap_dict): + def _restore_bdevs(self, omap_dict, callback): """Restores a bdev from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.BDEV_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring bdev: {args}") - try: - return_string = self.spdk_rpc.bdev.bdev_rbd_create( - self.spdk_rpc_client, args["ceph_pool_name"], - args["rbd_name"], int(args["block_size"])) - self.logger.info(f"Created bdev: {return_string}") - except Exception as ex: - self.logger.error(f"bdev_rbd_create failed with: \n {ex}") - raise + req = pb2.bdev_create_req( + bdev_name=args["bdev_name"], + ceph_pool_name=args["ceph_pool_name"], + rbd_name=args["rbd_name"], + block_size=int(args["block_size"]), + ) + ret = callback(req) def add_namespace(self, subsystem_nqn: str, bdev_name: str, val: str): """Adds a namespace to the persistent config.""" @@ -209,24 +206,17 @@ def delete_namespace(self, subsystem_nqn: str, bdev_name: str): key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + bdev_name self._delete_key(key) - def _restore_namespaces(self, omap_dict): + def _restore_namespaces(self, omap_dict, callback): """Restores a namespace from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.NAMESPACE_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring namespace: {args}") - try: - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - args["subsystem_nqn"], - args["bdev_name"], - ) - self.logger.info(f"Added NSID to subystem: {return_string}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_add_ns failed with: \n {ex}") - raise + req = pb2.subsystem_add_ns_req( + subsystem_nqn=args["subsystem_nqn"], + bdev_name=args["bdev_name"], + ) + ret = callback(req) def add_subsystem(self, subsystem_nqn: str, val: str): """Adds a subsystem to the persistent config.""" @@ -238,22 +228,17 @@ def delete_subsystem(self, subsystem_nqn: str): key = self.SUBSYSTEM_PREFIX + subsystem_nqn self._delete_key(key) - def _restore_subsystems(self, omap_dict): + def _restore_subsystems(self, omap_dict, callback): """Restores subsystems from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.SUBSYSTEM_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring subsystem: {args}") - try: - return_string = self.spdk_rpc.nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, args["subsystem_nqn"], - args["serial_number"]) - self.logger.info(f"Returned with status: {return_string}") - except Exception as ex: - self.logger.error( - f"nvmf_create_subsystem failed with: \n {ex}") - raise + req = pb2.subsystem_create_req( + subsystem_nqn=args["subsystem_nqn"], + serial_number=args["serial_number"], + ) + ret = callback(req) def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): """Adds a host to the persistent config.""" @@ -265,22 +250,17 @@ def delete_host(self, subsystem_nqn: str, host_nqn: str): key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) self._delete_key(key) - def _restore_hosts(self, omap_dict): + def _restore_hosts(self, omap_dict, callback): """Restore hosts from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.HOST_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring host: {args}") - try: - self.spdk_rpc.nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, args["nqn"], args["host_nqn"]) - self.logger.info( - f"Added host {args['host_nqn']} to {args['nqn']}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_add_host failed with: \n {ex}") - raise + req = pb2.subsystem_add_host_req( + subsystem_nqn=args["nqn"], + host_nqn=args["host_nqn"], + ) + ret = callback(req) def add_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str, val: str): @@ -295,22 +275,20 @@ def delete_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str): trsvcid) self._delete_key(key) - def _restore_listeners(self, omap_dict): + def _restore_listeners(self, omap_dict, callback): """Restores listeners from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.LISTENER_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring listener: {args}") - try: - self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, args["nqn"], args["trtype"], - args["traddr"], args["trsvcid"], args["adrfam"]) - self.logger.info(f"Added listener: {args['traddr']}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_add_listener failed with: \n {ex}") - raise + req = pb2.subsystem_add_listener_req( + nqn=args["nqn"], + trtype=args["trtype"], + adrfam=args["adrfam"], + traddr=args["traddr"], + trsvcid=args["trsvcid"], + ) + ret = callback(req) def set_allow_all(self, subsystem_nqn: str, val: str): """Sets open access to a subsystem in the persistent config.""" @@ -322,22 +300,15 @@ def delete_allow_all(self, subsystem_nqn: str): key = self.ALLOW_ALL_PREFIX + subsystem_nqn self._delete_key(key) - def _restore_allow_all(self, omap_dict): + def _restore_allow_all(self, omap_dict, callback): """Restores allow_all_hosts from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.ALLOW_ALL_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring allow_all: {args}") - try: - self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, args["subsystem_nqn"], False) - self.logger.info( - f"Set allow_all for {args['subsystem_nqn']}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_allow_any_host failed with: \n {ex}") - raise + req = pb2.subsystem_allow_any_host_req( + subsystem_nqn=args["subsystem_nqn"], disable=0) + ret = callback(req) def set_transport(self, trtype: str, val: str): """Sets transport type in the persistent config.""" @@ -354,23 +325,14 @@ def get_transport(self, trtype: str): key = self.TRANSPORT_PREFIX + trtype return self._read_key(key) - def _restore_transports(self, omap_dict): + def _restore_transports(self, omap_dict, callback): """Restores a transport from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.TRANSPORT_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring transport: {args}") - try: - return_string = self.spdk_rpc.nvmf.nvmf_create_transport( - self.spdk_rpc_client, args["trtype"]) - self.logger.info( - f"Created transport type {args['trtype']}: {return_string}" - ) - except Exception as ex: - self.logger.error( - f"nvmf_create_transport failed with: \n {ex}") - raise + req = pb2.create_transport_req(trtype=args["trtype"]) + ret = callback(req) def _read_key(self, key) -> Optional[str]: """Reads single key from persistent config and returns its value.""" @@ -383,7 +345,7 @@ def _read_key(self, key) -> Optional[str]: value_list = list(dict(iter).values()) if len(value_list) == 1: val = str(value_list[0], "utf-8") - self.logger.info(f"Read key: {key} -> {val}") + self.logger.debug(f"Read key: {key} -> {val}") return val return None @@ -396,14 +358,14 @@ def _read_all(self) -> Dict[str, str]: raise Exception("Omap read operation failed.") self.ioctx.operate_read_op(read_op, self.omap_name) omap_dict = dict(iter) - self.logger.info(f"Omap Persistent Config:\n{omap_dict}") + self.logger.debug(f"Omap Persistent Config:\n{omap_dict}") return omap_dict def delete_config(self): """Deletes OMAP object.""" try: - self.ioctx.delete_object(self.omap_name) + self.ioctx.remove_object(self.omap_name) self.logger.info(f"Object {self.omap_name} deleted.") except rados.ObjectNotFound: self.logger.info(f"Object {self.omap_name} not found.") @@ -419,25 +381,24 @@ def _clean_args(self, data: str) -> Dict[str, str]: param_dict[key.strip(' \"')] = value.strip(' \"') return param_dict - def restore(self, spdk_rpc, spdk_rpc_client): + def restore(self, callbacks): """Restores gateway config to persistent config specifications.""" - self.spdk_rpc = spdk_rpc - self.spdk_rpc_client = spdk_rpc_client omap_version = self._read_key(self.OMAP_VERSION_KEY) if omap_version == "1": self.logger.info("This omap was just created. Nothing to restore") else: omap_dict = self._read_all() - self._restore_bdevs(omap_dict) - self._restore_subsystems(omap_dict) - self._restore_namespaces(omap_dict) - self._restore_hosts(omap_dict) - self._restore_allow_all(omap_dict) - self._restore_transports(omap_dict) - self._restore_listeners(omap_dict) - self.logger.info("Restore complete.") - - # Update local version number to persistent config version + self._restore_bdevs(omap_dict, callbacks[self.BDEV_PREFIX]) + self._restore_subsystems(omap_dict, + callbacks[self.SUBSYSTEM_PREFIX]) + self._restore_namespaces(omap_dict, + callbacks[self.NAMESPACE_PREFIX]) + self._restore_hosts(omap_dict, callbacks[self.HOST_PREFIX]) + self._restore_allow_all(omap_dict, callbacks[self.ALLOW_ALL_PREFIX]) + self._restore_transports(omap_dict, + callbacks[self.TRANSPORT_PREFIX]) + self._restore_listeners(omap_dict, callbacks[self.LISTENER_PREFIX]) self.version = omap_version + self.logger.info("Restore complete.") return \ No newline at end of file diff --git a/nvme_gw_server.py b/nvme_gw_server.py index 550751789..7f77c961b 100644 --- a/nvme_gw_server.py +++ b/nvme_gw_server.py @@ -90,6 +90,13 @@ def serve(self): grpc_max_workers = self.nvme_config.getint("config", "grpc_server_max_workers") + # Create server and check for existing NVMeoF target configuration + self.server = grpc.server( + futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) + self.start_spdk() + self.restore_config() + pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) + if enable_auth: # Read in key and certificates for authentication server_key = self.nvme_config.get("mtls", "server_key") @@ -109,23 +116,11 @@ def serve(self): require_client_auth=True, ) - # Create server and check for existing NVMeoF target configuration - self.server = grpc.server( - futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) - self.start_spdk() - self.persistent_config.restore(self.spdk_rpc, self.spdk_rpc_client) - pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) - # Add secure port using crendentials self.server.add_secure_port( "{}:{}".format(gateway_addr, gateway_port), server_credentials) else: # Authentication is not enabled - self.server = grpc.server( - futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) - self.start_spdk() - self.persistent_config.restore(self.spdk_rpc, self.spdk_rpc_client) - pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) self.server.add_insecure_port("{}:{}".format( gateway_addr, gateway_port)) @@ -151,8 +146,8 @@ def start_spdk(self): import spdk.scripts.rpc as spdk_rpc self.spdk_rpc = spdk_rpc - spdk_tgt = self.nvme_config.get("spdk", "spdk_tgt") - spdk_cmd = os.path.join(spdk_path, spdk_tgt) + tgt_path = self.nvme_config.get("spdk", "tgt_path") + spdk_cmd = os.path.join(spdk_path, tgt_path) spdk_rpc_socket = self.nvme_config.get("spdk", "rpc_socket") spdk_tgt_cmd_extra_args = self.nvme_config.get("spdk", "tgt_cmd_extra_args") @@ -178,8 +173,8 @@ def start_spdk(self): conn_retries = self.nvme_config.getint("spdk", "conn_retries") self.logger.info( - f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket}, conn_retries: {conn_retries}, timeout: {timeout}" - ) + f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket}," + + f" conn_retries: {conn_retries}, timeout: {timeout}",) try: self.spdk_rpc_client = self.spdk_rpc.client.JSONRPCClient( @@ -194,38 +189,59 @@ def start_spdk(self): raise return - def bdev_rbd_create(self, request, context): + def restore_config(self): + callbacks = {} + callbacks[self.persistent_config.BDEV_PREFIX] = self.bdev_rbd_create + callbacks[self.persistent_config. + SUBSYSTEM_PREFIX] = self.nvmf_create_subsystem + callbacks[self.persistent_config. + NAMESPACE_PREFIX] = self.nvmf_subsystem_add_ns + callbacks[ + self.persistent_config.HOST_PREFIX] = self.nvmf_subsystem_add_host + callbacks[self.persistent_config. + ALLOW_ALL_PREFIX] = self.nvmf_subsystem_allow_any_host + callbacks[self.persistent_config. + TRANSPORT_PREFIX] = self.nvmf_create_transport + callbacks[self.persistent_config. + LISTENER_PREFIX] = self.nvmf_subsystem_add_listener + self.persistent_config.restore(callbacks) + + def bdev_rbd_create(self, request, context=None): """Creates bdev from a given RBD image.""" self.logger.info({ - f"Received: {request.ceph_pool_name}, {request.rbd_name}, {request.block_size}", + f"Received request to create bdev {request.bdev_name} from" + + f" {request.ceph_pool_name}/{request.rbd_name}", }) try: bdev_name = self.spdk_rpc.bdev.bdev_rbd_create( self.spdk_rpc_client, - request.ceph_pool_name, - request.rbd_name, - int(request.block_size), + name=request.bdev_name, + pool_name=request.ceph_pool_name, + rbd_name=request.rbd_name, + block_size=int(request.block_size), ) self.logger.info(f"Created bdev {bdev_name}") except Exception as ex: self.logger.error(f"bdev create failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.bdev_info() - # Update persistent configuration - try: - self.persistent_config.add_bdev(bdev_name, str(request)) - except Exception: - # Stop server manually - cannot raise exception due to gRPC thread - # hanging during server deallocation - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_bdev(bdev_name, str(request)) + except Exception: + # Stop server manually - cannot raise exception due to gRPC thread + # hanging during server deallocation + self.server.stop(None) + sys.exit(1) return pb2.bdev_info(bdev_name=bdev_name) - def bdev_rbd_delete(self, request, context): + def bdev_rbd_delete(self, request, context=None): """Deletes bdev.""" self.logger.info({ f"Received request to delete bdev: {request.bdev_name}", @@ -239,20 +255,22 @@ def bdev_rbd_delete(self, request, context): except Exception as ex: self.logger.error(f"bdev delete failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.delete_bdev(request.bdev_name) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.delete_bdev(request.bdev_name) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_create_subsystem(self, request, context): + def nvmf_create_subsystem(self, request, context=None): """Creates an NVMe subsystem.""" self.logger.info({ f"Received request to create: {request.subsystem_nqn}", @@ -269,22 +287,24 @@ def nvmf_create_subsystem(self, request, context): return_status = return_string != "none" except Exception as ex: self.logger.error(f"create_subsystem failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.subsystem_info() - # Update persistent configuration - try: - self.persistent_config.add_subsystem(request.subsystem_nqn, - str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_subsystem(request.subsystem_nqn, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.subsystem_info(subsystem_nqn=request.subsystem_nqn, created=return_status) - def nvmf_delete_subsystem(self, request, context): + def nvmf_delete_subsystem(self, request, context=None): """Deletes an NVMe subsystem.""" self.logger.info({ f"Received request to delete: {request.subsystem_nqn}", @@ -298,20 +318,22 @@ def nvmf_delete_subsystem(self, request, context): self.logger.info(f"returned with status: {return_string}") except Exception as ex: self.logger.error(f"delete_subsystem failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.delete_subsystem(request.subsystem_nqn) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.delete_subsystem(request.subsystem_nqn) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_subsystem_add_ns(self, request, context): + def nvmf_subsystem_add_ns(self, request, context=None): """Adds a given namespace to a given subsystem.""" self.logger.info({ f"Received request to add: {request.bdev_name} to {request.subsystem_nqn}", @@ -319,26 +341,30 @@ def nvmf_subsystem_add_ns(self, request, context): try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, request.subsystem_nqn, request.bdev_name) + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name) self.logger.info(f"returned with nsid: {return_string}") except Exception as ex: self.logger.error(f"Add NS returned with error: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.nsid() - # Update persistent configuration - try: - self.persistent_config.add_namespace(request.subsystem_nqn, - request.bdev_name, - str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_namespace(request.subsystem_nqn, + request.bdev_name, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.nsid(nsid=return_string) - def nvmf_subsystem_add_host(self, request, context): + def nvmf_subsystem_add_host(self, request, context=None): """Grants a given host access to a given subsystem.""" self.logger.info({ f"Received request to add: {request.host_nqn} to {request.subsystem_nqn}", @@ -351,21 +377,23 @@ def nvmf_subsystem_add_host(self, request, context): except Exception as ex: self.logger.error(f"Add Host returned with error: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.add_host(request.subsystem_nqn, - request.host_nqn, str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_host(request.subsystem_nqn, + request.host_nqn, str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_subsystem_allow_any_host(self, request, context): + def nvmf_subsystem_allow_any_host(self, request, context=None): """Grants any host access to a given subsystem.""" self.logger.info({ f"Set allow all hosts to {request.subsystem_nqn} to: {request.disable}", @@ -380,30 +408,34 @@ def nvmf_subsystem_allow_any_host(self, request, context): self.logger.error( f"Allow any host set to {request.disable} returned error: \n {ex}" ) - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.set_allow_all(request.subsystem_nqn, - str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.set_allow_all(request.subsystem_nqn, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_create_transport(self, request, context): + def nvmf_create_transport(self, request, context=None): """Sets a transport type for device access.""" self.logger.info({f"Setting transport type to: {request.trtype}"}) persist_val = "trtype: " + '"' + request.trtype + '"' # Check if transport type has already been created - trtype = self.persistent_config.get_transport(request.trtype) - if trtype is not None: - self.logger.info(f"Create Transport {trtype} already created.\n") - return (True) + if context: + trtype = self.persistent_config.get_transport(request.trtype) + if trtype is not None: + self.logger.info( + f"Create Transport {trtype} already created.\n") + return (True) try: return_string = self.spdk_rpc.nvmf.nvmf_create_transport( @@ -412,29 +444,32 @@ def nvmf_create_transport(self, request, context): self.logger.error( f"Create Transport {request.trtype} returned with error: \n {ex}" ) - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.set_transport(request.trtype, persist_val) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.set_transport(request.trtype, + persist_val) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_subsystem_add_listener(self, request, context): + def nvmf_subsystem_add_listener(self, request, context=None): """Adds a listener at the given TCP/IP address for the given subsystem.""" self.logger.info({ f"Adding listener at {request.traddr} : {request.trsvcid} for {request.nqn}" }) # Create transport if needed - tr_req = trtype() - tr_req.trtype = request.trtype - self.nvmf_create_transport(tr_req, context) + if context: + self.nvmf_create_transport(pb2.create_transport_req(trtype='TCP'), + context) try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( @@ -448,17 +483,20 @@ def nvmf_subsystem_add_listener(self, request, context): self.logger.info(f"Status of add listener: {return_string}") except Exception as ex: self.logger.error(f"Add Listener returned with error: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.add_listener(request.nqn, request.traddr, - request.trsvcid, str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_listener(request.nqn, request.traddr, + request.trsvcid, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) diff --git a/proto/nvme_gw.proto b/proto/nvme_gw.proto index 8e6c7ecf4..e2ac0e3dc 100644 --- a/proto/nvme_gw.proto +++ b/proto/nvme_gw.proto @@ -55,7 +55,7 @@ message spdk_status { message bdev_create_req { - optional string bdev_name = 1; + string bdev_name = 1; // required optional string user_id = 2; string ceph_pool_name = 3; //required string rbd_name = 4; //required