Skip to content

Commit

Permalink
Add size and flag parsing. (#14)
Browse files Browse the repository at this point in the history
* Add size (flag) parsing:
* `ParseByteSize`:             Parse a size (defaults to byte sizes).
* `ActionByteSize`:            Argument parsing `Action` for size values (default for bytes).

* The `ActioByteSize` does not allow an empty string. That should lead to `argument flag: value cannot be empty.` covered in test: 'Unit only is invalid.'.'

* Even if the unit is `0` it is not allowed.
  • Loading branch information
helly25 authored Sep 6, 2024
1 parent b920db9 commit eea62d4
Show file tree
Hide file tree
Showing 3 changed files with 457 additions and 10 deletions.
4 changes: 2 additions & 2 deletions mbo/app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def Main(self):

from argparse_formatter import ParagraphFormatter

from mbo.app.flags import EnumAction
from mbo.app.flags import ActionEnum

if TYPE_CHECKING:
from _typeshed import OpenTextMode
Expand Down Expand Up @@ -335,7 +335,7 @@ def Run(argv: list[str] = sys.argv):
"--help-output-mode",
dest="help_output_mode",
type=HelpOutputMode,
action=EnumAction,
action=ActionEnum,
help="Output mode for help.",
)
command.Prepare(argv)
Expand Down
162 changes: 156 additions & 6 deletions mbo/app/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A flag support library."""
"""A flag support library.
* `ActionEnum`: Argument parsing `Action` for enum values.
* `ParseEnumList`: Parses lists of enum values.
* `ActionEnumList`: Argument parsing `Action` for enum lists.
* `ParseDateTimeOrTimeDelta`: Parses `datetime.datetime` and time delta values.
* `ActionDateTimeOrTimeDelta`: Argument parsing `Action` for `datetime` and time deltas.
* `ParseByteSize`: Parse a size (defaults to byte sizes).
* `ActionByteSize`: Argument parsing `Action` for size values (default for bytes).
"""

import argparse
import collections
import re
from datetime import datetime, time, timedelta, timezone, tzinfo
from enum import Enum
from typing import Any, Callable, Iterable, Optional, cast

from pytimeparse.timeparse import timeparse


class EnumAction(argparse.Action):
class ActionEnum(argparse.Action):
"""Argparse action that handles single Enum values."""

def __init__(self, **kwargs):
Expand All @@ -33,15 +43,15 @@ def __init__(self, **kwargs):
raise ValueError(f"Type must be an Enum, provided type is '{enum_type}'.")

kwargs.setdefault("choices", tuple(e.value for e in enum_type))
super(EnumAction, self).__init__(**kwargs)
super(ActionEnum, self).__init__(**kwargs)
self._enum = enum_type

def __call__(self, parser, namespace, values, option_string=None):
value = self._enum(values)
setattr(namespace, self.dest, value)


def EnumListParser(enum_type: type[Enum]) -> Callable[[str], list[Enum]]:
def ParseEnumList(enum_type: type[Enum]) -> Callable[[str], list[Enum]]:
"""Implements flags comma separate lists of enum values.
In the argument definition default values can be specified as a list of the actual enum values.
Expand All @@ -54,7 +64,7 @@ def EnumListParser(enum_type: type[Enum]) -> Callable[[str], list[Enum]]:
parser.add_argument(
"--myenum",
default=[MyEnum.MY_DEFAULT],
type=EnumListParser(enum_type=MyEnum),
type=ParseEnumList(enum_type=MyEnum),
help="Comma separated list of MyEnum {}.".format(set(MyEnum.__members__.keys())),
)
args=parser.parse_args(["--nyenum", "my_default,my_other"])
Expand Down Expand Up @@ -394,8 +404,10 @@ def _parse(self, value: str) -> datetime:
raise argparse.ArgumentError(self, f"{error}")

def __call__(self, parser, namespace, values, option_string=None) -> None:
if values is None:
return setattr(namespace, self.dest, None)
result: Any
if values is list:
if isinstance(values, list):
result = [self._parse(v) for v in values]
elif isinstance(values, str):
result = self._parse(values)
Expand All @@ -407,3 +419,141 @@ def __call__(self, parser, namespace, values, option_string=None) -> None:
setattr(namespace, self.dest, str(values))
else:
setattr(namespace, self.dest, result)


