Skip to content

Commit

Permalink
clickhouse wip driver
Browse files Browse the repository at this point in the history
  • Loading branch information
phenobarbital committed Sep 12, 2024
1 parent be79592 commit fcb7642
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 34 deletions.
96 changes: 65 additions & 31 deletions asyncdb/drivers/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from typing import Union, Any, Optional
from collections.abc import Iterable, Sequence
from collections.abc import Iterable, Sequence, Awaitable
from pathlib import Path
from aiochclient import ChClient
from aiohttp import ClientSession
from .sql import SQLDriver
from ..exceptions import DriverError

Expand All @@ -14,7 +16,7 @@ class clickhouse(SQLDriver):
Attributes:
-----------
_provider : str
Name of the database provider (e.g., 'duckdb').
Name of the database provider (e.g., 'clickhouse').
_syntax : str
SQL syntax specific to the database provider (e.g., 'sql').
_dsn : str
Expand All @@ -25,9 +27,10 @@ class clickhouse(SQLDriver):
Indicates if the driver is currently connected to the database.
"""

_provider: str = "duckdb"
_provider: str = "clickhouse"
_syntax: str = "sql"
_dsn: str = "{database}"
_test_query: str = "SELECT version()"

def __init__(
self,
Expand All @@ -51,6 +54,7 @@ def __init__(
kwargs : dict
Additional keyword arguments to pass to the base SQLDriver.
"""
self._session: Awaitable = None
SQLDriver.__init__(self, dsn, loop, params, **kwargs)

async def connection(self, **kwargs):
Expand All @@ -72,6 +76,18 @@ async def connection(self, **kwargs):
"""
self._connection = None
self._connected = False
if not self._session:
self._session = ClientSession()
try:
self._connection = ChClient(self._session, **self.params)
print(self._connection, await self._connection.is_alive())
if await self._connection.is_alive():
self._connected = True
return self
except Exception as exc:
raise DriverError(
f"clickhouse Error: {exc}"
) from exc

connect = connection

Expand All @@ -88,8 +104,41 @@ async def close(self, timeout: int = 5) -> None:
--------
None
"""
self._connection = None
self._connected = False
try:
if self._session:
await self._session.close()
finally:
self._connection = None
self._connected = False
self._session = None

async def __aenter__(self) -> Any:
"""
Asynchronous context manager entry.
Establishes a connection when entering the context.
Returns:
--------
self : clickhouse
Returns the instance of the driver itself.
Raises:
-------
DriverError
If an error occurs during connection establishment.
"""
try:
if not self._session:
self._session = ClientSession()
if not self._connection:
await self.connection()
except Exception as err:
error = f"Error on Cursor Fetch: {err}"
raise DriverError(message=error) from err
return self

async def __aexit__(self, exc_type, exc, tb):
await self.close()

async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]:
"""
Expand All @@ -112,6 +161,12 @@ async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]:
error = None
self._result = None
await self.valid_operation(sentence)
try:
result = await self._connection.fetch(sentence)
if result:
self._result = result
except Exception as exc:
error = exc
return await self._serializer(self._result, error)

async def queryrow(self, sentence: Any = None) -> Iterable[Any]:
Expand Down Expand Up @@ -251,27 +306,6 @@ async def execute_many(self, sentence: Union[str, list], args: list) -> Optional

executemany = execute_many

async def __aenter__(self) -> Any:
"""
Asynchronous context manager entry. Establishes a connection when entering the context.
Returns:
--------
self : clickhouse
Returns the instance of the driver itself.
Raises:
-------
DriverError
If an error occurs during connection establishment.
"""
try:
await self.connection()
except Exception as err:
error = f"Error on Cursor Fetch: {err}"
raise DriverError(message=error) from err
return self

async def copy_to(self, sentence: Union[str, Path], destination: str, **kwargs) -> bool:
"""
Copies the result of a query to a file asynchronously.
Expand Down Expand Up @@ -333,7 +367,7 @@ async def prepare(self, sentence: Union[str, list]) -> Any:
"""
Prepares a SQL sentence for execution.
Currently not implemented for DuckDB and raises NotImplementedError.
Currently not implemented for ClickHouse and raises NotImplementedError.
Parameters:
-----------
Expand All @@ -348,7 +382,7 @@ async def prepare(self, sentence: Union[str, list]) -> Any:
Raises:
-------
NotImplementedError
Raised when called, as DuckDB does not support prepared statements in this implementation.
Raised when called, as ClickHouse does not support prepared statements in this implementation.
"""
raise NotImplementedError() # pragma: no cover

Expand Down Expand Up @@ -402,7 +436,7 @@ async def use(self, database: str):
"""
Switches the default database to the specified one.
Currently not implemented for DuckDB and raises NotImplementedError.
Currently not implemented for ClickHouse and raises NotImplementedError.
Parameters:
-----------
Expand All @@ -412,6 +446,6 @@ async def use(self, database: str):
Raises:
-------
NotImplementedError
Raised when called, as DuckDB does not support switching databases.
Raised when called, as ClickHouse does not support switching databases.
"""
raise NotImplementedError("DuckDB Error: There is no Database in DuckDB")
raise NotImplementedError("ClickHouse Error: There is no Database in ClickHouse")
10 changes: 7 additions & 3 deletions asyncdb/interfaces/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ def __init__(
self._loop = asyncio.get_event_loop()
asyncio.set_event_loop(self._loop)
except RuntimeError:
raise RuntimeError(
"No Event Loop is running. Please, run this driver inside an asyncio loop."
)
try:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
except RuntimeError:
raise RuntimeError(
"No Event Loop is running. Please, run this driver inside an asyncio loop."
)
if self._loop.is_closed():
self._loop = asyncio.get_running_loop()
asyncio.set_event_loop(self._loop)
Expand Down
22 changes: 22 additions & 0 deletions examples/test_clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
from asyncdb.drivers.clickhouse import clickhouse
from pprint import pprint


async def connect(db):
async with await db.connection() as conn:
print('HERE >>')
pprint(await conn.test_connection())
print('END >>')

if __name__ == "__main__":
loop = asyncio.get_event_loop()
params = {
"url": "http://localhost:8123/",
"user": "default",
"password": "u69ebsZQ",
"database": "default"
}
driver = clickhouse(params=params, loop=loop)
print('DRV > ', driver)
loop.run_until_complete(connect(driver))

0 comments on commit fcb7642

Please sign in to comment.