From 54fcc531bc4aec03e6d3781a2a8168901c54dd53 Mon Sep 17 00:00:00 2001 From: Caleb Hattingh Date: Mon, 8 Jun 2020 00:00:35 +1000 Subject: [PATCH] Added some methods for reading and writing files --- README.rst | 212 ++++++++++++++++++++++++++++++++++++- excitertools.py | 272 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 459 insertions(+), 25 deletions(-) diff --git a/README.rst b/README.rst index d457625..a96cf9c 100644 --- a/README.rst +++ b/README.rst @@ -10,11 +10,14 @@ :target: https://pypi.python.org/pypi/excitertools .. image:: https://img.shields.io/github/tag/cjrh/excitertools.svg - :target: https://img.shields.io/github/tag/cjrh/excitertools.svg + :target: https://github.com/cjrh/excitertools .. image:: https://img.shields.io/badge/install-pip%20install%20excitertools-ff69b4.svg :target: https://img.shields.io/badge/install-pip%20install%20excitertools-ff69b4.svg +.. image:: https://img.shields.io/badge/dependencies-more--itertools-4488ff.svg + :target: https://more-itertools.readthedocs.io/en/stable/ + .. image:: https://img.shields.io/pypi/v/excitertools.svg :target: https://img.shields.io/pypi/v/excitertools.svg @@ -992,6 +995,207 @@ only reading is supported. +.. _Iter.readbytes_backwards: + + +|source| ``@classmethod Iter.readbytes_backwards(cls, filename: str, buffering=-1, chunksize=8192) -> "Iter[int]"`` +=================================================================================================================== + + + +Read a file as bytes *backwards* and send chunks on the chain. Note +that this returns an Iter_ of int: each byte is expressed as an +integer. Grouping functions can be used to group sections of +integers and convert those back into bytes sequences as necessary. + +This method appears crude, but it can easily be used to build up more +exciting things. For example, we can use it as a building block +for backwards iterating over some lines of a file. + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... filename = td + 'bytes.bin' + ... with open(filename, 'w', newline='\n') as f: + ... f.writelines(['abc\n', 'def\n', 'ghi\n']) + ... + ... ( + ... Iter + ... .readbytes_backwards(filename, chunksize=5) + ... .split_before(lambda byte: byte == 10) + ... .map(reversed) + ... .map(bytes) + ... .map(lambda x: x.decode()) + ... .take(2) + ... .collect() + ... ) + ['ghi\n', 'def\n'] + +The only real gotcha in this example is that the call to +Iter.split_before_ does the comparison against an integer, 10. +Comparison against ``"\n"`` won't work, because when the bytes +get split up into individual bytes they become integers. + + + +.. _Iter.readbytes: + + +|source| ``@classmethod Iter.readbytes(cls, filename: str, buffering=-1, chunksize=8192) -> "Iter[str]"`` +========================================================================================================= + + + +Read a file as bytes and send chunks on the chain. + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... # Put some random text into a temporary file + ... with open(td + 'bytes.bin', 'w', newline='\n') as f: + ... f.writelines(['abc\n', 'def\n', 'ghi\n']) + ... + ... # Open the file, filter some lines, collect the result + ... Iter.readbytes(td + 'bytes.bin', chunksize=5).collect() + [b'abc\nd', b'ef\ngh', b'i\n'] + + + +.. _Iter.writebytes: + + +|cool| |sink| ``Iter.writebytes(self, filename: str, mode: str = "wb", ) -> None`` +================================================================================== + + + +Write data on the chain to disk. + +There are several ways to deal with newlines. There's a parameter +to append newlines to every line written: + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter([b'abc', b'xyz']) + ... .writebytes(td + 'bytes.bin') + ... ) + ... + ... with open(td + 'bytes.bin', 'rb') as f: + ... data = f.read() + ... print(data) + b'abcxyz' + + + +.. _Iter.readlines: + + +|source| ``@classmethod Iter.readlines(cls, filename: str, encoding=None) -> "Iter[str]"`` +========================================================================================== + + + +Simpler method than Iter.open_. + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... # Put some random text into a temporary file + ... with open(td + 'text.txt', 'w') as f: + ... f.writelines(['abc\n', 'def\n', 'ghi\n']) + ... + ... # Open the file, filter some lines, collect the result + ... Iter.readlines(td + 'text.txt').filter(lambda line: 'def' in line).collect() + ['def\n'] + + + +.. _Iter.writelines: + + +|cool| |sink| ``Iter.writelines(self, filename: str, mode: str = "w", encoding=None, errors=None, append_newlines: bool = False, ) -> None`` +============================================================================================================================================ + + + +Write data on the chain to disk. + +There are several ways to deal with newlines. There's a parameter +to append newlines to every line written: + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter + ... .range(5) + ... .map(str) + ... .writelines(td + 'text.txt', append_newlines=True) + ... ) + ... + ... with open(td + 'text.txt', 'r') as f: + ... data = f.read() + ... print(data) + 0 + 1 + 2 + 3 + 4 + +Or, you can use Iter.intersperse_: + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter + ... .range(5) + ... .map(str) + ... .intersperse("\n") + ... .writelines(td + 'text.txt') + ... ) + ... + ... with open(td + 'text.txt', 'r') as f: + ... data = f.read() + ... print(data) + 0 + 1 + 2 + 3 + 4 + +Finally, the newline can be added in the call to Iter.map_: + +.. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter + ... .range(5) + ... .map(lambda line: str(line) + "\n") + ... .writelines(td + 'text.txt') + ... ) + ... + ... with open(td + 'text.txt', 'r') as f: + ... data = f.read() + ... print(data) + 0 + 1 + 2 + 3 + 4 + + + .. _Iter.range: @@ -1617,8 +1821,8 @@ Docstring TBD .. _Iter.split_at: -``Iter.split_at(self, pred)`` -============================= +``Iter.split_at(self, pred, maxsplit=-1, keep_separator=False)`` +================================================================ Docstring TBD @@ -2811,7 +3015,7 @@ Reference: `more_itertools.nth_combination 'Iter'`` +``@classmethod Iter.always_iterable(cls, obj, base_type=(str, bytes)) -> "Iter"`` ================================================================================= Reference: `more_itertools.always_iterable `_ diff --git a/excitertools.py b/excitertools.py index fb0aac1..8063e6e 100644 --- a/excitertools.py +++ b/excitertools.py @@ -109,7 +109,7 @@ """ # This cannot be enabled because we still support 3.6 and pypy -#from __future__ import annotations +# from __future__ import annotations import sys import string import itertools @@ -167,7 +167,6 @@ "concat", "from_queue", "IterDict", - "finditer_regex", "splititer_regex", ] @@ -640,9 +639,7 @@ def zip_longest(*iterables, fillvalue=None): def finditer_regex( - pat: "re.Pattern[AnyStr]", - s: AnyStr, - flags: Union[int, re.RegexFlag] = 0 + pat: "re.Pattern[AnyStr]", s: AnyStr, flags: Union[int, re.RegexFlag] = 0 ) -> "Iter[AnyStr]": """ Wrapper for ``re.finditer``. Returns an instance of Iter_ to allow @@ -667,9 +664,7 @@ def finditer_regex( def splititer_regex( - pat: "re.Pattern[AnyStr]", - s: AnyStr, - flags: Union[int, re.RegexFlag] = 0 + pat: "re.Pattern[AnyStr]", s: AnyStr, flags: Union[int, re.RegexFlag] = 0 ) -> "Iter[AnyStr]": """ Lazy string splitting using regular expressions. @@ -721,10 +716,11 @@ def splititer_regex( ['aaa'] """ + def inner(): prev = 0 for m in re.finditer(pat, s, flags): - yield s[prev:m.start()] + yield s[prev : m.start()] prev = m.end() yield s[prev:] @@ -1002,6 +998,7 @@ def register(cls, *func): """ for f in func: + def inner(self, *args, **kwargs): return Iter(f(self, *args, **kwargs)) @@ -1133,6 +1130,234 @@ def inner(): return cls(inner()) + @classmethod + def readbytes_backwards(cls, filename: str, buffering=-1, chunksize=8192) -> "Iter[int]": + """ + |source| + + Read a file as bytes *backwards* and send chunks on the chain. Note + that this returns an Iter_ of int: each byte is expressed as an + integer. Grouping functions can be used to group sections of + integers and convert those back into bytes sequences as necessary. + + This method appears crude, but it can easily be used to build up more + exciting things. For example, we can use it as a building block + for backwards iterating over some lines of a file. + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... filename = td + 'bytes.bin' + ... with open(filename, 'w', newline='\\n') as f: + ... f.writelines(['abc\\n', 'def\\n', 'ghi\\n']) + ... + ... ( + ... Iter + ... .readbytes_backwards(filename, chunksize=5) + ... .split_before(lambda byte: byte == 10) + ... .map(reversed) + ... .map(bytes) + ... .map(lambda x: x.decode()) + ... .take(2) + ... .collect() + ... ) + ['ghi\\n', 'def\\n'] + + The only real gotcha in this example is that the call to + Iter.split_before_ does the comparison against an integer, 10. + Comparison against ``"\\n"`` won't work, because when the bytes + get split up into individual bytes they become integers. + + """ + + def inner(): + with open(file=filename, mode="rb", buffering=buffering) as f: + import os + f.seek(0, os.SEEK_END) + file_size = remaining_size = f.tell() + while remaining_size: + posn = max(0, remaining_size - chunksize) + correction = posn - (remaining_size - chunksize) + f.seek(posn) + # yield bytes(reversed(f.read(chunksize))) + yield from reversed(f.read(chunksize - correction)) + remaining_size = posn + + return cls(inner()) + + @classmethod + def readbytes(cls, filename: str, buffering=-1, chunksize=8192) -> "Iter[str]": + """ + |source| + + Read a file as bytes and send chunks on the chain. + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... # Put some random text into a temporary file + ... with open(td + 'bytes.bin', 'w', newline='\\n') as f: + ... f.writelines(['abc\\n', 'def\\n', 'ghi\\n']) + ... + ... # Open the file, filter some lines, collect the result + ... Iter.readbytes(td + 'bytes.bin', chunksize=5).collect() + [b'abc\\nd', b'ef\\ngh', b'i\\n'] + + """ + + def inner(): + with open(file=filename, mode="rb", buffering=buffering) as f: + data = f.read(chunksize) + while data: + yield data + data = f.read(chunksize) + + return cls(inner()) + + def writebytes( + self, + filename: str, + mode: str = "wb", + ) -> None: + """ + |cool| + |sink| + Write data on the chain to disk. + + There are several ways to deal with newlines. There's a parameter + to append newlines to every line written: + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter([b'abc', b'xyz']) + ... .writebytes(td + 'bytes.bin') + ... ) + ... + ... with open(td + 'bytes.bin', 'rb') as f: + ... data = f.read() + ... print(data) + b'abcxyz' + + """ + with open(filename, mode=mode) as f: + for bites in self: + f.write(bites) + + @classmethod + def readlines(cls, filename: str, encoding=None) -> "Iter[str]": + """ + |source| + + Simpler method than Iter.open_. + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... # Put some random text into a temporary file + ... with open(td + 'text.txt', 'w') as f: + ... f.writelines(['abc\\n', 'def\\n', 'ghi\\n']) + ... + ... # Open the file, filter some lines, collect the result + ... Iter.readlines(td + 'text.txt').filter(lambda line: 'def' in line).collect() + ['def\\n'] + + """ + return cls.open(file=filename, encoding=encoding) + + def writelines( + self, + filename: str, + mode: str = "w", + encoding=None, + errors=None, + append_newlines: bool = False, + ) -> None: + """ + |cool| + |sink| + Write data on the chain to disk. + + There are several ways to deal with newlines. There's a parameter + to append newlines to every line written: + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter + ... .range(5) + ... .map(str) + ... .writelines(td + 'text.txt', append_newlines=True) + ... ) + ... + ... with open(td + 'text.txt', 'r') as f: + ... data = f.read() + ... print(data) + 0 + 1 + 2 + 3 + 4 + + Or, you can use Iter.intersperse_: + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter + ... .range(5) + ... .map(str) + ... .intersperse("\\n") + ... .writelines(td + 'text.txt') + ... ) + ... + ... with open(td + 'text.txt', 'r') as f: + ... data = f.read() + ... print(data) + 0 + 1 + 2 + 3 + 4 + + Finally, the newline can be added in the call to Iter.map_: + + .. code-block:: python + + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as td: + ... ( + ... Iter + ... .range(5) + ... .map(lambda line: str(line) + "\\n") + ... .writelines(td + 'text.txt') + ... ) + ... + ... with open(td + 'text.txt', 'r') as f: + ... data = f.read() + ... print(data) + 0 + 1 + 2 + 3 + 4 + + """ + with open(filename, mode=mode, newline="\n") as f: + for line in self: + f.write(line) + if append_newlines: + f.write("\n") + # Standard utilities @classmethod @@ -1346,7 +1571,9 @@ def filter(self, function: "Optional[Callable[[T], bool]]" = None) -> "Iter[T]": """ return Iter(_filter(function, self)) - def starfilter(self, function: "Optional[Callable[[T, ...], bool]]" = None) -> "Iter[T]": + def starfilter( + self, function: "Optional[Callable[[T, ...], bool]]" = None + ) -> "Iter[T]": """ |cool| Like Iter.filter_, but arg unpacking in lambdas will work. @@ -1576,8 +1803,8 @@ def accumulate(self, func=None, *, initial=None): if initial: raise RuntimeError( f'The "initial" kwarg was added in Python 3.8 and is not' - f'available on this version of Python which is ' - f'{sys.version_info} ' + f"available on this version of Python which is " + f"{sys.version_info} " ) return Iter(itertools.accumulate(self.x, func)) @@ -1678,9 +1905,9 @@ def divide(self, n: int) -> "Iter": """ Docstring TBD """ return Iter(Iter(x) for x in more_itertools.divide(n, self.x)) - def split_at(self, pred): + def split_at(self, pred, maxsplit=-1, keep_separator=False): """ Docstring TBD """ - return Iter(more_itertools.split_at(self.x, pred)) + return Iter(more_itertools.split_at(self.x, pred, maxsplit=maxsplit, keep_separator=keep_separator)) def split_before(self, pred): """ Docstring TBD """ @@ -2169,7 +2396,9 @@ def flatten(self) -> "Iter[T]": return Iter(more_itertools.flatten(self)) @class_or_instancemethod - def roundrobin(self_or_cls: Union[Type[T], T], *iterables: C) -> "Iter[Union[T, C]]": + def roundrobin( + self_or_cls: Union[Type[T], T], *iterables: C + ) -> "Iter[Union[T, C]]": """ Reference: `more_itertools.roundrobin `_ @@ -2703,7 +2932,9 @@ def random_product(self_or_cls, *args, repeat=1): if isinstance(self_or_cls, type): return Iter(more_itertools.random_product(*args, repeat=repeat)) else: - return Iter(more_itertools.random_product(self_or_cls, *args, repeat=repeat)) + return Iter( + more_itertools.random_product(self_or_cls, *args, repeat=repeat) + ) def random_permutation(self, r=None): """ @@ -2739,9 +2970,7 @@ def random_combination_with_replacement(self, r): [] """ - return Iter(more_itertools.random_combination_with_replacement( - self, r - )) + return Iter(more_itertools.random_combination_with_replacement(self, r)) def nth_combination(self, r, index): """ @@ -2764,7 +2993,7 @@ def nth_combination(self, r, index): # Wrapping @classmethod - def always_iterable(cls, obj, base_type=(str, bytes)) -> 'Iter': + def always_iterable(cls, obj, base_type=(str, bytes)) -> "Iter": """ Reference: `more_itertools.always_iterable `_ @@ -3353,6 +3582,7 @@ def send_also(self, collector: Generator) -> "Iter": live at least as long as the iterator feeding it. """ + def func(v): try: collector.send(v) @@ -3463,7 +3693,7 @@ def concat(iterable: Iterable[AnyStr], glue: AnyStr) -> "AnyStr": # TODO: this seems like a really inefficient way to do this. # it might be better to use interleave/intersperse and just # call bytes() on the result - int.to_bytes(v, 1, 'little') if isinstance(v, int) else v + int.to_bytes(v, 1, "little") if isinstance(v, int) else v for v in iterable )