From 52ff56d6169dc4ad682fccf93c15b130b19984dd Mon Sep 17 00:00:00 2001 From: Sandy Kaur Date: Mon, 14 Feb 2022 02:58:57 -0600 Subject: [PATCH 1/2] Add target save and restore functionality --- nvme_gw.config | 10 +- nvme_gw_persistence.py | 443 +++++++++++++++++++++++++++++++++++++++++ nvme_gw_server.py | 324 +++++++++++++++++++++--------- 3 files changed, 679 insertions(+), 98 deletions(-) create mode 100644 nvme_gw_persistence.py diff --git a/nvme_gw.config b/nvme_gw.config index b67cdf3c3..2db9eaac5 100644 --- a/nvme_gw.config +++ b/nvme_gw.config @@ -12,10 +12,14 @@ enable_auth = False gateway_addr = [::] gateway_port = 5500 -spdk_path = /path/to/spdk -spdk_tgt = spdk/build/bin/nvmf_tgt +gateway_group = nvme_gw_group1 grpc_server_max_workers = 10 +[ceph] + +ceph_pool = rbd +ceph_config_file = /etc/ceph/ceph.conf + [mtls] server_key = ./server.key @@ -25,6 +29,8 @@ client_cert = ./client.crt [spdk] +spdk_path = /path/to/spdk +spdk_tgt = spdk/build/bin/nvmf_tgt rpc_socket = /var/tmp/spdk.sock timeout = 60.0 log_level = ERROR diff --git a/nvme_gw_persistence.py b/nvme_gw_persistence.py new file mode 100644 index 000000000..e83933df6 --- /dev/null +++ b/nvme_gw_persistence.py @@ -0,0 +1,443 @@ +# +# Copyright (c) 2021 International Business Machines +# All rights reserved. +# +# SPDX-License-Identifier: LGPL-3.0-or-later +# +# Authors: anita.shekar@ibm.com, sandy.kaur@ibm.com +# + +import rados +from typing import Dict, Optional +from abc import ABC, abstractmethod + + +class PersistentConfig(ABC): + """Persists gateway NVMeoF target configuration.""" + + @abstractmethod + def add_bdev(self, bdev_name: str, val: str): + pass + + @abstractmethod + def delete_bdev(self, bdev_name: str): + pass + + @abstractmethod + def add_namespace(self, subsystem_nqn: str, bdev_name: str, val: str): + pass + + @abstractmethod + def delete_namespace(self, subsystem_nqn: str, bdev_name: str): + pass + + @abstractmethod + def add_subsystem(self, subsystem_nqn: str, val: str): + pass + + @abstractmethod + def delete_subsystem(self, subsystem_nqn: str): + pass + + @abstractmethod + def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): + pass + + @abstractmethod + def delete_host(self, subsystem_nqn: str, host_nqn: str): + pass + + @abstractmethod + def add_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str, + val: str): + pass + + @abstractmethod + def delete_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str): + pass + + @abstractmethod + def set_allow_all(self, subsystem_nqn: str, val: str): + pass + + @abstractmethod + def delete_allow_all(self, subsystem_nqn: str): + pass + + @abstractmethod + def set_transport(self, trtype: str, val: str): + pass + + @abstractmethod + def delete_transport(self, trtype: str): + pass + + @abstractmethod + def delete_config(self): + pass + + @abstractmethod + def restore(self): + pass + + +class OmapPersistentConfig(PersistentConfig): + """Persists NVMeoF target configuration to an OMAP object. + + Handles reads/writes of persistent NVMeoF target configuration data in + key/value format within an OMAP object. + + Class attributes: + X_KEY: OMAP key name for "X" + X_PREFIX: OMAP key prefix for key of type "X" + + Instance attributes: + version: Local gateway NVMeoF target configuration version + nvme_config: Basic gateway parameters + logger: Logger instance to track OMAP access events + spdk_rpc: Module methods for SPDK + spdk_rpc_client: Client of SPDK RPC server + omap_name: OMAP object name + ioctx: I/O context which allows OMAP access + """ + + OMAP_VERSION_KEY = "omap_version" + BDEV_PREFIX = "bdev_" + NAMESPACE_PREFIX = "namespace_" + SUBSYSTEM_PREFIX = "subsystem_" + HOST_PREFIX = "subsystem_host_" + ALLOW_ALL_PREFIX = "allow_all_" + TRANSPORT_PREFIX = "transport_" + LISTENER_PREFIX = "listener_" + + def __init__(self, nvme_config): + self.version = 1 + self.nvme_config = nvme_config + self.logger = nvme_config.logger + + gateway_group = self.nvme_config.get("config", "gateway_group") + self.omap_name = gateway_group + "_config" + + ceph_pool = self.nvme_config.get("ceph", "ceph_pool") + ceph_conf = self.nvme_config.get("ceph", "ceph_config_file") + conn = rados.Rados(conffile=ceph_conf) + conn.connect() + self.ioctx = conn.open_ioctx(ceph_pool) + + try: + # Create a new gateway persistance OMAP object + with rados.WriteOpCtx() as write_op: + # Set exclusive parameter to fail write_op if object exists + write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(self.version),)) + self.ioctx.operate_write_op(write_op, self.omap_name) + self.logger.info( + f"First gateway: created object {self.omap_name}") + except rados.ObjectExists: + self.logger.info(f"{self.omap_name} already exists.") + + omap_version = self._read_key(self.OMAP_VERSION_KEY) + self.logger.info( + f"omap_version: {omap_version}, local version: {self.version}.") + + def _write_key(self, key: str, val: str): + """Writes key and value to the persistent config.""" + + try: + version_update = int(self.version) + 1 + with rados.WriteOpCtx() as write_op: + # Compare operation failure will cause write failure + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1) + self.ioctx.set_omap(write_op, (key,), (val,)) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(version_update),)) + self.ioctx.operate_write_op(write_op, self.omap_name) + self.version = version_update + self.logger.info(f"omap_key generated: {key}") + except Exception as ex: + self.logger.error(f"Unable to write to omap: {ex}. Exiting!") + raise + + def _delete_key(self, key: str): + """Deletes key from omap persistent config.""" + + version_update = int(self.version) + 1 + with rados.WriteOpCtx() as write_op: + # Compare operation failure will cause delete failure + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1) + self.ioctx.delete_omap_keys(write_op, (key,)) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(version_update),)) + self.ioctx.operate_write_op(write_op, self.omap_name) + self.version = version_update + self.logger.info(f"omap_key deleted: {key}") + + def add_bdev(self, bdev_name: str, val: str): + """Adds a bdev to the persistent config.""" + key = self.BDEV_PREFIX + bdev_name + self._write_key(key, val) + + def delete_bdev(self, bdev_name: str): + """Deletes a bdev from the persistent config.""" + key = self.BDEV_PREFIX + bdev_name + self._delete_key(key) + + def _restore_bdevs(self, omap_dict): + """Restores a bdev from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.BDEV_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring bdev: {args}") + try: + return_string = self.spdk_rpc.bdev.bdev_rbd_create( + self.spdk_rpc_client, args["ceph_pool_name"], + args["rbd_name"], int(args["block_size"])) + self.logger.info(f"Created bdev: {return_string}") + except Exception as ex: + self.logger.error(f"bdev_rbd_create failed with: \n {ex}") + raise + + def add_namespace(self, subsystem_nqn: str, bdev_name: str, val: str): + """Adds a namespace to the persistent config.""" + key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + bdev_name + self._write_key(key, val) + + def delete_namespace(self, subsystem_nqn: str, bdev_name: str): + """Deletes a namespace from the persistent config.""" + key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + bdev_name + self._delete_key(key) + + def _restore_namespaces(self, omap_dict): + """Restores a namespace from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.NAMESPACE_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring namespace: {args}") + try: + return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( + self.spdk_rpc_client, + args["subsystem_nqn"], + args["bdev_name"], + ) + self.logger.info(f"Added NSID to subystem: {return_string}") + except Exception as ex: + self.logger.error( + f"nvmf_subsystem_add_ns failed with: \n {ex}") + raise + + def add_subsystem(self, subsystem_nqn: str, val: str): + """Adds a subsystem to the persistent config.""" + key = self.SUBSYSTEM_PREFIX + subsystem_nqn + self._write_key(key, val) + + def delete_subsystem(self, subsystem_nqn: str): + """Deletes a subsystem from the persistent config.""" + key = self.SUBSYSTEM_PREFIX + subsystem_nqn + self._delete_key(key) + + def _restore_subsystems(self, omap_dict): + """Restores subsystems from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.SUBSYSTEM_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring subsystem: {args}") + try: + return_string = self.spdk_rpc.nvmf.nvmf_create_subsystem( + self.spdk_rpc_client, args["subsystem_nqn"], + args["serial_number"]) + self.logger.info(f"Returned with status: {return_string}") + except Exception as ex: + self.logger.error( + f"nvmf_create_subsystem failed with: \n {ex}") + raise + + def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): + """Adds a host to the persistent config.""" + key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) + self._write_key(key, val) + + def delete_host(self, subsystem_nqn: str, host_nqn: str): + """Deletes a host from the persistent config.""" + key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) + self._delete_key(key) + + def _restore_hosts(self, omap_dict): + """Restore hosts from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.HOST_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring host: {args}") + try: + self.spdk_rpc.nvmf.nvmf_subsystem_add_host( + self.spdk_rpc_client, args["nqn"], args["host_nqn"]) + self.logger.info( + f"Added host {args['host_nqn']} to {args['nqn']}") + except Exception as ex: + self.logger.error( + f"nvmf_subsystem_add_host failed with: \n {ex}") + raise + + def add_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str, + val: str): + """Adds a listener to the persistent config.""" + key = "{}{}_{}_{}".format(self.LISTENER_PREFIX, subsystem_nqn, traddr, + trsvcid) + self._write_key(key, val) + + def delete_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str): + """Deletes a listener from the persistent config.""" + key = "{}{}_{}_{}".format(self.LISTENER_PREFIX, subsystem_nqn, traddr, + trsvcid) + self._delete_key(key) + + def _restore_listeners(self, omap_dict): + """Restores listeners from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.LISTENER_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring listener: {args}") + try: + self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( + self.spdk_rpc_client, args["nqn"], args["trtype"], + args["traddr"], args["trsvcid"], args["adrfam"]) + self.logger.info(f"Added listener: {args['traddr']}") + except Exception as ex: + self.logger.error( + f"nvmf_subsystem_add_listener failed with: \n {ex}") + raise + + def set_allow_all(self, subsystem_nqn: str, val: str): + """Sets open access to a subsystem in the persistent config.""" + key = self.ALLOW_ALL_PREFIX + subsystem_nqn + self._write_key(key, val) + + def delete_allow_all(self, subsystem_nqn: str): + """Deletes open access to a subsystem in the persistent config.""" + key = self.ALLOW_ALL_PREFIX + subsystem_nqn + self._delete_key(key) + + def _restore_allow_all(self, omap_dict): + """Restores allow_all_hosts from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.ALLOW_ALL_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring allow_all: {args}") + try: + self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, args["subsystem_nqn"], False) + self.logger.info( + f"Set allow_all for {args['subsystem_nqn']}") + except Exception as ex: + self.logger.error( + f"nvmf_subsystem_allow_any_host failed with: \n {ex}") + raise + + def set_transport(self, trtype: str, val: str): + """Sets transport type in the persistent config.""" + key = self.TRANSPORT_PREFIX + trtype + self._write_key(key, val) + + def delete_transport(self, trtype: str): + """Delete transport type in the persistent config.""" + key = self.TRANSPORT_PREFIX + trtype + self._delete_key(key) + + def get_transport(self, trtype: str): + """Read existing transport type from the persistent config.""" + key = self.TRANSPORT_PREFIX + trtype + return self._read_key(key) + + def _restore_transports(self, omap_dict): + """Restores a transport from the persistent config.""" + + for (key, val) in omap_dict.items(): + if key.startswith(self.TRANSPORT_PREFIX): + args = self._clean_args(str(val, 'utf-8')) + self.logger.info(f"Restoring transport: {args}") + try: + return_string = self.spdk_rpc.nvmf.nvmf_create_transport( + self.spdk_rpc_client, args["trtype"]) + self.logger.info( + f"Created transport type {args['trtype']}: {return_string}" + ) + except Exception as ex: + self.logger.error( + f"nvmf_create_transport failed with: \n {ex}") + raise + + def _read_key(self, key) -> Optional[str]: + """Reads single key from persistent config and returns its value.""" + + with rados.ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, (key,)) + if ret != 0: + raise Exception("Omap read operation failed.") + self.ioctx.operate_read_op(read_op, self.omap_name) + value_list = list(dict(iter).values()) + if len(value_list) == 1: + val = str(value_list[0], "utf-8") + self.logger.info(f"Read key: {key} -> {val}") + return val + return None + + def _read_all(self) -> Dict[str, str]: + """Reads persistent config and returns dict of all keys and values.""" + + with rados.ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals(read_op, "", "", -1) + if ret != 0: + raise Exception("Omap read operation failed.") + self.ioctx.operate_read_op(read_op, self.omap_name) + omap_dict = dict(iter) + self.logger.info(f"Omap Persistent Config:\n{omap_dict}") + return omap_dict + + def delete_config(self): + """Deletes OMAP object.""" + + try: + self.ioctx.delete_object(self.omap_name) + self.logger.info(f"Object {self.omap_name} deleted.") + except rados.ObjectNotFound: + self.logger.info(f"Object {self.omap_name} not found.") + + def _clean_args(self, data: str) -> Dict[str, str]: + """Transforms configuration details from gRPC request format to a + dictionary. Requires data string in key:value form.""" + + param_dict = {} + for arg in data.split('\n'): + if arg != "": + key, value = arg.split(':', 1) + param_dict[key.strip(' \"')] = value.strip(' \"') + return param_dict + + def restore(self, spdk_rpc, spdk_rpc_client): + """Restores gateway config to persistent config specifications.""" + + self.spdk_rpc = spdk_rpc + self.spdk_rpc_client = spdk_rpc_client + omap_version = self._read_key(self.OMAP_VERSION_KEY) + if omap_version == "1": + self.logger.info("This omap was just created. Nothing to restore") + else: + omap_dict = self._read_all() + self._restore_bdevs(omap_dict) + self._restore_subsystems(omap_dict) + self._restore_namespaces(omap_dict) + self._restore_hosts(omap_dict) + self._restore_allow_all(omap_dict) + self._restore_transports(omap_dict) + self._restore_listeners(omap_dict) + self.logger.info("Restore complete.") + + # Update local version number to persistent config version + self.version = omap_version + return \ No newline at end of file diff --git a/nvme_gw_server.py b/nvme_gw_server.py index 93e2af6b9..550751789 100644 --- a/nvme_gw_server.py +++ b/nvme_gw_server.py @@ -19,36 +19,143 @@ import nvme_gw_pb2_grpc as pb2_grpc import nvme_gw_pb2 as pb2 import nvme_gw_config +from nvme_gw_persistence import OmapPersistentConfig import argparse import json libc = ctypes.CDLL(ctypes.util.find_library("c")) PR_SET_PDEATHSIG = 1 -def set_pdeathsig(sig = signal.SIGTERM): +def set_pdeathsig(sig=signal.SIGTERM): def callable(): return libc.prctl(PR_SET_PDEATHSIG, sig) return callable + +class trtype: + def __init__(self): + self.trtype = "tcp" + + class GWService(pb2_grpc.NVMEGatewayServicer): + """Implements gateway service interface. + + Handles configuration of the SPDK NVMEoF target according to client requests. + + Instance attributes: + nvme_config: Basic gateway parameters + logger: Logger instance to track server events + server: gRPC server instance to receive gateway client requests + persistent_config: Methods for target configuration persistence + spdk_rpc: Module methods for SPDK + spdk_rpc_client: Client of SPDK RPC server + spdk_process: Subprocess running SPDK NVMEoF target application + """ + def __init__(self, nvme_config): self.logger = nvme_config.logger self.nvme_config = nvme_config + self.persistent_config = OmapPersistentConfig(nvme_config) + self.spdk_process = None + self.server = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Cleans up SPDK and server instances.""" + + if self.spdk_process is not None: + self.logger.info("Terminating SPDK...") + self.spdk_process.terminate() + try: + timeout = self.nvme_config.getfloat("spdk", "timeout") + self.spdk_process.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + self.spdk_process.kill() + + if self.server is not None: + self.logger.info("Stopping the server...") + self.server.stop(None) + + self.logger.info("Exiting the gateway process.") + return True + + def serve(self): + """Starts gateway server.""" + + enable_auth = self.nvme_config.getboolean("config", "enable_auth") + gateway_addr = self.nvme_config.get("config", "gateway_addr") + gateway_port = self.nvme_config.get("config", "gateway_port") + grpc_max_workers = self.nvme_config.getint("config", + "grpc_server_max_workers") + + if enable_auth: + # Read in key and certificates for authentication + server_key = self.nvme_config.get("mtls", "server_key") + server_cert = self.nvme_config.get("mtls", "server_cert") + client_cert = self.nvme_config.get("mtls", "client_cert") + with open(server_key, "rb") as f: + private_key = f.read() + with open(server_cert, "rb") as f: + server_crt = f.read() + with open(client_cert, "rb") as f: + client_crt = f.read() + + # Create appropriate server credentials + server_credentials = grpc.ssl_server_credentials( + private_key_certificate_chain_pairs=[(private_key, server_crt)], + root_certificates=client_crt, + require_client_auth=True, + ) + + # Create server and check for existing NVMeoF target configuration + self.server = grpc.server( + futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) + self.start_spdk() + self.persistent_config.restore(self.spdk_rpc, self.spdk_rpc_client) + pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) + + # Add secure port using crendentials + self.server.add_secure_port( + "{}:{}".format(gateway_addr, gateway_port), server_credentials) + else: + # Authentication is not enabled + self.server = grpc.server( + futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) + self.start_spdk() + self.persistent_config.restore(self.spdk_rpc, self.spdk_rpc_client) + pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) + self.server.add_insecure_port("{}:{}".format( + gateway_addr, gateway_port)) + + # Start server + self.server.start() + self.server.wait_for_termination() + + while True: + timedout = self.server.wait_for_termination(timeout=1) + if not timedout: + break + alive = gw_service.ping() + if not alive: + break def start_spdk(self): + """Starts SPDK process.""" - spdk_path = self.nvme_config.get("config", "spdk_path") + spdk_path = self.nvme_config.get("spdk", "spdk_path") sys.path.append(spdk_path) self.logger.info(f"SPDK PATH: {spdk_path}") import spdk.scripts.rpc as spdk_rpc self.spdk_rpc = spdk_rpc - spdk_tgt = self.nvme_config.get("config", "spdk_tgt") + spdk_tgt = self.nvme_config.get("spdk", "spdk_tgt") spdk_cmd = os.path.join(spdk_path, spdk_tgt) spdk_rpc_socket = self.nvme_config.get("spdk", "rpc_socket") spdk_tgt_cmd_extra_args = self.nvme_config.get("spdk", - "tgt_cmd_extra_args") + "tgt_cmd_extra_args") cmd = [spdk_cmd, "-u", "-r", spdk_rpc_socket] if spdk_tgt_cmd_extra_args: @@ -56,10 +163,11 @@ def start_spdk(self): self.logger.info(f"Starting {' '.join(cmd)}") try: - subprocess.Popen(cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - preexec_fn = set_pdeathsig(signal.SIGTERM)) + self.spdk_process = subprocess.Popen(cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + preexec_fn=set_pdeathsig( + signal.SIGTERM)) except Exception as ex: self.logger.error(f"Unable to start SPDK: \n {ex}") @@ -74,7 +182,7 @@ def start_spdk(self): ) try: - self.client = self.spdk_rpc.client.JSONRPCClient( + self.spdk_rpc_client = self.spdk_rpc.client.JSONRPCClient( spdk_rpc_socket, None, timeout, @@ -87,16 +195,16 @@ def start_spdk(self): return def bdev_rbd_create(self, request, context): - # Create bdev from a given RBD image + """Creates bdev from a given RBD image.""" self.logger.info({ f"Received: {request.ceph_pool_name}, {request.rbd_name}, {request.block_size}", }) try: bdev_name = self.spdk_rpc.bdev.bdev_rbd_create( - self.client, + self.spdk_rpc_client, request.ceph_pool_name, request.rbd_name, - request.block_size, + int(request.block_size), ) self.logger.info(f"Created bdev {bdev_name}") @@ -106,16 +214,25 @@ def bdev_rbd_create(self, request, context): context.set_details(f"{ex}") return pb2.bdev_info() + # Update persistent configuration + try: + self.persistent_config.add_bdev(bdev_name, str(request)) + except Exception: + # Stop server manually - cannot raise exception due to gRPC thread + # hanging during server deallocation + self.server.stop(None) + sys.exit(1) + return pb2.bdev_info(bdev_name=bdev_name) def bdev_rbd_delete(self, request, context): - # Delete bdev + """Deletes bdev.""" self.logger.info({ f"Received request to delete bdev: {request.bdev_name}", }) try: return_string = self.spdk_rpc.bdev.bdev_rbd_delete( - self.client, + self.spdk_rpc_client, request.bdev_name, ) self.logger.info(f"Deleted bdev {request.bdev_name}") @@ -126,17 +243,24 @@ def bdev_rbd_delete(self, request, context): context.set_details(f"{ex}") return pb2.req_status() + # Update persistent configuration + try: + self.persistent_config.delete_bdev(request.bdev_name) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.req_status(status=return_string) def nvmf_create_subsystem(self, request, context): - # Create an NVMe Subsystem + """Creates an NVMe subsystem.""" self.logger.info({ f"Received request to create: {request.subsystem_nqn}", }) try: return_string = self.spdk_rpc.nvmf.nvmf_create_subsystem( - self.client, + self.spdk_rpc_client, nqn=request.subsystem_nqn, serial_number=request.serial_number, max_namespaces=request.max_namespaces, @@ -149,18 +273,26 @@ def nvmf_create_subsystem(self, request, context): context.set_details(f"{ex}") return pb2.subsystem_info() + # Update persistent configuration + try: + self.persistent_config.add_subsystem(request.subsystem_nqn, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.subsystem_info(subsystem_nqn=request.subsystem_nqn, created=return_status) def nvmf_delete_subsystem(self, request, context): - # Delete an NVMe Subsystem + """Deletes an NVMe subsystem.""" self.logger.info({ f"Received request to delete: {request.subsystem_nqn}", }) try: return_string = self.spdk_rpc.nvmf.nvmf_delete_subsystem( - self.client, + self.spdk_rpc_client, nqn=request.subsystem_nqn, ) self.logger.info(f"returned with status: {return_string}") @@ -170,17 +302,24 @@ def nvmf_delete_subsystem(self, request, context): context.set_details(f"{ex}") return pb2.req_status() + # Update persistent configuration + try: + self.persistent_config.delete_subsystem(request.subsystem_nqn) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.req_status(status=return_string) def nvmf_subsystem_add_ns(self, request, context): - # Add given NS to a given subsystem + """Adds a given namespace to a given subsystem.""" self.logger.info({ f"Received request to add: {request.bdev_name} to {request.subsystem_nqn}", }) try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( - self.client, request.subsystem_nqn, request.bdev_name) + self.spdk_rpc_client, request.subsystem_nqn, request.bdev_name) self.logger.info(f"returned with nsid: {return_string}") except Exception as ex: self.logger.error(f"Add NS returned with error: \n {ex}") @@ -188,17 +327,26 @@ def nvmf_subsystem_add_ns(self, request, context): context.set_details(f"{ex}") return pb2.nsid() + # Update persistent configuration + try: + self.persistent_config.add_namespace(request.subsystem_nqn, + request.bdev_name, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.nsid(nsid=return_string) def nvmf_subsystem_add_host(self, request, context): - # grant host access to a given subsystem + """Grants a given host access to a given subsystem.""" self.logger.info({ f"Received request to add: {request.host_nqn} to {request.subsystem_nqn}", }) try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_host( - self.client, request.subsystem_nqn, request.host_nqn) + self.spdk_rpc_client, request.subsystem_nqn, request.host_nqn) self.logger.info(f"Status of add host: {return_string}") except Exception as ex: @@ -207,17 +355,25 @@ def nvmf_subsystem_add_host(self, request, context): context.set_details(f"{ex}") return pb2.req_status() + # Update persistent configuration + try: + self.persistent_config.add_host(request.subsystem_nqn, + request.host_nqn, str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.req_status(status=return_string) def nvmf_subsystem_allow_any_host(self, request, context): - # grant host access to a given subsystem + """Grants any host access to a given subsystem.""" self.logger.info({ f"Set allow all hosts to {request.subsystem_nqn} to: {request.disable}", }) try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( - self.client, request.subsystem_nqn, request.disable) + self.spdk_rpc_client, request.subsystem_nqn, request.disable) self.logger.info( f"Status of allow all host request: {return_string}") except Exception as ex: @@ -228,14 +384,30 @@ def nvmf_subsystem_allow_any_host(self, request, context): context.set_details(f"{ex}") return pb2.req_status() + # Update persistent configuration + try: + self.persistent_config.set_allow_all(request.subsystem_nqn, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.req_status(status=return_string) def nvmf_create_transport(self, request, context): - # set transport type for device access + """Sets a transport type for device access.""" self.logger.info({f"Setting transport type to: {request.trtype}"}) + persist_val = "trtype: " + '"' + request.trtype + '"' + + # Check if transport type has already been created + trtype = self.persistent_config.get_transport(request.trtype) + if trtype is not None: + self.logger.info(f"Create Transport {trtype} already created.\n") + return (True) + try: return_string = self.spdk_rpc.nvmf.nvmf_create_transport( - self.client, request.trtype) + self.spdk_rpc_client, request.trtype) except Exception as ex: self.logger.error( f"Create Transport {request.trtype} returned with error: \n {ex}" @@ -244,16 +416,29 @@ def nvmf_create_transport(self, request, context): context.set_details(f"{ex}") return pb2.req_status() + # Update persistent configuration + try: + self.persistent_config.set_transport(request.trtype, persist_val) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.req_status(status=return_string) def nvmf_subsystem_add_listener(self, request, context): - # Add a istener at the specified tcp-ip address for the subsystem specified + """Adds a listener at the given TCP/IP address for the given subsystem.""" self.logger.info({ f"Adding listener at {request.traddr} : {request.trsvcid} for {request.nqn}" }) + + # Create transport if needed + tr_req = trtype() + tr_req.trtype = request.trtype + self.nvmf_create_transport(tr_req, context) + try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( - self.client, + self.spdk_rpc_client, request.nqn, request.trtype, request.traddr, @@ -267,18 +452,24 @@ def nvmf_subsystem_add_listener(self, request, context): context.set_details(f"{ex}") return pb2.req_status() + # Update persistent configuration + try: + self.persistent_config.add_listener(request.nqn, request.traddr, + request.trsvcid, str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) + return pb2.req_status(status=return_string) def nvmf_get_subsystems(self, request, context): - # Get NVMe Subsystems + """Gets NVMe subsystems.""" self.logger.info({ f"Received request to get subsystems", }) try: - ret = self.spdk_rpc.nvmf.nvmf_get_subsystems( - self.client, - ) + ret = self.spdk_rpc.nvmf.nvmf_get_subsystems(self.spdk_rpc_client,) self.logger.info(f"returned with: {ret}") except Exception as ex: self.logger.error(f"get_subsystems failed with: \n {ex}") @@ -289,76 +480,15 @@ def nvmf_get_subsystems(self, request, context): return pb2.subsystems_info(subsystems=json.dumps(ret)) def ping(self): + """Confirms communication with SPDK process.""" try: - ret = self.spdk_rpc.spdk_get_version(self.client) + ret = self.spdk_rpc.spdk_get_version(self.spdk_rpc_client) return True except Exception as ex: self.logger.error(f"spdk_get_version failed with: \n {ex}") return False -def serve(gw_config_filename): - - nvme_config = nvme_gw_config.NVMeGWConfig(gw_config_filename) - - enable_auth = nvme_config.getboolean("config", "enable_auth") - gateway_addr = nvme_config.get("config", "gateway_addr") - gateway_port = nvme_config.get("config", "gateway_port") - - server_key = nvme_config.get("mtls", "server_key") - server_cert = nvme_config.get("mtls", "server_cert") - client_cert = nvme_config.get("mtls", "client_cert") - - grpc_max_workers = nvme_config.getint("config", "grpc_server_max_workers") - - if enable_auth: - - # read in key and certificate - with open(server_key, "rb") as f: - private_key = f.read() - with open(server_cert, "rb") as f: - server_crt = f.read() - with open(client_cert, "rb") as f: - client_crt = f.read() - - # create server credentials & set client root certificate & set require_client_auth to True - server_credentials = grpc.ssl_server_credentials( - private_key_certificate_chain_pairs=[(private_key, server_crt)], - root_certificates=client_crt, - require_client_auth=True, - ) - - # create server - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) - gw_service = GWService(nvme_config) - gw_service.start_spdk() - pb2_grpc.add_NVMEGatewayServicer_to_server(gw_service, server) - - # add secure port using crendentials - server.add_secure_port("{}:{}".format(gateway_addr, gateway_port), - server_credentials) - else: - - # Authentication is not enabled - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) - gw_service = GWService(nvme_config) - gw_service.start_spdk() - pb2_grpc.add_NVMEGatewayServicer_to_server(gw_service, server) - server.add_insecure_port("{}:{}".format(gateway_addr, gateway_port)) - - server.start() - - while True: - timedout = server.wait_for_termination(timeout=1) - if not timedout: - break - alive = gw_service.ping() - if not alive: - break - - if __name__ == "__main__": parser = argparse.ArgumentParser(prog="python3 ./nvme_gw_server", @@ -372,4 +502,6 @@ def serve(gw_config_filename): ) args = parser.parse_args() - serve(args.config) + nvme_config = nvme_gw_config.NVMeGWConfig(args.config) + with GWService(nvme_config) as gw_service: + gw_service.serve() From ceebee42bae0be623a7201fcb8023494e360bef3 Mon Sep 17 00:00:00 2001 From: Sandy Kaur Date: Thu, 3 Mar 2022 16:25:21 -0600 Subject: [PATCH 2/2] Remove RPC code from OmapPersistentConfig --- nvme_gw.config | 4 +- nvme_gw_cli.py | 3 +- nvme_gw_persistence.py | 175 ++++++++++---------------- nvme_gw_server.py | 278 +++++++++++++++++++++++------------------ proto/nvme_gw.proto | 2 +- 5 files changed, 228 insertions(+), 234 deletions(-) diff --git a/nvme_gw.config b/nvme_gw.config index 2db9eaac5..8320cd7a2 100644 --- a/nvme_gw.config +++ b/nvme_gw.config @@ -12,7 +12,7 @@ enable_auth = False gateway_addr = [::] gateway_port = 5500 -gateway_group = nvme_gw_group1 +gateway_group = grpc_server_max_workers = 10 [ceph] @@ -30,7 +30,7 @@ client_cert = ./client.crt [spdk] spdk_path = /path/to/spdk -spdk_tgt = spdk/build/bin/nvmf_tgt +tgt_path = spdk/build/bin/nvmf_tgt rpc_socket = /var/tmp/spdk.sock timeout = 60.0 log_level = ERROR diff --git a/nvme_gw_cli.py b/nvme_gw_cli.py index 5b3d2b755..1224ac18c 100644 --- a/nvme_gw_cli.py +++ b/nvme_gw_cli.py @@ -135,7 +135,7 @@ def connect(self, nvme_config): @cli.cmd([ argument("-i", "--image", help="RBD image name", required=True), argument("-p", "--pool", help="Ceph pool name", required=True), - argument("-b", "--bdev", help="Bdev name"), + argument("-b", "--bdev", help="Bdev name", required=True), argument("-u", "--user", help="User ID"), argument("-s", "--block-size", help="Block size", default=4096), ]) @@ -147,6 +147,7 @@ def create_bdev(self, args): ceph_pool_name=args.pool, rbd_name=args.image, block_size=int(args.block_size), + bdev_name=args.bdev, ) ret = self.stub.bdev_rbd_create(create_req) self.logger.info(f"Created bdev: {ret.bdev_name}") diff --git a/nvme_gw_persistence.py b/nvme_gw_persistence.py index e83933df6..6ed6a7561 100644 --- a/nvme_gw_persistence.py +++ b/nvme_gw_persistence.py @@ -10,6 +10,7 @@ import rados from typing import Dict, Optional from abc import ABC, abstractmethod +import nvme_gw_pb2 as pb2 class PersistentConfig(ABC): @@ -77,7 +78,7 @@ def delete_config(self): pass @abstractmethod - def restore(self): + def restore(self, callbacks): pass @@ -116,7 +117,7 @@ def __init__(self, nvme_config): self.logger = nvme_config.logger gateway_group = self.nvme_config.get("config", "gateway_group") - self.omap_name = gateway_group + "_config" + self.omap_name = f"nvme.{gateway_group}.config" if gateway_group else "nvme.config" ceph_pool = self.nvme_config.get("ceph", "ceph_pool") ceph_conf = self.nvme_config.get("ceph", "ceph_config_file") @@ -135,11 +136,7 @@ def __init__(self, nvme_config): self.logger.info( f"First gateway: created object {self.omap_name}") except rados.ObjectExists: - self.logger.info(f"{self.omap_name} already exists.") - - omap_version = self._read_key(self.OMAP_VERSION_KEY) - self.logger.info( - f"omap_version: {omap_version}, local version: {self.version}.") + self.logger.info(f"{self.omap_name} omap object already exists.") def _write_key(self, key: str, val: str): """Writes key and value to the persistent config.""" @@ -148,13 +145,14 @@ def _write_key(self, key: str, val: str): version_update = int(self.version) + 1 with rados.WriteOpCtx() as write_op: # Compare operation failure will cause write failure - write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1) + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), + rados.LIBRADOS_CMPXATTR_OP_EQ) self.ioctx.set_omap(write_op, (key,), (val,)) self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), (str(version_update),)) self.ioctx.operate_write_op(write_op, self.omap_name) self.version = version_update - self.logger.info(f"omap_key generated: {key}") + self.logger.debug(f"omap_key generated: {key}") except Exception as ex: self.logger.error(f"Unable to write to omap: {ex}. Exiting!") raise @@ -165,13 +163,14 @@ def _delete_key(self, key: str): version_update = int(self.version) + 1 with rados.WriteOpCtx() as write_op: # Compare operation failure will cause delete failure - write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1) - self.ioctx.delete_omap_keys(write_op, (key,)) + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), + rados.LIBRADOS_CMPXATTR_OP_EQ) + self.ioctx.remove_omap_keys(write_op, (key,)) self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), (str(version_update),)) self.ioctx.operate_write_op(write_op, self.omap_name) self.version = version_update - self.logger.info(f"omap_key deleted: {key}") + self.logger.debug(f"omap_key deleted: {key}") def add_bdev(self, bdev_name: str, val: str): """Adds a bdev to the persistent config.""" @@ -183,21 +182,19 @@ def delete_bdev(self, bdev_name: str): key = self.BDEV_PREFIX + bdev_name self._delete_key(key) - def _restore_bdevs(self, omap_dict): + def _restore_bdevs(self, omap_dict, callback): """Restores a bdev from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.BDEV_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring bdev: {args}") - try: - return_string = self.spdk_rpc.bdev.bdev_rbd_create( - self.spdk_rpc_client, args["ceph_pool_name"], - args["rbd_name"], int(args["block_size"])) - self.logger.info(f"Created bdev: {return_string}") - except Exception as ex: - self.logger.error(f"bdev_rbd_create failed with: \n {ex}") - raise + req = pb2.bdev_create_req( + bdev_name=args["bdev_name"], + ceph_pool_name=args["ceph_pool_name"], + rbd_name=args["rbd_name"], + block_size=int(args["block_size"]), + ) + ret = callback(req) def add_namespace(self, subsystem_nqn: str, bdev_name: str, val: str): """Adds a namespace to the persistent config.""" @@ -209,24 +206,17 @@ def delete_namespace(self, subsystem_nqn: str, bdev_name: str): key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + bdev_name self._delete_key(key) - def _restore_namespaces(self, omap_dict): + def _restore_namespaces(self, omap_dict, callback): """Restores a namespace from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.NAMESPACE_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring namespace: {args}") - try: - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - args["subsystem_nqn"], - args["bdev_name"], - ) - self.logger.info(f"Added NSID to subystem: {return_string}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_add_ns failed with: \n {ex}") - raise + req = pb2.subsystem_add_ns_req( + subsystem_nqn=args["subsystem_nqn"], + bdev_name=args["bdev_name"], + ) + ret = callback(req) def add_subsystem(self, subsystem_nqn: str, val: str): """Adds a subsystem to the persistent config.""" @@ -238,22 +228,17 @@ def delete_subsystem(self, subsystem_nqn: str): key = self.SUBSYSTEM_PREFIX + subsystem_nqn self._delete_key(key) - def _restore_subsystems(self, omap_dict): + def _restore_subsystems(self, omap_dict, callback): """Restores subsystems from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.SUBSYSTEM_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring subsystem: {args}") - try: - return_string = self.spdk_rpc.nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, args["subsystem_nqn"], - args["serial_number"]) - self.logger.info(f"Returned with status: {return_string}") - except Exception as ex: - self.logger.error( - f"nvmf_create_subsystem failed with: \n {ex}") - raise + req = pb2.subsystem_create_req( + subsystem_nqn=args["subsystem_nqn"], + serial_number=args["serial_number"], + ) + ret = callback(req) def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): """Adds a host to the persistent config.""" @@ -265,22 +250,17 @@ def delete_host(self, subsystem_nqn: str, host_nqn: str): key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) self._delete_key(key) - def _restore_hosts(self, omap_dict): + def _restore_hosts(self, omap_dict, callback): """Restore hosts from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.HOST_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring host: {args}") - try: - self.spdk_rpc.nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, args["nqn"], args["host_nqn"]) - self.logger.info( - f"Added host {args['host_nqn']} to {args['nqn']}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_add_host failed with: \n {ex}") - raise + req = pb2.subsystem_add_host_req( + subsystem_nqn=args["nqn"], + host_nqn=args["host_nqn"], + ) + ret = callback(req) def add_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str, val: str): @@ -295,22 +275,20 @@ def delete_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str): trsvcid) self._delete_key(key) - def _restore_listeners(self, omap_dict): + def _restore_listeners(self, omap_dict, callback): """Restores listeners from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.LISTENER_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring listener: {args}") - try: - self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, args["nqn"], args["trtype"], - args["traddr"], args["trsvcid"], args["adrfam"]) - self.logger.info(f"Added listener: {args['traddr']}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_add_listener failed with: \n {ex}") - raise + req = pb2.subsystem_add_listener_req( + nqn=args["nqn"], + trtype=args["trtype"], + adrfam=args["adrfam"], + traddr=args["traddr"], + trsvcid=args["trsvcid"], + ) + ret = callback(req) def set_allow_all(self, subsystem_nqn: str, val: str): """Sets open access to a subsystem in the persistent config.""" @@ -322,22 +300,15 @@ def delete_allow_all(self, subsystem_nqn: str): key = self.ALLOW_ALL_PREFIX + subsystem_nqn self._delete_key(key) - def _restore_allow_all(self, omap_dict): + def _restore_allow_all(self, omap_dict, callback): """Restores allow_all_hosts from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.ALLOW_ALL_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring allow_all: {args}") - try: - self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, args["subsystem_nqn"], False) - self.logger.info( - f"Set allow_all for {args['subsystem_nqn']}") - except Exception as ex: - self.logger.error( - f"nvmf_subsystem_allow_any_host failed with: \n {ex}") - raise + req = pb2.subsystem_allow_any_host_req( + subsystem_nqn=args["subsystem_nqn"], disable=0) + ret = callback(req) def set_transport(self, trtype: str, val: str): """Sets transport type in the persistent config.""" @@ -354,23 +325,14 @@ def get_transport(self, trtype: str): key = self.TRANSPORT_PREFIX + trtype return self._read_key(key) - def _restore_transports(self, omap_dict): + def _restore_transports(self, omap_dict, callback): """Restores a transport from the persistent config.""" for (key, val) in omap_dict.items(): if key.startswith(self.TRANSPORT_PREFIX): args = self._clean_args(str(val, 'utf-8')) - self.logger.info(f"Restoring transport: {args}") - try: - return_string = self.spdk_rpc.nvmf.nvmf_create_transport( - self.spdk_rpc_client, args["trtype"]) - self.logger.info( - f"Created transport type {args['trtype']}: {return_string}" - ) - except Exception as ex: - self.logger.error( - f"nvmf_create_transport failed with: \n {ex}") - raise + req = pb2.create_transport_req(trtype=args["trtype"]) + ret = callback(req) def _read_key(self, key) -> Optional[str]: """Reads single key from persistent config and returns its value.""" @@ -383,7 +345,7 @@ def _read_key(self, key) -> Optional[str]: value_list = list(dict(iter).values()) if len(value_list) == 1: val = str(value_list[0], "utf-8") - self.logger.info(f"Read key: {key} -> {val}") + self.logger.debug(f"Read key: {key} -> {val}") return val return None @@ -396,14 +358,14 @@ def _read_all(self) -> Dict[str, str]: raise Exception("Omap read operation failed.") self.ioctx.operate_read_op(read_op, self.omap_name) omap_dict = dict(iter) - self.logger.info(f"Omap Persistent Config:\n{omap_dict}") + self.logger.debug(f"Omap Persistent Config:\n{omap_dict}") return omap_dict def delete_config(self): """Deletes OMAP object.""" try: - self.ioctx.delete_object(self.omap_name) + self.ioctx.remove_object(self.omap_name) self.logger.info(f"Object {self.omap_name} deleted.") except rados.ObjectNotFound: self.logger.info(f"Object {self.omap_name} not found.") @@ -419,25 +381,24 @@ def _clean_args(self, data: str) -> Dict[str, str]: param_dict[key.strip(' \"')] = value.strip(' \"') return param_dict - def restore(self, spdk_rpc, spdk_rpc_client): + def restore(self, callbacks): """Restores gateway config to persistent config specifications.""" - self.spdk_rpc = spdk_rpc - self.spdk_rpc_client = spdk_rpc_client omap_version = self._read_key(self.OMAP_VERSION_KEY) if omap_version == "1": self.logger.info("This omap was just created. Nothing to restore") else: omap_dict = self._read_all() - self._restore_bdevs(omap_dict) - self._restore_subsystems(omap_dict) - self._restore_namespaces(omap_dict) - self._restore_hosts(omap_dict) - self._restore_allow_all(omap_dict) - self._restore_transports(omap_dict) - self._restore_listeners(omap_dict) - self.logger.info("Restore complete.") - - # Update local version number to persistent config version + self._restore_bdevs(omap_dict, callbacks[self.BDEV_PREFIX]) + self._restore_subsystems(omap_dict, + callbacks[self.SUBSYSTEM_PREFIX]) + self._restore_namespaces(omap_dict, + callbacks[self.NAMESPACE_PREFIX]) + self._restore_hosts(omap_dict, callbacks[self.HOST_PREFIX]) + self._restore_allow_all(omap_dict, callbacks[self.ALLOW_ALL_PREFIX]) + self._restore_transports(omap_dict, + callbacks[self.TRANSPORT_PREFIX]) + self._restore_listeners(omap_dict, callbacks[self.LISTENER_PREFIX]) self.version = omap_version + self.logger.info("Restore complete.") return \ No newline at end of file diff --git a/nvme_gw_server.py b/nvme_gw_server.py index 550751789..b6190785c 100644 --- a/nvme_gw_server.py +++ b/nvme_gw_server.py @@ -30,12 +30,6 @@ def callable(): return libc.prctl(PR_SET_PDEATHSIG, sig) return callable - -class trtype: - def __init__(self): - self.trtype = "tcp" - - class GWService(pb2_grpc.NVMEGatewayServicer): """Implements gateway service interface. @@ -90,6 +84,13 @@ def serve(self): grpc_max_workers = self.nvme_config.getint("config", "grpc_server_max_workers") + # Create server and check for existing NVMeoF target configuration + self.server = grpc.server( + futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) + self.start_spdk() + self.restore_config() + pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) + if enable_auth: # Read in key and certificates for authentication server_key = self.nvme_config.get("mtls", "server_key") @@ -109,23 +110,11 @@ def serve(self): require_client_auth=True, ) - # Create server and check for existing NVMeoF target configuration - self.server = grpc.server( - futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) - self.start_spdk() - self.persistent_config.restore(self.spdk_rpc, self.spdk_rpc_client) - pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) - # Add secure port using crendentials self.server.add_secure_port( "{}:{}".format(gateway_addr, gateway_port), server_credentials) else: # Authentication is not enabled - self.server = grpc.server( - futures.ThreadPoolExecutor(max_workers=grpc_max_workers)) - self.start_spdk() - self.persistent_config.restore(self.spdk_rpc, self.spdk_rpc_client) - pb2_grpc.add_NVMEGatewayServicer_to_server(self, self.server) self.server.add_insecure_port("{}:{}".format( gateway_addr, gateway_port)) @@ -151,8 +140,8 @@ def start_spdk(self): import spdk.scripts.rpc as spdk_rpc self.spdk_rpc = spdk_rpc - spdk_tgt = self.nvme_config.get("spdk", "spdk_tgt") - spdk_cmd = os.path.join(spdk_path, spdk_tgt) + tgt_path = self.nvme_config.get("spdk", "tgt_path") + spdk_cmd = os.path.join(spdk_path, tgt_path) spdk_rpc_socket = self.nvme_config.get("spdk", "rpc_socket") spdk_tgt_cmd_extra_args = self.nvme_config.get("spdk", "tgt_cmd_extra_args") @@ -178,8 +167,8 @@ def start_spdk(self): conn_retries = self.nvme_config.getint("spdk", "conn_retries") self.logger.info( - f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket}, conn_retries: {conn_retries}, timeout: {timeout}" - ) + f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket}," + + f" conn_retries: {conn_retries}, timeout: {timeout}",) try: self.spdk_rpc_client = self.spdk_rpc.client.JSONRPCClient( @@ -194,38 +183,59 @@ def start_spdk(self): raise return - def bdev_rbd_create(self, request, context): + def restore_config(self): + callbacks = {} + callbacks[self.persistent_config.BDEV_PREFIX] = self.bdev_rbd_create + callbacks[self.persistent_config. + SUBSYSTEM_PREFIX] = self.nvmf_create_subsystem + callbacks[self.persistent_config. + NAMESPACE_PREFIX] = self.nvmf_subsystem_add_ns + callbacks[ + self.persistent_config.HOST_PREFIX] = self.nvmf_subsystem_add_host + callbacks[self.persistent_config. + ALLOW_ALL_PREFIX] = self.nvmf_subsystem_allow_any_host + callbacks[self.persistent_config. + TRANSPORT_PREFIX] = self.nvmf_create_transport + callbacks[self.persistent_config. + LISTENER_PREFIX] = self.nvmf_subsystem_add_listener + self.persistent_config.restore(callbacks) + + def bdev_rbd_create(self, request, context=None): """Creates bdev from a given RBD image.""" self.logger.info({ - f"Received: {request.ceph_pool_name}, {request.rbd_name}, {request.block_size}", + f"Received request to create bdev {request.bdev_name} from" + + f" {request.ceph_pool_name}/{request.rbd_name}", }) try: bdev_name = self.spdk_rpc.bdev.bdev_rbd_create( self.spdk_rpc_client, - request.ceph_pool_name, - request.rbd_name, - int(request.block_size), + name=request.bdev_name, + pool_name=request.ceph_pool_name, + rbd_name=request.rbd_name, + block_size=int(request.block_size), ) self.logger.info(f"Created bdev {bdev_name}") except Exception as ex: self.logger.error(f"bdev create failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.bdev_info() - # Update persistent configuration - try: - self.persistent_config.add_bdev(bdev_name, str(request)) - except Exception: - # Stop server manually - cannot raise exception due to gRPC thread - # hanging during server deallocation - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_bdev(bdev_name, str(request)) + except Exception: + # Stop server manually - cannot raise exception due to gRPC thread + # hanging during server deallocation + self.server.stop(None) + sys.exit(1) return pb2.bdev_info(bdev_name=bdev_name) - def bdev_rbd_delete(self, request, context): + def bdev_rbd_delete(self, request, context=None): """Deletes bdev.""" self.logger.info({ f"Received request to delete bdev: {request.bdev_name}", @@ -239,20 +249,22 @@ def bdev_rbd_delete(self, request, context): except Exception as ex: self.logger.error(f"bdev delete failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.delete_bdev(request.bdev_name) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.delete_bdev(request.bdev_name) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_create_subsystem(self, request, context): + def nvmf_create_subsystem(self, request, context=None): """Creates an NVMe subsystem.""" self.logger.info({ f"Received request to create: {request.subsystem_nqn}", @@ -269,22 +281,24 @@ def nvmf_create_subsystem(self, request, context): return_status = return_string != "none" except Exception as ex: self.logger.error(f"create_subsystem failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.subsystem_info() - # Update persistent configuration - try: - self.persistent_config.add_subsystem(request.subsystem_nqn, - str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_subsystem(request.subsystem_nqn, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.subsystem_info(subsystem_nqn=request.subsystem_nqn, created=return_status) - def nvmf_delete_subsystem(self, request, context): + def nvmf_delete_subsystem(self, request, context=None): """Deletes an NVMe subsystem.""" self.logger.info({ f"Received request to delete: {request.subsystem_nqn}", @@ -298,20 +312,22 @@ def nvmf_delete_subsystem(self, request, context): self.logger.info(f"returned with status: {return_string}") except Exception as ex: self.logger.error(f"delete_subsystem failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.delete_subsystem(request.subsystem_nqn) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.delete_subsystem(request.subsystem_nqn) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_subsystem_add_ns(self, request, context): + def nvmf_subsystem_add_ns(self, request, context=None): """Adds a given namespace to a given subsystem.""" self.logger.info({ f"Received request to add: {request.bdev_name} to {request.subsystem_nqn}", @@ -319,26 +335,30 @@ def nvmf_subsystem_add_ns(self, request, context): try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, request.subsystem_nqn, request.bdev_name) + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name) self.logger.info(f"returned with nsid: {return_string}") except Exception as ex: self.logger.error(f"Add NS returned with error: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.nsid() - # Update persistent configuration - try: - self.persistent_config.add_namespace(request.subsystem_nqn, - request.bdev_name, - str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_namespace(request.subsystem_nqn, + request.bdev_name, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.nsid(nsid=return_string) - def nvmf_subsystem_add_host(self, request, context): + def nvmf_subsystem_add_host(self, request, context=None): """Grants a given host access to a given subsystem.""" self.logger.info({ f"Received request to add: {request.host_nqn} to {request.subsystem_nqn}", @@ -351,21 +371,23 @@ def nvmf_subsystem_add_host(self, request, context): except Exception as ex: self.logger.error(f"Add Host returned with error: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.add_host(request.subsystem_nqn, - request.host_nqn, str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_host(request.subsystem_nqn, + request.host_nqn, str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_subsystem_allow_any_host(self, request, context): + def nvmf_subsystem_allow_any_host(self, request, context=None): """Grants any host access to a given subsystem.""" self.logger.info({ f"Set allow all hosts to {request.subsystem_nqn} to: {request.disable}", @@ -380,30 +402,34 @@ def nvmf_subsystem_allow_any_host(self, request, context): self.logger.error( f"Allow any host set to {request.disable} returned error: \n {ex}" ) - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.set_allow_all(request.subsystem_nqn, - str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.set_allow_all(request.subsystem_nqn, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_create_transport(self, request, context): + def nvmf_create_transport(self, request, context=None): """Sets a transport type for device access.""" self.logger.info({f"Setting transport type to: {request.trtype}"}) persist_val = "trtype: " + '"' + request.trtype + '"' # Check if transport type has already been created - trtype = self.persistent_config.get_transport(request.trtype) - if trtype is not None: - self.logger.info(f"Create Transport {trtype} already created.\n") - return (True) + if context: + trtype = self.persistent_config.get_transport(request.trtype) + if trtype is not None: + self.logger.info( + f"Create Transport {trtype} already created.\n") + return (True) try: return_string = self.spdk_rpc.nvmf.nvmf_create_transport( @@ -412,29 +438,32 @@ def nvmf_create_transport(self, request, context): self.logger.error( f"Create Transport {request.trtype} returned with error: \n {ex}" ) - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.set_transport(request.trtype, persist_val) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.set_transport(request.trtype, + persist_val) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) - def nvmf_subsystem_add_listener(self, request, context): + def nvmf_subsystem_add_listener(self, request, context=None): """Adds a listener at the given TCP/IP address for the given subsystem.""" self.logger.info({ f"Adding listener at {request.traddr} : {request.trsvcid} for {request.nqn}" }) # Create transport if needed - tr_req = trtype() - tr_req.trtype = request.trtype - self.nvmf_create_transport(tr_req, context) + if context: + self.nvmf_create_transport(pb2.create_transport_req(trtype='TCP'), + context) try: return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( @@ -448,17 +477,20 @@ def nvmf_subsystem_add_listener(self, request, context): self.logger.info(f"Status of add listener: {return_string}") except Exception as ex: self.logger.error(f"Add Listener returned with error: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") return pb2.req_status() - # Update persistent configuration - try: - self.persistent_config.add_listener(request.nqn, request.traddr, - request.trsvcid, str(request)) - except Exception: - self.server.stop(None) - sys.exit(1) + if context: + # Update persistent configuration + try: + self.persistent_config.add_listener(request.nqn, request.traddr, + request.trsvcid, + str(request)) + except Exception: + self.server.stop(None) + sys.exit(1) return pb2.req_status(status=return_string) diff --git a/proto/nvme_gw.proto b/proto/nvme_gw.proto index 8e6c7ecf4..e2ac0e3dc 100644 --- a/proto/nvme_gw.proto +++ b/proto/nvme_gw.proto @@ -55,7 +55,7 @@ message spdk_status { message bdev_create_req { - optional string bdev_name = 1; + string bdev_name = 1; // required optional string user_id = 2; string ceph_pool_name = 3; //required string rbd_name = 4; //required