diff --git a/asyncdb/drivers/clickhouse.py b/asyncdb/drivers/clickhouse.py index 8d3f44e5..477f8a9a 100644 --- a/asyncdb/drivers/clickhouse.py +++ b/asyncdb/drivers/clickhouse.py @@ -1,7 +1,9 @@ import asyncio from typing import Union, Any, Optional -from collections.abc import Iterable, Sequence +from collections.abc import Iterable, Sequence, Awaitable from pathlib import Path +from aiochclient import ChClient +from aiohttp import ClientSession from .sql import SQLDriver from ..exceptions import DriverError @@ -14,7 +16,7 @@ class clickhouse(SQLDriver): Attributes: ----------- _provider : str - Name of the database provider (e.g., 'duckdb'). + Name of the database provider (e.g., 'clickhouse'). _syntax : str SQL syntax specific to the database provider (e.g., 'sql'). _dsn : str @@ -25,9 +27,10 @@ class clickhouse(SQLDriver): Indicates if the driver is currently connected to the database. """ - _provider: str = "duckdb" + _provider: str = "clickhouse" _syntax: str = "sql" _dsn: str = "{database}" + _test_query: str = "SELECT version()" def __init__( self, @@ -51,6 +54,7 @@ def __init__( kwargs : dict Additional keyword arguments to pass to the base SQLDriver. """ + self._session: Awaitable = None SQLDriver.__init__(self, dsn, loop, params, **kwargs) async def connection(self, **kwargs): @@ -72,6 +76,18 @@ async def connection(self, **kwargs): """ self._connection = None self._connected = False + if not self._session: + self._session = ClientSession() + try: + self._connection = ChClient(self._session, **self.params) + print(self._connection, await self._connection.is_alive()) + if await self._connection.is_alive(): + self._connected = True + return self + except Exception as exc: + raise DriverError( + f"clickhouse Error: {exc}" + ) from exc connect = connection @@ -88,8 +104,41 @@ async def close(self, timeout: int = 5) -> None: -------- None """ - self._connection = None - self._connected = False + try: + if self._session: + await self._session.close() + finally: + self._connection = None + self._connected = False + self._session = None + + async def __aenter__(self) -> Any: + """ + Asynchronous context manager entry. + Establishes a connection when entering the context. + + Returns: + -------- + self : clickhouse + Returns the instance of the driver itself. + + Raises: + ------- + DriverError + If an error occurs during connection establishment. + """ + try: + if not self._session: + self._session = ClientSession() + if not self._connection: + await self.connection() + except Exception as err: + error = f"Error on Cursor Fetch: {err}" + raise DriverError(message=error) from err + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]: """ @@ -112,6 +161,12 @@ async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]: error = None self._result = None await self.valid_operation(sentence) + try: + result = await self._connection.fetch(sentence) + if result: + self._result = result + except Exception as exc: + error = exc return await self._serializer(self._result, error) async def queryrow(self, sentence: Any = None) -> Iterable[Any]: @@ -251,27 +306,6 @@ async def execute_many(self, sentence: Union[str, list], args: list) -> Optional executemany = execute_many - async def __aenter__(self) -> Any: - """ - Asynchronous context manager entry. Establishes a connection when entering the context. - - Returns: - -------- - self : clickhouse - Returns the instance of the driver itself. - - Raises: - ------- - DriverError - If an error occurs during connection establishment. - """ - try: - await self.connection() - except Exception as err: - error = f"Error on Cursor Fetch: {err}" - raise DriverError(message=error) from err - return self - async def copy_to(self, sentence: Union[str, Path], destination: str, **kwargs) -> bool: """ Copies the result of a query to a file asynchronously. @@ -333,7 +367,7 @@ async def prepare(self, sentence: Union[str, list]) -> Any: """ Prepares a SQL sentence for execution. - Currently not implemented for DuckDB and raises NotImplementedError. + Currently not implemented for ClickHouse and raises NotImplementedError. Parameters: ----------- @@ -348,7 +382,7 @@ async def prepare(self, sentence: Union[str, list]) -> Any: Raises: ------- NotImplementedError - Raised when called, as DuckDB does not support prepared statements in this implementation. + Raised when called, as ClickHouse does not support prepared statements in this implementation. """ raise NotImplementedError() # pragma: no cover @@ -402,7 +436,7 @@ async def use(self, database: str): """ Switches the default database to the specified one. - Currently not implemented for DuckDB and raises NotImplementedError. + Currently not implemented for ClickHouse and raises NotImplementedError. Parameters: ----------- @@ -412,6 +446,6 @@ async def use(self, database: str): Raises: ------- NotImplementedError - Raised when called, as DuckDB does not support switching databases. + Raised when called, as ClickHouse does not support switching databases. """ - raise NotImplementedError("DuckDB Error: There is no Database in DuckDB") + raise NotImplementedError("ClickHouse Error: There is no Database in ClickHouse") diff --git a/asyncdb/interfaces/abstract.py b/asyncdb/interfaces/abstract.py index ef4c4e22..9667c4a9 100644 --- a/asyncdb/interfaces/abstract.py +++ b/asyncdb/interfaces/abstract.py @@ -81,9 +81,13 @@ def __init__( self._loop = asyncio.get_event_loop() asyncio.set_event_loop(self._loop) except RuntimeError: - raise RuntimeError( - "No Event Loop is running. Please, run this driver inside an asyncio loop." - ) + try: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + except RuntimeError: + raise RuntimeError( + "No Event Loop is running. Please, run this driver inside an asyncio loop." + ) if self._loop.is_closed(): self._loop = asyncio.get_running_loop() asyncio.set_event_loop(self._loop) diff --git a/examples/test_clickhouse.py b/examples/test_clickhouse.py new file mode 100644 index 00000000..b1236753 --- /dev/null +++ b/examples/test_clickhouse.py @@ -0,0 +1,22 @@ +import asyncio +from asyncdb.drivers.clickhouse import clickhouse +from pprint import pprint + + +async def connect(db): + async with await db.connection() as conn: + print('HERE >>') + pprint(await conn.test_connection()) + print('END >>') + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + params = { + "url": "http://localhost:8123/", + "user": "default", + "password": "u69ebsZQ", + "database": "default" + } + driver = clickhouse(params=params, loop=loop) + print('DRV > ', driver) + loop.run_until_complete(connect(driver))