Skip to content

Commit

Permalink
Merge pull request #16 from glaciation-heu/HHT-669-core-ingestion-pro…
Browse files Browse the repository at this point in the history
…cessing

HHT-669: knowledge graph exporting based in k8s resources and metrics
  • Loading branch information
ktatarnikovhiro authored Jul 12, 2024
2 parents 1d5f9c9 + deb605c commit 93c7e0f
Show file tree
Hide file tree
Showing 180 changed files with 10,486 additions and 4,217 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ RUN pip install --no-cache-dir poetry \
&& poetry install --no-root --without dev,test \
&& rm -rf $(poetry config cache-dir)/{cache,artifacts}

COPY ./etc/config.yaml /code
COPY ./app /code/app
WORKDIR /code

CMD ["python", "-m", "app.kg_exporter", "--incluster"]
CMD ["python", "-m", "app.main", "--config", "./config.yaml"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ poetry run pytest

5. Launch the service:
```bash
poetry run python app/kg_exporter.py
poetry run python -m app.main --config ./etc/config.yaml
```

## Integration tests
Expand Down
File renamed without changes.
153 changes: 153 additions & 0 deletions app/clients/k8s/k8s_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from typing import Any, Dict, List, Optional, Set

import asyncio
from dataclasses import dataclass, field


@dataclass
class ResourceSnapshot:
cluster: Dict[str, Any] = field(default_factory=dict)
versions_info: Dict[str, Any] = field(default_factory=dict)
pods: List[Dict[str, Any]] = field(default_factory=list)
nodes: List[Dict[str, Any]] = field(default_factory=list)
deployments: List[Dict[str, Any]] = field(default_factory=list)
jobs: List[Dict[str, Any]] = field(default_factory=list)
statefullsets: List[Dict[str, Any]] = field(default_factory=list)
daemonsets: List[Dict[str, Any]] = field(default_factory=list)
replicasets: List[Dict[str, Any]] = field(default_factory=list)

def get_resource_names(self) -> Set[str]:
names: Set[str] = set()
names = {*names, *{self.get_resource_name(resource) for resource in self.pods}}
names = {*names, *{self.get_resource_name(resource) for resource in self.nodes}}
names = {
*names,
*{self.get_resource_name(resource) for resource in self.deployments},
}
names = {*names, *{self.get_resource_name(resource) for resource in self.jobs}}
names = {
*names,
*{self.get_resource_name(resource) for resource in self.statefullsets},
}
names = {
*names,
*{self.get_resource_name(resource) for resource in self.daemonsets},
}
names = {
*names,
*{self.get_resource_name(resource) for resource in self.replicasets},
}
return names

def find_resources_by_kind_and_identity(
self, kind: str, identity: str
) -> List[Dict[str, Any]]:
resources = self.get_resources_by_kind(kind)
if resources:
return [
resource
for resource in resources
if self.get_resource_name(resource) == identity
]
else:
return []

def get_resource_name(self, node: Dict[str, Any]) -> str:
return node["metadata"]["name"] # type: ignore

def get_resources_by_kind(self, kind: str) -> Optional[List[Dict[str, Any]]]:
if kind == "Pod":
return self.pods
elif kind == "Node":
return self.nodes
elif kind == "Deployment":
return self.deployments
elif kind == "Job":
return self.jobs
elif kind == "StatefulSet":
return self.statefullsets
elif kind == "DaemonSet":
return self.daemonsets
elif kind == "ReplicaSet":
return self.replicasets
return None

def add_resources_by_kind(self, kind: str, resources: List[Dict[str, Any]]) -> None:
if kind == "Pod":
self.pods.extend(resources)
elif kind == "Node":
self.nodes.extend(resources)
elif kind == "Deployment":
self.deployments.extend(resources)
elif kind == "Job":
self.jobs.extend(resources)
elif kind == "StatefulSet":
self.statefullsets.extend(resources)
elif kind == "DaemonSet":
self.daemonsets.extend(resources)
elif kind == "ReplicaSet":
self.replicasets.extend(resources)


class K8SClient:
async def fetch_snapshot(self) -> ResourceSnapshot:
resources = await asyncio.gather(
self.get_nodes(),
self.get_pods(),
self.get_deployments(),
self.get_jobs(),
self.get_statefullsets(),
self.get_daemonsets(),
self.get_replicasets(),
)
(
nodes,
pods,
deployments,
jobs,
statefullsets,
daemonsets,
replicasets,
) = resources
general_info = await asyncio.gather(
self.get_cluster_info(), self.get_api_versions()
)
(cluster_info, versions_info) = general_info
return ResourceSnapshot(
cluster=cluster_info,
versions_info=versions_info,
pods=pods,
nodes=nodes,
deployments=deployments,
jobs=jobs,
statefullsets=statefullsets,
daemonsets=daemonsets,
replicasets=replicasets,
)

async def get_nodes(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_pods(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_replicasets(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_deployments(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_daemonsets(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_statefullsets(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_jobs(self) -> List[Dict[str, Any]]:
raise NotImplementedError

async def get_cluster_info(self) -> Dict[str, Any]:
raise NotImplementedError

async def get_api_versions(self) -> Dict[str, Any]:
raise NotImplementedError
93 changes: 93 additions & 0 deletions app/clients/k8s/k8s_client_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Any, Callable, Coroutine, Dict, List, Optional, TypeVar

from kubernetes_asyncio import config
from kubernetes_asyncio.client import ApiClient
from kubernetes_asyncio.client.api.core_api import CoreApi
from kubernetes_asyncio.client.configuration import Configuration
from kubernetes_asyncio.dynamic import DynamicClient

from app.clients.k8s.k8s_client import K8SClient
from app.clients.k8s.k8s_settings import K8SSettings

T = TypeVar("T")


class K8SClientImpl(K8SClient):
settings: K8SSettings
configuration: Optional[Configuration]

def __init__(self, settings: K8SSettings):
self.settings = settings
self.configuration = None

async def get_nodes(self) -> List[Dict[str, Any]]:
return await self.get_resource("Node")

async def get_pods(self) -> List[Dict[str, Any]]:
return await self.get_resource("Pod")

async def get_deployments(self) -> List[Dict[str, Any]]:
return await self.get_resource("Deployment")

async def get_replicasets(self) -> List[Dict[str, Any]]:
return await self.get_resource("ReplicaSet")

async def get_daemonsets(self) -> List[Dict[str, Any]]:
return await self.get_resource("DaemonSet")

async def get_statefullsets(self) -> List[Dict[str, Any]]:
return await self.get_resource("StatefulSet")

async def get_jobs(self) -> List[Dict[str, Any]]:
return await self.get_resource("Job")

async def get_cluster_info(self) -> Dict[str, Any]:
async def get_cluster_info_internal(
dyn_client: DynamicClient,
) -> Dict[str, Any]:
configmap_api = await dyn_client.resources.get(
api_version="v1", kind="ConfigMap"
)
results = await configmap_api.get(
namespace="kube-system", name="kubeadm-config"
)
if results:
return results.to_dict() # type: ignore
else:
return {}

return await self.execute(get_cluster_info_internal)

async def get_api_versions(self) -> Dict[str, Any]:
async def get_api_versions_internal(
dyn_client: DynamicClient,
) -> Dict[str, Any]:
versions = await CoreApi(dyn_client.client).get_api_versions()
if versions:
return versions.to_dict() # type: ignore
else:
return {}

return await self.execute(get_api_versions_internal)

async def get_resource(self, kind: str) -> List[Dict[str, Any]]:
async def get_resource_internal(client: DynamicClient) -> List[Dict[str, Any]]:
api = await client.resources.get(api_version="v1", kind=kind)
result = await api.get()
return [item.to_dict() for item in result.items]

return await self.execute(get_resource_internal)

async def execute(
self, func: Callable[[DynamicClient], Coroutine[Any, Any, T]]
) -> T:
if not self.configuration:
self.configuration = Configuration()
if self.settings.in_cluster:
config.load_incluster_config(client_configuration=self.configuration)
else:
await config.load_kube_config(client_configuration=self.configuration)

async with ApiClient(configuration=self.configuration) as client_api:
async with DynamicClient(client_api) as dynamic_api:
return await func(dynamic_api)
5 changes: 5 additions & 0 deletions app/clients/k8s/k8s_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pydantic_settings import BaseSettings


class K8SSettings(BaseSettings):
in_cluster: bool
80 changes: 80 additions & 0 deletions app/clients/k8s/mock_k8s_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Any, Dict, List

from app.clients.k8s.k8s_client import K8SClient


class MockK8SClient(K8SClient):
nodes: List[Dict[str, Any]]
pods: List[Dict[str, Any]]
replicasets: List[Dict[str, Any]]
deployments: List[Dict[str, Any]]
daemonsets: List[Dict[str, Any]]
statefullsets: List[Dict[str, Any]]
jobs: List[Dict[str, Any]]
cluster: Dict[str, Any]
api_versions: Dict[str, Any]

def __init__(self):
self.nodes = []
self.pods = []
self.replicasets = []
self.deployments = []
self.daemonsets = []
self.statefullsets = []
self.jobs = []
self.cluster = {}
self.api_versions = {}

def mock_api_versions(self, api_versions: Dict[str, Any]) -> None:
self.api_versions = api_versions

async def get_api_versions(self) -> Dict[str, Any]:
return self.api_versions

def mock_cluster(self, cluster: Dict[str, Any]) -> None:
self.cluster = cluster

async def get_cluster_info(self) -> Dict[str, Any]:
return self.cluster

def mock_nodes(self, nodes: List[Dict[str, Any]]) -> None:
self.nodes = nodes

async def get_nodes(self) -> List[Dict[str, Any]]:
return self.nodes

def mock_pods(self, pods: List[Dict[str, Any]]) -> None:
self.pods = pods

async def get_pods(self) -> List[Dict[str, Any]]:
return self.pods

def mock_replicasets(self, replicasets: List[Dict[str, Any]]) -> None:
self.replicasets = replicasets

async def get_replicasets(self) -> List[Dict[str, Any]]:
return self.replicasets

def mock_deployments(self, deployments: List[Dict[str, Any]]) -> None:
self.deployments = deployments

async def get_deployments(self) -> List[Dict[str, Any]]:
return self.deployments

def mock_daemonsets(self, daemonsets: List[Dict[str, Any]]) -> None:
self.daemonsets = daemonsets

async def get_daemonsets(self) -> List[Dict[str, Any]]:
return self.daemonsets

def mock_statefullsets(self, statefullsets: List[Dict[str, Any]]) -> None:
self.statefullsets = statefullsets

async def get_statefullsets(self) -> List[Dict[str, Any]]:
return self.statefullsets

def mock_jobs(self, jobs: List[Dict[str, Any]]) -> None:
self.jobs = jobs

async def get_jobs(self) -> List[Dict[str, Any]]:
return self.jobs
29 changes: 29 additions & 0 deletions app/clients/k8s/test_k8s_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
from unittest import TestCase

from app.clients.k8s.k8s_client import ResourceSnapshot
from app.clients.k8s.mock_k8s_client import MockK8SClient


class K8SClientTest(TestCase):
def test_get_resource_snapshot(self):
client = MockK8SClient()
client.mock_daemonsets([{"daemonsets": "fake"}])
client.mock_deployments([{"deployments": "fake"}])
client.mock_jobs([{"jobs": "fake"}])
client.mock_nodes([{"nodes": "fake"}])
client.mock_pods([{"pods": "fake"}])
client.mock_replicasets([{"replicasets": "fake"}])
client.mock_statefullsets([{"statefullsets": "fake"}])
actual = asyncio.run(client.fetch_snapshot())

expected = ResourceSnapshot(
pods=[{"pods": "fake"}],
nodes=[{"nodes": "fake"}],
deployments=[{"deployments": "fake"}],
jobs=[{"jobs": "fake"}],
statefullsets=[{"statefullsets": "fake"}],
daemonsets=[{"daemonsets": "fake"}],
replicasets=[{"replicasets": "fake"}],
)
self.assertEqual(expected, actual)
Loading

0 comments on commit 93c7e0f

Please sign in to comment.