Skip to content

Commit

Permalink
Merge branch 'omap-config'
Browse files Browse the repository at this point in the history
  • Loading branch information
sskaur committed Mar 4, 2022
2 parents 9c1ccb2 + ceebee4 commit 63abedb
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 228 deletions.
4 changes: 2 additions & 2 deletions nvme_gw.config
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
enable_auth = False
gateway_addr = [::]
gateway_port = 5500
gateway_group = nvme_gw_group1
gateway_group =
grpc_server_max_workers = 10

[ceph]
Expand All @@ -30,7 +30,7 @@ client_cert = ./client.crt
[spdk]

spdk_path = /path/to/spdk
spdk_tgt = spdk/build/bin/nvmf_tgt
tgt_path = spdk/build/bin/nvmf_tgt
rpc_socket = /var/tmp/spdk.sock
timeout = 60.0
log_level = ERROR
Expand Down
3 changes: 2 additions & 1 deletion nvme_gw_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def connect(self, nvme_config):
@cli.cmd([
argument("-i", "--image", help="RBD image name", required=True),
argument("-p", "--pool", help="Ceph pool name", required=True),
argument("-b", "--bdev", help="Bdev name"),
argument("-b", "--bdev", help="Bdev name", required=True),
argument("-u", "--user", help="User ID"),
argument("-s", "--block-size", help="Block size", default=4096),
])
Expand All @@ -147,6 +147,7 @@ def create_bdev(self, args):
ceph_pool_name=args.pool,
rbd_name=args.image,
block_size=int(args.block_size),
bdev_name=args.bdev,
)
ret = self.stub.bdev_rbd_create(create_req)
self.logger.info(f"Created bdev: {ret.bdev_name}")
Expand Down
175 changes: 68 additions & 107 deletions nvme_gw_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import rados
from typing import Dict, Optional
from abc import ABC, abstractmethod
import nvme_gw_pb2 as pb2


class PersistentConfig(ABC):
Expand Down Expand Up @@ -77,7 +78,7 @@ def delete_config(self):
pass

@abstractmethod
def restore(self):
def restore(self, callbacks):
pass


Expand Down Expand Up @@ -116,7 +117,7 @@ def __init__(self, nvme_config):
self.logger = nvme_config.logger

gateway_group = self.nvme_config.get("config", "gateway_group")
self.omap_name = gateway_group + "_config"
self.omap_name = f"nvme.{gateway_group}.config" if gateway_group else "nvme.config"

ceph_pool = self.nvme_config.get("ceph", "ceph_pool")
ceph_conf = self.nvme_config.get("ceph", "ceph_config_file")
Expand All @@ -135,11 +136,7 @@ def __init__(self, nvme_config):
self.logger.info(
f"First gateway: created object {self.omap_name}")
except rados.ObjectExists:
self.logger.info(f"{self.omap_name} already exists.")

omap_version = self._read_key(self.OMAP_VERSION_KEY)
self.logger.info(
f"omap_version: {omap_version}, local version: {self.version}.")
self.logger.info(f"{self.omap_name} omap object already exists.")

