Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: Example where support is added for resourceVersion (async only) #86

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lightkube/core/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ..config.kubeconfig import SingleConfig, KubeConfig
from ..core import resource as r
from .generic_client import GenericAsyncClient
from .generic_client import GenericAsyncClient, GenericAsyncIterator
from ..core.exceptions import ConditionError, ObjectDeleted
from ..types import OnErrorHandler, PatchType, CascadeType, on_error_raise
from .internal_resources import core_v1
Expand Down Expand Up @@ -234,7 +234,7 @@ def list(
chunk_size: int = None,
labels: LabelSelector = None,
fields: FieldSelector = None,
) -> AsyncIterable[GlobalResource]: ...
) -> GenericAsyncIterator[GlobalResource]: ...

@overload
def list(
Expand All @@ -245,7 +245,7 @@ def list(
chunk_size: int = None,
labels: LabelSelector = None,
fields: FieldSelector = None,
) -> AsyncIterable[NamespacedResource]: ...
) -> GenericAsyncIterator[NamespacedResource]: ...

def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None):
"""Return an iterator of objects matching the selection criteria.
Expand Down
58 changes: 47 additions & 11 deletions lightkube/core/generic_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from typing import Type, Any, Dict, Union
from functools import partial
from typing import Type, Any, Dict, Union, Generic, AsyncIterator, TypeVar
import dataclasses
from dataclasses import dataclass
import json
Expand Down Expand Up @@ -260,7 +261,8 @@ def handle_response(self, method, resp, br):
br.params["continue"] = data["metadata"]["continue"]
else:
cont = False
return cont, (self.convert_to_resource(res, obj) for obj in data["items"])
rv = data["metadata"].get("resourceVersion")
return cont, rv, (self.convert_to_resource(res, obj) for obj in data["items"])
else:
if res is not None:
return self.convert_to_resource(res, data)
Expand Down Expand Up @@ -315,10 +317,45 @@ def list(self, br: BasicRequest) -> Any:
while cont:
req = self.build_adapter_request(br)
resp = self.send(req)
cont, chunk = self.handle_response("list", resp, br)
cont, rv, chunk = self.handle_response("list", resp, br)
yield from chunk


T = TypeVar('T')

class GenericAsyncIterator(Generic[T]):

def __init__(self, next_page):
self._resource_version = None
self._next_page = next_page
self._continue = True
self._chunk = None

async def resourceVersion(self):
if self._chunk is None:
await self._read_chunk()
return self._resource_version

async def _read_chunk(self):
if self._continue:
self._continue, self._resource_version, chunk = await self._next_page()
self._chunk = iter(chunk)

async def __anext__(self):
if self._chunk:
item = next(self._chunk, None)
if item:
return item
await self._read_chunk()
item = next(self._chunk, None)
if item:
return item
raise StopAsyncIteration

def __aiter__(self) -> AsyncIterator[T]:
return self


class GenericAsyncClient(GenericClient):
AdapterClient = staticmethod(client_adapter.AsyncClient)

Expand Down Expand Up @@ -367,14 +404,13 @@ async def request(
resp = await self.send(req)
return self.handle_response(method, resp, br)

async def list(self, br: BasicRequest) -> Any:
cont = True
while cont:
req = self.build_adapter_request(br)
resp = await self.send(req)
cont, chunk = self.handle_response("list", resp, br)
for item in chunk:
yield item
async def next_page(self, br: BasicRequest) -> Any:
req = self.build_adapter_request(br)
resp = await self.send(req)
return self.handle_response("list", resp, br)

def list(self, br: BasicRequest) -> Any:
return GenericAsyncIterator(partial(self.next_page, br))

async def close(self):
await self._client.aclose()
6 changes: 3 additions & 3 deletions tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def test_get_namespaced(client: lightkube.AsyncClient):
@respx.mock
@pytest.mark.asyncio
async def test_list_global(client: lightkube.AsyncClient):
resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]}
resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}}
respx.get("https://localhost:9443/api/v1/nodes").respond(json=resp)
nodes = client.list(Node)
assert [node.metadata.name async for node in nodes] == ['xx', 'yy']
Expand All @@ -114,7 +114,7 @@ async def test_list_global(client: lightkube.AsyncClient):
@respx.mock
@pytest.mark.asyncio
async def test_list_namespaced(client: lightkube.AsyncClient):
resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]}
resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}}
respx.get("https://localhost:9443/api/v1/namespaces/default/pods").respond(json=resp)
pods = [pod async for pod in client.list(Pod)]
for pod, expected in zip(pods, resp["items"]):
Expand All @@ -133,7 +133,7 @@ async def test_list_namespaced(client: lightkube.AsyncClient):
async def test_list_chunk_size(client: lightkube.AsyncClient):
resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], 'metadata': {'continue': 'yes'}}
respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3").respond(json=resp)
resp = {'items': [{'metadata': {'name': 'zz'}}]}
resp = {'items': [{'metadata': {'name': 'zz'}}], 'metadata': {}}
respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3&continue=yes").respond(json=resp)
pods = client.list(Pod, chunk_size=3)
assert [pod.metadata.name async for pod in pods] == ['xx', 'yy', 'zz']
Expand Down
6 changes: 3 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_get_global(client: lightkube.Client):

@respx.mock
def test_list_namespaced(client: lightkube.Client):
resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]}
resp = {'items':[{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}}
respx.get("https://localhost:9443/api/v1/namespaces/default/pods").respond(json=resp)
pods = client.list(Pod)
for pod, expected in zip(pods, resp["items"]):
Expand All @@ -169,7 +169,7 @@ def test_list_crd(client: lightkube.Client):

@respx.mock
def test_list_global(client: lightkube.Client):
resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}]}
resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], "metadata": {}}
respx.get("https://localhost:9443/api/v1/nodes").respond(json=resp)
nodes = client.list(Node)
assert [node.metadata.name for node in nodes] == ['xx', 'yy']
Expand All @@ -187,7 +187,7 @@ def test_list_global(client: lightkube.Client):
def test_list_chunk_size(client: lightkube.Client):
resp = {'items': [{'metadata': {'name': 'xx'}}, {'metadata': {'name': 'yy'}}], 'metadata': {'continue': 'yes'}}
respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3").respond(json=resp)
resp = {'items': [{'metadata': {'name': 'zz'}}]}
resp = {'items': [{'metadata': {'name': 'zz'}}], 'metadata': {}}
respx.get("https://localhost:9443/api/v1/namespaces/default/pods?limit=3&continue=yes").respond(json=resp)
pods = client.list(Pod, chunk_size=3)
assert [pod.metadata.name for pod in pods] == ['xx', 'yy', 'zz']
Expand Down