From 285f548adbf72f2317964c222a0d524b3ee26f7d Mon Sep 17 00:00:00 2001 From: Stefan Gula Date: Thu, 8 Feb 2024 13:38:35 +0100 Subject: [PATCH] data: add DOpaq class This patches introduces new class DOpaq, which allows creation of DNode without any schema --- cffi/cdefs.h | 1 + libyang/__init__.py | 1 + libyang/data.py | 92 ++++++++++++++++++++++++++++++++++++++++----- tests/test_data.py | 52 +++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 9 deletions(-) diff --git a/cffi/cdefs.h b/cffi/cdefs.h index 304ae55d..397e7e93 100644 --- a/cffi/cdefs.h +++ b/cffi/cdefs.h @@ -708,6 +708,7 @@ char* lyd_path(const struct lyd_node *, LYD_PATH_TYPE, char *, size_t); LY_ERR lyd_new_inner(struct lyd_node *, const struct lys_module *, const char *, ly_bool, struct lyd_node **); LY_ERR lyd_new_list(struct lyd_node *, const struct lys_module *, const char *, ly_bool, struct lyd_node **, ...); LY_ERR lyd_new_list2(struct lyd_node *, const struct lys_module *, const char *, const char *, ly_bool, struct lyd_node **); +LY_ERR lyd_new_opaq(struct lyd_node *, const struct ly_ctx *, const char *, const char *, const char *, const char *, struct lyd_node **); struct lyd_node_inner { union { diff --git a/libyang/__init__.py b/libyang/__init__.py index bc79e2f0..8bc89952 100644 --- a/libyang/__init__.py +++ b/libyang/__init__.py @@ -15,6 +15,7 @@ DNode, DNodeAttrs, DNotif, + DOpaq, DRpc, ) from .diff import ( diff --git a/libyang/data.py b/libyang/data.py index f0caf240..71ac95ec 100644 --- a/libyang/data.py +++ b/libyang/data.py @@ -22,6 +22,7 @@ LOG = logging.getLogger(__name__) +opaque_dnodes = [] # ------------------------------------------------------------------------------------- @@ -818,13 +819,22 @@ def print_dict( name_cache = {} def _node_name(node): - name = name_cache.get(node.schema) + if node.schema == ffi.NULL: + # opaq node + opaq_cdata = ffi.cast("struct lyd_node_opaq *", node) + if strip_prefixes: + name = c2str(opaq_cdata.name.name) + else: + mod_name = c2str(opaq_cdata.name.module_name) + name = "%s:%s" % (mod_name, c2str(opaq_cdata.name.name)) + else: + name = name_cache.get(node.schema) if name is None: if strip_prefixes: name = c2str(node.schema.name) else: - mod = node.schema.module - name = "%s:%s" % (c2str(mod.name), c2str(node.schema.name)) + mod_name = node.module().name() + name = "%s:%s" % (mod_name, c2str(node.schema.name)) name_cache[node.schema] = name return name @@ -861,7 +871,19 @@ def _to_dict(node, parent_dic): if not lib.lyd_node_should_print(node, flags): return name = _node_name(node) - if node.schema.nodetype == SNode.LIST: + if node.schema == ffi.NULL: + # opaq node + child = lib.lyd_child(node) + if child == ffi.NULL: + opaq_cdata = ffi.cast("struct lyd_node_opaq *", node) + parent_dic[name] = c2str(opaq_cdata.value) + else: + container = {} + while child: + _to_dict(child, container) + child = child.next + parent_dic[name] = container + elif node.schema.nodetype == SNode.LIST: list_element = {} child = lib.lyd_child(node) @@ -981,11 +1003,14 @@ def _decorator(nodeclass): @classmethod def new(cls, context: "libyang.Context", cdata) -> "DNode": cdata = ffi.cast("struct lyd_node *", cdata) - if not cdata.schema: - schemas = list(context.find_path(cls._get_path(cdata))) - if len(schemas) != 1: - raise LibyangError("Unable to determine schema") - nodecls = cls.NODETYPE_CLASS.get(schemas[0].nodetype(), None) + if cdata.schema == ffi.NULL: + if cdata in opaque_dnodes: + nodecls = DOpaq + else: + schemas = list(context.find_path(cls._get_path(cdata))) + if len(schemas) != 1: + raise LibyangError("Unable to determine schema") + nodecls = cls.NODETYPE_CLASS.get(schemas[0].nodetype(), None) else: nodecls = cls.NODETYPE_CLASS.get(cdata.schema.nodetype, None) if nodecls is None: @@ -1020,6 +1045,9 @@ def children(self, no_keys=False) -> Iterator[DNode]: while child: if child.schema != ffi.NULL: yield DNode.new(self.context, child) + else: + # opaq node + yield DOpaq(self.context, child) child = child.next def __iter__(self): @@ -1381,3 +1409,49 @@ def _to_dnode(_dic, _schema, _parent=ffi.NULL, in_rpc_output=False): raise return result + + +# ------------------------------------------------------------------------------------- +class DOpaq(DNode): + def __init__(self, context: "libyang.Context", cdata) -> None: + super().__init__(context, cdata) + if cdata not in opaque_dnodes: + opaque_dnodes.append(cdata) + self.cdata_opaq = ffi.cast("struct lyd_node_opaq *", cdata) + + @staticmethod + def create_opaq( + parent: Optional[DNode], + context: "libyang.Context", + name: str, + module_name: str, + value: Optional[str] = None, + prefix: Optional[str] = None, + ) -> "DOpaq": + n = ffi.new("struct lyd_node **") + ret = lib.lyd_new_opaq( + parent.cdata if parent is not None else ffi.NULL, + context.cdata, + str2c(name), + str2c(value) if value is not None else ffi.NULL, + str2c(prefix) if prefix is not None else ffi.NULL, + str2c(module_name), + n, + ) + if ret != lib.LY_SUCCESS: + raise context.error("Cannot create opaque data node") + return DOpaq(context, n[0]) + + def name(self) -> str: + return c2str(self.cdata_opaq.name.name) + + def module(self) -> Module: + module = self.context.get_module(c2str(self.cdata_opaq.name.module_name)) + if module is None: + raise self.context.error( + f"Unable to get module '{c2str(self.cdata_opaq.name.module_name)}'" + ) + return module + + def schema(self) -> SNode: + raise self.context.error("Opaque data node doesn't have any schema") diff --git a/tests/test_data.py b/tests/test_data.py index 10d9045f..3052fddc 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -18,9 +18,11 @@ DNode, DNodeAttrs, DNotif, + DOpaq, DRpc, IOType, LibyangError, + Module, ) from libyang.data import dict_to_dnode @@ -1034,3 +1036,53 @@ def test_dnode_attrs_set_and_remove_multiple(self): attrs.remove("ietf-netconf:operation") self.assertEqual(len(attrs), 0) + + def test_dnode_opaq(self): + module = self.ctx.get_module("yolo-nodetypes") + # basic node operations + dnode = DOpaq.create_opaq(None, self.ctx, "test1", "yolo-nodetypes", "val") + self.assertIsInstance(dnode, DOpaq) + self.assertEqual(dnode.name(), "test1") + with self.assertRaises(Exception) as cm: + dnode.schema() + self.assertEqual( + str(cm.exception), "Opaque data node doesn't have any schema" + ) + + # valid module check + module2 = dnode.module() + self.assertIsInstance(module2, Module) + self.assertEqual(module.cdata, module2.cdata) + + # invalid module check + dnode = DOpaq.create_opaq(None, self.ctx, "test1", "invalid-module", "val") + with self.assertRaises(Exception) as cm: + dnode.module() + self.assertEqual(str(cm.exception), "Unable to get module 'invalid-module'") + dnode.free() + + def test_dnode_opaq_within_print_dict(self): + dnode = DOpaq.create_opaq(None, self.ctx, "test1", "invalid-module", "val") + dic = dnode.print_dict() + self.assertEqual(dic, {"test1": "val"}) + dnode2 = DOpaq.create_opaq( + dnode, self.ctx, "test1-child", "invalid-module", "val2" + ) + self.assertIsInstance(dnode2, DOpaq) + dic = dnode.print_dict(strip_prefixes=False) + self.assertEqual( + dic, {"invalid-module:test1": {"invalid-module:test1-child": "val2"}} + ) + parent = dnode2.parent() + self.assertIsInstance(parent, DOpaq) + self.assertEqual(parent.cdata, dnode.cdata) + dnode.free() + + def test_dnode_opaq_within_container(self): + MAIN = {"yolo-nodetypes:conf": {"percentage": "20.2"}} + module = self.ctx.get_module("yolo-nodetypes") + dnode1 = dict_to_dnode(MAIN, module, None, validate=False) + dnode2 = DOpaq.create_opaq(dnode1, self.ctx, "ratios", "yolo-nodetypes", "val") + children = [c.cdata for c in dnode1.children()] + self.assertTrue(dnode2.cdata in children) + dnode1.free()