def _write_key(self, key: str, val: str):
"""Writes key and value to the persistent config."""
Expand All @@ -148,13 +145,14 @@ def _write_key(self, key: str, val: str):
version_update = int(self.version) + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause write failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1)
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.set_omap(write_op, (key,), (val,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.omap_name)
self.version = version_update
self.logger.info(f"omap_key generated: {key}")
self.logger.debug(f"omap_key generated: {key}")
except Exception as ex:
self.logger.error(f"Unable to write to omap: {ex}. Exiting!")
raise
Expand All @@ -165,13 +163,14 @@ def _delete_key(self, key: str):
version_update = int(self.version) + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause delete failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), 1)
self.ioctx.delete_omap_keys(write_op, (key,))
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.remove_omap_keys(write_op, (key,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.omap_name)
self.version = version_update
self.logger.info(f"omap_key deleted: {key}")
self.logger.debug(f"omap_key deleted: {key}")

def add_bdev(self, bdev_name: str, val: str):
"""Adds a bdev to the persistent config."""
Expand All @@ -183,21 +182,19 @@ def delete_bdev(self, bdev_name: str):
key = self.BDEV_PREFIX + bdev_name
self._delete_key(key)

def _restore_bdevs(self, omap_dict):
def _restore_bdevs(self, omap_dict, callback):
"""Restores a bdev from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.BDEV_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring bdev: {args}")
try:
return_string = self.spdk_rpc.bdev.bdev_rbd_create(
self.spdk_rpc_client, args["ceph_pool_name"],
args["rbd_name"], int(args["block_size"]))
self.logger.info(f"Created bdev: {return_string}")
except Exception as ex:
self.logger.error(f"bdev_rbd_create failed with: \n {ex}")
raise
req = pb2.bdev_create_req(
bdev_name=args["bdev_name"],
ceph_pool_name=args["ceph_pool_name"],
rbd_name=args["rbd_name"],
block_size=int(args["block_size"]),
)
ret = callback(req)

def add_namespace(self, subsystem_nqn: str, bdev_name: str, val: str):
"""Adds a namespace to the persistent config."""
Expand All @@ -209,24 +206,17 @@ def delete_namespace(self, subsystem_nqn: str, bdev_name: str):
key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + bdev_name
self._delete_key(key)

def _restore_namespaces(self, omap_dict):
def _restore_namespaces(self, omap_dict, callback):
"""Restores a namespace from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.NAMESPACE_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring namespace: {args}")
try:
return_string = self.spdk_rpc.nvmf.nvmf_subsystem_add_ns(
self.spdk_rpc_client,
args["subsystem_nqn"],
args["bdev_name"],
)
self.logger.info(f"Added NSID to subystem: {return_string}")
except Exception as ex:
self.logger.error(
f"nvmf_subsystem_add_ns failed with: \n {ex}")
raise
req = pb2.subsystem_add_ns_req(
subsystem_nqn=args["subsystem_nqn"],
bdev_name=args["bdev_name"],
)
ret = callback(req)

def add_subsystem(self, subsystem_nqn: str, val: str):
"""Adds a subsystem to the persistent config."""
Expand All @@ -238,22 +228,17 @@ def delete_subsystem(self, subsystem_nqn: str):
key = self.SUBSYSTEM_PREFIX + subsystem_nqn
self._delete_key(key)

def _restore_subsystems(self, omap_dict):
def _restore_subsystems(self, omap_dict, callback):
"""Restores subsystems from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.SUBSYSTEM_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring subsystem: {args}")
try:
return_string = self.spdk_rpc.nvmf.nvmf_create_subsystem(
self.spdk_rpc_client, args["subsystem_nqn"],
args["serial_number"])
self.logger.info(f"Returned with status: {return_string}")
except Exception as ex:
self.logger.error(
f"nvmf_create_subsystem failed with: \n {ex}")
raise
req = pb2.subsystem_create_req(
subsystem_nqn=args["subsystem_nqn"],
serial_number=args["serial_number"],
)
ret = callback(req)

def add_host(self, subsystem_nqn: str, host_nqn: str, val: str):
"""Adds a host to the persistent config."""
Expand All @@ -265,22 +250,17 @@ def delete_host(self, subsystem_nqn: str, host_nqn: str):
key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn)
self._delete_key(key)

def _restore_hosts(self, omap_dict):
def _restore_hosts(self, omap_dict, callback):
"""Restore hosts from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.HOST_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring host: {args}")
try:
self.spdk_rpc.nvmf.nvmf_subsystem_add_host(
self.spdk_rpc_client, args["nqn"], args["host_nqn"])
self.logger.info(
f"Added host {args['host_nqn']} to {args['nqn']}")
except Exception as ex:
self.logger.error(
f"nvmf_subsystem_add_host failed with: \n {ex}")
raise
req = pb2.subsystem_add_host_req(
subsystem_nqn=args["nqn"],
host_nqn=args["host_nqn"],
)
ret = callback(req)

def add_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str,
val: str):
Expand All @@ -295,22 +275,20 @@ def delete_listener(self, subsystem_nqn: str, traddr: str, trsvcid: str):
trsvcid)
self._delete_key(key)

def _restore_listeners(self, omap_dict):
def _restore_listeners(self, omap_dict, callback):
"""Restores listeners from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.LISTENER_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring listener: {args}")
try:
self.spdk_rpc.nvmf.nvmf_subsystem_add_listener(
self.spdk_rpc_client, args["nqn"], args["trtype"],
args["traddr"], args["trsvcid"], args["adrfam"])
self.logger.info(f"Added listener: {args['traddr']}")
except Exception as ex:
self.logger.error(
f"nvmf_subsystem_add_listener failed with: \n {ex}")
raise
req = pb2.subsystem_add_listener_req(
nqn=args["nqn"],
trtype=args["trtype"],
adrfam=args["adrfam"],
traddr=args["traddr"],
trsvcid=args["trsvcid"],
)
ret = callback(req)

def set_allow_all(self, subsystem_nqn: str, val: str):
"""Sets open access to a subsystem in the persistent config."""
Expand All @@ -322,22 +300,15 @@ def delete_allow_all(self, subsystem_nqn: str):
key = self.ALLOW_ALL_PREFIX + subsystem_nqn
self._delete_key(key)

def _restore_allow_all(self, omap_dict):
def _restore_allow_all(self, omap_dict, callback):
"""Restores allow_all_hosts from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.ALLOW_ALL_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring allow_all: {args}")
try:
self.spdk_rpc.nvmf.nvmf_subsystem_allow_any_host(
self.spdk_rpc_client, args["subsystem_nqn"], False)
self.logger.info(
f"Set allow_all for {args['subsystem_nqn']}")
except Exception as ex:
self.logger.error(
f"nvmf_subsystem_allow_any_host failed with: \n {ex}")
raise
req = pb2.subsystem_allow_any_host_req(
subsystem_nqn=args["subsystem_nqn"], disable=0)
ret = callback(req)

