diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a5be027..ec929b0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -34,7 +34,7 @@ jobs: strategy: fail-fast: false matrix: - python: ["3.8", "3.10"] + python: ["3.9", "3.10"] task: - name: Test run: | diff --git a/.gitignore b/.gitignore index 7b279e4..e325a48 100644 --- a/.gitignore +++ b/.gitignore @@ -33,8 +33,13 @@ doc/_build/ *.pem local_test/ local_test.py + +# local test files _* +# keep __init__.py files +!__* + # python *.pyc diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2eec3f8..a66045f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,13 +6,13 @@ repos: types: [python] exclude: ^tests/ - - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.9.0 - hooks: - - id: mypy - types: [python] - additional_dependencies: [ dataclasses-json>=0.6.4, websocket-client>=1.2.1] - exclude: '^tests/' + # - repo: https://github.com/pre-commit/mirrors-mypy + # rev: v1.9.0 + # hooks: + # - id: mypy + # types: [python] + # additional_dependencies: [ dataclasses-json>=0.6.4, websocket-client>=1.2.1] + # exclude: '^tests/' - repo: https://github.com/ambv/black rev: 24.4.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c07797..273dee4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +- Add PEP 249 support +- add pep249abc dependency ## [v0.1.5](https://github.com/Mapepire-IBMi/mapepire-python/releases/tag/v0.1.5) - 2024-08-30 --- add cl tests - - add query manager +- add cl tests +- add query manager ## TP1 - Add query manager diff --git a/README.md b/README.md index 8c1f22c..f3226d8 100644 --- a/README.md +++ b/README.md @@ -17,10 +17,14 @@ - [Setup](#setup) - [Install with `pip`](#install-with-pip) - [Server Component Setup](#server-component-setup) - - [Example usage](#example-usage) +- [Usage](#usage) + - [1. Using the `SQLJob` object to run queries synchronously](#1-using-the-sqljob-object-to-run-queries-synchronously) - [Query and run](#query-and-run) - - [Asynchronous Query Execution](#asynchronous-query-execution) - - [Pooling (beta)](#pooling-beta) + - [2. Using the `PoolJob` object to run queries asynchronously](#2-using-the-pooljob-object-to-run-queries-asynchronously) + - [3. Using the `Pool` object to run queries "concurrently"](#3-using-the-pool-object-to-run-queries-concurrently) + - [4. Using PEP 249 Implementation](#4-using-pep-249-implementation) + - [`fetchmany()` and `fetchall()` methods](#fetchmany-and-fetchall-methods) + - [PEP 249 Asynchronous Implementation](#pep-249-asynchronous-implementation) - [Development Setup](#development-setup) - [Setup python virtual environment with pip and venv](#setup-python-virtual-environment-with-pip-and-venv) - [Create a new virtual environment](#create-a-new-virtual-environment) @@ -69,9 +73,15 @@ pip install mapepire-python ### Server Component Setup To use mapire-python, you will need to have the Mapepire Server Component running on your IBM i server. Follow these instructions to set up the server component: [Mapepire Server Installation](https://mapepire-ibmi.github.io/guides/sysadmin/) -## Example usage +# Usage -Setup the server credentials used to connect to the server. One way to do this is to create a `mapepire.ini` file in the root of your project with the following content: +There are four main ways to run queries using `mapepire-python`: +1. Using the `SQLJob` object to run queries synchronously +2. Using the `PoolJob` object to run queries asynchronously +3. Using the `Pool` object to run queries "concurrently" +4. Using PEP 249 Implementation + +All of these methods require the `DaemonServer` object for connecting to the server. One way to set up the `DaemonServer` is to create a `mapepire.ini` file in the root of your project with the following content: ```ini [mapepire] @@ -81,7 +91,8 @@ USER="USER" PASSWORD="PASSWORD" ``` -The following script sets up a `DaemonServer` object that will be used to connect with the Server Component. Then a single `SQLJob` is created to facilitate the connection from the client side. + +## 1. Using the `SQLJob` object to run queries synchronously ```python import configparser @@ -262,7 +273,7 @@ with SQLJob(creds) as sql_job: print(result) ``` -### Asynchronous Query Execution +## 2. Using the `PoolJob` object to run queries asynchronously The `PoolJob` object can be used to create and run queries asynchronously: @@ -323,7 +334,7 @@ if __name__ == '__main__': ``` -## Pooling (beta) +## 3. Using the `Pool` object to run queries "concurrently" The `Pool` object can be used to create a pool of `PoolJob` objects to run queries concurrently. @@ -375,6 +386,71 @@ This script will create a pool of 3 `PoolJob` objects and run the query `values ['004460/QUSER/QZDASOINIT', '005096/QUSER/QZDASOINIT', '005319/QUSER/QZDASOINIT'] ``` +## 4. Using PEP 249 Implementation + +PEP 249 is the Python Database API Specification v2.0. The `mapepire-python` client provides a PEP 249 implementation that allows you to use the `Connection` and `Cursor` objects to interact with the Mapepire server. The same `DaemonServer` object is used by the PEP 249 implementation to connect to the server. + +```python +import configparser +from mapepire_python import connect +from mapepire_python.data_types import DaemonServer + +config = configparser.ConfigParser() +config.read('mapepire.ini') + +creds = DaemonServer( + host=config['mapepire']['SERVER'], + port=config['mapepire']['PORT'], + user=config['mapepire']['USER'], + password=config['mapepire']['PASSWORD'], + ignoreUnauthorized=True +) + +with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cursor: + result = cursor.fetchone() + print(result) +``` + +### `fetchmany()` and `fetchall()` methods + +The `Cursor` object provides the `fetchmany()` and `fetchall()` methods to fetch multiple rows from the result set: + +```python + +with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cursor: + results = cursor.fetchmany(size=2) + print(results) +``` +--- + +```python + +with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cursor: + results = cursor.fetchall() + print(results) +``` + +## PEP 249 Asynchronous Implementation + +The PEP 249 implementation also provides an asynchronous interface for running queries. The `connect` function returns an asynchronous context manager that can be used with the `async with` statement: + +```python +import asyncio +from mapepire_python.asycnio import connect + +async def main(): + async with connect(creds) as conn: + async with await conn.execute("select * from sample.employee") as cursor: + result = await cursor.fetchone() + print(result) + +if __name__ == '__main__': + asyncio.run(main()) +``` + # Development Setup diff --git a/environment-dev.yml b/environment-dev.yml index d204d6b..44cd696 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -18,3 +18,4 @@ dependencies: - websocket-client>=1.2.1 - pyee - websockets + - pep249abc diff --git a/mapepire_python/__init__.py b/mapepire_python/__init__.py new file mode 100644 index 0000000..473f3aa --- /dev/null +++ b/mapepire_python/__init__.py @@ -0,0 +1,46 @@ +from typing import Any, Dict, Union + +from .core import ( + Connection, + Cursor, + DatabaseError, + DataError, + Error, + IntegrityError, + InterfaceError, + InternalError, + NotSupportedError, + OperationalError, + ProgrammingError, +) +from .data_types import DaemonServer + +__all__ = [ + "apilevel", + "threadsafety", + "paramstyle", + "DatabaseError", + "DataError", + "Error", + "InterfaceError", + "IntegrityError", + "InternalError", + "NotSupportedError", + "OperationalError", + "ProgrammingError", + "CONNECTION_CLOSED", + "convert_runtime_errors", + "connect", + "Connection", + "Cursor", +] + +# pylint: disable=invalid-name +apilevel = "2.0" +threadsafety = 1 +paramstyle = "qmark" + + +def connect(connection_details: Union[DaemonServer, dict], opts: Dict[str, Any] = {}) -> Connection: + """Connect to a Mapepire Server, returning a connection.""" + return Connection(connection_details, opts) diff --git a/mapepire_python/asyncio/__init__.py b/mapepire_python/asyncio/__init__.py new file mode 100644 index 0000000..87a5bda --- /dev/null +++ b/mapepire_python/asyncio/__init__.py @@ -0,0 +1,46 @@ +from typing import Any, Dict, Union + +from ..asyncio.connection import AsyncConnection +from ..core.exceptions import ( + DatabaseError, + DataError, + Error, + IntegrityError, + InterfaceError, + InternalError, + NotSupportedError, + OperationalError, + ProgrammingError, +) +from ..data_types import DaemonServer +from .cursor import AsyncCursor + +__all__ = [ + "apilevel", + "threadsafety", + "paramstyle", + "connect", + "AsyncConnection", + "AsyncCursor", + "Error", + "InterfaceError", + "DatabaseError", + "DataError", + "IntegrityError", + "InternalError", + "NotSupportedError", + "OperationalError", + "ProgrammingError", +] + +# pylint: disable=invalid-name +apilevel = "2.0" +threadsafety = 1 +paramstyle = "qmark" + + +def connect( + connection_details: Union[DaemonServer, dict], opts: Dict[str, Any] = {} +) -> AsyncConnection: + """Connect to a Mapepire Server, returning a connection.""" + return AsyncConnection(connection_details, opts) diff --git a/mapepire_python/asyncio/connection.py b/mapepire_python/asyncio/connection.py new file mode 100644 index 0000000..969e0b8 --- /dev/null +++ b/mapepire_python/asyncio/connection.py @@ -0,0 +1,81 @@ +from typing import Optional, Sequence, Union + +from pep249 import aiopep249 +from pep249.aiopep249 import ProcArgs, ProcName, QueryParameters, SQLQuery + +from ..core.connection import Connection +from ..data_types import DaemonServer +from .cursor import AsyncCursor +from .utils import to_thread + + +class AsyncConnection(aiopep249.AsyncCursorExecuteMixin, aiopep249.AsyncConnection): + """ + A DB API 2.0 compliant async connection for Mapepire, as outlined in + PEP 249. + + Can be constructed by passing a connection details object as a dict, + or a `DaemonServer` object: + + ``` + import asyncio + from mapepire_python.asyncio import connect + from mapepire_python.data_types import DaemonServer + creds = DaemonServer( + host=SERVER, + port=PORT, + user=USER, + password=PASS, + ignoreUnauthorized=True + ) + + >>> async def main(): + ... async with connect(creds) as conn: + ... async with await conn.execute("select * from sample.employee") as cur: + ... print(await cur.fetchone()) + + >>> if __name__ == '__main__': + ... asyncio.run(main()) + + ``` + + + """ + + def __init__(self, database: Union[DaemonServer, dict], opts={}) -> None: + super().__init__() + self._connection = Connection(database, opts=opts) + + async def cursor(self) -> AsyncCursor: + return AsyncCursor(self, self._connection.cursor()) + + async def close(self) -> None: + await to_thread(self._connection.close) + + async def execute( + self, operation: SQLQuery, parameters: Optional[QueryParameters] = None + ) -> AsyncCursor: + cursor = await self.cursor() + return await cursor.execute(operation, parameters) + + async def executemany( + self, operation: SQLQuery, seq_of_parameters: Sequence[QueryParameters] + ) -> AsyncCursor: + cursor = await self.cursor() + return await cursor.executemany(operation, seq_of_parameters) + + async def callproc( + self, procname: ProcName, parameters: Optional[ProcArgs] = None + ) -> Optional[ProcArgs]: + cursor = await self.cursor() + return await cursor.callproc(procname, parameters) + + async def executescript(self, script: SQLQuery) -> AsyncCursor: + """A lazy implementation of SQLite's `executescript`.""" + return await self.execute(script) + + async def commit(self) -> None: + to_thread(self._connection.commit) + + async def rollback(self) -> None: + to_thread(self._connection.rollback) diff --git a/mapepire_python/asyncio/cursor.py b/mapepire_python/asyncio/cursor.py new file mode 100644 index 0000000..a552b09 --- /dev/null +++ b/mapepire_python/asyncio/cursor.py @@ -0,0 +1,98 @@ +import weakref +from typing import TYPE_CHECKING, Optional, Sequence, Type, Union + +from pep249 import QueryParameters, ResultRow, aiopep249 +from pep249.aiopep249.types import ( + ColumnDescription, + ProcArgs, + ProcName, + ResultSet, + SQLQuery, +) + +if TYPE_CHECKING: + from .connection import AsyncConnection # pylint:disable=cyclic-import + +from ..core.cursor import Cursor +from .utils import to_thread + + +class AsyncCursor( + aiopep249.CursorConnectionMixin, + aiopep249.IterableAsyncCursorMixin, + aiopep249.TransactionalAsyncCursor, +): + """ + An async DB API 2.0 compliant cursor for Mapepire, as outlined in + PEP 249. + + Can be constructed by passing an AsyncConnection and a sync Cursor. + + """ + + def __init__(self, connection: "AsyncConnection", cursor: Cursor) -> None: + super().__init__() + self._connection = weakref.proxy(connection) + self._cursor = cursor + + @property + def connection(self) -> "AsyncConnection": + return self._connection + + @property + def description(self) -> Optional[Sequence[ColumnDescription]]: + return self._cursor.description + + @property + def rowcount(self) -> int: + return self._cursor.rowcount + + async def commit(self) -> None: + await to_thread(self._cursor.commit) + + async def rollback(self) -> None: + await to_thread(self._cursor.rollback) + + async def close(self) -> None: + await to_thread(self._cursor.close) + + async def callproc( + self, procname: ProcName, parameters: Optional[ProcArgs] = None + ) -> Optional[ProcArgs]: + return await to_thread(self._cursor.callproc, procname, parameters) + + async def nextset(self) -> Optional[bool]: + return await to_thread(self._cursor.nextset) + + def setinputsizes(self, sizes: Sequence[Optional[Union[int, Type]]]) -> None: + self._cursor.setinputsizes(sizes) + + def setoutputsize(self, size: int, column: Optional[int] = None) -> None: + self._cursor.setoutputsize(size, column) + + async def execute( + self, operation: SQLQuery, parameters: Optional[QueryParameters] = None + ) -> "AsyncCursor": + await to_thread(self._cursor.execute, operation, parameters) + return self + + async def executescript(self, script: SQLQuery) -> "AsyncCursor": + """A lazy implementation of SQLite's `executescript`.""" + return await self.execute(script) + + async def executemany( + self, operation: SQLQuery, seq_of_parameters: Sequence[QueryParameters] + ) -> "AsyncCursor": + await to_thread(self._cursor.executemany, operation, seq_of_parameters) + return self + + async def fetchone(self) -> Optional[ResultRow]: + return await to_thread(self._cursor.fetchone) + + async def fetchmany(self, size: Optional[int] = None) -> ResultSet: + if size is None: + size = self.arraysize + return await to_thread(self._cursor.fetchmany, size) + + async def fetchall(self) -> ResultSet: + return await to_thread(self._cursor.fetchall) diff --git a/mapepire_python/asyncio/utils.py b/mapepire_python/asyncio/utils.py new file mode 100644 index 0000000..6711572 --- /dev/null +++ b/mapepire_python/asyncio/utils.py @@ -0,0 +1,23 @@ +try: + from asyncio import to_thread # pylint: disable=unused-import +except ImportError: + from asyncio import get_running_loop + from contextvars import copy_context + from functools import partial + + # Taken from the Python 3.9 source code, with slight modification. + async def to_thread(func, *args, **kwargs): + """ + Asynchronously run function *func* in a separate thread. + + Any *args and **kwargs supplied for this function are directly passed + to *func*. Also, the current :class:`contextvars.Context` is propogated, + allowing context variables from the main thread to be accessed in the + separate thread. + + Return a coroutine that can be awaited to get the eventual result of *func*. + """ + loop = get_running_loop() + ctx = copy_context() + func_call = partial(ctx.run, func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) diff --git a/mapepire_python/client/__init__.py b/mapepire_python/client/__init__.py new file mode 100644 index 0000000..81d3051 --- /dev/null +++ b/mapepire_python/client/__init__.py @@ -0,0 +1,3 @@ +from ..client.sql_job import SQLJob + +__all__ = ["SQLJob"] diff --git a/mapepire_python/client/query.py b/mapepire_python/client/query.py index 7505f92..281e71b 100644 --- a/mapepire_python/client/query.py +++ b/mapepire_python/client/query.py @@ -22,14 +22,14 @@ def __init__(self, job: SQLJob, query: str, opts: QueryOptions) -> None: self.job = job self.sql: str = query self.is_prepared: bool = True if opts.parameters is not None else False - self.parameters: Optional[List[str]] = opts.parameters + self.parameters: Optional[List[str]] = opts.parameters or [] self.is_cl_command: Optional[bool] = opts.isClCommand self.should_auto_close: Optional[bool] = opts.autoClose self.is_terse_results: Optional[bool] = opts.isTerseResults self._rows_to_fetch: int = 100 self.state: QueryState = QueryState.NOT_YET_RUN - + self._correlation_id = None Query.global_query_list.append(self) def __enter__(self): @@ -38,11 +38,50 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() + def __str__(self): + return f"Query(sql={self.sql}, parameters={self.parameters}, correlation_id={self._correlation_id})" + def _execute_query(self, qeury_object: Dict[str, Any]) -> Dict[str, Any]: self.job.send(json.dumps(qeury_object)) query_result: Dict[str, Any] = json.loads(self.job._socket.recv()) return query_result + def prepare_sql_execute(self): + # check Query state first + if self.state == QueryState.RUN_DONE: + raise Exception("Statement has already been fully run") + + query_object = { + "id": self.job._get_unique_id("prepare_sql_execute"), + "type": "prepare_sql_execute", + "sql": self.sql, + "rows": 0, + "parameters": self.parameters, + } + + query_result: Dict[str, Any] = self._execute_query(query_object) + self.state = ( + QueryState.RUN_DONE + if query_result.get("is_done", False) + else QueryState.RUN_MORE_DATA_AVAIL + ) + + if not query_result.get("success", False) and not self.is_cl_command: + print(query_result) + self.state = QueryState.ERROR + error_keys = ["error", "sql_state", "sql_rc"] + error_list = { + key: query_result[key] for key in error_keys if key in query_result.keys() + } + if len(error_list) == 0: + error_list["error"] = "failed to run query for unknown reason" + + raise RuntimeError(error_list) + + self._correlation_id = query_result["id"] + + return query_result + def run(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]: if rows_to_fetch is None: rows_to_fetch = self._rows_to_fetch @@ -90,7 +129,7 @@ def run(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]: if len(error_list) == 0: error_list["error"] = "failed to run query for unknown reason" - raise Exception(error_list) + raise RuntimeError(error_list) self._correlation_id = query_result["id"] @@ -126,7 +165,7 @@ def fetch_more(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]: if not query_result["success"]: self.state = QueryState.ERROR - raise Exception(query_result["error"] or "Failed to run Query (unknown error)") + raise RuntimeError(query_result["error"] or "Failed to run Query (unknown error)") return query_result diff --git a/mapepire_python/client/sql_job.py b/mapepire_python/client/sql_job.py index 4cd20bd..688b86c 100644 --- a/mapepire_python/client/sql_job.py +++ b/mapepire_python/client/sql_job.py @@ -7,6 +7,8 @@ from ..data_types import DaemonServer, JobStatus, QueryOptions, dict_to_dataclass from .websocket import WebsocketConnection +__all__ = ["SQLJob"] + class SQLJob(BaseJob): def __init__(self, creds: DaemonServer = None, options: Dict[Any, Any] = {}) -> None: diff --git a/mapepire_python/client/websocket.py b/mapepire_python/client/websocket.py index 131b253..a7e3dc7 100644 --- a/mapepire_python/client/websocket.py +++ b/mapepire_python/client/websocket.py @@ -34,4 +34,4 @@ def connect(self) -> WebSocket: self.uri, header=self.headers, sslopt=self.ssl_opts, timeout=10 ) except Exception as e: - raise Exception(f"An error occurred while connecting to the server: {e}") + raise RuntimeError(f"An error occurred while connecting to the server: {e}") diff --git a/mapepire_python/core/__init__.py b/mapepire_python/core/__init__.py new file mode 100644 index 0000000..4c0c56a --- /dev/null +++ b/mapepire_python/core/__init__.py @@ -0,0 +1,27 @@ +from .connection import Connection +from .cursor import Cursor +from .exceptions import ( + DatabaseError, + DataError, + Error, + IntegrityError, + InterfaceError, + InternalError, + NotSupportedError, + OperationalError, + ProgrammingError, +) + +__all__ = [ + "Connection", + "Cursor", + "Error", + "InterfaceError", + "DatabaseError", + "DataError", + "IntegrityError", + "InternalError", + "NotSupportedError", + "OperationalError", + "ProgrammingError", +] diff --git a/mapepire_python/core/connection.py b/mapepire_python/core/connection.py new file mode 100644 index 0000000..a8e206f --- /dev/null +++ b/mapepire_python/core/connection.py @@ -0,0 +1,83 @@ +from typing import Optional, Sequence, Union + +import pep249 +from pep249 import ProcArgs, QueryParameters, SQLQuery + +from ..client.sql_job import SQLJob +from ..core.cursor import Cursor +from ..core.exceptions import convert_runtime_errors +from ..core.utils import raise_if_closed +from ..data_types import DaemonServer + +__all__ = ["Connection"] + +COMMIT = "COMMIT" +ROLLBACK = "ROLLBACK" + + +class Connection(pep249.CursorExecuteMixin, pep249.ConcreteErrorMixin, pep249.Connection): + """ + A DB API 2.0 compliant connection for Mapepire, as outlined in + PEP 249. + + Can be constructed by passing a connection details object as a dict, + or a `DaemonServer` object: + + ``` + import asyncio + from mapepire_python import connect + from mapepire_python.data_types import DaemonServer + creds = DaemonServer( + host=SERVER, + port=PORT, + user=USER, + password=PASS, + ignoreUnauthorized=True + ) + with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cur: + print(await cur.fetchone()) + + ``` + """ + + @convert_runtime_errors + def __init__(self, database: Union[DaemonServer, dict], opts={}) -> None: + super().__init__() + self.job = SQLJob(creds=database, options=opts) + self.job.connect(database) + self._closed = False + + @raise_if_closed + @convert_runtime_errors + def cursor( + self, + ) -> Cursor: + return Cursor(self, self.job) + + @convert_runtime_errors + def close(self) -> None: + if self._closed: + return + + self.job.close() + self._closed = True + + def execute(self, operation: str, parameters: Optional[QueryParameters] = None) -> Cursor: + return self.cursor().execute(operation, parameters) + + def executemany(self, operation: str, seq_of_parameters: Sequence[QueryParameters]) -> Cursor: + return self.cursor().executemany(operation, seq_of_parameters) + + def callproc(self, procname: str, parameters: Sequence[ProcArgs] = None) -> Optional[ProcArgs]: + return self.cursor().callproc(procname, parameters) + + def executescript(self, script: SQLQuery) -> Cursor: + """A lazy implementation of SQLite's `executescript`.""" + return self.execute(script) + + def commit(self) -> None: + self.job.query_and_run(COMMIT) + + def rollback(self) -> None: + self.job.query_and_run(ROLLBACK) diff --git a/mapepire_python/core/cursor.py b/mapepire_python/core/cursor.py new file mode 100644 index 0000000..2a8e750 --- /dev/null +++ b/mapepire_python/core/cursor.py @@ -0,0 +1,201 @@ +import weakref +from collections import deque +from typing import TYPE_CHECKING, Any, Optional, Sequence, Type, Union + +import pep249 +from pep249 import ( + ColumnDescription, + ProcArgs, + ProcName, + QueryParameters, + ResultRow, + ResultSet, + SQLQuery, +) +from pep249.cursor import CursorType + +from mapepire_python.core.utils import raise_if_closed + +if TYPE_CHECKING: + # pylint: disable=cyclic-import + from ..core.connection import Connection + +from ..client.query import Query, QueryState +from ..client.sql_job import SQLJob +from ..core.exceptions import convert_runtime_errors +from ..core.utils import QueryResultSet +from ..data_types import QueryOptions + +__all__ = ["Cursor"] + + +class Cursor(pep249.CursorConnectionMixin, pep249.IterableCursorMixin, pep249.TransactionalCursor): + max_rows = 2147483647 + + def __init__(self, connection: "Connection", job: SQLJob) -> None: + super().__init__() + self._connection = weakref.proxy(connection) + self.job = job + self.query: Query = None + self.query_q = deque(maxlen=20) + self.__closed = False + self.__has_results = False + + @property + def has_results(self) -> bool: + return self.__has_results + + def __set_has_results(self, value: bool): + self.__has_results = value + + @property + def connection(self) -> "Connection": + """The parent Connection of the implementing cursor.""" + return self._connection + + @property + def _closed(self) -> bool: + # pylint: disable=protected-access + try: + return self.__closed or self.connection._closed + except ReferenceError: + # Parent connection already GC'd. + return True + + @_closed.setter + def _closed(self, value: bool): + self.__closed = value + + @property + def rowcount(self) -> int: + return getattr(self, "_rowcount", -1) + + @rowcount.setter + def rowcount(self, value: int): + setattr(self, "_rowcount", value) + + def setinputsizes(self, sizes: Sequence[Optional[Union[int, Type]]]) -> None: + pass + + def setoutputsize(self, size: int, column: Optional[int] = None) -> None: + pass + + @raise_if_closed + @convert_runtime_errors + def execute( + self, + operation: SQLQuery, + parameters: Optional[QueryParameters] = None, + **kwargs: Any, + ) -> "Cursor": + opts = kwargs.get("opts", None) + if opts: + query = Query(self.job, operation, opts) + else: + create_opts = QueryOptions( + isClCommand=kwargs.get("isClCommand", None), + isTerseResults=kwargs.get("isTerseResults", None), + parameters=parameters if parameters else None, + autoClose=kwargs.get("autoclose", None), + ) + + query = Query(self.job, operation, create_opts) + + prepare_result = query.prepare_sql_execute() + # print(prepare_result) + + if prepare_result["has_results"]: + self.query = query + self.__set_has_results(True) + self.query_q.append(query) + + update_count = prepare_result.get("update_count", None) + if update_count: + self.rowcount = update_count + + return self + + @raise_if_closed + @convert_runtime_errors + def executemany( + self: CursorType, + operation: SQLQuery, + seq_of_parameters: Sequence[QueryParameters], + **kwargs: Any, + ) -> "Cursor": + return self.execute(operation=operation, parameters=seq_of_parameters) + + @raise_if_closed + @convert_runtime_errors + def callproc(self, procname: ProcName, parameters: Optional[ProcArgs] = None) -> "Cursor": + return self.execute(procname, parameters=parameters) + + @property + def description( + self, + ) -> Optional[Sequence[ColumnDescription]]: + pass + + @raise_if_closed + @convert_runtime_errors + def fetchone(self) -> Optional[ResultRow]: + if not self.query or self.query.state == QueryState.RUN_DONE: + return None + res = self.query.fetch_more(rows_to_fetch=1) + if res: + self._result_set = QueryResultSet(res) + return res + + @raise_if_closed + @convert_runtime_errors + def fetchall(self) -> ResultSet: + if not self.query: + return None + + res = self.query.fetch_more(rows_to_fetch=self.max_rows) + if res: + self._result_set = QueryResultSet(res) + return res + + @raise_if_closed + @convert_runtime_errors + def fetchmany(self, size: Optional[int] = None) -> ResultSet: + if size is None: + size = self.arraysize + if not self.query: + return None + res = self.query.fetch_more(rows_to_fetch=size) + if res: + self._result_set = QueryResultSet(res) + return res + + def executescript(self, script: SQLQuery) -> "Cursor": + """A lazy implementation of SQLite's `executescript`.""" + return self.execute(script) + + def nextset(self) -> Optional[bool]: + try: + if len(self.query_q) > 1: + self.query_q.popleft() + self.query = self.query_q[0] + self.__set_has_results(True) + return True + return None + except Exception: + return None + + @convert_runtime_errors + def close(self) -> None: + if self._closed: + return + if self.query and self.job._socket.connected: + for q in self.query_q: + q.close() + self.query_q.clear() + self._closed = True + + def commit(self) -> None: + self.job.query_and_run("COMMIT") + + def rollback(self) -> None: + self.job.query_and_run("ROLLBACK") diff --git a/mapepire_python/core/exceptions.py b/mapepire_python/core/exceptions.py new file mode 100644 index 0000000..aab3781 --- /dev/null +++ b/mapepire_python/core/exceptions.py @@ -0,0 +1,97 @@ +""" +This module covers the exceptions outlined in PEP 249. + +""" + +# pylint: disable=missing-class-docstring +import ast +from functools import wraps +from typing import Callable, TypeVar + +from pep249 import ( + DatabaseError, + DataError, + Error, + IntegrityError, + InterfaceError, + InternalError, + NotSupportedError, + OperationalError, + ProgrammingError, +) + +ReturnType = TypeVar("ReturnType") + + +__all__ = [ + "DatabaseError", + "DataError", + "Error", + "InterfaceError", + "IntegrityError", + "InternalError", + "NotSupportedError", + "OperationalError", + "ProgrammingError", + "CONNECTION_CLOSED", + "convert_runtime_errors", + "ReturnType", +] + +CONNECTION_CLOSED = ProgrammingError("Cannot operate on a closed connection.") + +INTEGRITY_ERRORS = ("Constraint Error",) +PROGRAMMING_ERRORS = ("*FILE not found.",) +DATA_ERRORS = ("Invalid Input Error", "Out of Range Error") +INTERNAL_ERRORS = () +OPERATIONAL_ERRORS = () +NOT_SUPPORTED_ERRORS = () + + +def _parse_runtime_error(error: RuntimeError) -> DatabaseError: + """ + Parse a runtime error straight from DuckDB and return a more + appropriate exception. + + """ + if isinstance(error, Error) or not isinstance(error, RuntimeError): + return error + error_string = str(error) + error_message = None + new_error_type = DatabaseError + try: + error_dict = ast.literal_eval(error_string) + error_message = error_dict["error"] + if any(err in error_message for err in PROGRAMMING_ERRORS): + new_error_type = ProgrammingError + return new_error_type(error_message) + except Exception: + pass + + # if error_type in INTEGRITY_ERRORS: + # new_error_type = IntegrityError + # elif error_type in PROGRAMMING_ERRORS: + # new_error_type = ProgrammingError + # elif error_type in DATA_ERRORS: + # new_error_type = DataError + # elif error_type in INTERNAL_ERRORS: + # new_error_type = InternalError + # elif error_type in OPERATIONAL_ERRORS: + # new_error_type = OperationalError + # elif error_type in NOT_SUPPORTED_ERRORS: + # new_error_type = NotSupportedError + + return new_error_type(error_string) + + +def convert_runtime_errors(function: Callable[..., ReturnType]) -> Callable[..., ReturnType]: + """Wrap a function, raising correct errors from `RuntimeError`s.""" + + @wraps(function) + def wrapper(*args, **kwargs): + try: + return function(*args, **kwargs) + except RuntimeError as err: + raise _parse_runtime_error(err) from err + + return wrapper diff --git a/mapepire_python/core/utils.py b/mapepire_python/core/utils.py new file mode 100644 index 0000000..dccec39 --- /dev/null +++ b/mapepire_python/core/utils.py @@ -0,0 +1,78 @@ +"""Some useful utility pieces.""" + +from functools import wraps +from typing import Any, Callable, Dict, List, Optional + +from .exceptions import CONNECTION_CLOSED, ProgrammingError, ReturnType + +__all__ = ["raise_if_closed"] + + +def raise_if_closed(method: Callable[..., ReturnType]) -> Callable[..., ReturnType]: + """ + Wrap a connection/cursor method and raise a 'connection closed' error if + the object is closed. + + """ + + @wraps(method) + def wrapped(self, *args, **kwargs): + """Raise if the connection/cursor is closed.""" + if self._closed: # pylint: disable=protected-access + raise CONNECTION_CLOSED + return method(self, *args, **kwargs) + + return wrapped + + +def ignore_transaction_error( + method: Callable[..., ReturnType] +) -> Callable[..., Optional[ReturnType]]: + """ + Ignore transaction errors, returning `None` instead. Useful for + `rollback`. + + """ + + @wraps(method) + def wrapped(*args, **kwargs): + """Ignore transaction errors, returning `None` instead.""" + try: + return method(*args, **kwargs) + except (ProgrammingError, RuntimeError) as err: + if str(err).endswith("no transaction is active"): + return None + raise + + return wrapped + + +class ColumnMetaData: + def __init__(self, name: str, type: str, display_size: int, label: str): + self.name = name + self.type = type + self.display_size = display_size + self.label = label + + +class MetaData: + def __init__(self, column_count: int, job: str, columns: List[ColumnMetaData]): + self.column_count = column_count + self.job = job + self.columns = columns + + +class QueryResultSet: + def __init__(self, result: Dict[str, Any]): + self.id = result.get("id", None) + self.has_results = result.get("has_results", None) + self.update_count = result.get("update_count", None) + metadata = result.get("metadata", {}) + self.metadata = MetaData( + column_count=metadata.get("column_count", None), + job=metadata.get("job", None), + columns=[ColumnMetaData(**col) for col in metadata.get("columns", [])], + ) + self.data = result.get("data", []) + self.is_done = result.get("is_done", None) + self.success = result.get("success", None) diff --git a/mapepire_python/pool/__init__.py b/mapepire_python/pool/__init__.py new file mode 100644 index 0000000..b7e20b2 --- /dev/null +++ b/mapepire_python/pool/__init__.py @@ -0,0 +1,4 @@ +from .pool_client import Pool +from .pool_job import PoolJob + +__all__ = ["Pool", "PoolJob"] diff --git a/mapepire_python/pool/pool_client.py b/mapepire_python/pool/pool_client.py index a2f83c7..68fc852 100644 --- a/mapepire_python/pool/pool_client.py +++ b/mapepire_python/pool/pool_client.py @@ -4,6 +4,8 @@ from ..data_types import DaemonServer, JDBCOptions, JobStatus, QueryOptions from .pool_job import PoolJob +__all__ = ["Pool"] + @dataclass class PoolOptions: diff --git a/mapepire_python/pool/pool_job.py b/mapepire_python/pool/pool_job.py index 7904bbe..3e05bcf 100644 --- a/mapepire_python/pool/pool_job.py +++ b/mapepire_python/pool/pool_job.py @@ -10,6 +10,8 @@ from ..base_job import BaseJob from ..data_types import DaemonServer, JobStatus, QueryOptions, dict_to_dataclass +__all__ = ["PoolJob"] + class PoolJob(BaseJob): unique_id_counter = 0 diff --git a/mapepire_python/query_manager.py b/mapepire_python/query_manager.py index e02c58e..6ad749e 100644 --- a/mapepire_python/query_manager.py +++ b/mapepire_python/query_manager.py @@ -57,7 +57,7 @@ def query_and_run( return query.run(**kwargs) except Exception as e: raise RuntimeError(f"Failed to run query: {e}") - + async def query_and_run_async( self, query: str, opts: Optional[Union[Dict[str, Any], QueryOptions]] = None, **kwargs ) -> Dict[str, Any]: diff --git a/pyproject.toml b/pyproject.toml index 8198762..6d3f467 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,8 @@ dependencies = [ "dataclasses-json>=0.6.4", "websocket-client>=1.2.1", "websockets", - "pyee" + "pyee", + "pep249abc" ] license = {file = "LICENSE"} @@ -39,6 +40,8 @@ dev = [ "black>=23.0,<24.0", "isort>=5.12,<5.13", "pytest", + "pyee", + "pep249abc", "pytest-asyncio", "pytest-sphinx", "pytest-cov", diff --git a/requirements-dev.txt b/requirements-dev.txt index ed58403..3fbfb30 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,6 @@ dataclasses-json>=0.6.4 websocket-client>=1.2.1 +pep249abc pytest pytest-asyncio python-dotenv diff --git a/test.txt b/test.txt deleted file mode 100644 index e69de29..0000000 diff --git a/tests/async_pool_test.py b/tests/async_pool_test.py index a18f3fa..6dd63cb 100644 --- a/tests/async_pool_test.py +++ b/tests/async_pool_test.py @@ -1,5 +1,4 @@ # Fetch environment variables -import asyncio import os import pytest @@ -9,14 +8,14 @@ from mapepire_python.pool.pool_job import PoolJob from mapepire_python.query_manager import QueryManager -server = os.getenv('VITE_SERVER') -user = os.getenv('VITE_DB_USER') -password = os.getenv('VITE_DB_PASS') -port = os.getenv('VITE_DB_PORT') +server = os.getenv("VITE_SERVER") +user = os.getenv("VITE_DB_USER") +password = os.getenv("VITE_DB_PASS") +port = os.getenv("VITE_DB_PORT") # Check if environment variables are set if not server or not user or not password: - raise ValueError('One or more environment variables are missing.') + raise ValueError("One or more environment variables are missing.") creds = DaemonServer( @@ -27,68 +26,74 @@ ignoreUnauthorized=True, ) + @pytest.mark.asyncio async def test_pool(): async with PoolJob(creds=creds) as job: - query = job.query('select * from sample.employee') - query2 = job.query('select * from sample.department') + query = job.query("select * from sample.employee") + query2 = job.query("select * from sample.department") res = await query.run() res2 = await query2.run() - assert res['success'] == True - assert res2['success'] == True - + assert res["success"] == True + assert res2["success"] == True + async with PoolJob(creds=creds) as pool_job: query_manager = QueryManager(pool_job) - async with query_manager.create_query('select * from sample.employee') as query: - res = await query.run(rows_to_fetch=1) - + async with query_manager.create_query("select * from sample.employee") as query: + res = await query.run(rows_to_fetch=1) + + @pytest.mark.asyncio async def test_pool_with_cm(): async with PoolJob(creds=creds) as pool_job: query_manager = QueryManager(pool_job) - async with query_manager.create_query('select * from sample.employee') as query: - res = await query.run(rows_to_fetch=1) - assert res['success'] == True - + async with query_manager.create_query("select * from sample.employee") as query: + res = await query.run(rows_to_fetch=1) + assert res["success"] == True + + @pytest.mark.asyncio async def test_pool_with_q_and_run(): async with PoolJob(creds=creds) as pool_job: - res = await pool_job.query_and_run('select * from sample.employee') - assert res['success'] == True - + res = await pool_job.query_and_run("select * from sample.employee") + assert res["success"] == True + + @pytest.mark.asyncio async def test_pool_with_cm_q_and_run(): async with PoolJob(creds=creds) as pool_job: query_manager = QueryManager(pool_job) - res = await query_manager.query_and_run_async('select * from sample.employee') - assert res['success'] == True - + res = await query_manager.query_and_run_async("select * from sample.employee") + assert res["success"] == True + + @pytest.mark.asyncio async def test_pool_raw(): async with PoolJob(creds=creds) as pool_job: - async with pool_job.query('select * from sample.employee') as query: - res = await query.run(rows_to_fetch=1) - assert res['success'] == True - - + async with pool_job.query("select * from sample.employee") as query: + res = await query.run(rows_to_fetch=1) + assert res["success"] == True + + def test_pool2(): with SQLJob(creds=creds) as job: - query = job.query('select * from sample.employee') - query2 = job.query('select * from sample.department') + query = job.query("select * from sample.employee") + query2 = job.query("select * from sample.department") res = query.run() res2 = query2.run() - assert res['success'] == True - assert res2['success'] == True + assert res["success"] == True + assert res2["success"] == True + @pytest.mark.asyncio async def test_simple_dict(): creds_dict = { - 'host': server, - 'user': user, - 'port': port, - 'password': password, - 'port': port, - 'ignoreUnauthorized': True + "host": server, + "user": user, + "port": port, + "password": password, + "port": port, + "ignoreUnauthorized": True, } job = PoolJob() await job.connect(creds_dict) @@ -98,208 +103,212 @@ async def test_simple_dict(): assert result["success"] is True assert result["is_done"] is False assert result["has_results"] is True - + + @pytest.mark.asyncio async def test_simple2(): job = PoolJob() job.enable_local_trace_data() _ = await job.connect(creds) - query = job.query('select * from sample.employee') + query = job.query("select * from sample.employee") result = await query.run(rows_to_fetch=5) await job.close() - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True - - + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True + + @pytest.mark.asyncio async def test_simple(): job = PoolJob() _ = await job.connect(creds) - query = job.query('select * from sample.employee') + query = job.query("select * from sample.employee") result = await query.run(rows_to_fetch=5) await job.close() - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True - + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True + + @pytest.mark.asyncio async def test_query_large_dataset(): job = PoolJob() _ = await job.connect(creds) - query = job.query('select * from sample.employee') + query = job.query("select * from sample.employee") result = await query.run(rows_to_fetch=30) await job.close() - - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True - assert len(result['data']) == 30 - + + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True + assert len(result["data"]) == 30 + + @pytest.mark.asyncio async def test_run_query_terse_format(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - isTerseResults=True - ) - query = job.query('select * from sample.employee', opts=opts) + opts = QueryOptions(isTerseResults=True) + query = job.query("select * from sample.employee", opts=opts) result = await query.run(rows_to_fetch=5) await job.close() - - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True - assert 'metadata' in result and result['metadata'], "The 'metadata' key is missing or has no data" - + + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True + assert ( + "metadata" in result and result["metadata"] + ), "The 'metadata' key is missing or has no data" + + @pytest.mark.asyncio async def test_invalid_query(): job = PoolJob() _ = await job.connect(creds) - query = job.query('select * from sample.notreal') + query = job.query("select * from sample.notreal") try: result = await query.run(rows_to_fetch=5) raise Exception("error not raised") except Exception as e: assert e.args[0] - assert 'error' in e.args[0] - assert '*FILE not found.' in e.args[0]['error'] + assert "error" in e.args[0] + assert "*FILE not found." in e.args[0]["error"] await job.close() - + + @pytest.mark.asyncio async def test_query_edge_cases(): job = PoolJob() _ = await job.connect(creds) - query = job.query('') # empty string + query = job.query("") # empty string try: _ = await query.run(rows_to_fetch=1) - raise Exception('no error raised') + raise Exception("no error raised") except Exception as e: assert e.args[0] - assert 'error' in e.args[0] - assert 'A string parameter value with zero length was detected.' in e.args[0]['error'] - + assert "error" in e.args[0] + assert "A string parameter value with zero length was detected." in e.args[0]["error"] + + @pytest.mark.asyncio async def test_run_sql_query_with_edge_case_inputs(): job = PoolJob() await job.connect(creds) - + # Test empty string query - query = job.query('') + query = job.query("") with pytest.raises(Exception) as excinfo: await query.run(rows_to_fetch=1) - assert 'A string parameter value with zero length was detected.' in str(excinfo.value) - + assert "A string parameter value with zero length was detected." in str(excinfo.value) + # Test non-string query with pytest.raises(Exception) as excinfo: query = job.query(666) await query.run(rows_to_fetch=1) - assert 'Token 666 was not valid' in str(excinfo.value) - + assert "Token 666 was not valid" in str(excinfo.value) + # Test invalid token query - query = job.query('a') + query = job.query("a") with pytest.raises(Exception) as excinfo: await query.run(rows_to_fetch=1) - assert 'Token A was not valid.' in str(excinfo.value) - + assert "Token A was not valid." in str(excinfo.value) + # Test long invalid token query long_invalid_query = "aeriogfj304tq34projqwe'fa;sdfaSER90Q243RSDASDAFQ#4dsa12$$$YS" * 10 query = job.query(long_invalid_query) with pytest.raises(Exception) as excinfo: await query.run(rows_to_fetch=1) - assert 'Token AERIOGFJ304TQ34PROJQWE was not valid.' in str(excinfo.value) - + assert "Token AERIOGFJ304TQ34PROJQWE was not valid." in str(excinfo.value) + # Test valid query with zero rows to fetch query = job.query("SELECT * FROM SAMPLE.employee") res = await query.run(rows_to_fetch=0) - assert res['data'] == [], "Expected empty result set when rows_to_fetch is 0" - + assert res["data"] == [], "Expected empty result set when rows_to_fetch is 0" + # Test valid query with non-numeric rows to fetch # use async default rows_to_fetch == 100 query = job.query("select * from sample.department") - res = await query.run(rows_to_fetch='s') - assert res['success'] - + res = await query.run(rows_to_fetch="s") + assert res["success"] + # Test valid query with negative rows to fetch query = job.query("select * from sample.department") res = await query.run(rows_to_fetch=-1) - assert res['data'] == [], "Expected empty result set when rows_to_fetch < 0" - + assert res["data"] == [], "Expected empty result set when rows_to_fetch < 0" + # query.close() await job.close() - + + @pytest.mark.asyncio async def test_drop_table(): job = PoolJob() _ = await job.connect(creds) - query = job.query('drop table sample.delete if exists') + query = job.query("drop table sample.delete if exists") res = await query.run() - assert res['has_results'] == False + assert res["has_results"] == False await job.close() - + + @pytest.mark.asyncio async def test_fetch_more(): job = PoolJob() _ = await job.connect(creds) - query = job.query('select * from sample.employee') + query = job.query("select * from sample.employee") res = await query.run(rows_to_fetch=5) - while not res['is_done']: + while not res["is_done"]: res = await query.fetch_more(10) - assert len(res['data']) > 0 - + assert len(res["data"]) > 0 + await job.close() - assert res['is_done'] - + assert res["is_done"] + + @pytest.mark.asyncio async def test_prepare_statement(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - parameters=[500] - ) - query = job.query('select * from sample.employee where bonus > ?', opts=opts) + opts = QueryOptions(parameters=[500]) + query = job.query("select * from sample.employee where bonus > ?", opts=opts) res = await query.run() - assert res['success'] - assert len(res['data']) >= 17 - + assert res["success"] + assert len(res["data"]) >= 17 + + @pytest.mark.asyncio async def test_prepare_statement_terse(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - parameters=[500], - isTerseResults=True - ) - query = job.query('select * from sample.employee where bonus > ?', opts=opts) + opts = QueryOptions(parameters=[500], isTerseResults=True) + query = job.query("select * from sample.employee where bonus > ?", opts=opts) res = await query.run() - assert res['success'] - assert len(res['data']) >= 17 - assert 'metadata' in res - + assert res["success"] + assert len(res["data"]) >= 17 + assert "metadata" in res + + @pytest.mark.asyncio async def test_prepare_statement_mult_params(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - parameters=[500, 'PRES'] - ) - query = job.query('select * from sample.employee where bonus > ? and job = ?', opts=opts) + opts = QueryOptions(parameters=[500, "PRES"]) + query = job.query("select * from sample.employee where bonus > ? and job = ?", opts=opts) res = await query.run() - assert res['success'] + assert res["success"] await job.close() - + + @pytest.mark.asyncio async def test_prepare_statement_invalid_params(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - parameters=['jjfkdsajf'] - ) - query = job.query('select * from sample.employee where bonus > ?', opts=opts) + opts = QueryOptions(parameters=["jjfkdsajf"]) + query = job.query("select * from sample.employee where bonus > ?", opts=opts) with pytest.raises(Exception) as execinfo: res = await query.run() - assert 'Data type mismatch. (Infinite or NaN)' in str(execinfo.value) - + assert "Data type mismatch. (Infinite or NaN)" in str(execinfo.value) + + @pytest.mark.asyncio async def test_prepare_statement_no_param(): job = PoolJob() @@ -307,43 +316,46 @@ async def test_prepare_statement_no_param(): opts = QueryOptions( parameters=[], ) - query = job.query('select * from sample.employee where bonus > ?', opts=opts) + query = job.query("select * from sample.employee where bonus > ?", opts=opts) with pytest.raises(Exception) as execinfo: res = await query.run() - assert 'The number of parameter values set or registered does not match the number of parameters.' in str(execinfo.value) + assert ( + "The number of parameter values set or registered does not match the number of parameters." + in str(execinfo.value) + ) await job.close() - + + @pytest.mark.asyncio async def test_prepare_statement_too_many(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - parameters=[500, 'hello'] - ) - query = job.query('select * from sample.employee where bonus > ?', opts=opts) + opts = QueryOptions(parameters=[500, "hello"]) + query = job.query("select * from sample.employee where bonus > ?", opts=opts) with pytest.raises(Exception) as execinfo: res = await query.run() - assert 'Descriptor index not valid. (2>1)' in str(execinfo.value) - + assert "Descriptor index not valid. (2>1)" in str(execinfo.value) + + @pytest.mark.asyncio async def test_prepare_statement_invalid_data(): job = PoolJob() _ = await job.connect(creds) - opts = QueryOptions( - parameters=[{'bonus': 500}] - ) - query = job.query('select * from sample.employee where bonus > ?', opts=opts) + opts = QueryOptions(parameters=[{"bonus": 500}]) + query = job.query("select * from sample.employee where bonus > ?", opts=opts) with pytest.raises(Exception) as execinfo: res = await query.run() - assert 'JsonObject' in str(execinfo.value) - + assert "JsonObject" in str(execinfo.value) + + @pytest.mark.asyncio async def test_run_from_job(): job = PoolJob() _ = await job.connect(creds) - res = await job.query_and_run('select * from sample.employee') - assert res['success'] - + res = await job.query_and_run("select * from sample.employee") + assert res["success"] + + @pytest.mark.asyncio async def test_multiple_statements(): job = PoolJob() @@ -356,7 +368,3 @@ async def test_multiple_statements(): assert resB["success"] is True await job.close() - - - - diff --git a/tests/pep249_async_test.py b/tests/pep249_async_test.py new file mode 100644 index 0000000..d19ee90 --- /dev/null +++ b/tests/pep249_async_test.py @@ -0,0 +1,103 @@ +import asyncio +import os +import time + +import pytest + +import mapepire_python +from mapepire_python.asyncio import connect +from mapepire_python.data_types import DaemonServer +from mapepire_python.pool.pool_client import Pool, PoolOptions + +server = os.getenv("VITE_SERVER") +user = os.getenv("VITE_DB_USER") +password = os.getenv("VITE_DB_PASS") +port = os.getenv("VITE_DB_PORT") + +# Check if environment variables are set +if not server or not user or not password: + raise ValueError("One or more environment variables are missing.") + + +creds = DaemonServer( + host=server, + port=port, + user=user, + password=password, + ignoreUnauthorized=True, +) + + +@pytest.mark.asyncio +async def test_pep249_async_raw(): + async with connect(creds) as conn: + async with await conn.execute("select * from sample.employee") as cur: + res = await cur.fetchone() + assert res["success"] == True + + +@pytest.mark.asyncio +async def test_pep249_async_next(): + async def async_row_generator(query: str): + async with connect(creds) as conn: + async with await conn.execute(query) as cur: + try: + while True: + row = await cur.__anext__() + yield row + except StopAsyncIteration: + pass + + rows = [] + async for row in async_row_generator("select * from sample.department"): + rows.append(row) + + assert all(row["success"] for row in rows) + + +@pytest.mark.asyncio +async def test_pep249_async_for(): + async def async_row_generator(query: str): + async with connect(creds) as conn: + async for row in await conn.execute(query): + yield row + + time1 = asyncio.get_event_loop().time() + rows = [] + async for row in async_row_generator("select * from sample.department"): + assert row["success"] == True + time2 = asyncio.get_event_loop().time() + + print(f"fime: {time2 - time1}") + + +def test_sync_pep249(): + start_time = time.time() + with mapepire_python.connect(creds) as conn: + with conn.execute("select * from sample.department") as cur: + cur.fetchall() + end_time = time.time() + print(f"Synchronous execution time: {end_time - start_time} seconds") + + +@pytest.mark.asyncio +async def test_async_pep249(): + start_time = time.time() + async with connect(creds) as conn: + async with await conn.execute("select * from sample.department") as cur: + await cur.fetchall() + end_time = time.time() + print(f"Asynchronous execution time: {end_time - start_time} seconds") + + +@pytest.mark.asyncio +async def test_pool_perf(): + start_time = time.time() + async with Pool( + options=PoolOptions(creds=creds, opts=None, max_size=1, starting_size=1) + ) as pool: + res = await asyncio.gather(pool.execute("select * from sample.department")) + assert res[0]["success"] == True + + end_time = time.time() + print(f"Asynchronous execution time: {end_time - start_time} seconds") diff --git a/tests/pep249_test.py b/tests/pep249_test.py new file mode 100644 index 0000000..38b4841 --- /dev/null +++ b/tests/pep249_test.py @@ -0,0 +1,266 @@ +import os + +from mapepire_python import connect +from mapepire_python.data_types import DaemonServer, QueryOptions + +# Fetch environment variables +server = os.getenv("VITE_SERVER") +user = os.getenv("VITE_DB_USER") +password = os.getenv("VITE_DB_PASS") +port = os.getenv("VITE_DB_PORT") + +# Check if environment variables are set +if not server or not user or not password: + raise ValueError("One or more environment variables are missing.") + +creds = DaemonServer( + host=server, + port=port, + user=user, + password=password, + ignoreUnauthorized=True, +) + + +def test_pep249(): + conn = connect(creds) + cur = conn.execute("select * from sample.employee") + assert cur.rowcount == -1 + res = cur.fetchmany(5) + cur.close() + conn.close() + assert len(res["data"]) == 5 + + +def test_pep249_no_size(): + conn = connect(creds) + cur = conn.execute("select * from sample.employee") + res = cur.fetchmany() + assert len(res["data"]) == 1 + + +def test_pep249_set_array_size(): + conn = connect(creds) + cur = conn.execute("select * from sample.employee") + cur.arraysize = 10 + res = cur.fetchmany() + assert len(res["data"]) == 10 + + +def test_pep249_cm_fetchmany(): + with connect(creds) as connection: + with connection.execute("select * from sample.employee") as cur: + res = cur.fetchmany(5) + assert len(res["data"]) == 5 + + +def test_pep249_cm_fetchall(): + with connect(creds) as connection: + with connection.execute("select * from sample.employee") as cur: + res = cur.fetchall() + assert len(res["data"]) > 5 + + +def test_pep249_cm_fetchone(): + with connect(creds) as connection: + with connection.execute("select * from sample.employee") as cur: + res = cur.fetchone() + print(res["data"]) + assert len(res["data"]) == 1 + + res2 = cur.fetchone() + print(res2["data"]) + assert res["data"] != res2["data"] + + +def test_pep249_cm_next(): + with connect(creds) as connection: + with connection.execute("select * from sample.employee") as cur: + assert next(cur) is not None + + +def test_pep249_query_queue(): + conn = connect(creds) + cur = conn.cursor() + cur.execute("select * from sample.employee") + cur.execute("select * from sample.department") + assert len(cur.query_q) == 2 + print("Employee\n") + row = cur.fetchone() + while row is not None: + row = cur.fetchone() + + print("\nDepartment:\n") + cur.nextset() + row = cur.fetchone() + while row is not None: + row = cur.fetchone() + + +def test_pep249_query_queue_error(): + conn = connect(creds) + cur = conn.cursor() + cur.execute("select * from sample.employee") + print("Employee\n") + row = cur.fetchone() + while row is not None: + row = cur.fetchone() + + next_set = cur.nextset() + assert next_set is None + + +def test_prepare_statement_mult_params(): + conn = connect(creds) + cur = conn.cursor() + opts = QueryOptions(parameters=[500, "PRES"]) + cur.execute("select * from sample.employee where bonus > ? and job = ?", opts=opts) + res = cur.fetchall() + assert res["success"] == True + + +def test_prepare_statement_mult_params_seq(): + conn = connect(creds) + cur = conn.cursor() + parameters = [500, "PRES"] + cur.execute("select * from sample.employee where bonus > ? and job = ?", parameters=parameters) + res = cur.fetchall() + assert res["success"] == True + + +def test_pep249_iterate(): + def rows(): + with connect(creds) as conn: + for row in conn.execute("select * from sample.department"): + yield row + + cool_rows = rows() + for row in cool_rows: + assert row["success"] == True + + +def test_pep249_iterate_cur(): + with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cur: + for _ in cur.fetchmany(5)["data"]: + pass + assert True + return + assert False + + +def test_pep249_nextset(): + conn = connect(creds) + cur = conn.execute("select * from sample.employee") + res = cur.nextset() + assert res == None + + +def test_pep249_nextset_true(): + conn = connect(creds) + cur = conn.cursor() + cur.execute("select * from sample.employee") + cur.execute("select * from sample.department") + assert len(cur.query_q) == 2 + res = cur.nextset() + assert res == True + + res = cur.nextset() + assert res == None + rows = cur.fetchmany(5) + assert len(rows["data"]) == 5 + + res = cur.nextset() + assert res == None + rows = cur.fetchmany(5) + assert len(rows["data"]) == 5 + + +def test_pep249_execute_many(): + conn = connect(creds) + cur = conn.cursor() + parameters = [ + ["SANJULA", "416 345 0879"], + ["TONGKUN", "647 345 0879"], + ["KATHERINE", "905 345 1879"], + ["IRFAN", "647 345 0879"], + ["SANJULA", "416 234 0879"], + ["TONGKUN", "333 345 0879"], + ["KATHERINE", "416 345 0000"], + ["IRFAN", "416 345 3333"], + ["SANJULA", "416 545 0879"], + ["TONGKUN", "456 345 0879"], + ["KATHERINE", "416 065 1879"], + ["IRFAN", "416 345 1111"], + ] + cur.execute("drop table sample.deletemepy if exists") + cur.execute("CREATE or replace TABLE SAMPLE.DELETEMEPY (name varchar(10), phone varchar(12))") + assert len(cur.query_q) == 0 + res = cur.fetchall() + assert res == None + + cur.executemany("INSERT INTO SAMPLE.DELETEMEPY values (?, ?)", parameters) + + assert cur.rowcount == 12 + + cur.execute("select * from sample.deletemepy") + + res = cur.fetchall() + + assert len(res["data"]) == 12 + + +def test_pep249_has_results(): + with connect(creds) as conn: + cur = conn.cursor() + cur.execute("select * from sample.department") + assert cur.has_results == True + + cur.execute( + "create or replace variable sample.coolval varchar(8) ccsid 1208 default 'abcd'" + ) + + assert cur.has_results == True + + assert cur.fetchall() is not None + + +def test_pep249_has_results_no_select(): + with connect(creds) as conn: + cur = conn.cursor() + cur.execute( + "create or replace variable sample.coolval varchar(8) ccsid 1208 default 'abcd'" + ) + + assert cur.has_results == False + + assert cur.fetchall() is None + + +def test_pep249_has_results_setter(): + with connect(creds) as conn: + cur = conn.cursor() + cur.execute( + "create or replace variable sample.coolval varchar(8) ccsid 1208 default 'abcd'" + ) + + assert cur.has_results == False + + assert cur.fetchall() is None + + +def test_pep249_has_results_flow(): + with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cur: + if cur.has_results: + assert cur.fetchall() is not None + + +def test_pep249_has_next_set_None(): + with connect(creds) as conn: + with conn.execute("select * from sample.employee") as cur: + if cur.has_results: + assert cur.fetchall() is not None + if not cur.nextset(): + cur.execute("select * from sample.department") + assert cur.fetchone() is not None diff --git a/tests/pooling_test.py b/tests/pooling_test.py index 8dd8a1d..53f982f 100644 --- a/tests/pooling_test.py +++ b/tests/pooling_test.py @@ -4,19 +4,17 @@ import pytest from mapepire_python.client.sql_job import SQLJob -from mapepire_python.data_types import DaemonServer, JobStatus, QueryOptions +from mapepire_python.data_types import DaemonServer, JobStatus from mapepire_python.pool.pool_client import Pool, PoolOptions -from mapepire_python.pool.pool_job import PoolJob -from mapepire_python.query_manager import QueryManager -server = os.getenv('VITE_SERVER') -user = os.getenv('VITE_DB_USER') -password = os.getenv('VITE_DB_PASS') -port = os.getenv('VITE_DB_PORT') +server = os.getenv("VITE_SERVER") +user = os.getenv("VITE_DB_USER") +password = os.getenv("VITE_DB_PASS") +port = os.getenv("VITE_DB_PORT") # Check if environment variables are set if not server or not user or not password: - raise ValueError('One or more environment variables are missing.') + raise ValueError("One or more environment variables are missing.") creds = DaemonServer( @@ -31,22 +29,17 @@ @pytest.mark.asyncio async def test_simple_pool_cm(): async with Pool( - options=PoolOptions( - creds=creds, - opts=None, - max_size=5, - starting_size=3 - ) + options=PoolOptions(creds=creds, opts=None, max_size=5, starting_size=3) ) as pool: job_names = [] try: resultsA = await asyncio.gather( - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)') + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), ) - job_names = [res['data'][0]['00001'] for res in resultsA] - print(job_names) + print(resultsA) + job_names = [res["data"][0]["00001"] for res in resultsA] assert len(job_names) == 3 assert pool.get_active_job_count() == 3 finally: @@ -56,70 +49,68 @@ async def test_simple_pool_cm(): for task in pending: task.cancel() + @pytest.mark.asyncio async def test_simple_pool(): async with Pool( - options=PoolOptions( - creds=creds, - opts=None, - max_size=5, - starting_size=3 - ) + options=PoolOptions(creds=creds, opts=None, max_size=5, starting_size=3) ) as pool: - + job_names = [] resultsA = await asyncio.gather( - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)') + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), ) - job_names = [res['data'][0]['00001'] for res in resultsA] + job_names = [res["data"][0]["00001"] for res in resultsA] assert len(job_names) == 3 - + assert pool.get_active_job_count() == 3 - + resultsB = await asyncio.gather( - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), - pool.execute('values (job_name)'), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), + pool.execute("values (job_name)"), ) - - job_names = [res['data'][0]["00001"] for res in resultsB] + + job_names = [res["data"][0]["00001"] for res in resultsB] assert len(job_names) == 15 - - + + @pytest.mark.asyncio async def test_starting_size_greater_than_max_size(): pool = Pool(PoolOptions(creds=creds, max_size=1, starting_size=10)) with pytest.raises(ValueError, match="Max size must be greater than or equal to starting size"): await pool.init() - - + + @pytest.mark.asyncio async def test_max_size_of_0(): pool = Pool(PoolOptions(creds=creds, max_size=0, starting_size=10)) with pytest.raises(ValueError, match="Max size must be greater than 0"): await pool.init() + @pytest.mark.asyncio async def test_starting_size_of_0(): pool = Pool(PoolOptions(creds=creds, max_size=5, starting_size=0)) with pytest.raises(ValueError, match="Starting size must be greater than 0"): await pool.init() + @pytest.mark.asyncio async def test_performance_test(): pool = Pool(PoolOptions(creds=creds, max_size=5, starting_size=5)) @@ -129,7 +120,7 @@ async def test_performance_test(): results = await asyncio.gather(*queries) end_pool1 = asyncio.get_event_loop().time() await pool.end() - assert all(res['has_results'] for res in results) + assert all(res["has_results"] for res in results) pool = Pool(PoolOptions(creds=creds, max_size=1, starting_size=1)) await pool.init() @@ -138,7 +129,7 @@ async def test_performance_test(): results = await asyncio.gather(*queries) end_pool2 = asyncio.get_event_loop().time() await pool.end() - assert all(res['has_results'] for res in results) + assert all(res["has_results"] for res in results) no_pool_start = asyncio.get_event_loop().time() # for _ in range(20): @@ -155,7 +146,7 @@ async def test_performance_test(): print(f"Time taken without pool: {no_pool_end - no_pool_start} seconds") # assert (end_pool2 - start_pool2) > (end_pool1 - start_pool1) # assert (no_pool_end - no_pool_start) > (end_pool2 - start_pool2) - + @pytest.mark.asyncio async def test_pool_with_no_space_but_ready_job_returns_ready_job(): @@ -167,15 +158,14 @@ async def test_pool_with_no_space_but_ready_job_returns_ready_job(): assert job.get_status() == JobStatus.Ready assert job.get_running_count() == 0 await asyncio.gather(*executed_promise) - - - -# Functionality of pop_job() needs review + + +# Functionality of pop_job() needs review @pytest.mark.asyncio async def test_pop_jobs_returns_free_job(): async with Pool(PoolOptions(creds=creds, max_size=5, starting_size=5)) as pool: try: - + assert pool.get_active_job_count() == 5 executed_promises = [ pool.execute("select * FROM SAMPLE.employee"), @@ -193,20 +183,20 @@ async def test_pop_jobs_returns_free_job(): if pending: for task in pending: task.cancel() - - + + # @pytest.mark.asyncio # async def test_pop_job_with_pool_ignore(): # async with Pool(PoolOptions(creds=creds, max_size=1, starting_size=1)) as pool: # try: - + # assert pool.get_active_job_count() == 1 - + # executed_promises = [pool.execute("select * FROM SAMPLE.employee")] - + # # there is 1 job in pool, return that job # job = await pool.pop_job() - + # # the pool is empty, this will create a new job and add it to the pool # job2 = await pool.pop_job() # assert len(pool.jobs) == 1 @@ -220,9 +210,6 @@ async def test_pop_jobs_returns_free_job(): # for task in pending: # task.cancel() - - - # The following tests need further invesigation for tracking JobStatus and running tasks @@ -247,7 +234,6 @@ async def test_pop_jobs_returns_free_job(): # await pool.end() - # @pytest.mark.asyncio # async def test_pool_with_space_but_no_ready_job_adds_job_to_pool(): # pool = Pool(PoolOptions(creds=creds, max_size=2, starting_size=1)) @@ -287,6 +273,3 @@ async def test_pop_jobs_returns_free_job(): # assert job.get_running_count() == 2 # await asyncio.gather(*executed_promises) # await pool.end() - - - \ No newline at end of file diff --git a/tests/query_manager_test.py b/tests/query_manager_test.py index 5c18772..09e3c4b 100644 --- a/tests/query_manager_test.py +++ b/tests/query_manager_test.py @@ -1,21 +1,18 @@ import os -import re - -import pytest from mapepire_python.client.sql_job import SQLJob -from mapepire_python.data_types import DaemonServer, QueryOptions +from mapepire_python.data_types import DaemonServer from mapepire_python.query_manager import QueryManager # Fetch environment variables -server = os.getenv('VITE_SERVER') -user = os.getenv('VITE_DB_USER') -password = os.getenv('VITE_DB_PASS') -port = os.getenv('VITE_DB_PORT') +server = os.getenv("VITE_SERVER") +user = os.getenv("VITE_DB_USER") +password = os.getenv("VITE_DB_PASS") +port = os.getenv("VITE_DB_PORT") # Check if environment variables are set if not server or not user or not password: - raise ValueError('One or more environment variables are missing.') + raise ValueError("One or more environment variables are missing.") creds = DaemonServer( @@ -26,75 +23,78 @@ ignoreUnauthorized=True, ) + def test_query_manager(): # connection logic job = SQLJob() job.connect(creds) - + # Query Manager query_manager = QueryManager(job) - + # create a unique query query = query_manager.create_query("select * from sample.employee") - + # run query result = query_manager.run_query(query) - - assert result['success'] + + assert result["success"] job.close() with SQLJob(creds) as sql_job: query_manager = QueryManager(sql_job) with query_manager.create_query("select * from sample.employee") as query: result = query_manager.run_query(query, rows_to_fetch=1) print(result) - + def test_query_manager_with_cm_q_and_run(): with SQLJob(creds=creds) as job: query_manager = QueryManager(job) - res = query_manager.query_and_run('select * from sample.employee') - assert res['success'] == True - + res = query_manager.query_and_run("select * from sample.employee") + assert res["success"] == True + - def test_context_manager(): with SQLJob() as job: job.connect(creds) - + query_manager = QueryManager(job) query = query_manager.create_query("select * from sample.department") result = query_manager.run_query(query) - assert result['success'] + assert result["success"] + def test_simple_v2(): with SQLJob(creds) as job: query_manager = QueryManager(job) - query = query_manager.create_query('select * from sample.employee') + query = query_manager.create_query("select * from sample.employee") result = query_manager.run_query(query, rows_to_fetch=5) - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True query.close() - + + def test_query_large_dataset(): job = SQLJob() _ = job.connect(creds) query_manager = QueryManager(job) - query = query_manager.create_query('select * from sample.employee') - + query = query_manager.create_query("select * from sample.employee") + result = query_manager.run_query(query, rows_to_fetch=30) query.close() job.close() - - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True - assert len(result['data']) == 30 - + + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True + assert len(result["data"]) == 30 + + def test_query_and_run(): with SQLJob(creds) as job: query_manager = QueryManager(job) - result = query_manager.query_and_run('select * from sample.employee', rows_to_fetch=5) - assert result['success'] == True - assert result['is_done'] == False - assert result['has_results'] == True \ No newline at end of file + result = query_manager.query_and_run("select * from sample.employee", rows_to_fetch=5) + assert result["success"] == True + assert result["is_done"] == False + assert result["has_results"] == True diff --git a/tests/sql_test.py b/tests/sql_test.py index 1c1c691..0a216b5 100644 --- a/tests/sql_test.py +++ b/tests/sql_test.py @@ -34,15 +34,16 @@ def test_simple(): assert result["success"] is True assert result["is_done"] is False assert result["has_results"] is True - + + def test_simple_dict(): creds_dict = { - 'host': server, - 'user': user, - 'port': port, - 'password': password, - 'port': port, - 'ignoreUnauthorized': True + "host": server, + "user": user, + "port": port, + "password": password, + "port": port, + "ignoreUnauthorized": True, } job = SQLJob() _ = job.connect(creds_dict) @@ -52,12 +53,55 @@ def test_simple_dict(): assert result["success"] is True assert result["is_done"] is False assert result["has_results"] is True - - + + +# def test_pep249_execute_many(): +# parameters = [ +# ["SANJULA", "416 345 0879"], +# ["TONGKUN", "647 345 0879"], +# ["KATHERINE", "905 345 1879"], +# ["IRFAN", "647 345 0879"], +# ["SANJULA", "416 234 0879"], +# ["TONGKUN", "333 345 0879"], +# ["KATHERINE", "416 345 0000"], +# ["IRFAN", "416 345 3333"], +# ["SANJULA", "416 545 0879"], +# ["TONGKUN", "456 345 0879"], +# ["KATHERINE", "416 065 1879"], +# ["IRFAN", "416 345 1111"], +# ] + +# with SQLJob(creds) as job: +# with job.query('select * from sample.deleteme') as q: +# res = q.run() +# print(res) +# cur.execute("drop table sample.deleteme if exists") +# cur.execute("CREATE or replace TABLE SAMPLE.DELETEME (name varchar(10), phone varchar(12))") +# assert len(cur.query_q) == 2 + +# cur.execute('INSERT INTO SAMPLE.DELETEME values (?, ?)', parameters) +# print(cur.query) +# cur.nextset() + +# print(cur.query) +# cur.nextset() + +# print(cur.query) +# cur.execute('select * from sample.deleteme') + +# cur.nextset() +# print(cur.query) +# res = cur.fetchall() + + +# print(res) + + def test_with_q_and_run_cm(): with SQLJob(creds=creds) as job: - res = job.query_and_run('select * from sample.employee') - assert res['success'] == True + res = job.query_and_run("select * from sample.employee") + assert res["success"] == True + def test_query_large_dataset(): job = SQLJob()