diff --git a/CHANGES b/CHANGES index 848417a7..ec70b5a8 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,21 @@ +3.4.1: +Bug Fixes: +* yaml-set (and the underlying Processor class) were unable to change nodes + having a null (None) value to anything else. This changes how null/None + values are handled by the Processor during node retrieval; they are no longer + discarded, so you will receive None as the data of any retrieved NodeCoords + for appropriate null/None leaf nodes. + +Enhancements: +* The node deletion capability of the yaml-set command is now part of the + library. See Processor::delete_nodes(...) and + Processor::delete_gathered_nodes(...) for details. +* The library now supports loading YAML from String rather than only from file. + Simply pass a new `literal=True` keyword parameter to + Parsers::get_yaml_data(...) or Parsers::get_yaml_multidoc_data(...) to + indicate that `source` is literal serialized (String) YAML data rather than a + file-spec. This mode is implied when reading from STDIN (source is "-"). + 3.4.0: Bug Fixes: * For the yaml-diff command-line tool, custom identity keys for specific diff --git a/tests/test_commands_yaml_set.py b/tests/test_commands_yaml_set.py index bb28be7f..1759a639 100644 --- a/tests/test_commands_yaml_set.py +++ b/tests/test_commands_yaml_set.py @@ -1218,3 +1218,27 @@ def test_assign_null(self, script_runner, tmp_path_factory): with open(yaml_file, 'r') as fhnd: filedat = fhnd.read() assert filedat == yamlout + + def test_change_null(self, script_runner, tmp_path_factory): + yamlin = """--- +ingress_key: Preceding value +concrete_key: +egress_key: Following value +""" + yamlout = """--- +ingress_key: Preceding value +concrete_key: Now not null +egress_key: Following value +""" + yaml_file = create_temp_yaml_file(tmp_path_factory, yamlin) + result = script_runner.run( + self.command, + "--change=concrete_key", + "--value=Now not null", + yaml_file + ) + assert result.success, result.stderr + + with open(yaml_file, 'r') as fhnd: + filedat = fhnd.read() + assert filedat == yamlout diff --git a/tests/test_common_parsers.py b/tests/test_common_parsers.py index d6b86891..a4776abe 100644 --- a/tests/test_common_parsers.py +++ b/tests/test_common_parsers.py @@ -9,6 +9,58 @@ class Test_common_parsers(): """Tests for the Parsers helper class.""" + ### + # get_yaml_data (literal=True) + ### + def test_get_yaml_data_literally(self, quiet_logger): + serialized_yaml = """--- +hash: + key: value + +list: + - ichi + - ni + - san +""" + yaml = Parsers.get_yaml_editor() + (data, loaded) = Parsers.get_yaml_data( + yaml, quiet_logger, serialized_yaml, + literal=True) + assert loaded == True + assert data["hash"]["key"] == "value" + assert data["list"][0] == "ichi" + assert data["list"][1] == "ni" + assert data["list"][2] == "san" + + ### + # get_yaml_multidoc_data (literal=True) + ### + def test_get_yaml_multidoc_data_literally(self, quiet_logger): + serialized_yaml = """--- +document: 1st +has: data +... +--- +document: 2nd +has: different data +""" + yaml = Parsers.get_yaml_editor() + doc_id = 0 + for (data, loaded) in Parsers.get_yaml_multidoc_data( + yaml, quiet_logger, serialized_yaml, + literal=True): + assert loaded == True + if doc_id == 0: + document = "1st" + has = "data" + else: + document= "2nd" + has = "different data" + doc_id = doc_id + 1 + + assert data["document"] == document + assert data["has"] == has + ### # stringify_dates ### diff --git a/tests/test_processor.py b/tests/test_processor.py index 6b311b5a..55983100 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -1,772 +1,832 @@ -import pytest -from datetime import date - -from ruamel.yaml import YAML - -from yamlpath.func import unwrap_node_coords -from yamlpath.exceptions import YAMLPathException -from yamlpath.enums import ( - PathSeperators, - PathSegmentTypes, - PathSearchMethods, - YAMLValueFormats, -) -from yamlpath.path import SearchTerms -from yamlpath import YAMLPath, Processor -from tests.conftest import quiet_logger - - -class Test_Processor(): - """Tests for the Processor class.""" - - def test_get_none_data_nodes(self, quiet_logger): - processor = Processor(quiet_logger, None) - yamlpath = YAMLPath("abc") - optional_matches = 0 - must_exist_matches = 0 - req_node_matches = 0 - traversal_matches = 0 - - for node in processor.get_nodes(yamlpath, mustexist=False): - optional_matches += 1 - for node in processor.get_nodes(yamlpath, mustexist=True): - must_exist_matches += 1 - for node in processor._get_required_nodes(None, yamlpath): - req_node_matches += 1 - for node in processor._get_nodes_by_traversal(None, yamlpath, 0): - traversal_matches += 1 - - assert optional_matches == 0 - assert must_exist_matches == 0 - assert req_node_matches == 0 - assert traversal_matches == 1 # A None node traverses into null - - @pytest.mark.parametrize("yamlpath,results,mustexist,default", [ - ("aliases[&aliasAnchorOne]", ["Anchored Scalar Value"], True, None), - ("aliases[&newAlias]", ["Not in the original data"], False, "Not in the original data"), - ("aliases[0]", ["Anchored Scalar Value"], True, None), - ("aliases.0", ["Anchored Scalar Value"], True, None), - ("(array_of_hashes.name)+(rollback_hashes.on_condition.failure.name)", [["one", "two", "three", "four"]], True, None), - ("/array_of_hashes/name", ["one", "two"], True, None), - ("aliases[1:2]", [["Hey, Number Two!"]], True, None), - ("aliases[1:1]", [["Hey, Number Two!"]], True, None), - ("squads[bravo:charlie]", [2.2, 3.3], True, None), - ("/&arrayOfHashes/1/step", [2], True, None), - ("&arrayOfHashes[step=1].name", ["one"], True, None), - ("squads[.!=""][.=1.1]", [1.1], True, None), - ("squads[.!=""][.>1.1][.<3.3]", [2.2], True, None), - ("aliases[.^Hey]", ["Hey, Number Two!"], True, None), - ("aliases[.$Value]", ["Anchored Scalar Value"], True, None), - ("aliases[.%Value]", ["Anchored Scalar Value"], True, None), - ("&arrayOfHashes[step>1].name", ["two"], True, None), - ("&arrayOfHashes[step<2].name", ["one"], True, None), - ("squads[.>charlie]", [4.4], True, None), - ("squads[.>=charlie]", [3.3, 4.4], True, None), - ("squads[.4F]", True), - ("/ints/[.<4F]", True), - ("/ints/[.>=4F]", True), - ("/ints/[.<=4F]", True), - ("/floats/[.=4.F]", True), - ("/floats/[.>4.F]", True), - ("/floats/[.<4.F]", True), - ("/floats/[.>=4.F]", True), - ("/floats/[.<=4.F]", True), - ("abc.**", True), - ]) - def test_get_impossible_nodes_error(self, quiet_logger, yamlpath, mustexist): - yamldata = """--- - ints: - - 1 - - 2 - - 3 - - 4 - - 5 - floats: - - 1.1 - - 2.2 - - 3.3 - """ - yaml = YAML() - processor = Processor(quiet_logger, yaml.load(yamldata)) - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes(yamlpath, mustexist=mustexist)) - assert -1 < str(ex.value).find("does not match any nodes") - - def test_illegal_traversal_recursion(self, quiet_logger): - yamldata = """--- - any: data - """ - yaml = YAML() - processor = Processor(quiet_logger, yaml.load(yamldata)) - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes("**.**")) - assert -1 < str(ex.value).find("Repeating traversals are not allowed") - - def test_set_value_in_empty_data(self, capsys, quiet_logger): - import sys - yamldata = "" - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - processor.set_value("abc", "void") - yaml.dump(data, sys.stdout) - assert -1 == capsys.readouterr().out.find("abc") - - def test_set_value_in_none_data(self, capsys, quiet_logger): - import sys - yaml = YAML() - data = None - processor = Processor(quiet_logger, data) - processor._update_node(None, None, None, YAMLValueFormats.DEFAULT) - yaml.dump(data, sys.stdout) - assert -1 == capsys.readouterr().out.find("abc") - - @pytest.mark.parametrize("yamlpath,value,tally,mustexist,vformat,pathsep", [ - ("aliases[&testAnchor]", "Updated Value", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), - (YAMLPath("top_scalar"), "New top-level value", 1, False, YAMLValueFormats.DEFAULT, PathSeperators.DOT), - ("/top_array/2", 42, 1, False, YAMLValueFormats.INT, PathSeperators.FSLASH), - ("/top_hash/positive_float", 0.009, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), - ("/top_hash/negative_float", -0.009, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), - ("/top_hash/positive_float", -2.71828, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), - ("/top_hash/negative_float", 5283.4, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), - ]) - def test_set_value(self, quiet_logger, yamlpath, value, tally, mustexist, vformat, pathsep): - yamldata = """--- - aliases: - - &testAnchor Initial Value - top_array: - # Comment 1 - - 1 - # Comment 2 - - 2 - # Comment N - top_scalar: Top-level plain scalar string - top_hash: - positive_float: 3.14159265358 - negative_float: -11.034 - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - processor.set_value(yamlpath, value, mustexist=mustexist, value_format=vformat, pathsep=pathsep) - matchtally = 0 - for node in processor.get_nodes(yamlpath, mustexist=mustexist): - assert unwrap_node_coords(node) == value - matchtally += 1 - assert matchtally == tally - - def test_cannot_set_nonexistent_required_node_error(self, quiet_logger): - yamldata = """--- - key: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - processor.set_value("abc", "void", mustexist=True) - assert -1 < str(ex.value).find("No nodes matched") - - def test_none_data_to_get_nodes_by_path_segment(self, capsys, quiet_logger): - import sys - yamldata = "" - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - nodes = list(processor._get_nodes_by_path_segment(data, YAMLPath("abc"), 0)) - yaml.dump(data, sys.stdout) - assert -1 == capsys.readouterr().out.find("abc") - - def test_bad_segment_index_for_get_nodes_by_path_segment(self, capsys, quiet_logger): - import sys - yamldata = """--- - key: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - nodes = list(processor._get_nodes_by_path_segment(data, YAMLPath("abc"), 10)) - yaml.dump(data, sys.stdout) - assert -1 == capsys.readouterr().out.find("abc") - - def test_get_nodes_by_unknown_path_segment_error(self, quiet_logger): - from collections import deque - from enum import Enum - from yamlpath.enums import PathSegmentTypes - names = [m.name for m in PathSegmentTypes] + ['DNF'] - PathSegmentTypes = Enum('PathSegmentTypes', names) - - yamldata = """--- - key: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - path = YAMLPath("abc") - stringified = str(path) # Force Path to parse - path._escaped = deque([ - (PathSegmentTypes.DNF, "abc"), - ]) - - with pytest.raises(NotImplementedError): - nodes = list(processor._get_nodes_by_path_segment(data, path, 0)) - - def test_non_int_slice_error(self, quiet_logger): - yamldata = """--- - - step: 1 - - step: 2 - - step: 3 - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - processor.set_value("[1:4F]", "") - assert -1 < str(ex.value).find("is not an integer array slice") - - def test_non_int_array_index_error(self, quiet_logger): - from collections import deque - yamldata = """--- - - 1 - """ - yaml = YAML() - data = yaml.load(yamldata) - path = YAMLPath("[0]") - processor = Processor(quiet_logger, data) - strp = str(path) - - path._escaped = deque([ - (PathSegmentTypes.INDEX, "0F"), - ]) - path._unescaped = deque([ - (PathSegmentTypes.INDEX, "0F"), - ]) - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor._get_nodes_by_index(data, path, 0)) - assert -1 < str(ex.value).find("is not an integer array index") - - def test_nonexistant_path_search_method_error(self, quiet_logger): - from enum import Enum - from yamlpath.enums import PathSearchMethods - names = [m.name for m in PathSearchMethods] + ['DNF'] - PathSearchMethods = Enum('PathSearchMethods', names) - - yamldata = """--- - top_scalar: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(NotImplementedError): - nodes = list(processor._get_nodes_by_search( - data, - SearchTerms(True, PathSearchMethods.DNF, ".", "top_scalar") - )) - - def test_adjoined_collectors_error(self, quiet_logger): - yamldata = """--- - key: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes("(&arrayOfHashes.step)(disabled_steps)")) - assert -1 < str(ex.value).find("has no meaning") - - def test_no_attrs_to_arrays_error(self, quiet_logger): - yamldata = """--- - array: - - one - - two - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes("array.attr")) - assert -1 < str(ex.value).find("Cannot add") - - def test_no_index_to_hashes_error(self, quiet_logger): - # Using [#] syntax is a disambiguated INDEX ELEMENT NUMBER. In - # DICTIONARY context, this would create an ambiguous request to access - # either the #th value or a value whose key is the literal #. As such, - # an error is deliberately generated when [#] syntax is used against - # dictionaries. When you actually want a DICTIONARY KEY that happens - # to be an integer, omit the square braces, []. - yamldata = """--- - hash: - key: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes("hash[6]")) - assert -1 < str(ex.value).find("Cannot add") - - def test_get_nodes_array_impossible_type_error(self, quiet_logger): - yamldata = """--- - array: - - 1 - - 2 - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes(r"/array/(.=~/^.{3,4}$/)", default_value="New value")) - assert -1 < str(ex.value).find("Cannot add") - - def test_no_attrs_to_scalars_errors(self, quiet_logger): - yamldata = """--- - scalar: value - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes("scalar[6]")) - assert -1 < str(ex.value).find("Cannot add") - - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes("scalar.key")) - assert -1 < str(ex.value).find("Cannot add") - - @pytest.mark.parametrize("yamlpath,value,tally,mustexist,vformat,pathsep", [ - ("/anchorKeys[&keyOne]", "Set self-destruct", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), - ("/hash[&keyTwo]", "Confirm", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), - ("/anchorKeys[&recursiveAnchorKey]", "Recurse more", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), - ("/hash[&recursiveAnchorKey]", "Recurse even more", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), - ]) - def test_key_anchor_changes(self, quiet_logger, yamlpath, value, tally, mustexist, vformat, pathsep): - yamldata = """--- - anchorKeys: - &keyOne aliasOne: 11A1 - &keyTwo aliasTwo: 22B2 - &recursiveAnchorKey subjectKey: *recursiveAnchorKey - - hash: - *keyOne : - subval: 1.1 - *keyTwo : - subval: 2.2 - *recursiveAnchorKey : - subval: 3.3 - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - yamlpath = YAMLPath(yamlpath) - processor.set_value(yamlpath, value, mustexist=mustexist, value_format=vformat, pathsep=pathsep) - matchtally = 0 - for node in processor.get_nodes(yamlpath): - assert unwrap_node_coords(node) == value - matchtally += 1 - assert matchtally == tally - - def test_key_anchor_children(self, quiet_logger): - yamldata = """--- - anchorKeys: - &keyOne aliasOne: 1 1 Alpha 1 - &keyTwo aliasTwo: 2 2 Beta 2 - - hash: - *keyOne : - subval: 1.1 - *keyTwo : - subval: 2.2 - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - yamlpath = YAMLPath("hash[&keyTwo].subval") - newvalue = "Mute audibles" - processor.set_value(yamlpath, newvalue, mustexist=True) - matchtally = 0 - for node in processor.get_nodes(yamlpath): - assert unwrap_node_coords(node) == newvalue - matchtally += 1 - assert matchtally == 1 - - def test_cannot_add_novel_alias_keys(self, quiet_logger): - yamldata = """--- - anchorKeys: - &keyOne aliasOne: 1 1 Alpha 1 - &keyTwo aliasTwo: 2 2 Beta 2 - - hash: - *keyOne : - subval: 1.1 - *keyTwo : - subval: 2.2 - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - - yamlpath = YAMLPath("hash[&keyThree].subval") - newvalue = "Abort" - with pytest.raises(YAMLPathException) as ex: - nodes = list(processor.get_nodes(yamlpath)) - assert -1 < str(ex.value).find("Cannot add") - - @pytest.mark.parametrize("yamlpath,value,verifications", [ - ("number", 5280, [ - ("aliases[&alias_number]", 1), - ("number", 5280), - ("alias_number", 1), - ("hash.number", 1), - ("hash.alias_number", 1), - ("complex.hash.number", 1), - ("complex.hash.alias_number", 1), - ]), - ("aliases[&alias_number]", 5280, [ - ("aliases[&alias_number]", 5280), - ("number", 1), - ("alias_number", 5280), - ("hash.number", 1), - ("hash.alias_number", 5280), - ("complex.hash.number", 1), - ("complex.hash.alias_number", 5280), - ]), - ("bool", False, [ - ("aliases[&alias_bool]", True), - ("bool", False), - ("alias_bool", True), - ("hash.bool", True), - ("hash.alias_bool", True), - ("complex.hash.bool", True), - ("complex.hash.alias_bool", True), - ]), - ("aliases[&alias_bool]", False, [ - ("aliases[&alias_bool]", False), - ("bool", True), - ("alias_bool", False), - ("hash.bool", True), - ("hash.alias_bool", False), - ("complex.hash.bool", True), - ("complex.hash.alias_bool", False), - ]), - ]) - def test_set_nonunique_values(self, quiet_logger, yamlpath, value, verifications): - yamldata = """--- - aliases: - - &alias_number 1 - - &alias_bool true - number: 1 - bool: true - alias_number: *alias_number - alias_bool: *alias_bool - hash: - number: 1 - bool: true - alias_number: *alias_number - alias_bool: *alias_bool - complex: - hash: - number: 1 - bool: true - alias_number: *alias_number - alias_bool: *alias_bool - """ - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - processor.set_value(yamlpath, value) - for verification in verifications: - for verify_node_coord in processor.get_nodes(verification[0]): - assert unwrap_node_coords(verify_node_coord) == verification[1] - - @pytest.mark.parametrize("yamlpath,results", [ - ("(temps[. >= 100]) - (temps[. > 110])", [[110, 100]]), - ("(temps[. < 32]) - (temps[. >= 114])", [[0]]), - ("(temps[. < 32]) + (temps[. > 110])", [[0, 114]]), - ("(temps[. <= 32]) + (temps[. > 110])", [[32, 0, 114]]), - ("(temps[. < 32]) + (temps[. >= 110])", [[0, 110, 114]]), - ("(temps[. <= 32]) + (temps[. >= 110])", [[32, 0, 110, 114]]), - ("(temps[. < 0]) + (temps[. >= 114])", [[114]]), - ]) - def test_get_singular_collectors(self, quiet_logger, yamlpath, results): - yamldata = """--- - temps: - - 32 - - 0 - - 110 - - 100 - - 72 - - 68 - - 114 - - 34 - - 36 - """ - yaml = YAML() - processor = Processor(quiet_logger, yaml.load(yamldata)) - matchidx = 0 - # Note that Collectors deal with virtual DOMs, so mustexist must always - # be set True. Otherwise, ephemeral virtual nodes would be created and - # discarded. Is this desirable? Maybe, but not today. For now, using - # Collectors without setting mustexist=True will be undefined behavior. - for node in processor.get_nodes(yamlpath, mustexist=True): - assert unwrap_node_coords(node) == results[matchidx] - matchidx += 1 - assert len(results) == matchidx - - @pytest.mark.parametrize("yamlpath,results", [ - ("(/list1) + (/list2)", [[1, 2, 3, 4, 5, 6]]), - ("(/list1) - (/exclude)", [[1, 2]]), - ("(/list2) - (/exclude)", [[5, 6]]), - ("(/list1) + (/list2) - (/exclude)", [[1, 2, 5, 6]]), - ("((/list1) + (/list2)) - (/exclude)", [[1, 2, 5, 6]]), - ("(/list1) + ((/list2) - (/exclude))", [[1, 2, 3, 5, 6]]), - ("((/list1) - (/exclude)) + ((/list2) - (/exclude))", [[1, 2, 5, 6]]), - ("((/list1) - (/exclude)) + ((/list2) - (/exclude))*", [1, 2, 5, 6]), - ("(((/list1) - (/exclude)) + ((/list2) - (/exclude)))[2]", [5]), - ]) - def test_scalar_collectors(self, quiet_logger, yamlpath, results): - yamldata = """--- - list1: - - 1 - - 2 - - 3 - list2: - - 4 - - 5 - - 6 - exclude: - - 3 - - 4 - """ - yaml = YAML() - processor = Processor(quiet_logger, yaml.load(yamldata)) - matchidx = 0 - # Note that Collectors deal with virtual DOMs, so mustexist must always - # be set True. Otherwise, ephemeral virtual nodes would be created and - # discarded. Is this desirable? Maybe, but not today. For now, using - # Collectors without setting mustexist=True will be undefined behavior. - for node in processor.get_nodes(yamlpath, mustexist=True): - assert unwrap_node_coords(node) == results[matchidx] - matchidx += 1 - assert len(results) == matchidx - - def test_get_every_data_type(self, quiet_logger): - # Contributed by https://github.com/AndydeCleyre - yamldata = """--- -intthing: 6 -floatthing: 6.8 -yesthing: yes -nothing: no -truething: true -falsething: false -nullthing: null -nothingthing: -emptystring: "" -nullstring: "null" - """ - - # Note that Python/pytest is translating nothingthing into a string, "null". - # This is NOT yamlpath doing this. In fact, the yaml-get command-line tool - # actually translates true nulls into "\x00" (hexadecimal NULL control-characters). - results = [6, 6.8, "yes", "no", True, False, "", "null", "", "null"] - - yaml = YAML() - data = yaml.load(yamldata) - processor = Processor(quiet_logger, data) - yamlpath = YAMLPath("*") - - match_index = 0 - for node in processor.get_nodes(yamlpath): - assert unwrap_node_coords(node) == results[match_index] - match_index += 1 +import pytest +from datetime import date +from types import SimpleNamespace + +from ruamel.yaml import YAML + +from yamlpath.func import unwrap_node_coords +from yamlpath.exceptions import YAMLPathException +from yamlpath.enums import ( + PathSeperators, + PathSegmentTypes, + PathSearchMethods, + YAMLValueFormats, +) +from yamlpath.path import SearchTerms +from yamlpath.wrappers import ConsolePrinter +from yamlpath import YAMLPath, Processor +from tests.conftest import quiet_logger + + +class Test_Processor(): + """Tests for the Processor class.""" + + def test_get_none_data_nodes(self, quiet_logger): + processor = Processor(quiet_logger, None) + yamlpath = YAMLPath("abc") + optional_matches = 0 + must_exist_matches = 0 + req_node_matches = 0 + traversal_matches = 0 + + for node in processor.get_nodes(yamlpath, mustexist=False): + optional_matches += 1 + for node in processor.get_nodes(yamlpath, mustexist=True): + must_exist_matches += 1 + for node in processor._get_required_nodes(None, yamlpath): + req_node_matches += 1 + for node in processor._get_nodes_by_traversal(None, yamlpath, 0): + traversal_matches += 1 + + assert optional_matches == 0 + assert must_exist_matches == 0 + assert req_node_matches == 0 + assert traversal_matches == 1 # A None node traverses into null + + @pytest.mark.parametrize("yamlpath,results,mustexist,default", [ + ("aliases[&aliasAnchorOne]", ["Anchored Scalar Value"], True, None), + ("aliases[&newAlias]", ["Not in the original data"], False, "Not in the original data"), + ("aliases[0]", ["Anchored Scalar Value"], True, None), + ("aliases.0", ["Anchored Scalar Value"], True, None), + ("(array_of_hashes.name)+(rollback_hashes.on_condition.failure.name)", [["one", "two", "three", "four"]], True, None), + ("/array_of_hashes/name", ["one", "two"], True, None), + ("aliases[1:2]", [["Hey, Number Two!"]], True, None), + ("aliases[1:1]", [["Hey, Number Two!"]], True, None), + ("squads[bravo:charlie]", [2.2, 3.3], True, None), + ("/&arrayOfHashes/1/step", [2], True, None), + ("&arrayOfHashes[step=1].name", ["one"], True, None), + ("squads[.!=""][.=1.1]", [1.1], True, None), + ("squads[.!=""][.>1.1][.<3.3]", [2.2], True, None), + ("aliases[.^Hey]", ["Hey, Number Two!"], True, None), + ("aliases[.$Value]", ["Anchored Scalar Value"], True, None), + ("aliases[.%Value]", ["Anchored Scalar Value"], True, None), + ("&arrayOfHashes[step>1].name", ["two"], True, None), + ("&arrayOfHashes[step<2].name", ["one"], True, None), + ("squads[.>charlie]", [4.4], True, None), + ("squads[.>=charlie]", [3.3, 4.4], True, None), + ("squads[.4F]", True), + ("/ints/[.<4F]", True), + ("/ints/[.>=4F]", True), + ("/ints/[.<=4F]", True), + ("/floats/[.=4.F]", True), + ("/floats/[.>4.F]", True), + ("/floats/[.<4.F]", True), + ("/floats/[.>=4.F]", True), + ("/floats/[.<=4.F]", True), + ("abc.**", True), + ]) + def test_get_impossible_nodes_error(self, quiet_logger, yamlpath, mustexist): + yamldata = """--- + ints: + - 1 + - 2 + - 3 + - 4 + - 5 + floats: + - 1.1 + - 2.2 + - 3.3 + """ + yaml = YAML() + processor = Processor(quiet_logger, yaml.load(yamldata)) + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes(yamlpath, mustexist=mustexist)) + assert -1 < str(ex.value).find("does not match any nodes") + + def test_illegal_traversal_recursion(self, quiet_logger): + yamldata = """--- + any: data + """ + yaml = YAML() + processor = Processor(quiet_logger, yaml.load(yamldata)) + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes("**.**")) + assert -1 < str(ex.value).find("Repeating traversals are not allowed") + + def test_set_value_in_empty_data(self, capsys, quiet_logger): + import sys + yamldata = "" + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + processor.set_value("abc", "void") + yaml.dump(data, sys.stdout) + assert -1 == capsys.readouterr().out.find("abc") + + def test_set_value_in_none_data(self, capsys, quiet_logger): + import sys + yaml = YAML() + data = None + processor = Processor(quiet_logger, data) + processor._update_node(None, None, None, YAMLValueFormats.DEFAULT) + yaml.dump(data, sys.stdout) + assert -1 == capsys.readouterr().out.find("abc") + + @pytest.mark.parametrize("yamlpath,value,tally,mustexist,vformat,pathsep", [ + ("aliases[&testAnchor]", "Updated Value", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), + (YAMLPath("top_scalar"), "New top-level value", 1, False, YAMLValueFormats.DEFAULT, PathSeperators.DOT), + ("/top_array/2", 42, 1, False, YAMLValueFormats.INT, PathSeperators.FSLASH), + ("/top_hash/positive_float", 0.009, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), + ("/top_hash/negative_float", -0.009, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), + ("/top_hash/positive_float", -2.71828, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), + ("/top_hash/negative_float", 5283.4, 1, True, YAMLValueFormats.FLOAT, PathSeperators.FSLASH), + ("/null_value", "No longer null", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.FSLASH), + ]) + def test_set_value(self, quiet_logger, yamlpath, value, tally, mustexist, vformat, pathsep): + yamldata = """--- +aliases: + - &testAnchor Initial Value +top_array: + # Comment 1 + - 1 + # Comment 2 + - 2 +# Comment N +top_scalar: Top-level plain scalar string +top_hash: + positive_float: 3.14159265358 + negative_float: -11.034 +null_value: + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + processor.set_value(yamlpath, value, mustexist=mustexist, value_format=vformat, pathsep=pathsep) + matchtally = 0 + for node in processor.get_nodes(yamlpath, mustexist=mustexist): + assert unwrap_node_coords(node) == value + matchtally += 1 + assert matchtally == tally + + def test_cannot_set_nonexistent_required_node_error(self, quiet_logger): + yamldata = """--- + key: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + processor.set_value("abc", "void", mustexist=True) + assert -1 < str(ex.value).find("No nodes matched") + + def test_none_data_to_get_nodes_by_path_segment(self, capsys, quiet_logger): + import sys + yamldata = "" + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + nodes = list(processor._get_nodes_by_path_segment(data, YAMLPath("abc"), 0)) + yaml.dump(data, sys.stdout) + assert -1 == capsys.readouterr().out.find("abc") + + def test_bad_segment_index_for_get_nodes_by_path_segment(self, capsys, quiet_logger): + import sys + yamldata = """--- + key: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + nodes = list(processor._get_nodes_by_path_segment(data, YAMLPath("abc"), 10)) + yaml.dump(data, sys.stdout) + assert -1 == capsys.readouterr().out.find("abc") + + def test_get_nodes_by_unknown_path_segment_error(self, quiet_logger): + from collections import deque + from enum import Enum + from yamlpath.enums import PathSegmentTypes + names = [m.name for m in PathSegmentTypes] + ['DNF'] + PathSegmentTypes = Enum('PathSegmentTypes', names) + + yamldata = """--- + key: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + path = YAMLPath("abc") + stringified = str(path) # Force Path to parse + path._escaped = deque([ + (PathSegmentTypes.DNF, "abc"), + ]) + + with pytest.raises(NotImplementedError): + nodes = list(processor._get_nodes_by_path_segment(data, path, 0)) + + def test_non_int_slice_error(self, quiet_logger): + yamldata = """--- + - step: 1 + - step: 2 + - step: 3 + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + processor.set_value("[1:4F]", "") + assert -1 < str(ex.value).find("is not an integer array slice") + + def test_non_int_array_index_error(self, quiet_logger): + from collections import deque + yamldata = """--- + - 1 + """ + yaml = YAML() + data = yaml.load(yamldata) + path = YAMLPath("[0]") + processor = Processor(quiet_logger, data) + strp = str(path) + + path._escaped = deque([ + (PathSegmentTypes.INDEX, "0F"), + ]) + path._unescaped = deque([ + (PathSegmentTypes.INDEX, "0F"), + ]) + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor._get_nodes_by_index(data, path, 0)) + assert -1 < str(ex.value).find("is not an integer array index") + + def test_nonexistant_path_search_method_error(self, quiet_logger): + from enum import Enum + from yamlpath.enums import PathSearchMethods + names = [m.name for m in PathSearchMethods] + ['DNF'] + PathSearchMethods = Enum('PathSearchMethods', names) + + yamldata = """--- + top_scalar: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(NotImplementedError): + nodes = list(processor._get_nodes_by_search( + data, + SearchTerms(True, PathSearchMethods.DNF, ".", "top_scalar") + )) + + def test_adjoined_collectors_error(self, quiet_logger): + yamldata = """--- + key: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes("(&arrayOfHashes.step)(disabled_steps)")) + assert -1 < str(ex.value).find("has no meaning") + + def test_no_attrs_to_arrays_error(self, quiet_logger): + yamldata = """--- + array: + - one + - two + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes("array.attr")) + assert -1 < str(ex.value).find("Cannot add") + + def test_no_index_to_hashes_error(self, quiet_logger): + # Using [#] syntax is a disambiguated INDEX ELEMENT NUMBER. In + # DICTIONARY context, this would create an ambiguous request to access + # either the #th value or a value whose key is the literal #. As such, + # an error is deliberately generated when [#] syntax is used against + # dictionaries. When you actually want a DICTIONARY KEY that happens + # to be an integer, omit the square braces, []. + yamldata = """--- + hash: + key: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes("hash[6]")) + assert -1 < str(ex.value).find("Cannot add") + + def test_get_nodes_array_impossible_type_error(self, quiet_logger): + yamldata = """--- + array: + - 1 + - 2 + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes(r"/array/(.=~/^.{3,4}$/)", default_value="New value")) + assert -1 < str(ex.value).find("Cannot add") + + def test_no_attrs_to_scalars_errors(self, quiet_logger): + yamldata = """--- + scalar: value + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes("scalar[6]")) + assert -1 < str(ex.value).find("Cannot add") + + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes("scalar.key")) + assert -1 < str(ex.value).find("Cannot add") + + @pytest.mark.parametrize("yamlpath,value,tally,mustexist,vformat,pathsep", [ + ("/anchorKeys[&keyOne]", "Set self-destruct", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), + ("/hash[&keyTwo]", "Confirm", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), + ("/anchorKeys[&recursiveAnchorKey]", "Recurse more", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), + ("/hash[&recursiveAnchorKey]", "Recurse even more", 1, True, YAMLValueFormats.DEFAULT, PathSeperators.AUTO), + ]) + def test_key_anchor_changes(self, quiet_logger, yamlpath, value, tally, mustexist, vformat, pathsep): + yamldata = """--- + anchorKeys: + &keyOne aliasOne: 11A1 + &keyTwo aliasTwo: 22B2 + &recursiveAnchorKey subjectKey: *recursiveAnchorKey + + hash: + *keyOne : + subval: 1.1 + *keyTwo : + subval: 2.2 + *recursiveAnchorKey : + subval: 3.3 + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + yamlpath = YAMLPath(yamlpath) + processor.set_value(yamlpath, value, mustexist=mustexist, value_format=vformat, pathsep=pathsep) + matchtally = 0 + for node in processor.get_nodes(yamlpath): + assert unwrap_node_coords(node) == value + matchtally += 1 + assert matchtally == tally + + def test_key_anchor_children(self, quiet_logger): + yamldata = """--- + anchorKeys: + &keyOne aliasOne: 1 1 Alpha 1 + &keyTwo aliasTwo: 2 2 Beta 2 + + hash: + *keyOne : + subval: 1.1 + *keyTwo : + subval: 2.2 + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + yamlpath = YAMLPath("hash[&keyTwo].subval") + newvalue = "Mute audibles" + processor.set_value(yamlpath, newvalue, mustexist=True) + matchtally = 0 + for node in processor.get_nodes(yamlpath): + assert unwrap_node_coords(node) == newvalue + matchtally += 1 + assert matchtally == 1 + + def test_cannot_add_novel_alias_keys(self, quiet_logger): + yamldata = """--- + anchorKeys: + &keyOne aliasOne: 1 1 Alpha 1 + &keyTwo aliasTwo: 2 2 Beta 2 + + hash: + *keyOne : + subval: 1.1 + *keyTwo : + subval: 2.2 + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + yamlpath = YAMLPath("hash[&keyThree].subval") + newvalue = "Abort" + with pytest.raises(YAMLPathException) as ex: + nodes = list(processor.get_nodes(yamlpath)) + assert -1 < str(ex.value).find("Cannot add") + + @pytest.mark.parametrize("yamlpath,value,verifications", [ + ("number", 5280, [ + ("aliases[&alias_number]", 1), + ("number", 5280), + ("alias_number", 1), + ("hash.number", 1), + ("hash.alias_number", 1), + ("complex.hash.number", 1), + ("complex.hash.alias_number", 1), + ]), + ("aliases[&alias_number]", 5280, [ + ("aliases[&alias_number]", 5280), + ("number", 1), + ("alias_number", 5280), + ("hash.number", 1), + ("hash.alias_number", 5280), + ("complex.hash.number", 1), + ("complex.hash.alias_number", 5280), + ]), + ("bool", False, [ + ("aliases[&alias_bool]", True), + ("bool", False), + ("alias_bool", True), + ("hash.bool", True), + ("hash.alias_bool", True), + ("complex.hash.bool", True), + ("complex.hash.alias_bool", True), + ]), + ("aliases[&alias_bool]", False, [ + ("aliases[&alias_bool]", False), + ("bool", True), + ("alias_bool", False), + ("hash.bool", True), + ("hash.alias_bool", False), + ("complex.hash.bool", True), + ("complex.hash.alias_bool", False), + ]), + ]) + def test_set_nonunique_values(self, quiet_logger, yamlpath, value, verifications): + yamldata = """--- + aliases: + - &alias_number 1 + - &alias_bool true + number: 1 + bool: true + alias_number: *alias_number + alias_bool: *alias_bool + hash: + number: 1 + bool: true + alias_number: *alias_number + alias_bool: *alias_bool + complex: + hash: + number: 1 + bool: true + alias_number: *alias_number + alias_bool: *alias_bool + """ + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + processor.set_value(yamlpath, value) + for verification in verifications: + for verify_node_coord in processor.get_nodes(verification[0]): + assert unwrap_node_coords(verify_node_coord) == verification[1] + + @pytest.mark.parametrize("yamlpath,results", [ + ("(temps[. >= 100]) - (temps[. > 110])", [[110, 100]]), + ("(temps[. < 32]) - (temps[. >= 114])", [[0]]), + ("(temps[. < 32]) + (temps[. > 110])", [[0, 114]]), + ("(temps[. <= 32]) + (temps[. > 110])", [[32, 0, 114]]), + ("(temps[. < 32]) + (temps[. >= 110])", [[0, 110, 114]]), + ("(temps[. <= 32]) + (temps[. >= 110])", [[32, 0, 110, 114]]), + ("(temps[. < 0]) + (temps[. >= 114])", [[114]]), + ]) + def test_get_singular_collectors(self, quiet_logger, yamlpath, results): + yamldata = """--- + temps: + - 32 + - 0 + - 110 + - 100 + - 72 + - 68 + - 114 + - 34 + - 36 + """ + yaml = YAML() + processor = Processor(quiet_logger, yaml.load(yamldata)) + matchidx = 0 + # Note that Collectors deal with virtual DOMs, so mustexist must always + # be set True. Otherwise, ephemeral virtual nodes would be created and + # discarded. Is this desirable? Maybe, but not today. For now, using + # Collectors without setting mustexist=True will be undefined behavior. + for node in processor.get_nodes(yamlpath, mustexist=True): + assert unwrap_node_coords(node) == results[matchidx] + matchidx += 1 + assert len(results) == matchidx + + @pytest.mark.parametrize("yamlpath,results", [ + ("(/list1) + (/list2)", [[1, 2, 3, 4, 5, 6]]), + ("(/list1) - (/exclude)", [[1, 2]]), + ("(/list2) - (/exclude)", [[5, 6]]), + ("(/list1) + (/list2) - (/exclude)", [[1, 2, 5, 6]]), + ("((/list1) + (/list2)) - (/exclude)", [[1, 2, 5, 6]]), + ("(/list1) + ((/list2) - (/exclude))", [[1, 2, 3, 5, 6]]), + ("((/list1) - (/exclude)) + ((/list2) - (/exclude))", [[1, 2, 5, 6]]), + ("((/list1) - (/exclude)) + ((/list2) - (/exclude))*", [1, 2, 5, 6]), + ("(((/list1) - (/exclude)) + ((/list2) - (/exclude)))[2]", [5]), + ]) + def test_scalar_collectors(self, quiet_logger, yamlpath, results): + yamldata = """--- + list1: + - 1 + - 2 + - 3 + list2: + - 4 + - 5 + - 6 + exclude: + - 3 + - 4 + """ + yaml = YAML() + processor = Processor(quiet_logger, yaml.load(yamldata)) + matchidx = 0 + # Note that Collectors deal with virtual DOMs, so mustexist must always + # be set True. Otherwise, ephemeral virtual nodes would be created and + # discarded. Is this desirable? Maybe, but not today. For now, using + # Collectors without setting mustexist=True will be undefined behavior. + for node in processor.get_nodes(yamlpath, mustexist=True): + assert unwrap_node_coords(node) == results[matchidx] + matchidx += 1 + assert len(results) == matchidx + + def test_get_every_data_type(self, quiet_logger): + # Contributed by https://github.com/AndydeCleyre + yamldata = """--- +intthing: 6 +floatthing: 6.8 +yesthing: yes +nothing: no +truething: true +falsething: false +nullthing: null +nothingthing: +emptystring: "" +nullstring: "null" + """ + + results = [6, 6.8, "yes", "no", True, False, None, None, "", "null"] + + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + yamlpath = YAMLPath("*") + + match_index = 0 + for node in processor.get_nodes(yamlpath): + assert unwrap_node_coords(node) == results[match_index] + match_index += 1 + + @pytest.mark.parametrize("delete_yamlpath,pathseperator,old_deleted_nodes,new_flat_data", [ + (YAMLPath("/**[&alias_number]"), PathSeperators.FSLASH, [1, 1, 1], [1,1,True,1,1,True,1,1,True,1,"ABC",123,"BCD",987,"CDE","8B8"]), + ("records[1]", PathSeperators.AUTO, ["ABC",123,"BCD",987], [1,1,1,True,1,1,1,True,1,1,1,True,1,1,"CDE","8B8"]), + ]) + def test_delete_nodes(self, quiet_logger, delete_yamlpath, pathseperator, old_deleted_nodes, new_flat_data): + yamldata = """--- +aliases: + - &alias_number 1 + - &alias_bool true +number: 1 +bool: true +alias_number: *alias_number +alias_bool: *alias_bool +hash: + number: 1 + bool: true + alias_number: *alias_number + alias_bool: *alias_bool +complex: + hash: + number: 1 + bool: true + alias_number: *alias_number + alias_bool: *alias_bool +records: + - id: ABC + data: 123 + - id: BCD + data: 987 + - id: CDE + data: 8B8 +""" + yaml = YAML() + data = yaml.load(yamldata) + processor = Processor(quiet_logger, data) + + # The return set must be received lest no nodes will be deleted + deleted_nodes = [] + for nc in processor.delete_nodes(delete_yamlpath, pathsep=pathseperator): + deleted_nodes.append(nc) + + for (test_value, verify_node_coord) in zip(old_deleted_nodes, deleted_nodes): + assert test_value, unwrap_node_coords(verify_node_coord) + + for (test_value, verify_node_coord) in zip(new_flat_data, processor.get_nodes("**")): + assert test_value, unwrap_node_coords(verify_node_coord) + + def test_null_docs_have_nothing_to_delete(self, capsys): + args = SimpleNamespace(verbose=False, quiet=False, debug=True) + logger = ConsolePrinter(args) + processor = Processor(logger, None) + + deleted_nodes = [] + for nc in processor.delete_nodes("**"): + deleted_nodes.append(nc) + + console = capsys.readouterr() + assert "Refusing to delete nodes from a null document" in console.out diff --git a/yamlpath/__init__.py b/yamlpath/__init__.py index 5eb8baec..5962188e 100644 --- a/yamlpath/__init__.py +++ b/yamlpath/__init__.py @@ -1,6 +1,6 @@ """Core YAML Path classes.""" # Establish the version number common to all components -__version__ = "3.4.0" +__version__ = "3.4.1" from yamlpath.yamlpath import YAMLPath from yamlpath.processor import Processor diff --git a/yamlpath/commands/yaml_set.py b/yamlpath/commands/yaml_set.py index 2b577f95..c13ac71a 100644 --- a/yamlpath/commands/yaml_set.py +++ b/yamlpath/commands/yaml_set.py @@ -376,34 +376,12 @@ def _try_load_input_file(args, log, yaml, change_path, new_value): yaml_data = Nodes.build_next_node(change_path, 0, new_value) return yaml_data -def _delete_nodes(log, delete_nodes) -> None: +def _delete_nodes(log, processor, delete_nodes) -> None: """Recursively delete specified nodes.""" - for delete_nc in reversed(delete_nodes): - node = delete_nc.node - parent = delete_nc.parent - parentref = delete_nc.parentref - log.debug( - "Deleting node:", - prefix="yaml_set::delete_nodes: ", - data_header="!" * 80, - footer="!" * 80, - data=delete_nc) - - # Ensure the reference exists before attempting to delete it - if isinstance(node, list) and isinstance(node[0], NodeCoords): - _delete_nodes(log, node) - elif isinstance(node, NodeCoords): - _delete_nodes(log, [node]) - elif isinstance(parent, dict): - if parentref in parent: - del parent[parentref] - elif isinstance(parent, list): - if len(parent) > parentref: - del parent[parentref] - else: - # Edge-case: Attempt to delete from a document which is - # entirely one Scalar value OR user is deleting the entire - # document. + try: + processor.delete_gathered_nodes(delete_nodes) + except YAMLPathException as ex: + if "delete the entire document" in ex.user_message: log.critical( "Refusing to delete the entire document! Ensure the source" " document is YAML, JSON, or compatible and --change|-g is" @@ -634,7 +612,7 @@ def main(): # Destroy the collected nodes (from their parents) in the reverse order # they were discovered. This is necessary lest Array elements be # improperly handled, leading to unwanted data loss. - _delete_nodes(log, change_node_coordinates) + _delete_nodes(log, processor, change_node_coordinates) elif args.aliasof: # Assign the change nodes as Aliases of whatever --aliasof points to _alias_nodes( diff --git a/yamlpath/common/parsers.py b/yamlpath/common/parsers.py index b1bc0e13..e45b8fdc 100644 --- a/yamlpath/common/parsers.py +++ b/yamlpath/common/parsers.py @@ -64,9 +64,9 @@ def get_yaml_editor(**kwargs: Any) -> Any: return yaml @staticmethod - # pylint: disable=too-many-branches,too-many-statements + # pylint: disable=too-many-branches,too-many-statements,too-many-locals def get_yaml_data( - parser: Any, logger: ConsolePrinter, source: str + parser: Any, logger: ConsolePrinter, source: str, **kwargs ) -> Tuple[Any, bool]: """ Parse YAML/Compatible data and return the ruamel.yaml object result. @@ -76,8 +76,12 @@ def get_yaml_data( Parameters: 1. parser (ruamel.yaml.YAML) The YAML data parser 2. logger (ConsolePrinter) The logging facility - 3. source (str) The source file to load; can be - for reading from - STDIN + 3. source (str) The source file or serialized literal to load; can be - + for reading from STDIN (implies literal=True) + + Keyword Parameters: + * literal (bool) `source` is literal serialized YAML data rather than a + file-spec, so load it directly Returns: Tuple[Any, bool] A tuple containing the document and its success/fail state. The first field is the parsed document; will be @@ -85,6 +89,7 @@ def get_yaml_data( The second field will be True when there were no errors during parsing and False, otherwise. """ + literal = kwargs.pop("literal", False) yaml_data = None data_available = True @@ -98,8 +103,11 @@ def get_yaml_data( if source == "-": yaml_data = parser.load(stdin.read()) else: - with open(source, 'r') as fhnd: - yaml_data = parser.load(fhnd) + if literal: + yaml_data = parser.load(source) + else: + with open(source, 'r') as fhnd: + yaml_data = parser.load(fhnd) except KeyboardInterrupt: logger.error("Aborting data load due to keyboard interrupt!") data_available = False @@ -156,7 +164,7 @@ def get_yaml_data( @staticmethod # pylint: disable=too-many-branches,too-many-statements,too-many-locals def get_yaml_multidoc_data( - parser: Any, logger: ConsolePrinter, source: str + parser: Any, logger: ConsolePrinter, source: str, **kwargs ) -> Generator[Tuple[Any, bool], None, None]: """ Parse YAML/Compatible multi-docs and yield each ruamel.yaml object. @@ -169,12 +177,18 @@ def get_yaml_multidoc_data( 3. source (str) The source file to load; can be - for reading from STDIN + Keyword Parameters: + * literal (bool) `source` is literal serialized YAML data rather than a + file-spec, so load it directly + Returns: Generator[Tuple[Any, bool], None, None] A tuple for each document as it is parsed. The first field is the parsed document; will be None for empty documents and for documents which could not be read. The second field will be True when there were no errors during parsing and False, otherwise. """ + literal = kwargs.pop("literal", False) + # This code traps errors and warnings from ruamel.yaml, substituting # lengthy stack-dumps with specific, meaningful feedback. Further, # some warnings are treated as errors by ruamel.yaml, so these are also @@ -196,13 +210,18 @@ def get_yaml_multidoc_data( if not doc_yielded: yield ("", True) else: - with open(source, 'r') as fhnd: - for document in parser.load_all(fhnd): - logger.debug( - "Yielding document from {}:".format(source), - prefix="get_yaml_multidoc_data: ", - data=document) + if literal: + for document in parser.load_all(source): yield (document, True) + else: + with open(source, 'r') as fhnd: + for document in parser.load_all(fhnd): + logger.debug( + "Yielding document from {}:" + .format(source), + prefix="get_yaml_multidoc_data: ", + data=document) + yield (document, True) except KeyboardInterrupt: has_error = True logger.error("Aborting data load due to keyboard interrupt!") diff --git a/yamlpath/processor.py b/yamlpath/processor.py index 42d41edb..c0f3db06 100644 --- a/yamlpath/processor.py +++ b/yamlpath/processor.py @@ -191,6 +191,104 @@ def set_value(self, yaml_path: Union[YAMLPath, str], .format(value, value_format, str(vex)) , str(yaml_path)) from vex + def delete_nodes(self, yaml_path: Union[YAMLPath, str], + **kwargs: Any) -> Generator[NodeCoords, None, None]: + """ + Delete nodes at YAML Path in data. + + Parameters: + 1. yaml_path (Union[YAMLPath, str]) The YAML Path to evaluate + + Keyword Parameters: + * pathsep (PathSeperators) Forced YAML Path segment seperator; set + only when automatic inference fails; + default = PathSeperators.AUTO + + Returns: (Generator) Affected NodeCoords before they are deleted + + Raises: + - `YAMLPathException` when YAML Path is invalid + """ + pathsep: PathSeperators = kwargs.pop("pathsep", PathSeperators.AUTO) + + if self.data is None: + self.logger.debug( + "Refusing to delete nodes from a null document!", + prefix="Processor::delete_nodes: ", data=self.data) + return + + if isinstance(yaml_path, str): + yaml_path = YAMLPath(yaml_path, pathsep) + elif pathsep is not PathSeperators.AUTO: + yaml_path.seperator = pathsep + + # Nodes must be processed in reverse order while deleting them to avoid + # corrupting list element indecies, thereby deleting the wrong nodes. + # As such, the intended nodes must be first gathered into a list. + gathered_nodes: List[NodeCoords] = [] + for node_coords in self._get_required_nodes(self.data, yaml_path): + self.logger.debug( + "Gathered node for deletion:", + prefix="Processor::delete_nodes: ", data=node_coords) + gathered_nodes.append(node_coords) + yield node_coords + + if len(gathered_nodes) > 0: + self._delete_nodes(gathered_nodes) + + def delete_gathered_nodes(self, gathered_nodes: List[NodeCoords]) -> None: + """ + Recursively delete pre-gathered nodes. + + Parameters: + 1. gathered_nodes (List[NodeCoords]) The pre-gathered nodes to delete. + """ + self._delete_nodes(gathered_nodes) + + def _delete_nodes(self, delete_nodes: List[NodeCoords]) -> None: + """ + Recursively delete specified nodes. + + Parameters: + 1. delete_nodes (List[NodeCoords]) The nodes to delete. + + Raises: + - `YAMLPathException` when the operation would destroy the entire + document + """ + for delete_nc in reversed(delete_nodes): + node = delete_nc.node + parent = delete_nc.parent + parentref = delete_nc.parentref + self.logger.debug( + "Deleting node:", + prefix="yaml_set::delete_nodes: ", + data_header="!" * 80, + footer="!" * 80, + data=delete_nc) + + # Ensure the reference exists before attempting to delete it + if isinstance(node, list) and isinstance(node[0], NodeCoords): + self._delete_nodes(node) + elif isinstance(node, NodeCoords): + self._delete_nodes([node]) + elif isinstance(parent, dict): + if parentref in parent: + del parent[parentref] + elif isinstance(parent, list): + if len(parent) > parentref: + del parent[parentref] + else: + # Edge-case: Attempt to delete from a document which is + # entirely one Scalar value OR user is deleting the entire + # document. + raise YAMLPathException( + "Refusing to delete the entire document! Ensure the" + " source document is YAML, JSON, or compatible and the" + " target nodes do not include the document root.", + str(delete_nc.path) + ) + # pylint: disable=locally-disabled,too-many-branches,too-many-locals def _get_nodes_by_path_segment(self, data: Any, yaml_path: YAMLPath, segment_index: int, @@ -333,6 +431,10 @@ def _get_nodes_by_key( YAMLPath.escape_path_section( str_stripped, translated_path.seperator)) if stripped_attrs in data: + self.logger.debug( + "Processor::_get_nodes_by_key: FOUND key node by name at" + " {}." + .format(str_stripped)) yield NodeCoords( data[stripped_attrs], data, stripped_attrs, next_translated_path) @@ -350,6 +452,10 @@ def _get_nodes_by_key( # Try using the ref as a bare Array index idx = int(str_stripped) if len(data) > idx: + self.logger.debug( + "Processor::_get_nodes_by_key: FOUND key node as a" + " bare Array index at [{}]." + .format(str_stripped)) yield NodeCoords( data[idx], data, idx, translated_path + "[{}]".format(idx)) @@ -369,6 +475,10 @@ def _get_nodes_by_key( element, yaml_path, segment_index, parent=data, parentref=eleidx, traverse_lists=traverse_lists, translated_path=next_translated_path): + self.logger.debug( + "Processor::_get_nodes_by_key: FOUND key node " + " via pass-through Array-of-Hashes search at {}." + .format(next_translated_path)) yield node_coord # pylint: disable=locally-disabled,too-many-locals @@ -1017,14 +1127,8 @@ def _get_optional_nodes( parent = kwargs.pop("parent", None) parentref = kwargs.pop("parentref", None) translated_path = kwargs.pop("translated_path", YAMLPath("")) - if data is None: - self.logger.debug( - "Bailing out on None data at parentref, {}, of parent:" - .format(parentref), - prefix="Processor::_get_optional_nodes: ", data=parent) - return - segments = yaml_path.escaped + # pylint: disable=locally-disabled,too-many-nested-blocks if segments and len(segments) > depth: (segment_type, unstripped_attrs) = yaml_path.unescaped[depth]