def set_transport(self, trtype: str, val: str):
"""Sets transport type in the persistent config."""
Expand All @@ -354,23 +325,14 @@ def get_transport(self, trtype: str):
key = self.TRANSPORT_PREFIX + trtype
return self._read_key(key)

def _restore_transports(self, omap_dict):
def _restore_transports(self, omap_dict, callback):
"""Restores a transport from the persistent config."""

for (key, val) in omap_dict.items():
if key.startswith(self.TRANSPORT_PREFIX):
args = self._clean_args(str(val, 'utf-8'))
self.logger.info(f"Restoring transport: {args}")
try:
return_string = self.spdk_rpc.nvmf.nvmf_create_transport(
self.spdk_rpc_client, args["trtype"])
self.logger.info(
f"Created transport type {args['trtype']}: {return_string}"
)
except Exception as ex:
self.logger.error(
f"nvmf_create_transport failed with: \n {ex}")
raise
req = pb2.create_transport_req(trtype=args["trtype"])
ret = callback(req)

def _read_key(self, key) -> Optional[str]:
"""Reads single key from persistent config and returns its value."""
Expand All @@ -383,7 +345,7 @@ def _read_key(self, key) -> Optional[str]:
value_list = list(dict(iter).values())
if len(value_list) == 1:
val = str(value_list[0], "utf-8")
self.logger.info(f"Read key: {key} -> {val}")
self.logger.debug(f"Read key: {key} -> {val}")
return val
return None

Expand All @@ -396,14 +358,14 @@ def _read_all(self) -> Dict[str, str]:
raise Exception("Omap read operation failed.")
self.ioctx.operate_read_op(read_op, self.omap_name)
omap_dict = dict(iter)
self.logger.info(f"Omap Persistent Config:\n{omap_dict}")
self.logger.debug(f"Omap Persistent Config:\n{omap_dict}")
return omap_dict

def delete_config(self):
"""Deletes OMAP object."""

try:
self.ioctx.delete_object(self.omap_name)
self.ioctx.remove_object(self.omap_name)
self.logger.info(f"Object {self.omap_name} deleted.")
except rados.ObjectNotFound:
self.logger.info(f"Object {self.omap_name} not found.")
Expand All @@ -419,25 +381,24 @@ def _clean_args(self, data: str) -> Dict[str, str]:
param_dict[key.strip(' \"')] = value.strip(' \"')
return param_dict

def restore(self, spdk_rpc, spdk_rpc_client):
def restore(self, callbacks):
"""Restores gateway config to persistent config specifications."""

self.spdk_rpc = spdk_rpc
self.spdk_rpc_client = spdk_rpc_client
omap_version = self._read_key(self.OMAP_VERSION_KEY)
if omap_version == "1":
self.logger.info("This omap was just created. Nothing to restore")
else:
omap_dict = self._read_all()
self._restore_bdevs(omap_dict)
self._restore_subsystems(omap_dict)
self._restore_namespaces(omap_dict)
self._restore_hosts(omap_dict)
self._restore_allow_all(omap_dict)
self._restore_transports(omap_dict)
self._restore_listeners(omap_dict)
self.logger.info("Restore complete.")

# Update local version number to persistent config version
self._restore_bdevs(omap_dict, callbacks[self.BDEV_PREFIX])
self._restore_subsystems(omap_dict,
callbacks[self.SUBSYSTEM_PREFIX])
self._restore_namespaces(omap_dict,
callbacks[self.NAMESPACE_PREFIX])
self._restore_hosts(omap_dict, callbacks[self.HOST_PREFIX])
self._restore_allow_all(omap_dict, callbacks[self.ALLOW_ALL_PREFIX])
self._restore_transports(omap_dict,
callbacks[self.TRANSPORT_PREFIX])
self._restore_listeners(omap_dict, callbacks[self.LISTENER_PREFIX])
self.version = omap_version
self.logger.info("Restore complete.")
return
Loading

0 comments on commit 63abedb

Please sign in to comment.