def ParseByteSize(
value: str,
*,
suffix_case_sensitive: bool | None = None,
unit: str = "B",
unit_case_sensitive: bool = False,
unit_required: bool = False,
) -> int:
"""Parses size values (default bytes).
Args:
* value: The value string to parse.
* suffix_case_sensitive: Whether the suffix (e.g. `Ki`) is case sensitive (defaults to `unit_case_sensitive`).
* unit_case_sensitive: Whether the unit is case-sensitive (default is False).
* unit_required: Whether the unit is required, this defaults to `True` if `unit` is specified.
* unit: The expected unit (defaults to `B`).
"""
if not value:
raise ValueError("value must not be empty.")
original = value
value = str(value) # If not mypy.
if not unit_case_sensitive and value.lower().endswith(unit.lower()):
value = value[: -len(unit)]
elif value.endswith(unit):
value = value.removesuffix(unit)
elif unit_required:
if value.lower().endswith(unit.lower()):
lower_case = " (found via case insensitive search)"
else:
lower_case = ""
raise ValueError(f"value does not have required unit `{unit}`{lower_case}.")
_SUFFIXES: dict[str, int] = {
"": 1,
"K": 1000**1, # kilo
"M": 1000**2, # mega
"G": 1000**3, # giga
"T": 1000**4, # tera
"P": 1000**5, # peta
"E": 1000**6, # exa
"Z": 1000**7, # zetta
"Y": 1000**8, # yotta
"X": 1000**9, # xona"
"Ki": 1024**1,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
"Pi": 1024**5,
"Ei": 1024**6,
"Zi": 1024**7,
"Yi": 1024**8,
"Xi": 1024**9,
}
_REGEX = re.compile(f"([0-9]*[.]?[0-9]*)[ ]?({'|'.join(_SUFFIXES.keys())})?")
value_str = value
if value.endswith(("i", "I")):
value_nc_str = value_str[:-1].upper() + "i"
else:
value_nc_str = value_str.upper()
if value_str and not suffix_case_sensitive:
value_str = value_nc_str
match = _REGEX.fullmatch(value_str)
if not match:
if _REGEX.fullmatch(value_nc_str) or value_str.endswith(("i", "I")):
raise ValueError(f"value has bad suffix case, got '{original}'.")
raise ValueError(
f"value does not match pattern (not a valid byte size), got '{original}'."
)
number = match.group(1)
if number == "":
raise ValueError("value cannot be empty.")
if number == ".":
raise ValueError("value cannot be '.'.")
result = float(number) if number.find(".") > -1 else int(number)
if match.group(2):
factor = _SUFFIXES.get(match.group(2) or "", 0)
if not factor:
raise ValueError(
f"value has unsupported suffix ({match.group(2)}), got '{original}'."
)
else:
factor = 1
result *= factor
return int(result)


class ActionByteSize(argparse.Action):
"""Parses arguments as bytes, supporting KB, MB, GB, TB, PB, EB as well as KiB etc. suffixes.
This action has the additional config:
* default: Can be a number of type `int` or `float`.
* unit: The expected unit (defaults to `B`).
* unit_required: Whether the unit is required, this defaults to `True` if `unit` is specified.
* unit_case_sensitive: Whether the unit is case-sensitive (default is False).
* suffix_case_sensitive: Whether the suffix (e.g. `Ki`) is case sensitive (defaults to `unit_case_sensitive`).
"""

def __init__(self, **kwargs) -> None:
self.unit = kwargs.pop("unit", "B")
self.unit_required = bool(kwargs.pop("unit_required", self.unit))
self.unit_case_sensitive = bool(kwargs.pop("unit_case_sensitive", False))
self.suffix_case_sensitive = bool(
kwargs.pop("suffix_case_sensitive", self.unit_case_sensitive)
)
default: Any = kwargs.pop("default", None)
if default is None or default == "":
default = None
elif isinstance(default, int) or isinstance(default, float):
default = f"{default}{self.unit}"
super(ActionByteSize, self).__init__(default=default, **kwargs)
if default is None:
self.default = None

def _parse(self, value: Any) -> int | None:
if value is int:
return value
else:
try:
return ParseByteSize(
value=str(value),
suffix_case_sensitive=self.suffix_case_sensitive,
unit=self.unit,
unit_case_sensitive=self.unit_case_sensitive,
unit_required=self.unit_required,
)
except ValueError as error:
raise argparse.ArgumentError(self, str(error))
except Exception as error:
raise error

def __call__(self, parser, namespace, values, option_string=None) -> None:
if values is None:
setattr(namespace, self.dest, None)
elif isinstance(values, list):
setattr(namespace, self.dest, [self._parse(str(v)) for v in values])
else:
setattr(namespace, self.dest, self._parse(str(values)))
Loading

0 comments on commit eea62d4

Please sign in to comment.