diff --git a/codebeaver.yml b/codebeaver.yml new file mode 100644 index 00000000..419e2435 --- /dev/null +++ b/codebeaver.yml @@ -0,0 +1,2 @@ +from: pytest +# This file was generated automatically by CodeBeaver based on your repository. Learn how to customize it here: https://docs.codebeaver.ai/configuration/ \ No newline at end of file diff --git a/toolz/tests/test_itertoolz.py b/toolz/tests/test_itertoolz.py index 27907b9e..94e38918 100644 --- a/toolz/tests/test_itertoolz.py +++ b/toolz/tests/test_itertoolz.py @@ -4,16 +4,17 @@ from functools import partial from random import Random from pickle import dumps, loads +import pytest from toolz.itertoolz import (remove, groupby, merge_sorted, - concat, concatv, interleave, unique, - isiterable, getter, - mapcat, isdistinct, first, second, - nth, take, tail, drop, interpose, get, - rest, last, cons, frequencies, - reduceby, iterate, accumulate, - sliding_window, count, partition, - partition_all, take_nth, pluck, join, - diff, topk, peek, peekn, random_sample) + concat, concatv, interleave, unique, + isiterable, getter, + mapcat, isdistinct, first, second, + nth, take, tail, drop, interpose, get, + rest, last, cons, frequencies, + reduceby, iterate, accumulate, + sliding_window, count, partition, + partition_all, take_nth, pluck, join, + diff, topk, peek, peekn, random_sample) from operator import add, mul @@ -54,15 +55,15 @@ def test_groupby(): def test_groupby_non_callable(): assert groupby(0, [(1, 2), (1, 3), (2, 2), (2, 4)]) == \ {1: [(1, 2), (1, 3)], - 2: [(2, 2), (2, 4)]} + 2: [(2, 2), (2, 4)]} assert groupby([0], [(1, 2), (1, 3), (2, 2), (2, 4)]) == \ {(1,): [(1, 2), (1, 3)], - (2,): [(2, 2), (2, 4)]} + (2,): [(2, 2), (2, 4)]} assert groupby([0, 0], [(1, 2), (1, 3), (2, 2), (2, 4)]) == \ {(1, 1): [(1, 2), (1, 3)], - (2, 2): [(2, 2), (2, 4)]} + (2, 2): [(2, 2), (2, 4)]} def test_merge_sorted(): @@ -70,11 +71,11 @@ def test_merge_sorted(): assert list(merge_sorted([1, 3, 5], [2, 4, 6])) == [1, 2, 3, 4, 5, 6] assert list(merge_sorted([1], [2, 4], [3], [])) == [1, 2, 3, 4] assert list(merge_sorted([5, 3, 1], [6, 4, 3], [], - key=lambda x: -x)) == [6, 5, 4, 3, 3, 1] + key=lambda x: -x)) == [6, 5, 4, 3, 3, 1] assert list(merge_sorted([2, 1, 3], [1, 2, 3], - key=lambda x: x // 3)) == [2, 1, 1, 2, 3, 3] + key=lambda x: x // 3)) == [2, 1, 1, 2, 3, 3] assert list(merge_sorted([2, 3], [1, 3], - key=lambda x: x // 3)) == [2, 1, 3, 3] + key=lambda x: x // 3)) == [2, 1, 3, 3] assert ''.join(merge_sorted('abc', 'abc', 'abc')) == 'aaabbbccc' assert ''.join(merge_sorted('abc', 'abc', 'abc', key=ord)) == 'aaabbbccc' assert ''.join(merge_sorted('cba', 'cba', 'cba', @@ -261,7 +262,7 @@ def test_frequencies(): {"cat": 2, "eel": 1, "pig": 2, "dog": 3}) assert frequencies([]) == {} assert frequencies("onomatopoeia") == {"a": 2, "e": 1, "i": 1, "m": 1, - "o": 4, "n": 1, "p": 1, "t": 1} + "o": 4, "n": 1, "p": 1, "t": 1} def test_reduceby(): @@ -355,6 +356,20 @@ def __eq__(self, other): assert list(partition_all(4, [obj]*7)) == result assert list(partition_all(4, iter([obj]*7))) == result + # Test invalid __len__: https://github.com/pytoolz/toolz/issues/602 + class ListWithBadLength(list): + def __init__(self, contents, off_by=1): + self.off_by = off_by + super().__init__(contents) + + def __len__(self): + return super().__len__() + self.off_by + + too_long_list = ListWithBadLength([1, 2], off_by=+1) + assert raises(LookupError, lambda: list(partition_all(5, too_long_list))) + too_short_list = ListWithBadLength([1, 2], off_by=-1) + assert raises(LookupError, lambda: list(partition_all(5, too_short_list))) + def test_count(): assert count((1, 2, 3)) == 3 @@ -401,8 +416,8 @@ def addpair(pair): assert result == expected result = set(starmap(add, join(first, names, second, fruit, - left_default=no_default2, - right_default=no_default2))) + left_default=no_default2, + right_default=no_default2))) assert result == expected @@ -417,7 +432,7 @@ def test_key_as_getter(): pows = [(i, i**2, i**3) for i in range(5)] assert set(join(0, squares, 0, pows)) == set(join(lambda x: x[0], squares, - lambda x: x[0], pows)) + lambda x: x[0], pows)) get = lambda x: (x[0], x[1]) assert set(join([0, 1], squares, [0, 1], pows)) == set(join(get, squares, @@ -425,7 +440,7 @@ def test_key_as_getter(): get = lambda x: (x[0],) assert set(join([0], squares, [0], pows)) == set(join(get, squares, - get, pows)) + get, pows)) def test_join_double_repeats(): @@ -473,7 +488,7 @@ def test_right_outer_join(): def test_outer_join(): result = set(join(identity, [1, 2], identity, [2, 3], - left_default=None, right_default=None)) + left_default=None, right_default=None)) expected = {(2, 2), (1, None), (None, 3)} assert result == expected @@ -496,10 +511,10 @@ def test_diff(): assert list(diff([(1, 2), (1, 3)])) == [(2, 3)] data1 = [{'cost': 1, 'currency': 'dollar'}, - {'cost': 2, 'currency': 'dollar'}] + {'cost': 2, 'currency': 'dollar'}] data2 = [{'cost': 100, 'currency': 'yen'}, - {'cost': 300, 'currency': 'yen'}] + {'cost': 300, 'currency': 'yen'}] conversions = {'dollar': 1, 'yen': 0.01} @@ -556,8 +571,8 @@ def test_random_sample(): assert list(random_sample(prob=1, seq=alist, random_state=2016)) == alist mk_rsample = lambda rs=1: list(random_sample(prob=0.1, - seq=alist, - random_state=rs)) + seq=alist, + random_state=rs)) rsample1 = mk_rsample() assert rsample1 == mk_rsample() @@ -572,3 +587,88 @@ def test_random_sample(): assert mk_rsample(b"a") == mk_rsample(u"a") assert raises(TypeError, lambda: mk_rsample([])) + +def test_accumulate_empty_iterable(): + """Test that accumulate on an empty iterable returns an empty iterator + when no initial is given and returns a one-element iterator when an initial is given.""" + from toolz.itertoolz import accumulate, no_default + # When no initial is provided, an empty iterable returns nothing. + assert list(accumulate(lambda a, b: a + b, [])) == [] + # With an initial value provided, it should return a list with only that element. + init = "start" + assert list(accumulate(lambda a, b: a + b, [], init)) == [init] + +def test_interleave_infinite(): + """Test interleave with one infinite and one finite iterator and ensure it is lazy.""" + from toolz.itertoolz import interleave + import itertools + finite = iter([1, 2, 3]) + infinite = itertools.repeat('a') + # Limit the output using islice since one iterator is infinite. + result = list(itertools.islice(interleave([finite, infinite]), 6)) + # Expected to alternate until the finite iterator is exhausted. + assert result == [1, 'a', 2, 'a', 3, 'a'] + +def test_unique_edge_cases(): + """Test that unique works correctly with an empty sequence and with all duplicates.""" + from toolz.itertoolz import unique + # For empty input, unique returns an empty iterator. + assert tuple(unique([])) == () + # For all duplicates, only one instance should be returned. + assert tuple(unique([1, 1, 1, 1])) == (1,) + +def test_get_errors(): + """Test that get raises proper errors for non-indexable inputs and missing keys.""" + from toolz.itertoolz import get, no_default + # When an unsupported index is provided for a non-indexable type, expect TypeError. + with pytest.raises(TypeError): + get({}, [1, 2, 3]) + # When the key is missing and no default is provided, expect a KeyError. + with pytest.raises(KeyError): + get('missing', {'a': 1}) + +def test_partition_all_edge(): + """Test partition_all on sequences whose length is (or is not) divisible by n.""" + from toolz.itertoolz import partition_all, no_pad + # Divisible length + assert list(partition_all(3, [1, 2, 3, 4, 5, 6])) == [(1, 2, 3), (4, 5, 6)] + # Not divisible: the final partition should be shorter. + parts = list(partition_all(4, [1, 2, 3, 4, 5])) + assert parts[-1] == (5,) + +def test_iterate_single_value(): + """Test iterate returns the correct infinite sequence starting from a given value.""" + from toolz.itertoolz import iterate + itr = iterate(lambda x: x * 2, 1) + expected = [1, 2, 4, 8, 16] + result = [next(itr) for _ in range(5)] + assert result == expected + +def test_sliding_window_short_sequence(): + """Test that sliding_window returns an empty iterator when the window size exceeds the sequence length.""" + from toolz.itertoolz import sliding_window + result = list(sliding_window(5, [1, 2, 3])) + assert result == [] + +def test_random_sample_zero_probability(): + """Test that random_sample with probability 0 returns an empty list.""" + from toolz.itertoolz import random_sample + alist = list(range(100)) + result = list(random_sample(0, alist, random_state=42)) + assert result == [] + +def test_topk_zero(): + """Test that topk with k=0 returns an empty tuple.""" + from toolz.itertoolz import topk + result = topk(0, [1, 2, 3]) + assert result == () + +def test_join_empty(): + """Test that join returns an empty list when one or both of the sequences are empty.""" + from toolz.itertoolz import join + # Left sequence empty: no join results (inner join). + result = list(join(lambda x: x, [], lambda x: x, [1, 2, 3])) + assert result == [] + # Right sequence empty. + result = list(join(lambda x: x, [1, 2, 3], lambda x: x, [])) + assert result == []