From 29b2c1051d4a331a590934ed7eb768e2e1b6f3a3 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Thu, 12 Sep 2024 13:36:27 +0200 Subject: [PATCH 1/2] fix some syntax issues --- asyncdb/drivers/delta.py | 2 +- asyncdb/drivers/mysqlclient.py | 2 +- asyncdb/drivers/oracle.py | 11 +++++-- asyncdb/drivers/redis.py | 10 ++++-- asyncdb/drivers/sa.py | 57 +++++++++++++++++----------------- asyncdb/interfaces/abstract.py | 6 ++-- examples/test_mysql.py | 1 - tests/test_mysql.py | 2 +- 8 files changed, 51 insertions(+), 40 deletions(-) diff --git a/asyncdb/drivers/delta.py b/asyncdb/drivers/delta.py index b54ea5aa..2c8ca5bb 100644 --- a/asyncdb/drivers/delta.py +++ b/asyncdb/drivers/delta.py @@ -445,7 +445,7 @@ async def file_to_parquet( pq.write_table(atable, parquet, compression="snappy") except Exception as exc: raise DriverError( - f"Query Error: {exc}" + f"Delta File To Parquet Error: {exc}" ) from exc async def write( diff --git a/asyncdb/drivers/mysqlclient.py b/asyncdb/drivers/mysqlclient.py index 02932290..f0347df4 100644 --- a/asyncdb/drivers/mysqlclient.py +++ b/asyncdb/drivers/mysqlclient.py @@ -144,7 +144,7 @@ async def close(self): await self._thread_func(self._pool.close) self._connected = False self._logger.debug( - f"MySQL Connection Closed." + "MySQL Connection Closed." ) disconnect = close diff --git a/asyncdb/drivers/oracle.py b/asyncdb/drivers/oracle.py index 2ff7af0a..5872470e 100644 --- a/asyncdb/drivers/oracle.py +++ b/asyncdb/drivers/oracle.py @@ -29,9 +29,16 @@ def __init__(self, dsn: str = "", loop: asyncio.AbstractEventLoop = None, params except KeyError: self._lib_dir = None try: - super(oracle, self).__init__(dsn=dsn, loop=loop, params=params, **kwargs) + super(oracle, self).__init__( + dsn=dsn, + loop=loop, + params=params, + **kwargs + ) _generated = datetime.now() - _starttime - print(f"Oracle Started in: {_generated}") + print( + f"Oracle Started in: {_generated}" + ) except Exception as err: raise DriverError(f"Oracle Error: {err}") from err # set the JSON encoder: diff --git a/asyncdb/drivers/redis.py b/asyncdb/drivers/redis.py index fb17aad1..a5802dc0 100644 --- a/asyncdb/drivers/redis.py +++ b/asyncdb/drivers/redis.py @@ -241,8 +241,14 @@ async def execute(self, sentence, *args, **kwargs) -> Any: if self._connection: try: return await self._connection.execute_command(sentence, *args) - except (RedisError,) as err: - raise DriverError(f"Connection Error: {err}") from err + except RedisError as err: + raise DriverError( + f"Connection Error: {err}" + ) from err + except Exception as err: + raise DriverError( + f"Unknown Redis Error: {err}" + ) from err execute_many = execute diff --git a/asyncdb/drivers/sa.py b/asyncdb/drivers/sa.py index 0457f7f7..fcc44bb1 100644 --- a/asyncdb/drivers/sa.py +++ b/asyncdb/drivers/sa.py @@ -42,8 +42,8 @@ async def __anext__(self): # Retrieve the next item in the result set try: return next(self._result_iter) - except StopIteration: - raise StopAsyncIteration + except StopIteration as e: + raise StopAsyncIteration from e class sa(SQLDriver, DBCursorBackend): _provider = "sa" @@ -84,8 +84,7 @@ def __init__( if params: self._driver = params.get('driver', "postgresql+asyncpg") else: - params = {} - params["driver"] = "postgresql+asyncpg" + params = {"driver": "postgresql+asyncpg"} SQLDriver.__init__( self, dsn=dsn, loop=loop, @@ -226,7 +225,7 @@ async def query( self, sentence: Any, params: List = None, - format: str = None + query_format: str = None ): """ Running Query. @@ -234,8 +233,8 @@ async def query( self._result = None error = None await self.valid_operation(sentence) - if not format: - format = self._row_format + if not query_format: + query_format = self._row_format try: self.start_timing() async with self._connection.connect() as conn: @@ -245,11 +244,11 @@ async def query( rows = result.fetchall() # Get the column names from the result metadata column_names = result.keys() - if format in ("dict", "iterable"): + if query_format in ("dict", "iterable"): self._result = [ dict(zip(column_names, row)) for row in rows ] - elif format == "record": + elif query_format == "record": self._result = [ self._construct_record(row, column_names) for row in rows ] @@ -275,7 +274,7 @@ async def queryrow( self, sentence: Any, params: Any = None, - format: Optional[str] = None + query_format: Optional[str] = None ): """ Running Query and return only one row. @@ -284,8 +283,8 @@ async def queryrow( error = None await self.valid_operation(sentence) try: - if not format: - format = self._row_format + if not query_format: + query_format = self._row_format result = None async with self._connection.connect() as conn: if isinstance(sentence, str): @@ -293,9 +292,9 @@ async def queryrow( result = await conn.execute(sentence, params) column_names = result.keys() row = result.fetchone() - if format in ("dict", 'iterable'): + if query_format in ("dict", 'iterable'): self._result = dict(zip(column_names, row)) - elif format == "record": + elif query_format == "record": self._result = self._construct_record(row, column_names) else: self._result = row @@ -317,7 +316,7 @@ async def fetch_all( self, sentence: Any, params: List = None, - format: Optional[str] = None + query_format: Optional[str] = None ): """ Fetch All Rows in a Query. @@ -325,8 +324,8 @@ async def fetch_all( result = None await self.valid_operation(sentence) try: - if not format: - format = self._row_format + if not query_format: + query_format = self._row_format async with self._connection.connect() as conn: if isinstance(sentence, str): sentence = text(sentence) @@ -335,11 +334,11 @@ async def fetch_all( rows = rst.fetchall() if rows is None: return None - if format in ("dict", 'iterable'): + if query_format in ("dict", 'iterable'): result = [ dict(zip(column_names, row)) for row in rows ] - elif format == "record": + elif query_format == "record": result = [ self._construct_record(row, column_names) for row in rows ] @@ -362,7 +361,7 @@ async def fetch_many( sentence: Any, size: int = 1, params: List = None, - format: Optional[str] = None + query_format: Optional[str] = None ): """ Fetch Many Rows from a Query as requested. @@ -370,8 +369,8 @@ async def fetch_many( result = None await self.valid_operation(sentence) try: - if not format: - format = self._row_format + if not query_format: + query_format = self._row_format async with self._connection.connect() as conn: if isinstance(sentence, str): sentence = text(sentence) @@ -380,11 +379,11 @@ async def fetch_many( rows = rst.fetchmany(size) if rows is None: return None - if format in ("dict", 'iterable'): + if query_format in ("dict", 'iterable'): result = [ dict(zip(column_names, row)) for row in rows ] - elif format == "record": + elif query_format == "record": result = [ self._construct_record(row, column_names) for row in rows ] @@ -408,7 +407,7 @@ async def fetch_one( self, sentence: Any, params: List = None, - format: Optional[str] = None + query_format: Optional[str] = None ): """ Running Query and return only one row. @@ -416,8 +415,8 @@ async def fetch_one( result = None await self.valid_operation(sentence) try: - if not format: - format = self._row_format + if not query_format: + query_format = self._row_format async with self._connection.connect() as conn: if isinstance(sentence, str): sentence = text(sentence) @@ -426,9 +425,9 @@ async def fetch_one( row = rst.fetchone() if row is None: return None - if format in ("dict", 'iterable'): + if query_format in ("dict", 'iterable'): result = dict(zip(column_names, row)) - elif format == "record": + elif query_format == "record": result = Record( dict(zip(column_names, row)), column_names diff --git a/asyncdb/interfaces/abstract.py b/asyncdb/interfaces/abstract.py index 9667c4a9..7e0cfd10 100644 --- a/asyncdb/interfaces/abstract.py +++ b/asyncdb/interfaces/abstract.py @@ -84,10 +84,10 @@ def __init__( try: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) - except RuntimeError: + except RuntimeError as e: raise RuntimeError( "No Event Loop is running. Please, run this driver inside an asyncio loop." - ) + ) from e if self._loop.is_closed(): self._loop = asyncio.get_running_loop() asyncio.set_event_loop(self._loop) @@ -100,7 +100,7 @@ def get_loop(self): event_loop = get_loop def event_loop_is_closed(self): - return True if not self._loop else bool(self._loop.is_closed()) + return bool(self._loop.is_closed()) if self._loop else True class PoolContextManager(Awaitable, AbstractAsyncContextManager): diff --git a/examples/test_mysql.py b/examples/test_mysql.py index e9530ee3..e57da2a5 100644 --- a/examples/test_mysql.py +++ b/examples/test_mysql.py @@ -23,7 +23,6 @@ async def pooler(loop): async with await pool.acquire() as conn: # execute a sentence result, error = await conn.test_connection() - print(result, 'Error: ', error) print('Is closed: ', {db.is_connected()}) await pool.close() diff --git a/tests/test_mysql.py b/tests/test_mysql.py index 1374616f..7c9082a6 100644 --- a/tests/test_mysql.py +++ b/tests/test_mysql.py @@ -46,7 +46,7 @@ async def test_connect(driver, event_loop): await db.connection() pytest.assume(db.is_connected() is True) result, error = await db.test_connection() - pytest.assume(type(result) == list) + pytest.assume(isinstance(result, list) and len(result) > 0) pytest.assume(error is None) await db.close() From 43938dd46d947e1938affba56351cd618ea6ae92 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 16 Sep 2024 01:07:11 +0200 Subject: [PATCH 2/2] new drivers: elasticsearch, aioch, clickhouse --- asyncdb/drivers/aioch.py | 457 ++++++++++++++++++++++++++++++++++ asyncdb/drivers/clickhouse.py | 267 ++++++++++++++------ asyncdb/drivers/elastic.py | 392 +++++++++++++++++++++++++++++ examples/test_aioch.py | 22 ++ examples/test_clickhouse.py | 52 +++- examples/test_delta.py | 3 + setup.py | 1 + start_docker.sh | 6 + 8 files changed, 1123 insertions(+), 77 deletions(-) create mode 100644 asyncdb/drivers/aioch.py create mode 100644 asyncdb/drivers/elastic.py create mode 100644 examples/test_aioch.py create mode 100755 start_docker.sh diff --git a/asyncdb/drivers/aioch.py b/asyncdb/drivers/aioch.py new file mode 100644 index 00000000..043c2e9a --- /dev/null +++ b/asyncdb/drivers/aioch.py @@ -0,0 +1,457 @@ +import asyncio +from typing import Union, Any, Optional +from collections.abc import Iterable, Sequence, Awaitable +from pathlib import Path +from aiochclient import ChClient +from aiohttp import ClientSession +from .sql import SQLDriver +from ..exceptions import DriverError + + +class aioch(SQLDriver): + """ + async version if clickhouse driver for Connecting to a Clickhouse Cluster. + This class provides a consistent interface using aiochclient. + + Attributes: + ----------- + _provider : str + Name of the database provider (e.g., 'clickhouse'). + _syntax : str + SQL syntax specific to the database provider (e.g., 'sql'). + _dsn : str + Data Source Name (DSN) template for connecting to the database, if required. + _connection : Any + Holds the active connection to the database. + _connected : bool + Indicates if the driver is currently connected to the database. + """ + + _provider: str = "clickhouse" + _syntax: str = "sql" + _dsn: str = "{database}" + _test_query: str = "SELECT version()" + + def __init__( + self, + dsn: str = "", + loop: asyncio.AbstractEventLoop = None, + params: dict = None, + **kwargs + ) -> None: + """ + Initializes the clickhouse with the given DSN, + event loop, and optional parameters. + + Parameters: + ----------- + dsn : str, optional + The Data Source Name for the database connection. Defaults to an empty string. + loop : asyncio.AbstractEventLoop, optional + The event loop to use for asynchronous operations. Defaults to None, which uses the current event loop. + params : dict, optional + Additional connection parameters as a dictionary. Defaults to None. + kwargs : dict + Additional keyword arguments to pass to the base SQLDriver. + """ + self._session: Awaitable = None + SQLDriver.__init__( + self, + dsn=dsn, + loop=loop, + params=params, + **kwargs + ) + + async def connection(self, **kwargs): + """ + Establishes a connection to the database asynchronously. + + This method should be overridden by subclasses to implement the logic + for establishing a connection to the specific database. + + Parameters: + ----------- + kwargs : dict + Additional arguments to be used when establishing the connection. + + Returns: + -------- + self : clickhouse + Returns the instance of the driver itself after the connection is established. + """ + self._connection = None + self._connected = False + if not self._session: + self._session = ClientSession() + try: + self._connection = ChClient(self._session, **self.params) + print(self._connection, await self._connection.is_alive()) + if await self._connection.is_alive(): + self._connected = True + return self + except Exception as exc: + raise DriverError( + f"clickhouse Error: {exc}" + ) from exc + + connect = connection + + async def close(self, timeout: int = 5) -> None: + """ + Closes the active connection to the database asynchronously. + + Parameters: + ----------- + timeout : int, optional + The time in seconds to wait before forcefully closing the connection. Defaults to 5 seconds. + + Returns: + -------- + None + """ + try: + if self._session: + await self._session.close() + finally: + self._connection = None + self._connected = False + self._session = None + + async def __aenter__(self) -> Any: + """ + Asynchronous context manager entry. + Establishes a connection when entering the context. + + Returns: + -------- + self : clickhouse + Returns the instance of the driver itself. + + Raises: + ------- + DriverError + If an error occurs during connection establishment. + """ + try: + if not self._session: + self._session = ClientSession() + if not self._connection: + await self.connection() + except Exception as err: + error = f"Error on Cursor Fetch: {err}" + raise DriverError(message=error) from err + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]: + """ + Executes a query to retrieve data from the database asynchronously. + + Parameters: + ----------- + sentence : Any + The SQL query or command to execute. + args : tuple + Additional positional arguments to be passed to the query. + kwargs : dict + Additional keyword arguments to be passed to the query. + + Returns: + -------- + Iterable[Any] + An iterable containing the rows returned by the query. + """ + error = None + self._result = None + await self.valid_operation(sentence) + try: + result = await self._connection.fetch(sentence) + if result: + self._result = result + except Exception as exc: + error = exc + return await self._serializer(self._result, error) + + async def queryrow(self, sentence: Any = None) -> Iterable[Any]: + """ + Executes a query to retrieve a single row of data from the database asynchronously. + + Parameters: + ----------- + sentence : Any, optional + The SQL query or command to execute. Defaults to None. + + Returns: + -------- + Iterable[Any] + An iterable containing the single row returned by the query. + """ + error = None + self._result = None + await self.valid_operation(sentence) + return await self._serializer(self._result, error) + + async def fetch_all(self, sentence: str, *args, **kwargs) -> Sequence: + """ + Executes a query to fetch all rows of data without returning errors. + + This method is an alias for `query` but does not return any error information. + + Parameters: + ----------- + sentence : str + The SQL query or command to execute. + args : tuple + Additional positional arguments to be passed to the query. + kwargs : dict + Additional keyword arguments to be passed to the query. + + Returns: + -------- + Sequence + A sequence of rows returned by the query. + """ + cursor = None + await self.valid_operation(sentence) + + # alias to be compatible with aiosqlite methods. + fetchall = fetch_all + + async def fetch_many(self, sentence: str, size: int = None): + """ + Executes a query to fetch a specified number of rows without returning errors. + + This method is an alias for `query`, but without returning any error information. + + Parameters: + ----------- + sentence : str + The SQL query or command to execute. + size : int, optional + The number of rows to fetch. Defaults to None, which fetches all rows. + + Returns: + -------- + Iterable[Any] + An iterable containing the specified number of rows returned by the query. + """ + await self.valid_operation(sentence) + + fetchmany = fetch_many + + async def fetch_one(self, sentence: str, *args, **kwargs) -> Optional[dict]: + """ + Executes a query to fetch a single row of data without returning errors. + + This method is an alias for `queryrow`, but without returning any error information. + + Parameters: + ----------- + sentence : str + The SQL query or command to execute. + args : tuple + Additional positional arguments to be passed to the query. + kwargs : dict + Additional keyword arguments to be passed to the query. + + Returns: + -------- + Optional[dict] + A dictionary representing the single row returned by the query, or None if no rows are returned. + """ + await self.valid_operation(sentence) + + fetchone = fetch_one + fetchrow = fetch_one + + async def execute(self, sentence: Any, **kwargs) -> Optional[Any]: + """ + Executes a transaction or command that does not necessarily return a result asynchronously. + + Parameters: + ----------- + sentence : Any + The SQL command or transaction to execute. + kwargs : dict + Additional keyword arguments to be passed to the execution. + + Returns: + -------- + Optional[Any] + The result of the execution, if any. + """ + error = None + result = None + return (result, error) + + async def execute_many(self, sentence: Union[str, list], args: list) -> Optional[Any]: + """ + Executes multiple transactions or commands asynchronously. + + This method is similar to `execute`, but accepts multiple commands to be executed. + + Parameters: + ----------- + sentence : Union[str, list] + A single SQL command or a list of commands to execute. + args : list + A list of arguments to pass to each command. + + Returns: + -------- + Optional[Any] + The result of the executions, if any. + """ + error = None + result = None + await self.valid_operation(sentence) + return (result, error) + + executemany = execute_many + + async def copy_to(self, sentence: Union[str, Path], destination: str, **kwargs) -> bool: + """ + Copies the result of a query to a file asynchronously. + + Parameters: + ----------- + sentence : Union[str, Path] + The SQL query or the path to a file containing the data to be copied. + destination : str + The destination path where the data will be saved. + kwargs : dict + Additional keyword arguments to customize the copying process. + + Returns: + -------- + bool + Returns True if the copy operation is successful, otherwise False. + """ + pass + + async def write( + self, + data, + table_id: str = None, + dataset_id: str = None, + use_streams: bool = False, + use_pandas: bool = True, + if_exists: str = "append", + **kwargs, + ) -> bool: + """ + Writes data to a table asynchronously, optionally using streams or pandas DataFrame. + + Parameters: + ----------- + data : Any + The data to be written, which can be a CSV file, stream, or pandas DataFrame. + table_id : str, optional + The ID of the table where the data will be written. Defaults to None. + dataset_id : str, optional + The ID of the dataset where the table resides. Defaults to None. + use_streams : bool, optional + If True, uses streaming to write the data. Defaults to False. + use_pandas : bool, optional + If True, uses pandas DataFrame to write the data. Defaults to True. + if_exists : str, optional + Specifies what to do if the table already exists. Defaults to "append". + kwargs : dict + Additional keyword arguments to customize the writing process. + + Returns: + -------- + bool + Returns True if the write operation is successful, otherwise False. + """ + pass + + async def prepare(self, sentence: Union[str, list]) -> Any: + """ + Prepares a SQL sentence for execution. + + Currently not implemented for ClickHouse and raises NotImplementedError. + + Parameters: + ----------- + sentence : Union[str, list] + The SQL command(s) to prepare. + + Returns: + -------- + Any + Typically, this would return a prepared statement object, but this implementation raises NotImplementedError. + + Raises: + ------- + NotImplementedError + Raised when called, as ClickHouse does not support prepared statements in this implementation. + """ + raise NotImplementedError() # pragma: no cover + + def tables(self, schema: str = "") -> Iterable[Any]: + """ + Retrieves a list of tables in the specified schema. + + Currently not implemented and raises NotImplementedError. + + Parameters: + ----------- + schema : str, optional + The name of the schema to query. Defaults to an empty string. + + Returns: + -------- + Iterable[Any] + An iterable of table names. + + Raises: + ------- + NotImplementedError + Raised when called, as this implementation does not support table listing. + """ + raise NotImplementedError() # pragma: no cover + + def table(self, tablename: str = "") -> Iterable[Any]: + """ + Retrieves information about a specific table. + + Currently not implemented and raises NotImplementedError. + + Parameters: + ----------- + tablename : str, optional + The name of the table to query. Defaults to an empty string. + + Returns: + -------- + Iterable[Any] + An iterable of table information. + + Raises: + ------- + NotImplementedError + Raised when called, as this implementation does not support detailed table information. + """ + raise NotImplementedError() # pragma: no cover + + async def use(self, database: str): + """ + Switches the default database to the specified one. + + Currently not implemented for ClickHouse and raises NotImplementedError. + + Parameters: + ----------- + database : str + The name of the database to switch to. + + Raises: + ------- + NotImplementedError + Raised when called, as ClickHouse does not support switching databases. + """ + raise NotImplementedError("ClickHouse Error: There is no Database in ClickHouse") diff --git a/asyncdb/drivers/clickhouse.py b/asyncdb/drivers/clickhouse.py index 477f8a9a..b5be6965 100644 --- a/asyncdb/drivers/clickhouse.py +++ b/asyncdb/drivers/clickhouse.py @@ -2,16 +2,16 @@ from typing import Union, Any, Optional from collections.abc import Iterable, Sequence, Awaitable from pathlib import Path -from aiochclient import ChClient -from aiohttp import ClientSession +from clickhouse_driver import Client +from asyncdb.meta.record import Record from .sql import SQLDriver from ..exceptions import DriverError class clickhouse(SQLDriver): """ - clickhouse class for Connecting to a Clickhouse Cluster. - This class provides a consistent interface using aiochclient. + native clickhouse driver for Connecting to a Clickhouse Cluster. + This class provides a consistent interface using native clickhouse_driver. Attributes: ----------- @@ -29,8 +29,8 @@ class clickhouse(SQLDriver): _provider: str = "clickhouse" _syntax: str = "sql" - _dsn: str = "{database}" - _test_query: str = "SELECT version()" + _dsn: str = "" + _test_query: str = "SELECT now(), version()" def __init__( self, @@ -46,7 +46,8 @@ def __init__( Parameters: ----------- dsn : str, optional - The Data Source Name for the database connection. Defaults to an empty string. + The Data Source Name for the database connection. + Defaults to an empty string. loop : asyncio.AbstractEventLoop, optional The event loop to use for asynchronous operations. Defaults to None, which uses the current event loop. params : dict, optional @@ -54,8 +55,20 @@ def __init__( kwargs : dict Additional keyword arguments to pass to the base SQLDriver. """ - self._session: Awaitable = None - SQLDriver.__init__(self, dsn, loop, params, **kwargs) + server_args = { + "secure": False, + "verify": False, + "compression": True + } + SQLDriver.__init__( + self, + dsn=dsn, + loop=loop, + params=params, + **kwargs + ) + self.params.update(server_args) + async def connection(self, **kwargs): """ @@ -76,12 +89,19 @@ async def connection(self, **kwargs): """ self._connection = None self._connected = False - if not self._session: - self._session = ClientSession() + self._executor = self.get_executor( + executor="thread", max_workers=10 + ) try: - self._connection = ChClient(self._session, **self.params) - print(self._connection, await self._connection.is_alive()) - if await self._connection.is_alive(): + if self._dsn: + self._connection = await self._thread_func( + Client.from_url, self._dsn, executor=self._executor + ) + else: + self._connection = await self._thread_func( + Client, **self.params, executor=self._executor + ) + if self._connection.connection: self._connected = True return self except Exception as exc: @@ -104,13 +124,9 @@ async def close(self, timeout: int = 5) -> None: -------- None """ - try: - if self._session: - await self._session.close() - finally: - self._connection = None - self._connected = False - self._session = None + self._connection = None # Clickhouse does not have a close method. + self._connected = False + self._session = None async def __aenter__(self) -> Any: """ @@ -128,8 +144,6 @@ async def __aenter__(self) -> Any: If an error occurs during connection establishment. """ try: - if not self._session: - self._session = ClientSession() if not self._connection: await self.connection() except Exception as err: @@ -140,7 +154,102 @@ async def __aenter__(self) -> Any: async def __aexit__(self, exc_type, exc, tb): await self.close() - async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]: + async def execute( + self, + sentence: Any, + params: Optional[Iterable] = None, + **kwargs + ) -> Optional[Any]: + """ + Executes a transaction or command that does not necessarily + return a result asynchronously. + + Parameters: + ----------- + sentence : Any + The SQL command or transaction to execute. + kwargs : dict + Additional keyword arguments to be passed to the execution. + + Returns: + -------- + Optional[Any] + The result of the execution, if any. + """ + error = None + result = None + await self.valid_operation(sentence) + try: + if not self._executor: + self._executor = self.get_executor( + executor="thread", max_workers=2 + ) + new_args = { + "with_column_types": True, + "columnar": False, + "params": params, + **kwargs + } + if params: + new_args['params'] = params + result = await self._thread_func( + self._connection.execute, + sentence, + **new_args, + executor=self._executor + ) + except Exception as exc: + error = exc + finally: + return [result, error] + + async def execute_many( + self, + sentence: Union[str, list], + params: Optional[Iterable] = None + ) -> Optional[Any]: + """ + Executes multiple transactions or commands asynchronously. + + This method is similar to `execute`, but accepts multiple commands to be executed. + + Parameters: + ----------- + sentence : Union[str, list] + A single SQL command or a list of commands to execute. + params : iterable + A list of arguments to pass to each command. + + Returns: + -------- + Optional[Any] + The result of the executions, if any. + """ + error = None + result = None + if isinstance(sentence, str): + sentences = [sentence] + else: + sentences = sentence + results = [] + for sentence in sentences: + await self.valid_operation(sentence) + result = await self.execute(sentence, params=params) + results.append(result) + return (result, error) + + executemany = execute_many + + def _construct_record(self, row, column_names): + return Record(dict(zip(column_names, row)), column_names) + + async def query( + self, + sentence: Any, + *args, + row_format: str = None, + **kwargs + ) -> Iterable[Any]: """ Executes a query to retrieve data from the database asynchronously. @@ -161,15 +270,43 @@ async def query(self, sentence: Any, *args, **kwargs) -> Iterable[Any]: error = None self._result = None await self.valid_operation(sentence) + if not row_format: + row_format = self._row_format try: - result = await self._connection.fetch(sentence) + if not self._executor: + self._executor = self.get_executor( + executor="thread", max_workers=2 + ) + new_args = { + "with_column_types": True, + "columnar": False, + **kwargs + } + result, columns_info = await self._thread_func( + self._connection.execute, + sentence, *args, **new_args, + executor=self._executor + ) if result: - self._result = result + if row_format == 'record': + self._result = result + elif row_format in ('dict', 'iterable'): + # Get the column names from the executed query + columns = [col[0] for col in columns_info] + self._result = [dict(zip(columns, row)) for row in result] + else: + self._result = result except Exception as exc: error = exc return await self._serializer(self._result, error) - async def queryrow(self, sentence: Any = None) -> Iterable[Any]: + async def queryrow( + self, + sentence: Any, + *args, + params: Optional[Iterable] = None, + **kwargs + ) -> Iterable[Any]: """ Executes a query to retrieve a single row of data from the database asynchronously. @@ -186,6 +323,37 @@ async def queryrow(self, sentence: Any = None) -> Iterable[Any]: error = None self._result = None await self.valid_operation(sentence) + try: + if not self._executor: + self._executor = self.get_executor( + executor="thread", max_workers=2 + ) + new_args = { + "with_column_types": True, + "settings": { + 'max_block_size': 100000 + }, + "chunk_size": 1, + **kwargs + } + rows_gen = await self._thread_func( + self._connection.execute_iter, + sentence, *args, **new_args, + executor=self._executor + ) + # Extract the first element (column info) using next() + column_info = next(rows_gen) + # Extract column names + column_names = [col[0] for col in column_info] + print(rows_gen, column_names) + result = [] + for row in rows_gen: + + row_dict = dict(zip(column_names, row)) + result.append(row_dict) + self._result = result + except Exception as exc: + error = exc return await self._serializer(self._result, error) async def fetch_all(self, sentence: str, *args, **kwargs) -> Sequence: @@ -261,51 +429,6 @@ async def fetch_one(self, sentence: str, *args, **kwargs) -> Optional[dict]: fetchone = fetch_one fetchrow = fetch_one - async def execute(self, sentence: Any, **kwargs) -> Optional[Any]: - """ - Executes a transaction or command that does not necessarily return a result asynchronously. - - Parameters: - ----------- - sentence : Any - The SQL command or transaction to execute. - kwargs : dict - Additional keyword arguments to be passed to the execution. - - Returns: - -------- - Optional[Any] - The result of the execution, if any. - """ - error = None - result = None - return (result, error) - - async def execute_many(self, sentence: Union[str, list], args: list) -> Optional[Any]: - """ - Executes multiple transactions or commands asynchronously. - - This method is similar to `execute`, but accepts multiple commands to be executed. - - Parameters: - ----------- - sentence : Union[str, list] - A single SQL command or a list of commands to execute. - args : list - A list of arguments to pass to each command. - - Returns: - -------- - Optional[Any] - The result of the executions, if any. - """ - error = None - result = None - await self.valid_operation(sentence) - return (result, error) - - executemany = execute_many - async def copy_to(self, sentence: Union[str, Path], destination: str, **kwargs) -> bool: """ Copies the result of a query to a file asynchronously. diff --git a/asyncdb/drivers/elastic.py b/asyncdb/drivers/elastic.py new file mode 100644 index 00000000..002be169 --- /dev/null +++ b/asyncdb/drivers/elastic.py @@ -0,0 +1,392 @@ +""" ElasticSearch async Provider. +Notes on Elastic Provider +-------------------- +This provider implements a few subset of funcionalities from elasticsearch. +TODO: + - use jsonpath to query json-objects + - implements lists and hash datatypes +""" +import asyncio +import time +from typing import Any, Union +from dataclasses import dataclass, is_dataclass +from elasticsearch import AsyncElasticsearch +from ..exceptions import ConnectionTimeout, DriverError +from .base import BaseDriver +from ..utils.types import SafeDict + + +@dataclass +class ElasticConfig: + host: str + port: int = 9200 + user: str + password: str + db: str + protocol: str = 'http' + + def get_dsn(self) -> str: + return f"{self.protocol}://{self.host}:{self.port}/" + + +class elastic(BaseDriver): + _provider = "elasticsearch" + _syntax = "json" + + def __init__( + self, + dsn: str = None, + loop=None, + params: Union[dict, ElasticConfig] = None, + **kwargs + ): + # self._dsn = "{protocol}://{user}:{password}@{host}:{port}/{database}" + if isinstance(params, ElasticConfig): + self._database = params.database + else: + self._database = params.pop('db', 'default') + self._dsn = "{protocol}://{host}:{port}/" + super(elastic, self).__init__( + dsn=dsn, + loop=loop, + params=params, + **kwargs + ) + + def create_dsn(self, params: Union[dict, dataclass]): + if is_dataclass(params): + self._dsn = params.get_dsn() + else: + try: + return self._dsn.format_map( + SafeDict(**params) + ) if params else None + except TypeError as err: + self._logger.error(err) + raise DriverError( + f"Error creating DSN connection: {err}" + ) from err + + async def connection(self, timeout: int = 10, **kwargs): + """ + Asynchronously establish a connection to Elasticsearch + with a connection timeout. + + Args: + timeout (int): The maximum time in seconds to wait for the connection + to be established. Defaults to 10 seconds. + **kwargs: Additional keyword arguments to pass to the Elasticsearch connection. + + Returns: + self: The current instance of the elastic class after establishing the connection. + + Raises: + ConnectionTimeout: If the connection attempt exceeds the specified timeout. + DriverError: If any other error occurs while attempting to connect to Elasticsearch. + """ + args = { + "timeout": self._timeout, + **self.kwargs + } + try: + # Use asyncio.wait_for to apply a timeout to the connection attempt + self._connection = await asyncio.wait_for( + AsyncElasticsearch( + hosts=self._dsn, + **args + ), + timeout=timeout + ) + self._connected = True + return self + except asyncio.TimeoutError: + raise ConnectionTimeout( + f"Elasticsearch connection timed out after {timeout} seconds" + ) + except Exception as exc: + raise DriverError( + f"Elasticsearch Connection Error: {exc}" + ) from exc + + def is_closed(self) -> bool: + return self._connection is None + + async def ping(self, msg: str = None) -> bool: + try: + return await self._connection.ping() + except Exception as exc: + self._logger.error(f"Ping failed: {exc}") + return False + + async def close(self, timeout: int = 10): + try: + # Close the Elasticsearch connection + await asyncio.wait_for( + self._connection.close(), + timeout=timeout + ) + except Exception as e: + self._logger.warning( + f"Elasticsearch closing connection: {e}" + ) + + async def test_connection( + self, + key: str = "test-index", + id: int = 1 + ) -> bool: + try: + # Perform a simple operation to check the connection + await self._connection.index( + index=key, + id=id, + document={'test_field': 'test_value'} + ) + await self._connection.delete(index=key, id=id) + return True + except Exception as exc: + self._logger.error( + f"Test connection failed: {exc}" + ) + return False + + async def use(self, database: int): + self._database = database + + async def prepare(self, sentence: Union[str, list]) -> Any: + raise NotImplementedError() # pragma: no-cover + + async def get(self, key: str): + """ + Get a document by its ID. + """ + try: + response = await self._connection.get( + index=self._database, + id=key + ) + return response['_source'] + except Exception as exc: + self._logger.error(f"Error getting document with ID {key}: {exc}") + raise DriverError(f"Error getting document with ID {key}: {exc}") from exc + + async def set(self, key: str, value: dict, **kwargs): + """ + Index or update a document in Elasticsearch. + """ + try: + await self._connection.index( + index=self._database, + id=key, + document=value, + **kwargs + ) + except Exception as exc: + self._logger.error( + f"Error setting document with ID {key}: {exc}" + ) + raise DriverError( + f"Error setting document with ID {key}: {exc}" + ) from exc + + async def exists(self, key: str, *keys) -> bool: + """ + Check if a document exists by its ID. + """ + try: + exists = await self._connection.exists( + index=self._database, + id=key + ) + return exists + except Exception as exc: + self._logger.error( + f"Error checking existence of document with ID {key}: {exc}" + ) + raise DriverError( + f"Error checking existence of document with ID {key}: {exc}" + ) from exc + + async def delete(self, key: str, *keys): + """ + Delete a document by its ID. + """ + try: + await self._connection.delete( + index=self._database, + id=key + ) + except Exception as exc: + self._logger.error( + f"Error deleting document with ID {key}: {exc}" + ) + raise DriverError( + f"Error deleting document with ID {key}: {exc}" + ) from exc + + async def query(self, sentence: str, *args, **kwargs) -> Any: + """ + Execute a search query on the Elasticsearch index. + + Args: + sentence (str): The query body to be executed on the Elasticsearch index, + typically written in Elasticsearch Query DSL format. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments to pass to the search method. + + Returns: + List[dict]: A list of documents (hits) matching the query. Each document is a dictionary + containing the document's source and metadata. + + Raises: + DriverError: If an error occurs while executing the query. + + Example: + response = await elastic_instance.query('{"query": {"match_all": {}}}') + # Example response: + # [ + # { + # "_index": "my-index", + # "_type": "_doc", + # "_id": "1", + # "_score": 1.0, + # "_source": { + # "field1": "value1", + # "field2": "value2", + # ... + # } + # }, + # ... + # ] + """ + result = None + error = None + try: + response = await self._connection.search( + index=self._database, + body=sentence, + **kwargs + ) + result = response['hits']['hits'] + except Exception as exc: + error = exc + finally: + return await self._serializer(result, error) + + async def queryrow(self, sentence: str, *args, **kwargs) -> Any: + """ + Execute a search query on the Elasticsearch index and return a single document. + + Args: + sentence (str): The query body to be executed on the Elasticsearch index, + typically written in Elasticsearch Query DSL format. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments to pass to the search method. + + Returns: + dict or None: A dictionary containing the first document (hit) matching the query, + including the document's source and metadata. Returns None if no documents match. + The result is processed through the custom serializer. + + Raises: + DriverError: If an error occurs while executing the query. + + Example: + response = await elastic_instance.queryrow('{"query": {"match": {"field1": "value"}}}') + # Example response: + # { + # "_index": "my-index", + # "_type": "_doc", + # "_id": "1", + # "_score": 1.0, + # "_source": { + # "field1": "value", + # "field2": "value2", + # ... + # } + # } + """ + result = None + error = None + try: + response = await self._connection.search( + index=self._database, + body=sentence, + size=1, + **kwargs + ) + hits = response['hits']['hits'] + return hits[0] if hits else None + except Exception as exc: + error = exc + finally: + return await self._serializer(result, error) + + async def execute(self, sentence: str, *args, **kwargs) -> None: + """ + Execute an Elasticsearch operation that doesn't return a result. + For example, creating an index or updating settings. + """ + try: + # Assuming `sentence` is an action, like creating an index + if sentence == 'create_index': + index_name = kwargs.get('index_name') + body = kwargs.get('body', {}) + await self._connection.indices.create( + index=index_name, + body=body + ) + # Add other operations as needed + else: + self._logger.warning(f"Unsupported operation: {sentence}") + except Exception as exc: + self._logger.error(f"Error executing operation {sentence}: {exc}") + raise DriverError(f"Error executing operation {sentence}: {exc}") from exc + + async def execute_many(self, sentences: list, *args, **kwargs) -> None: + """ + Execute multiple Elasticsearch operations in bulk. + """ + try: + actions = [] + for sentence in sentences: + # Assuming each sentence is a dict representing an action + # For example: {'_op_type': 'index', '_index': 'my-index', '_id': '1', '_source': {...}} + actions.append(sentence) + if actions: + await self._connection.bulk(body=actions) + except Exception as exc: + self._logger.error(f"Error executing bulk operations: {exc}") + raise DriverError(f"Error executing bulk operations: {exc}") from exc + + async def fetchall(self, sentence: str, *args, **kwargs) -> Any: + try: + response = await self._connection.search( + index=self._database, + body=sentence, + **kwargs + ) + return response['hits']['hits'] + except Exception as exc: + raise DriverError( + f"Error executing query: {exc}" + ) from exc + + fetch_all = fetchall + + async def fetchone(self, sentence: str, *args, **kwargs) -> Any: + try: + response = await self._connection.search( + index=self._database, + body=sentence, + size=1, + **kwargs + ) + hits = response['hits']['hits'] + return hits[0] if hits else None + except Exception as exc: + raise DriverError( + f"Error executing queryrow: {exc}" + ) from exc + + fetch_one = fetchone diff --git a/examples/test_aioch.py b/examples/test_aioch.py new file mode 100644 index 00000000..7a43ce22 --- /dev/null +++ b/examples/test_aioch.py @@ -0,0 +1,22 @@ +import asyncio +from asyncdb.drivers.aioch import aioch +from pprint import pprint + + +async def connect(db): + async with await db.connection() as conn: + print('HERE >>') + pprint(await conn.test_connection()) + print('END >>') + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + params = { + "url": "http://localhost:8123/", + "user": "default", + "password": "u69ebsZQ", + "database": "default" + } + driver = aioch(params=params, loop=loop) + print('DRV > ', driver) + loop.run_until_complete(connect(driver)) diff --git a/examples/test_clickhouse.py b/examples/test_clickhouse.py index b1236753..3d3075b6 100644 --- a/examples/test_clickhouse.py +++ b/examples/test_clickhouse.py @@ -3,19 +3,61 @@ from pprint import pprint +table = """ +CREATE TABLE iris ( + sepal_length Decimal32(2), + sepal_width Decimal32(2), + petal_length Decimal32(2), + petal_width Decimal32(2), + species String +) ENGINE = MergeTree +PARTITION BY species +ORDER BY (species) +SETTINGS index_granularity = 8192; +""" + async def connect(db): async with await db.connection() as conn: - print('HERE >>') - pprint(await conn.test_connection()) - print('END >>') + result, error = await conn.test_connection() + print('RESULT > ', result) + await conn.execute( + "DROP TABLE IF EXISTS iris;" + ) + # Create A Table: + await conn.execute( + table + ) + # Insert Data: + insert = "INSERT INTO iris (sepal_length, sepal_width, petal_length, petal_width, species) VALUES" + data = [ + (5.1, 3.7, 1.5, 0.4, 'Iris-setosa'), + (4.6, 3.6, 1.0, 0.2, 'Iris-setosa') + ] + result, error = await conn.execute( + insert, params=data + ) + print('INSERTED > ', result, 'Error: ', error) + # getting one single row: + row, error = await conn.queryrow( + "SELECT * FROM iris" + ) + print(row) + # at the end, drop the table: + await conn.execute( + "DROP TABLE iris;" + ) + if __name__ == "__main__": loop = asyncio.get_event_loop() params = { - "url": "http://localhost:8123/", + "host": "localhost", + "port": 9000, "user": "default", "password": "u69ebsZQ", - "database": "default" + "database": "default", + "client_name": "ASYNCDB", + "secure": False } driver = clickhouse(params=params, loop=loop) print('DRV > ', driver) diff --git a/examples/test_delta.py b/examples/test_delta.py index 0dab1077..a9474c45 100644 --- a/examples/test_delta.py +++ b/examples/test_delta.py @@ -144,6 +144,9 @@ async def test_epson(evt: asyncio.AbstractEventLoop): # loop.run_until_complete( # test_data(evt=loop) # ) + loop.run_until_complete( + test_create_epson(evt=loop) + ) loop.run_until_complete( test_epson(evt=loop) ) diff --git a/setup.py b/setup.py index 9473f965..a3fe484c 100644 --- a/setup.py +++ b/setup.py @@ -245,6 +245,7 @@ def readme(): ], "clickhouse": [ "clickhouse-driver==0.2.9", + "clickhouse-cityhash==1.0.2.4", "aiochclient[httpx-speedups]==2.6.0", "clickhouse-connect==0.7.19" # Support for SuperSet ], diff --git a/start_docker.sh b/start_docker.sh new file mode 100755 index 00000000..8b12d0f2 --- /dev/null +++ b/start_docker.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +# Start Clickhouse +docker start clickhouse-server clickhouse-zookeeper + +