diff --git a/README.md b/README.md index bc3a1978..c98e5f47 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ This daemon runs as root. It provides the ability to export existing RBD images 2. Modify the config file (default ceph-nvmeof.conf) to reflect the IP/ Port where the server can be reached: - gateway_addr = - gateway_port = + addr = + port = 3. To [enable mTLS](#mtls-configuration-for-testing-purposes) using self signed certificates, edit the config file to set: @@ -123,16 +123,16 @@ Indicate the location of the keys and certificates in the config file: 2. Run the CLI (ensure a ceph pool 'rbd' with an rbdimage 'mytestdevimage' is created prior to this step): $ python3 -m control.cli create_bdev -i mytestdevimage -p rbd -b Ceph0 - INFO:root:Created bdev: Ceph0 + INFO:root:Created bdev Ceph0: True $ python3 -m control.cli create_subsystem -n nqn.2016-06.io.spdk:cnode1 -s SPDK00000000000001 - INFO:root:Created subsystem: nqn.2016-06.io.spdk:cnode1 + INFO:root:Created subsystem nqn.2016-06.io.spdk:cnode1: True $ python3 -m control.cli add_namespace -n nqn.2016-06.io.spdk:cnode1 -b Ceph0 - INFO:root:Added namespace 1 to nqn.2016-06.io.spdk:cnode1 + INFO:root:Added namespace 1 to nqn.2016-06.io.spdk:cnode1: True $ python3 -m control.cli add_host -n nqn.2016-06.io.spdk:cnode1 -t * - INFO:root:Allow open host access to nqn.2016-06.io.spdk:cnode1: True + INFO:root:Allowed open host access to nqn.2016-06.io.spdk:cnode1: True $ python3 -m control.cli create_listener -n nqn.2016-06.io.spdk:cnode1 -s 5001 INFO:root:Created nqn.2016-06.io.spdk:cnode1 listener: True diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index 0f5f997b..af96fd1e 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -7,12 +7,12 @@ # Authors: anita.shekar@ibm.com, sandy.kaur@ibm.com # -[config] +[gateway] -gateway_name = -gateway_group = -gateway_addr = 127.0.0.1 -gateway_port = 5500 +name = +group = +addr = 127.0.0.1 +port = 5500 enable_auth = False [ceph] diff --git a/control/__main__.py b/control/__main__.py index fda00520..aa8e8f05 100644 --- a/control/__main__.py +++ b/control/__main__.py @@ -11,7 +11,7 @@ import logging import argparse from .server import GatewayServer -from .config import NVMeGWConfig +from .config import GatewayConfig if __name__ == '__main__': parser = argparse.ArgumentParser(prog="python3 -m control", @@ -30,7 +30,7 @@ if not os.path.isfile(args.config): logger.error(f"Config file {args.config} not found.") raise FileNotFoundError - - config = NVMeGWConfig(args.config) + + config = GatewayConfig(args.config) with GatewayServer(config) as gateway: gateway.serve() diff --git a/control/cli.py b/control/cli.py index b083685f..17d3848d 100644 --- a/control/cli.py +++ b/control/cli.py @@ -13,7 +13,7 @@ import logging from .proto import gateway_pb2_grpc as pb2_grpc from .proto import gateway_pb2 as pb2 -from .config import NVMeGWConfig +from .config import GatewayConfig def argument(*name_or_flags, **kwargs): @@ -93,23 +93,23 @@ def stub(self): raise AttributeError("stub is None. Set with connect method.") return self._stub - def connect(self, nvme_config): + def connect(self, config): """Connects to server and sets stub.""" # Read in configuration parameters - host = nvme_config.get("config", "gateway_addr") - port = nvme_config.get("config", "gateway_port") - enable_auth = nvme_config.getboolean("config", "enable_auth") + host = config.get("gateway", "addr") + port = config.get("gateway", "port") + enable_auth = config.getboolean("gateway", "enable_auth") server = "{}:{}".format(host, port) if enable_auth: # Create credentials for mutual TLS and a secure channel - with open(nvme_config.get("mtls", "client_cert"), "rb") as f: + with open(config.get("mtls", "client_cert"), "rb") as f: client_cert = f.read() - with open(nvme_config.get("mtls", "client_key"), "rb") as f: + with open(config.get("mtls", "client_key"), "rb") as f: client_key = f.read() - with open(nvme_config.get("mtls", "server_cert"), "rb") as f: + with open(config.get("mtls", "server_cert"), "rb") as f: server_cert = f.read() credentials = grpc.ssl_channel_credentials( @@ -119,12 +119,11 @@ def connect(self, nvme_config): ) channel = grpc.secure_channel(server, credentials) else: - # Instantiate a channel without credentials channel = grpc.insecure_channel(server) # Bind the client and the server - self._stub = pb2_grpc.NVMEGatewayStub(channel) + self._stub = pb2_grpc.GatewayStub(channel) @cli.cmd([ argument("-i", "--image", help="RBD image name", required=True), @@ -140,14 +139,14 @@ def create_bdev(self, args): """Creates a bdev from an RBD image.""" try: - create_req = pb2.create_bdev_req( + req = pb2.create_bdev_req( ceph_pool_name=args.pool, rbd_name=args.image, block_size=args.block_size, bdev_name=args.bdev, ) - ret = self.stub.create_bdev(create_req) - self.logger.info(f"Created bdev: {ret.status}") + ret = self.stub.create_bdev(req) + self.logger.info(f"Created bdev {args.bdev}: {ret.status}") except Exception as error: self.logger.error(f"Failed to create bdev: \n {error}") @@ -158,9 +157,9 @@ def delete_bdev(self, args): """Deletes a bdev.""" try: - delete_req = pb2.delete_bdev_req(bdev_name=args.bdev) - ret = self.stub.delete_bdev(delete_req) - self.logger.info(f"Deleted bdev: {delete_req.bdev_name}") + req = pb2.delete_bdev_req(bdev_name=args.bdev) + ret = self.stub.delete_bdev(req) + self.logger.info(f"Deleted bdev {args.bdev}: {ret.status}") except Exception as error: self.logger.error(f"Failed to delete bdev: \n {error}") @@ -172,10 +171,10 @@ def create_subsystem(self, args): """Creates a subsystem.""" try: - create_req = pb2.create_subsystem_req(subsystem_nqn=args.subnqn, - serial_number=args.serial) - ret = self.stub.create_subsystem(create_req) - self.logger.info(f"Created subsystem: {ret.status}") + req = pb2.create_subsystem_req(subsystem_nqn=args.subnqn, + serial_number=args.serial) + ret = self.stub.create_subsystem(req) + self.logger.info(f"Created subsystem {args.subnqn}: {ret.status}") except Exception as error: self.logger.error(f"Failed to create subsystem: \n {error}") @@ -186,9 +185,9 @@ def delete_subsystem(self, args): """Deletes a subsystem.""" try: - delete_req = pb2.delete_subsystem_req(subsystem_nqn=args.subnqn) - ret = self.stub.delete_subsystem(delete_req) - self.logger.info(f"Deleted subsystem: {delete_req.subsystem_nqn}") + req = pb2.delete_subsystem_req(subsystem_nqn=args.subnqn) + ret = self.stub.delete_subsystem(req) + self.logger.info(f"Deleted subsystem {args.subnqn}: {ret.status}") except Exception as error: self.logger.error(f"Failed to delete subsystem: \n {error}") @@ -200,10 +199,11 @@ def add_namespace(self, args): """Adds a namespace to a subsystem.""" try: - create_req = pb2.add_namespace_req(subsystem_nqn=args.subnqn, - bdev_name=args.bdev) - ret = self.stub.add_namespace(create_req) - self.logger.info(f"Added namespace {ret.nsid} to {args.subnqn}") + req = pb2.add_namespace_req(subsystem_nqn=args.subnqn, + bdev_name=args.bdev) + ret = self.stub.add_namespace(req) + self.logger.info( + f"Added namespace {ret.nsid} to {args.subnqn}: {ret.status}") except Exception as error: self.logger.error(f"Failed to add namespace: \n {error}") @@ -215,10 +215,12 @@ def remove_namespace(self, args): """Removes a namespace from a subsystem.""" try: - delete_req = pb2.remove_namespace_req(subsystem_nqn=args.subnqn, - nsid=args.nsid) - ret = self.stub.remove_namespace(delete_req) - self.logger.info(f"Deleted namespace {delete_req.nsid}: {ret}") + req = pb2.remove_namespace_req(subsystem_nqn=args.subnqn, + nsid=args.nsid) + ret = self.stub.remove_namespace(req) + self.logger.info( + f"Removed namespace {args.nsid} from {args.subnqn}:" + + f" {ret.status}") except Exception as error: self.logger.error(f"Failed to remove namespace: \n {error}") @@ -230,16 +232,16 @@ def add_host(self, args): """Adds a host to a subsystem.""" try: - create_req = pb2.add_host_req(subsystem_nqn=args.subnqn, - host_nqn=args.host) - ret = self.stub.add_host(create_req) + req = pb2.add_host_req(subsystem_nqn=args.subnqn, + host_nqn=args.host) + ret = self.stub.add_host(req) if args.host == "*": self.logger.info( f"Allowed open host access to {args.subnqn}: {ret.status}") else: self.logger.info( - f"Added host {args.host} access to {args.subnqn}: {ret.status}" - ) + f"Added host {args.host} access to {args.subnqn}:" + + f" {ret.status}") except Exception as error: self.logger.error(f"Failed to add host: \n {error}") @@ -251,16 +253,16 @@ def remove_host(self, args): """Removes a host from a subsystem.""" try: - delete_req = pb2.remove_host_req(subsystem_nqn=args.subnqn, - host_nqn=args.host) - ret = self.stub.remove_host(delete_req) + req = pb2.remove_host_req(subsystem_nqn=args.subnqn, + host_nqn=args.host) + ret = self.stub.remove_host(req) if args.host == "*": self.logger.info( f"Disabled open host access to {args.subnqn}: {ret.status}") else: self.logger.info( - f"Removed host {args.host} access from {args.subnqn}: {ret.status}" - ) + f"Removed host {args.host} access from {args.subnqn}:" + + f" {ret.status}") except Exception as error: self.logger.error(f"Failed to remove host: \n {error}") @@ -273,10 +275,10 @@ def remove_host(self, args): argument("-s", "--trsvcid", help="Port number", required=True), ]) def create_listener(self, args): - """Creates a listener for a subsystem at a given TCP/IP address.""" + """Creates a listener for a subsystem at a given IP/Port.""" try: - create_req = pb2.create_listener_req( + req = pb2.create_listener_req( nqn=args.subnqn, gateway_name=args.gateway_name, trtype=args.trtype, @@ -284,7 +286,7 @@ def create_listener(self, args): traddr=args.traddr, trsvcid=args.trsvcid, ) - ret = self.stub.create_listener(create_req) + ret = self.stub.create_listener(req) self.logger.info(f"Created {args.subnqn} listener: {ret.status}") except Exception as error: self.logger.error(f"Failed to create listener: \n {error}") @@ -298,10 +300,10 @@ def create_listener(self, args): argument("-s", "--trsvcid", help="Port number", required=True), ]) def delete_listener(self, args): - """Deletes a listener from a subsystem at a given TCP/IP address.""" + """Deletes a listener from a subsystem at a given IP/Port.""" try: - delete_req = pb2.delete_listener_req( + req = pb2.delete_listener_req( nqn=args.subnqn, gateway_name=args.gateway_name, trtype=args.trtype, @@ -309,18 +311,19 @@ def delete_listener(self, args): traddr=args.traddr, trsvcid=args.trsvcid, ) - ret = self.stub.delete_listener(delete_req) - self.logger.info(f"Deleted {args.traddr} from {args.subnqn}: {ret.status}") + ret = self.stub.delete_listener(req) + self.logger.info( + f"Deleted {args.traddr} from {args.subnqn}: {ret.status}") except Exception as error: self.logger.error(f"Failed to delete listener: \n {error}") @cli.cmd() def get_subsystems(self, args): - """Get subsystems.""" + """Gets subsystems.""" try: - get_req = pb2.get_subsystems_req() - ret = self.stub.get_subsystems(get_req) + req = pb2.get_subsystems_req() + ret = self.stub.get_subsystems(req) subsystems = json.loads(ret.subsystems) formatted_subsystems = json.dumps(subsystems, indent=4) self.logger.info(f"Get subsystems:\n{formatted_subsystems}") @@ -331,8 +334,8 @@ def get_subsystems(self, args): def main(args=None): client = GatewayClient() parsed_args = client.cli.parser.parse_args(args) - nvme_config = NVMeGWConfig(parsed_args.config) - client.connect(nvme_config) + config = GatewayConfig(parsed_args.config) + client.connect(config) if parsed_args.subcommand is None: client.cli.parser.print_help() else: diff --git a/control/config.py b/control/config.py index ac237565..4c301e1c 100644 --- a/control/config.py +++ b/control/config.py @@ -10,31 +10,36 @@ import configparser -class NVMeGWConfig: - def __init__(self, gw_config_filename): - self.nvme_gw_config = configparser.ConfigParser() - self.nvme_gw_config.read(gw_config_filename) +class GatewayConfig: + """Loads and returns config file settings. + + Instance attributes: + config: Config parser object + """ + def __init__(self, conffile): + self.config = configparser.ConfigParser() + self.config.read(conffile) def get(self, section, param): - return self.nvme_gw_config.get(section, param) + return self.config.get(section, param) def getboolean(self, section, param): - return self.nvme_gw_config.getboolean(section, param) + return self.config.getboolean(section, param) def getint(self, section, param): - return self.nvme_gw_config.getint(section, param) + return self.config.getint(section, param) def getfloat(self, section, param): - return self.nvme_gw_config.getfloat(section, param) + return self.config.getfloat(section, param) def get_with_default(self, section, param, value): - return self.nvme_gw_config.get(section, param, fallback=value) + return self.config.get(section, param, fallback=value) def getboolean_with_default(self, section, param, value): - return self.nvme_gw_config.getboolean(section, param, fallback=value) + return self.config.getboolean(section, param, fallback=value) def getint_with_default(self, section, param, value): - return self.nvme_gw_config.getint(section, param, fallback=value) + return self.config.getint(section, param, fallback=value) def getfloat_with_default(self, section, param, value): - return self.nvme_gw_config.getfloat(section, param, fallback=value) + return self.config.getfloat(section, param, fallback=value) diff --git a/control/grpc.py b/control/grpc.py index 8c1e8742..e0ca7d16 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -16,13 +16,13 @@ from .proto import gateway_pb2_grpc as pb2_grpc -class GatewayService(pb2_grpc.NVMEGatewayServicer): +class GatewayService(pb2_grpc.GatewayServicer): """Implements gateway service interface. Handles configuration of the SPDK NVMEoF target according to client requests. Instance attributes: - nvme_config: Basic gateway parameters + config: Basic gateway parameters logger: Logger instance to track server events gateway_name: Gateway identifier gateway_state: Methods for target state persistence @@ -30,25 +30,25 @@ class GatewayService(pb2_grpc.NVMEGatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, nvme_config, gateway_state, spdk_rpc, spdk_rpc_client): + def __init__(self, config, gateway_state, spdk_rpc, spdk_rpc_client): self.logger = logging.getLogger(__name__) - self.nvme_config = nvme_config + self.config = config self.gateway_state = gateway_state self.spdk_rpc = spdk_rpc self.spdk_rpc_client = spdk_rpc_client - self.gateway_name = self.nvme_config.get("config", "gateway_name") + self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: self.gateway_name = socket.gethostname() def create_bdev(self, request, context=None): """Creates a bdev from an RBD image.""" - self.logger.info({ - f"Received request to create bdev {request.bdev_name} from", - f" {request.ceph_pool_name}/{request.rbd_name}", - f" with block size {request.block_size}", - }) + + self.logger.info( + f"Received request to create bdev {request.bdev_name} from" + + f" {request.ceph_pool_name}/{request.rbd_name}" + + f" with block size {request.block_size}") try: bdev_name = self.spdk_rpc.bdev.bdev_rbd_create( self.spdk_rpc_client, @@ -57,10 +57,9 @@ def create_bdev(self, request, context=None): rbd_name=request.rbd_name, block_size=request.block_size, ) - self.logger.info(f"Created bdev {bdev_name}") - + self.logger.info(f"create_bdev: {bdev_name}") except Exception as ex: - self.logger.error(f"bdev create failed with: \n {ex}") + self.logger.error(f"create_bdev failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -81,18 +80,17 @@ def create_bdev(self, request, context=None): def delete_bdev(self, request, context=None): """Deletes a bdev.""" - self.logger.info({ - f"Received request to delete bdev: {request.bdev_name}", - }) + + self.logger.info( + f"Received request to delete bdev {request.bdev_name}") try: - return_string = self.spdk_rpc.bdev.bdev_rbd_delete( + ret = self.spdk_rpc.bdev.bdev_rbd_delete( self.spdk_rpc_client, request.bdev_name, ) - self.logger.info(f"Deleted bdev {request.bdev_name}") - + self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") except Exception as ex: - self.logger.error(f"bdev delete failed with: \n {ex}") + self.logger.error(f"delete_bdev failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -101,28 +99,26 @@ def delete_bdev(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.delete_bdev(request.bdev_name) + self.gateway_state.remove_bdev(request.bdev_name) except Exception as ex: self.logger.error( f"Error persisting delete_bdev {request.bdev_name}: {ex}") raise - return pb2.req_status(status=return_string) + return pb2.req_status(status=ret) def create_subsystem(self, request, context=None): """Creates a subsystem.""" - self.logger.info({ - f"Received request to create: {request.subsystem_nqn}", - }) + self.logger.info( + f"Received request to create subsystem {request.subsystem_nqn}") try: - return_string = self.spdk_rpc.nvmf.nvmf_create_subsystem( + ret = self.spdk_rpc.nvmf.nvmf_create_subsystem( self.spdk_rpc_client, nqn=request.subsystem_nqn, serial_number=request.serial_number, ) - self.logger.info(f"returned with status: {return_string}") - return_status = return_string != "none" + self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: self.logger.error(f"create_subsystem failed with: \n {ex}") if context: @@ -142,20 +138,19 @@ def create_subsystem(self, request, context=None): f" {request.subsystem_nqn}: {ex}") raise - return pb2.req_status(status=return_status) + return pb2.req_status(status=ret) def delete_subsystem(self, request, context=None): """Deletes a subsystem.""" - self.logger.info({ - f"Received request to delete: {request.subsystem_nqn}", - }) + self.logger.info( + f"Received request to delete {request.subsystem_nqn}") try: - return_string = self.spdk_rpc.nvmf.nvmf_delete_subsystem( + ret = self.spdk_rpc.nvmf.nvmf_delete_subsystem( self.spdk_rpc_client, nqn=request.subsystem_nqn, ) - self.logger.info(f"returned with status: {return_string}") + self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: self.logger.error(f"delete_subsystem failed with: \n {ex}") if context: @@ -166,29 +161,28 @@ def delete_subsystem(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.delete_subsystem(request.subsystem_nqn) + self.gateway_state.remove_subsystem(request.subsystem_nqn) except Exception as ex: self.logger.error(f"Error persisting delete_subsystem" + f" {request.subsystem_nqn}: {ex}") raise - return pb2.req_status(status=return_string) + return pb2.req_status(status=ret) def add_namespace(self, request, context=None): """Adds a namespace to a subsystem.""" - self.logger.info({ - f"Received request to add: {request.bdev_name} to {request.subsystem_nqn}", - }) + self.logger.info(f"Received request to add {request.bdev_name} to" + + f" {request.subsystem_nqn}") try: nsid = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns( self.spdk_rpc_client, nqn=request.subsystem_nqn, bdev_name=request.bdev_name ) - self.logger.info(f"returned with nsid: {nsid}") + self.logger.info(f"add_namespace: {nsid}") except Exception as ex: - self.logger.error(f"Add NS returned with error: \n {ex}") + self.logger.error(f"add_namespace failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -210,18 +204,18 @@ def add_namespace(self, request, context=None): def remove_namespace(self, request, context=None): """Removes a namespace from a subsystem.""" - self.logger.info({ - f"Received request to remove: {request.nsid} from {request.subsystem_nqn}", - }) + self.logger.info(f"Received request to remove {request.nsid} from" + + f" {request.subsystem_nqn}") try: - status = self.spdk_rpc.nvmf.nvmf_subsystem_remove_ns( + ret = self.spdk_rpc.nvmf.nvmf_subsystem_remove_ns( self.spdk_rpc_client, nqn=request.subsystem_nqn, - nsid=request.nsid) - self.logger.info(f"Returned with status: {status}") + nsid=request.nsid, + ) + self.logger.info(f"remove_namespace {request.nsid}: {ret}") except Exception as ex: - self.logger.error(f"Remove namespace returned with error: \n {ex}") + self.logger.error(f"remove_namespace failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -230,39 +224,40 @@ def remove_namespace(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.delete_namespace(request.subsystem_nqn, + self.gateway_state.remove_namespace(request.subsystem_nqn, str(request.nsid)) except Exception as ex: self.logger.error( f"Error persisting remove_namespace {request.nsid}: {ex}") raise - return pb2.req_status(status=status) + return pb2.req_status(status=ret) def add_host(self, request, context=None): """Adds a host to a subsystem.""" try: if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info({ - f"Received request: allow any host to {request.subsystem_nqn}", - }) - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( + self.logger.info(f"Received request to allow any host to" + + f" {request.subsystem_nqn}") + ret = self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, disable=False, ) + self.logger.info(f"add_host *: {ret}") else: # Allow single host access to subsystem - self.logger.info({ - f"Received request: add host {request.host_nqn} to {request.subsystem_nqn}", - }) - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_host( + self.logger.info( + f"Received request to add host {request.host_nqn} to" + + f" {request.subsystem_nqn}") + ret = self.spdk_rpc.nvmf.nvmf_subsystem_add_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, host=request.host_nqn, ) + self.logger.info(f"add_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Add host access returned with error: \n {ex}") + self.logger.error(f"add_host failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -281,35 +276,34 @@ def add_host(self, request, context=None): f"Error persisting add_host {request.host_nqn}: {ex}") raise - return pb2.req_status(status=return_string) + return pb2.req_status(status=ret) def remove_host(self, request, context=None): """Removes a host from a subsystem.""" try: if request.host_nqn == "*": # Disable allow any host access - self.logger.info({ - f"Received request: disable any host access to ", - f"{request.subsystem_nqn}", - }) - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( + self.logger.info( + f"Received request to disable any host access to" + + f" {request.subsystem_nqn}") + ret = self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, disable=True, ) + self.logger.info(f"remove_host *: {ret}") else: # Remove single host access to subsystem - self.logger.info({ - f"Received request: remove host {request.host_nqn} from ", - f"{request.subsystem_nqn}", - }) - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_remove_host( + self.logger.info( + f"Received request to remove host_{request.host_nqn} from" + + f" {request.subsystem_nqn}") + ret = self.spdk_rpc.nvmf.nvmf_subsystem_remove_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, host=request.host_nqn, ) + self.logger.info(f"remove_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error( - f"Remove host access returned with error: \n {ex}") + self.logger.error(f"remove_host failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -318,20 +312,21 @@ def remove_host(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.delete_host(request.subsystem_nqn, + self.gateway_state.remove_host(request.subsystem_nqn, request.host_nqn) except Exception as ex: self.logger.error(f"Error persisting remove_host: {ex}") raise - return pb2.req_status(status=return_string) + return pb2.req_status(status=ret) def create_listener(self, request, context=None): - """Creates a listener for a subsystem at a given TCP/IP address.""" - self.logger.info({ - f"Adding {request.gateway_name} {request.trtype} listener at {request.traddr}:{request.trsvcid} for {request.nqn}" - }) + """Creates a listener for a subsystem at a given IP/Port.""" + self.logger.info( + f"Received request to create {request.gateway_name}" + + f" {request.trtype} listener for {request.nqn} at" + + f" {request.traddr}:{request.trsvcid}.") try: if (request.gateway_name and not request.traddr) or \ (not request.gateway_name and request.traddr): @@ -341,13 +336,13 @@ def create_listener(self, request, context=None): if not request.gateway_name or \ request.gateway_name == self.gateway_name: if not request.traddr: - traddr = self.nvme_config.get("config", "gateway_addr") + traddr = self.config.get("gateway", "addr") if not traddr: raise Exception("config.gateway_addr option is not set") else: traddr = request.traddr - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( + ret = self.spdk_rpc.nvmf.nvmf_subsystem_add_listener( self.spdk_rpc_client, nqn=request.nqn, trtype=request.trtype, @@ -355,9 +350,9 @@ def create_listener(self, request, context=None): trsvcid=request.trsvcid, adrfam=request.adrfam, ) - self.logger.info(f"Status of add listener: {return_string}") + self.logger.info(f"create_listener: {ret}") except Exception as ex: - self.logger.error(f"Add Listener failed: \n {ex}") + self.logger.error(f"create_listener failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -379,13 +374,15 @@ def create_listener(self, request, context=None): f"Error persisting add_listener {request.trsvcid}: {ex}") raise - return pb2.req_status(status=return_string) + return pb2.req_status(status=ret) def delete_listener(self, request, context=None): - """Deletes a listener from a subsystem at a given TCP/IP address.""" - self.logger.info( - {f"Removing {request.gateway_name} {request.trtype} listener at {request.traddr}:{request.trsvcid} for {request.nqn}"}) + """Deletes a listener from a subsystem at a given IP/Port.""" + self.logger.info( + f"Received request to delete {request.gateway_name}" + + f" {request.trtype} listener for {request.nqn} at" + + f" {request.traddr}:{request.trsvcid}.") try: if (request.gateway_name and not request.traddr) or \ (not request.gateway_name and request.traddr): @@ -395,13 +392,13 @@ def delete_listener(self, request, context=None): if not request.gateway_name or \ request.gateway_name == self.gateway_name: if not request.traddr: - traddr = self.nvme_config.get("config", "gateway_addr") + traddr = self.config.get("gateway", "addr") if not traddr: raise Exception("config.gateway_addr option is not set") else: traddr = request.traddr - return_string = self.spdk_rpc.nvmf.nvmf_subsystem_remove_listener( + ret = self.spdk_rpc.nvmf.nvmf_subsystem_remove_listener( self.spdk_rpc_client, nqn=request.nqn, trtype=request.trtype, @@ -409,9 +406,9 @@ def delete_listener(self, request, context=None): trsvcid=request.trsvcid, adrfam=request.adrfam, ) - self.logger.info(f"Status of remove listener: {return_string}") + self.logger.info(f"delete_listener: {ret}") except Exception as ex: - self.logger.error(f"Remove listener returned with error: \n {ex}") + self.logger.error(f"delete_listener failed with: \n {ex}") if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") @@ -420,27 +417,25 @@ def delete_listener(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.delete_listener(request.nqn, + self.gateway_state.remove_listener(request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) except Exception as ex: self.logger.error( - f"Error persisting remove_listener {request.trsvcid}: {ex}") + f"Error persisting delete_listener {request.trsvcid}: {ex}") raise - return pb2.req_status(status=return_string) + return pb2.req_status(status=ret) def get_subsystems(self, request, context): """Gets subsystems.""" - self.logger.info({ - f"Received request to get subsystems", - }) + self.logger.info(f"Received request to get subsystems") try: ret = self.spdk_rpc.nvmf.nvmf_get_subsystems(self.spdk_rpc_client) - self.logger.info(f"returned with: {ret}") + self.logger.info(f"get_subsystems: {ret}") except Exception as ex: self.logger.error(f"get_subsystems failed with: \n {ex}") context.set_code(grpc.StatusCode.INTERNAL) diff --git a/control/proto/gateway.proto b/control/proto/gateway.proto index 91dc55c4..c63d8d59 100644 --- a/control/proto/gateway.proto +++ b/control/proto/gateway.proto @@ -10,7 +10,7 @@ syntax = "proto3"; -service NVMEGateway { +service Gateway { // Creates a bdev from an RBD image rpc create_bdev(create_bdev_req) returns (req_status) {} @@ -35,10 +35,10 @@ service NVMEGateway { // Removes a host from a subsystem rpc remove_host(remove_host_req) returns (req_status) {} - // Creates a listener for a subsystem at a given TCP/IP address + // Creates a listener for a subsystem at a given IP/Port rpc create_listener(create_listener_req) returns(req_status) {} - // Deletes a listener from a subsystem at a given TCP/IP address + // Deletes a listener from a subsystem at a given IP/Port rpc delete_listener(delete_listener_req) returns(req_status) {} // Gets subsystems diff --git a/control/server.py b/control/server.py index 5170e39c..a2c01d21 100644 --- a/control/server.py +++ b/control/server.py @@ -39,7 +39,7 @@ class GatewayServer: """Runs SPDK and receives client requests for the gateway service. Instance attributes: - nvme_config: Basic gateway parameters + config: Basic gateway parameters logger: Logger instance to track server events gateway_state: Methods for target state persistence gateway_rpc: GatewayService object on which to make RPC calls directly @@ -50,21 +50,21 @@ class GatewayServer: spdk_process: Subprocess running SPDK NVMEoF target application """ - def __init__(self, nvme_config): + def __init__(self, config): self.logger = logging.getLogger(__name__) - self.nvme_config = nvme_config + self.config = config self.spdk_process = None self.server = None - gateway_name = self.nvme_config.get("config", "gateway_name") + gateway_name = self.config.get("gateway", "name") if not gateway_name: gateway_name = socket.gethostname() self.logger.info(f"Starting gateway {gateway_name}") self._start_spdk() - self.gateway_state = OmapGatewayState(self.nvme_config) - self.gateway_rpc = GatewayService(self.nvme_config, self.gateway_state, + self.gateway_state = OmapGatewayState(self.config) + self.gateway_rpc = GatewayService(self.config, self.gateway_state, self.spdk_rpc, self.spdk_rpc_client) def __enter__(self): @@ -77,7 +77,7 @@ def __exit__(self, exc_type, exc_value, traceback): self.logger.info("Terminating SPDK...") self.spdk_process.terminate() try: - timeout = self.nvme_config.getfloat("spdk", "timeout") + timeout = self.config.getfloat("spdk", "timeout") self.spdk_process.communicate(timeout=timeout) except subprocess.TimeoutExpired: self.spdk_process.kill() @@ -94,17 +94,17 @@ def serve(self): # Register service implementation with server self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) - pb2_grpc.add_NVMEGatewayServicer_to_server(self.gateway_rpc, self.server) + pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) - enable_auth = self.nvme_config.getboolean("config", "enable_auth") - gateway_addr = self.nvme_config.get("config", "gateway_addr") - gateway_port = self.nvme_config.get("config", "gateway_port") + enable_auth = self.config.getboolean("gateway", "enable_auth") + gateway_addr = self.config.get("gateway", "addr") + gateway_port = self.config.get("gateway", "port") if enable_auth: # Read in key and certificates for authentication - server_key = self.nvme_config.get("mtls", "server_key") - server_cert = self.nvme_config.get("mtls", "server_cert") - client_cert = self.nvme_config.get("mtls", "client_cert") + server_key = self.config.get("mtls", "server_key") + server_cert = self.config.get("mtls", "server_cert") + client_cert = self.config.get("mtls", "client_cert") with open(server_key, "rb") as f: private_key = f.read() with open(server_cert, "rb") as f: @@ -144,17 +144,17 @@ def _start_spdk(self): """Starts SPDK process.""" # Get path and import SPDK's RPC modules - spdk_path = self.nvme_config.get("spdk", "spdk_path") + spdk_path = self.config.get("spdk", "spdk_path") sys.path.append(spdk_path) self.logger.info(f"SPDK PATH: {spdk_path}") import spdk.scripts.rpc as spdk_rpc self.spdk_rpc = spdk_rpc # Start target - tgt_path = self.nvme_config.get("spdk", "tgt_path") - spdk_rpc_socket = self.nvme_config.get("spdk", "rpc_socket") - spdk_tgt_cmd_extra_args = self.nvme_config.get("spdk", - "tgt_cmd_extra_args") + tgt_path = self.config.get("spdk", "tgt_path") + spdk_rpc_socket = self.config.get("spdk", "rpc_socket") + spdk_tgt_cmd_extra_args = self.config.get("spdk", + "tgt_cmd_extra_args") spdk_cmd = os.path.join(spdk_path, tgt_path) cmd = [spdk_cmd, "-u", "-r", spdk_rpc_socket] if spdk_tgt_cmd_extra_args: @@ -169,9 +169,9 @@ def _start_spdk(self): raise # Initialization - timeout = self.nvme_config.getfloat("spdk", "timeout") - log_level = self.nvme_config.get("spdk", "log_level") - conn_retries = self.nvme_config.getint("spdk", "conn_retries") + timeout = self.config.getfloat("spdk", "timeout") + log_level = self.config.get("spdk", "log_level") + conn_retries = self.config.getint("spdk", "conn_retries") self.logger.info({ f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket},", f" conn_retries: {conn_retries}, timeout: {timeout}", @@ -196,7 +196,7 @@ def _start_spdk(self): raise # Implicitly create transports - spdk_transports = self.nvme_config.get_with_default( + spdk_transports = self.config.get_with_default( "spdk", "transports", "tcp") for trtype in spdk_transports.split(): self._create_transport(trtype.lower()) @@ -205,7 +205,7 @@ def _create_transport(self, trtype): """Initializes a transport type.""" args = {'trtype': trtype} name = "transport_" + trtype + "_options" - options = self.nvme_config.get_with_default("spdk", name, "") + options = self.config.get_with_default("spdk", name, "") self.logger.debug(f"create_transport: {trtype} options: {options}") diff --git a/control/state.py b/control/state.py index 1142ebd0..49e8c8e1 100644 --- a/control/state.py +++ b/control/state.py @@ -23,7 +23,7 @@ def add_bdev(self, bdev_name: str, val: str): pass @abstractmethod - def delete_bdev(self, bdev_name: str): + def remove_bdev(self, bdev_name: str): pass @abstractmethod @@ -31,7 +31,7 @@ def add_namespace(self, subsystem_nqn: str, bdev_name: str, val: str): pass @abstractmethod - def delete_namespace(self, subsystem_nqn: str, bdev_name: str): + def remove_namespace(self, subsystem_nqn: str, bdev_name: str): pass @abstractmethod @@ -39,7 +39,7 @@ def add_subsystem(self, subsystem_nqn: str, val: str): pass @abstractmethod - def delete_subsystem(self, subsystem_nqn: str): + def remove_subsystem(self, subsystem_nqn: str): pass @abstractmethod @@ -47,7 +47,7 @@ def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): pass @abstractmethod - def delete_host(self, subsystem_nqn: str, host_nqn: str): + def remove_host(self, subsystem_nqn: str, host_nqn: str): pass @abstractmethod @@ -56,7 +56,7 @@ def add_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str, pass @abstractmethod - def delete_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str): + def remove_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str): pass @abstractmethod @@ -80,7 +80,7 @@ class OmapGatewayState(GatewayState): Instance attributes: version: Local gateway NVMeoF target state version - nvme_config: Basic gateway parameters + config: Basic gateway parameters logger: Logger instance to track OMAP access events spdk_rpc: Module methods for SPDK spdk_rpc_client: Client of SPDK RPC server @@ -95,16 +95,16 @@ class OmapGatewayState(GatewayState): HOST_PREFIX = "host_" LISTENER_PREFIX = "listener_" - def __init__(self, nvme_config): + def __init__(self, config): self.version = 1 - self.nvme_config = nvme_config + self.config = config self.logger = logging.getLogger(__name__) - gateway_group = self.nvme_config.get("config", "gateway_group") + gateway_group = self.config.get("gateway", "group") self.omap_name = f"nvme.{gateway_group}.config" if gateway_group else "nvme.config" - ceph_pool = self.nvme_config.get("ceph", "pool") - ceph_conf = self.nvme_config.get("ceph", "config_file") + ceph_pool = self.config.get("ceph", "pool") + ceph_conf = self.config.get("ceph", "config_file") conn = rados.Rados(conffile=ceph_conf) conn.connect() self.ioctx = conn.open_ioctx(ceph_pool) @@ -122,11 +122,11 @@ def __init__(self, nvme_config): except rados.ObjectExists: self.logger.info(f"{self.omap_name} omap object already exists.") except Exception as ex: - self.logger.error(f"Unable to write to omap: {ex}. Exiting!") + self.logger.error(f"Unable to create omap: {ex}. Exiting!") raise - def _write_key(self, key: str, val: str): - """Writes key and value to the OMAP.""" + def _add_key(self, key: str, val: str): + """Adds key and value to the OMAP.""" try: version_update = self.version + 1 @@ -141,37 +141,37 @@ def _write_key(self, key: str, val: str): self.version = version_update self.logger.debug(f"omap_key generated: {key}") except Exception as ex: - self.logger.error(f"Unable to write to omap: {ex}. Exiting!") + self.logger.error(f"Unable to add key to omap: {ex}. Exiting!") raise - def _delete_key(self, key: str): - """Deletes key from the OMAP.""" + def _remove_key(self, key: str): + """Removes key from the OMAP.""" try: version_update = self.version + 1 with rados.WriteOpCtx() as write_op: - # Compare operation failure will cause delete failure + # Compare operation failure will cause remove failure write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), - rados.LIBRADOS_CMPXATTR_OP_EQ) + rados.LIBRADOS_CMPXATTR_OP_EQ) self.ioctx.remove_omap_keys(write_op, (key,)) self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), (str(version_update),)) self.ioctx.operate_write_op(write_op, self.omap_name) self.version = version_update - self.logger.debug(f"omap_key deleted: {key}") + self.logger.debug(f"omap_key removed: {key}") except Exception as ex: - self.logger.error(f"Unable to delete from omap: {ex}. Exiting!") + self.logger.error(f"Unable to remove key from omap: {ex}. Exiting!") raise def add_bdev(self, bdev_name: str, val: str): """Adds a bdev to the OMAP.""" key = self.BDEV_PREFIX + bdev_name - self._write_key(key, val) + self._add_key(key, val) - def delete_bdev(self, bdev_name: str): - """Deletes a bdev from the OMAP.""" + def remove_bdev(self, bdev_name: str): + """Removes a bdev from the OMAP.""" key = self.BDEV_PREFIX + bdev_name - self._delete_key(key) + self._remove_key(key) def _restore_bdevs(self, omap_dict, callback): """Restores a bdev from the OMAP.""" @@ -184,12 +184,12 @@ def _restore_bdevs(self, omap_dict, callback): def add_namespace(self, subsystem_nqn: str, nsid: str, val: str): """Adds a namespace to the OMAP.""" key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + nsid - self._write_key(key, val) + self._add_key(key, val) - def delete_namespace(self, subsystem_nqn: str, nsid: str): - """Deletes a namespace from the OMAP.""" + def remove_namespace(self, subsystem_nqn: str, nsid: str): + """Removes a namespace from the OMAP.""" key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + nsid - self._delete_key(key) + self._remove_key(key) def _restore_namespaces(self, omap_dict, callback): """Restores a namespace from the OMAP.""" @@ -205,12 +205,12 @@ def _restore_namespaces(self, omap_dict, callback): def add_subsystem(self, subsystem_nqn: str, val: str): """Adds a subsystem to the OMAP.""" key = self.SUBSYSTEM_PREFIX + subsystem_nqn - self._write_key(key, val) + self._add_key(key, val) - def delete_subsystem(self, subsystem_nqn: str): - """Deletes a subsystem from the OMAP.""" + def remove_subsystem(self, subsystem_nqn: str): + """Removes a subsystem from the OMAP.""" key = self.SUBSYSTEM_PREFIX + subsystem_nqn - self._delete_key(key) + self._remove_key(key) # Delete all keys related to subsystem omap_dict = self._read_all() @@ -218,7 +218,7 @@ def delete_subsystem(self, subsystem_nqn: str): if (key.startswith(self.NAMESPACE_PREFIX + subsystem_nqn) or key.startswith(self.HOST_PREFIX + subsystem_nqn) or key.startswith(self.LISTENER_PREFIX + subsystem_nqn)): - self._delete_key(key) + self._remove_key(key) def _restore_subsystems(self, omap_dict, callback): """Restores subsystems from the OMAP.""" @@ -231,12 +231,12 @@ def _restore_subsystems(self, omap_dict, callback): def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): """Adds a host to the OMAP.""" key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) - self._write_key(key, val) + self._add_key(key, val) - def delete_host(self, subsystem_nqn: str, host_nqn: str): - """Deletes a host from the OMAP.""" + def remove_host(self, subsystem_nqn: str, host_nqn: str): + """Removes a host from the OMAP.""" key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) - self._delete_key(key) + self._remove_key(key) def _restore_hosts(self, omap_dict, callback): """Restore hosts from the OMAP.""" @@ -251,14 +251,14 @@ def add_listener(self, subsystem_nqn: str, gateway: str, trtype: str, """Adds a listener to the OMAP.""" key = "{}{}_{}_{}_{}_{}".format(self.LISTENER_PREFIX, gateway, subsystem_nqn, trtype, traddr, trsvcid) - self._write_key(key, val) + self._add_key(key, val) - def delete_listener(self, subsystem_nqn: str, gateway: str, trtype: str, + def remove_listener(self, subsystem_nqn: str, gateway: str, trtype: str, traddr: str, trsvcid: str): - """Deletes a listener from the OMAP.""" + """Removes a listener from the OMAP.""" key = "{}{}_{}_{}_{}_{}".format(self.LISTENER_PREFIX, gateway, subsystem_nqn, trtype, traddr, trsvcid) - self._delete_key(key) + self._remove_key(key) def _restore_listeners(self, omap_dict, callback): """Restores listeners from the OMAP.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index e4dfcb57..6a9588b9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -15,11 +15,13 @@ listener_list = [["-g", gateway_name, "-a", addr, "-s", "5001"], ["-s", "5002"]] config = "ceph-nvmeof.conf" + class TestGet: def test_get_subsystems(self, caplog): cli(["-c", config, "get_subsystems"]) assert "Failed to get" not in caplog.text + class TestCreate: def test_create_bdev(self, caplog): cli(["-c", config, "create_bdev", "-i", image, "-p", pool, "-b", bdev]) @@ -43,6 +45,7 @@ def test_create_listener(self, caplog, listener): cli(["-c", config, "create_listener", "-n", subsystem] + listener) assert "Failed to create" not in caplog.text + class TestDelete: @pytest.mark.parametrize("host", host_list) def test_remove_host(self, caplog, host):