diff --git a/cffi/cdefs.h b/cffi/cdefs.h index 78e500a4..b9d3b773 100644 --- a/cffi/cdefs.h +++ b/cffi/cdefs.h @@ -1042,5 +1042,56 @@ LY_ERR lyd_new_implicit_all(struct lyd_node **, const struct ly_ctx *, uint32_t, LY_ERR lyd_new_meta(const struct ly_ctx *, struct lyd_node *, const struct lys_module *, const char *, const char *, ly_bool, struct lyd_meta **); +struct ly_opaq_name { + const char *name; + const char *prefix; + + union { + const char *module_ns; + const char *module_name; + }; +}; + +struct lyd_node_opaq { + union { + struct lyd_node node; + + struct { + uint32_t hash; + uint32_t flags; + const struct lysc_node *schema; + struct lyd_node_inner *parent; + struct lyd_node *next; + struct lyd_node *prev; + struct lyd_meta *meta; + void *priv; + }; + }; + + struct lyd_node *child; + + struct ly_opaq_name name; + const char *value; + uint32_t hints; + LY_VALUE_FORMAT format; + void *val_prefix_data; + + struct lyd_attr *attr; + const struct ly_ctx *ctx; +}; + +struct lyd_attr { + struct lyd_node_opaq *parent; + struct lyd_attr *next; + struct ly_opaq_name name; + const char *value; + uint32_t hints; + LY_VALUE_FORMAT format; + void *val_prefix_data; +}; + +LY_ERR lyd_new_attr(struct lyd_node *, const char *, const char *, const char *, struct lyd_attr **); +void lyd_free_attr_single(const struct ly_ctx *ctx, struct lyd_attr *attr); + /* from libc, needed to free allocated strings */ void free(void *); diff --git a/libyang/__init__.py b/libyang/__init__.py index f9dfeccd..5e3854ad 100644 --- a/libyang/__init__.py +++ b/libyang/__init__.py @@ -13,6 +13,7 @@ DLeafList, DList, DNode, + DNodeAttrs, DNotif, DRpc, ) diff --git a/libyang/data.py b/libyang/data.py index abc66cd6..f0caf240 100644 --- a/libyang/data.py +++ b/libyang/data.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: MIT import logging -from typing import IO, Any, Dict, Iterator, Optional, Union +from typing import IO, Any, Dict, Iterator, Optional, Tuple, Union from _libyang import ffi, lib from .keyed_list import KeyedList @@ -190,13 +190,69 @@ def diff_flags(with_defaults: bool = False) -> int: return flags +# ------------------------------------------------------------------------------------- +class DNodeAttrs: + __slots__ = ("context", "parent", "cdata", "__dict__") + + def __init__(self, context: "libyang.Context", parent: "libyang.DNode"): + self.context = context + self.parent = parent + self.cdata = [] # C type: "struct lyd_attr *" + + def get(self, name: str) -> Optional[str]: + for attr_name, attr_value in self: + if attr_name == name: + return attr_value + return None + + def set(self, name: str, value: str): + attrs = ffi.new("struct lyd_attr **") + ret = lib.lyd_new_attr( + self.parent.cdata, + ffi.NULL, + str2c(name), + str2c(value), + attrs, + ) + if ret != lib.LY_SUCCESS: + raise self.context.error("cannot create attr") + self.cdata.append(attrs[0]) + + def remove(self, name: str): + for attr in self.cdata: + if self._get_attr_name(attr) == name: + lib.lyd_free_attr_single(self.context.cdata, attr) + self.cdata.remove(attr) + break + + def __contains__(self, name: str) -> bool: + for attr_name, _ in self: + if attr_name == name: + return True + return False + + def __iter__(self) -> Iterator[Tuple[str, str]]: + for attr in self.cdata: + name = self._get_attr_name(attr) + yield (name, c2str(attr.value)) + + def __len__(self) -> int: + return len(self.cdata) + + @staticmethod + def _get_attr_name(cdata) -> str: + if cdata.name.prefix != ffi.NULL: + return f"{c2str(cdata.name.prefix)}:{c2str(cdata.name.name)}" + return c2str(cdata.name.name) + + # ------------------------------------------------------------------------------------- class DNode: """ Data tree node. """ - __slots__ = ("context", "cdata", "free_func", "__dict__") + __slots__ = ("context", "cdata", "attributes", "free_func", "__dict__") def __init__(self, context: "libyang.Context", cdata): """ @@ -207,6 +263,7 @@ def __init__(self, context: "libyang.Context", cdata): """ self.context = context self.cdata = cdata # C type: "struct lyd_node *" + self.attributes = None self.free_func = None # type: Callable[DNode] def meta(self): @@ -254,6 +311,11 @@ def new_meta(self, name: str, value: str, clear_dflt: bool = False): if ret != lib.LY_SUCCESS: raise self.context.error("cannot create meta") + def attrs(self) -> DNodeAttrs: + if not self.attributes: + self.attributes = DNodeAttrs(self.context, self) + return self.attributes + def add_defaults( self, no_config: bool = False, diff --git a/tests/test_data.py b/tests/test_data.py index 6a160359..10d9045f 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -16,6 +16,7 @@ DLeaf, DList, DNode, + DNodeAttrs, DNotif, DRpc, IOType, @@ -950,13 +951,86 @@ def test_dnode_insert_sibling(self): self.assertIsInstance(sibling, DLeaf) self.assertEqual(sibling.cdata, dnode2.cdata) - def test_dnode_new_opaq_find_one(self): + def _create_opaq_hostname(self): root = self.ctx.create_data_path(path="/yolo-system:conf") root.new_path( "hostname", None, opt_opaq=True, ) - dnode = root.find_one("/yolo-system:conf/hostname") + return root.find_one("/yolo-system:conf/hostname") + + def test_dnode_new_opaq_find_one(self): + dnode = self._create_opaq_hostname() self.assertIsInstance(dnode, DLeaf) + + def test_dnode_attrs(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + self.assertIsInstance(attrs, DNodeAttrs) + + def test_dnode_attrs_set(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + self.assertEqual(len(attrs.cdata), 0) + attrs.set("ietf-netconf:operation", "remove") + + self.assertEqual(len(attrs.cdata), 1) + + def test_dnode_attrs_get(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + attrs.set("ietf-netconf:operation", "remove") + + value = attrs.get("ietf-netconf:operation") + self.assertEqual(value, "remove") + + def test_dnode_attrs__len(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + self.assertEqual(len(attrs), 0) + attrs.set("ietf-netconf:operation", "remove") + + self.assertEqual(len(attrs), 1) + + def test_dnode_attrs__contains(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + attrs.set("ietf-netconf:operation", "remove") + + self.assertTrue("ietf-netconf:operation" in attrs) + + def test_dnode_attrs_remove(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + attrs.set("ietf-netconf:operation", "remove") + attrs.remove("ietf-netconf:operation") + + self.assertEqual(len(attrs), 0) + + def test_dnode_attrs_set_and_remove_multiple(self): + dnode = self._create_opaq_hostname() + attrs = dnode.attrs() + + attrs.set("ietf-netconf:operation", "remove") + attrs.set("something:else", "test") + attrs.set("no_prefix", "test") + self.assertEqual(len(attrs), 3) + + attrs.remove("something:else") + self.assertEqual(len(attrs), 2) + self.assertIn("no_prefix", attrs) + self.assertIn("ietf-netconf:operation", attrs) + + attrs.remove("no_prefix") + self.assertEqual(len(attrs), 1) + + attrs.remove("ietf-netconf:operation") + self.assertEqual(len(attrs), 0)