From ec1900cea15de394f478ff91e108a760b0f1eb6d Mon Sep 17 00:00:00 2001 From: Giuseppe Tribulato Date: Mon, 30 Dec 2024 23:46:15 +0100 Subject: [PATCH] Working example --- lightkube/core/async_client.py | 6 ++-- lightkube/core/generic_client.py | 58 ++++++++++++++++++++++++++------ tests/test_async_client.py | 6 ++-- tests/test_client.py | 6 ++-- 4 files changed, 56 insertions(+), 20 deletions(-) diff --git a/lightkube/core/async_client.py b/lightkube/core/async_client.py index 5059bb9..856dac4 100644 --- a/lightkube/core/async_client.py +++ b/lightkube/core/async_client.py @@ -12,7 +12,7 @@ from ..config.kubeconfig import SingleConfig, KubeConfig from ..core import resource as r -from .generic_client import GenericAsyncClient +from .generic_client import GenericAsyncClient, GenericAsyncIterator from ..core.exceptions import ConditionError, ObjectDeleted from ..types import OnErrorHandler, PatchType, CascadeType, on_error_raise from .internal_resources import core_v1 @@ -234,7 +234,7 @@ def list( chunk_size: int = None, labels: LabelSelector = None, fields: FieldSelector = None, - ) -> AsyncIterable[GlobalResource]: ... + ) -> GenericAsyncIterator[GlobalResource]: ... @overload def list( @@ -245,7 +245,7 @@ def list( chunk_size: int = None, labels: LabelSelector = None, fields: FieldSelector = None, - ) -> AsyncIterable[NamespacedResource]: ... + ) -> GenericAsyncIterator[NamespacedResource]: ... def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None): """Return an iterator of objects matching the selection criteria. diff --git a/lightkube/core/generic_client.py b/lightkube/core/generic_client.py index 3157312..97789b1 100644 --- a/lightkube/core/generic_client.py +++ b/lightkube/core/generic_client.py @@ -1,5 +1,6 @@ import time -from typing import Type, Any, Dict, Union +from functools import partial +from typing import Type, Any, Dict, Union, Generic, AsyncIterator, TypeVar import dataclasses from dataclasses import dataclass import json @@ -260,7 +261,8 @@ def handle_response(self, method, resp, br): br.params["continue"] = data["metadata"]["continue"] else: cont = False - return cont, (self.convert_to_resource(res, obj) for obj in data["items"]) + rv = data["metadata"].get("resourceVersion") + return cont, rv, (self.convert_to_resource(res, obj) for obj in data["items"]) else: if res is not None: return self.convert_to_resource(res, data) @@ -315,10 +317,45 @@ def list(self, br: BasicRequest) -> Any: while cont: req = self.build_adapter_request(br) resp = self.send(req) - cont, chunk = self.handle_response("list", resp, br) + cont, rv, chunk = self.handle_response("list", resp, br) yield from chunk +T = TypeVar('T') + +class GenericAsyncIterator(Generic[T]): + + def __init__(self, next_page): + self._resource_version = None + self._next_page = next_page + self._continue = True + self._chunk = None + + async def resourceVersion(self): + if self._chunk is None: + await self._read_chunk() + return self._resource_version + + async def _read_chunk(self): + if self._continue: + self._continue, self._resource_version, chunk = await self._next_page() + self._chunk = iter(chunk) + + async def __anext__(self): + if self._chunk: + item = next(self._chunk, None) + if item: + return item + await self._read_chunk() + item = next(self._chunk, None) + if item: + return item + raise StopAsyncIteration + + def __aiter__(self) -> AsyncIterator[T]: + return self + + class GenericAsyncClient(GenericClient): AdapterClient = staticmethod(client_adapter.AsyncClient) @@ -367,14 +404,13 @@ async def request( resp = await self.send(req) return self.handle_response(method, resp, br) - async def list(self, br: BasicRequest) -> Any: - cont = True - while cont: - req = self.build_adapter_request(br) - resp = await self.send(req) - cont, chunk = self.handle_response("list", resp, br) - for item in chunk: - yield item + async def next_page(self, br: BasicRequest) -> Any: + req = self.build_adapter_request(br) + resp = await self.send(req) + return self.handle_response("list", resp, br) + + def list(self, br: BasicRequest) -> Any: + return GenericAsyncIterator(partial(self.next_page, br)) async def close(self): await self._client.aclose() diff --git a/tests/test_async_client.py b/tests/test_async_client.py index 69113d1..81cea25 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -96,7 +96,7 @@ async def test_get_namespaced(client: lightkube.AsyncClient): @respx.mock @pytest.mark.asyncio async def test_list_global(client: lightkube.AsyncClient): - resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]} + resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}} respx.get("https://localhost:9443/api/v1/nodes").respond(json=resp) nodes = client.list(Node) assert [node.metadata.name async for node in nodes] == ['xx', 'yy'] @@ -114,7 +114,7 @@ async def test_list_global(client: lightkube.AsyncClient): @respx.mock @pytest.mark.asyncio async def test_list_namespaced(client: lightkube.AsyncClient): - resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]} + resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}} respx.get("https://localhost:9443/api/v1/namespaces/default/pods").respond(json=resp) pods = [pod async for pod in client.list(Pod)] for pod, expected in zip(pods, resp["items"]): @@ -133,7 +133,7 @@ async def test_list_namespaced(client: lightkube.AsyncClient): async def test_list_chunk_size(client: lightkube.AsyncClient): resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], 'metadata': {'continue': 'yes'}} respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3").respond(json=resp) - resp = {'items': [{'metadata': {'name': 'zz'}}]} + resp = {'items': [{'metadata': {'name': 'zz'}}], 'metadata': {}} respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3&continue=yes").respond(json=resp) pods = client.list(Pod, chunk_size=3) assert [pod.metadata.name async for pod in pods] == ['xx', 'yy', 'zz'] diff --git a/tests/test_client.py b/tests/test_client.py index 9c0cfa4..bf7cdda 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -144,7 +144,7 @@ def test_get_global(client: lightkube.Client): @respx.mock def test_list_namespaced(client: lightkube.Client): - resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]} + resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}} respx.get("https://localhost:9443/api/v1/namespaces/default/pods").respond(json=resp) pods = client.list(Pod) for pod, expected in zip(pods, resp["items"]): @@ -169,7 +169,7 @@ def test_list_crd(client: lightkube.Client): @respx.mock def test_list_global(client: lightkube.Client): - resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]} + resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}} respx.get("https://localhost:9443/api/v1/nodes").respond(json=resp) nodes = client.list(Node) assert [node.metadata.name for node in nodes] == ['xx', 'yy'] @@ -187,7 +187,7 @@ def test_list_global(client: lightkube.Client): def test_list_chunk_size(client: lightkube.Client): resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], 'metadata': {'continue': 'yes'}} respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3").respond(json=resp) - resp = {'items': [{'metadata': {'name': 'zz'}}]} + resp = {'items': [{'metadata': {'name': 'zz'}}], 'metadata': {}} respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3&continue=yes").respond(json=resp) pods = client.list(Pod, chunk_size=3) assert [pod.metadata.name for pod in pods] == ['xx', 'yy', 'zz']