From 09e2dd7b9fd5199cd18f4a356698c7f27b675e5b Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sat, 2 Mar 2024 19:29:15 +0500
Subject: [PATCH 01/15] Add and test psycopg3 support

---
 .github/workflows/test-suite.yml     |  9 +++++----
 README.md                            |  3 +++
 databases/backends/common/records.py |  2 --
 docs/index.md                        |  3 +++
 requirements.txt                     |  3 +++
 setup.py                             |  9 +++++----
 tests/test_databases.py              | 20 +++++++++++++++++---
 tests/test_integration.py            | 23 ++++++++++++++---------
 8 files changed, 50 insertions(+), 22 deletions(-)

diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml
index f85ca99a..a46893de 100644
--- a/.github/workflows/test-suite.yml
+++ b/.github/workflows/test-suite.yml
@@ -18,7 +18,7 @@ jobs:
 
     services:
       mysql:
-        image: mysql:5.7
+        image: mariadb:11
         env:
           MYSQL_USER: username
           MYSQL_PASSWORD: password
@@ -26,10 +26,10 @@ jobs:
           MYSQL_DATABASE: testsuite
         ports:
           - 3306:3306
-        options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
+        options: --health-cmd="mariadb-admin ping" --health-interval=10s --health-timeout=5s --health-retries=3
 
       postgres:
-        image: postgres:14
+        image: postgres:16
         env:
           POSTGRES_USER: username
           POSTGRES_PASSWORD: password
@@ -59,5 +59,6 @@ jobs:
             mysql+asyncmy://username:password@localhost:3306/testsuite,
             postgresql://username:password@localhost:5432/testsuite,
             postgresql+aiopg://username:password@127.0.0.1:5432/testsuite,
-            postgresql+asyncpg://username:password@localhost:5432/testsuite
+            postgresql+asyncpg://username:password@localhost:5432/testsuite,
+            postgresql+psycopg://username:password@localhost:5432/testsuite
         run: "scripts/test"
diff --git a/README.md b/README.md
index f40cd173..edf68e40 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@ Database drivers supported are:
 
 * [asyncpg][asyncpg]
 * [aiopg][aiopg]
+* [psycopg3][psycopg3]
 * [aiomysql][aiomysql]
 * [asyncmy][asyncmy]
 * [aiosqlite][aiosqlite]
@@ -42,6 +43,7 @@ You can install the required database drivers with:
 ```shell
 $ pip install databases[asyncpg]
 $ pip install databases[aiopg]
+$ pip install databases[psycopg3]
 $ pip install databases[aiomysql]
 $ pip install databases[asyncmy]
 $ pip install databases[aiosqlite]
@@ -105,6 +107,7 @@ for examples of how to start using databases together with SQLAlchemy core expre
 [pymysql]: https://github.com/PyMySQL/PyMySQL
 [asyncpg]: https://github.com/MagicStack/asyncpg
 [aiopg]: https://github.com/aio-libs/aiopg
+[psycopg3]: https://github.com/psycopg/psycopg
 [aiomysql]: https://github.com/aio-libs/aiomysql
 [asyncmy]: https://github.com/long2ice/asyncmy
 [aiosqlite]: https://github.com/omnilib/aiosqlite
diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py
index e963af50..f57de839 100644
--- a/databases/backends/common/records.py
+++ b/databases/backends/common/records.py
@@ -1,6 +1,4 @@
-import enum
 import typing
-from datetime import date, datetime, time
 
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.engine.row import Row as SQLRow
diff --git a/docs/index.md b/docs/index.md
index 7c3cebf2..c4e581d5 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -31,6 +31,7 @@ Database drivers supported are:
 
 * [asyncpg][asyncpg]
 * [aiopg][aiopg]
+* [psycopg3][psycopg3]
 * [aiomysql][aiomysql]
 * [asyncmy][asyncmy]
 * [aiosqlite][aiosqlite]
@@ -40,6 +41,7 @@ You can install the required database drivers with:
 ```shell
 $ pip install databases[asyncpg]
 $ pip install databases[aiopg]
+$ pip install databases[psycopg3]
 $ pip install databases[aiomysql]
 $ pip install databases[asyncmy]
 $ pip install databases[aiosqlite]
@@ -103,6 +105,7 @@ for examples of how to start using databases together with SQLAlchemy core expre
 [pymysql]: https://github.com/PyMySQL/PyMySQL
 [asyncpg]: https://github.com/MagicStack/asyncpg
 [aiopg]: https://github.com/aio-libs/aiopg
+[psycopg3]: https://github.com/psycopg/psycopg
 [aiomysql]: https://github.com/aio-libs/aiomysql
 [asyncmy]: https://github.com/long2ice/asyncmy
 [aiosqlite]: https://github.com/omnilib/aiosqlite
diff --git a/requirements.txt b/requirements.txt
index 8b05a46e..be086f77 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,9 +6,12 @@ aiomysql==0.2.0
 aiopg==1.4.0
 aiosqlite==0.20.0
 asyncpg==0.29.0
+psycopg==3.1.18
+psycopg-binary==3.1.18
 
 # Sync database drivers for standard tooling around setup/teardown/migrations.
 psycopg==3.1.18
+psycopg-binary==3.1.18
 pymysql==1.1.0
 
 # Testing
diff --git a/setup.py b/setup.py
index 41c0c584..9218bc1f 100644
--- a/setup.py
+++ b/setup.py
@@ -47,14 +47,15 @@ def get_packages(package):
     author_email="tom@tomchristie.com",
     packages=get_packages("databases"),
     package_data={"databases": ["py.typed"]},
-    install_requires=["sqlalchemy>=2.0.7"],
+    install_requires=["sqlalchemy>=2.0.11"],
     extras_require={
-        "postgresql": ["asyncpg"],
-        "asyncpg": ["asyncpg"],
-        "aiopg": ["aiopg"],
         "mysql": ["aiomysql"],
         "aiomysql": ["aiomysql"],
         "asyncmy": ["asyncmy"],
+        "postgresql": ["asyncpg"],
+        "aiopg": ["aiopg"],
+        "asyncpg": ["asyncpg"],
+        "psycopg3": ["psycopg"],
         "sqlite": ["aiosqlite"],
         "aiosqlite": ["aiosqlite"],
     },
diff --git a/tests/test_databases.py b/tests/test_databases.py
index d9d9e4d6..90771b1d 100644
--- a/tests/test_databases.py
+++ b/tests/test_databases.py
@@ -134,6 +134,7 @@ def create_test_database():
             "postgresql+aiopg",
             "sqlite+aiosqlite",
             "postgresql+asyncpg",
+            "postgresql+psycopg",
         ]:
             url = str(database_url.replace(driver=None))
         engine = sqlalchemy.create_engine(url)
@@ -151,6 +152,7 @@ def create_test_database():
             "postgresql+aiopg",
             "sqlite+aiosqlite",
             "postgresql+asyncpg",
+            "postgresql+psycopg",
         ]:
             url = str(database_url.replace(driver=None))
         engine = sqlalchemy.create_engine(url)
@@ -1354,7 +1356,11 @@ async def test_queries_with_expose_backend_connection(database_url):
                 elif database.url.scheme == "mysql+asyncmy":
                     async with raw_connection.cursor() as cursor:
                         await cursor.execute(insert_query, values)
-                elif database.url.scheme in ["postgresql", "postgresql+asyncpg"]:
+                elif database.url.scheme in [
+                    "postgresql",
+                    "postgresql+asyncpg",
+                    "postgresql+psycopg",
+                ]:
                     await raw_connection.execute(insert_query, *values)
                 elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]:
                     await raw_connection.execute(insert_query, values)
@@ -1392,7 +1398,11 @@ async def test_queries_with_expose_backend_connection(database_url):
                     async with raw_connection.cursor() as cursor:
                         await cursor.execute(select_query)
                         results = await cursor.fetchall()
-                elif database.url.scheme in ["postgresql", "postgresql+asyncpg"]:
+                elif database.url.scheme in [
+                    "postgresql",
+                    "postgresql+asyncpg",
+                    "postgresql+psycopg",
+                ]:
                     results = await raw_connection.fetch(select_query)
                 elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]:
                     results = await raw_connection.execute_fetchall(select_query)
@@ -1407,7 +1417,11 @@ async def test_queries_with_expose_backend_connection(database_url):
                 assert results[2][2] == True
 
                 # fetch_one()
-                if database.url.scheme in ["postgresql", "postgresql+asyncpg"]:
+                if database.url.scheme in [
+                    "postgresql",
+                    "postgresql+asyncpg",
+                    "postgresql+psycopg",
+                ]:
                     result = await raw_connection.fetchrow(select_query)
                 elif database.url.scheme == "mysql+asyncmy":
                     async with raw_connection.cursor() as cursor:
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 139f8ffe..0605529f 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -1,7 +1,10 @@
+import contextlib
+
 import pytest
 import sqlalchemy
 from starlette.applications import Starlette
 from starlette.responses import JSONResponse
+from starlette.routing import Route
 from starlette.testclient import TestClient
 
 from databases import Database, DatabaseURL
@@ -29,6 +32,7 @@ def create_test_database():
             "postgresql+aiopg",
             "sqlite+aiosqlite",
             "postgresql+asyncpg",
+            "postgresql+psycopg",
         ]:
             url = str(database_url.replace(driver=None))
         engine = sqlalchemy.create_engine(url)
@@ -45,6 +49,7 @@ def create_test_database():
             "postgresql+aiopg",
             "sqlite+aiosqlite",
             "postgresql+asyncpg",
+            "postgresql+psycopg",
         ]:
             url = str(database_url.replace(driver=None))
         engine = sqlalchemy.create_engine(url)
@@ -53,17 +58,13 @@ def create_test_database():
 
 def get_app(database_url):
     database = Database(database_url, force_rollback=True)
-    app = Starlette()
 
-    @app.on_event("startup")
-    async def startup():
+    @contextlib.asynccontextmanager
+    async def lifespan(app):
         await database.connect()
-
-    @app.on_event("shutdown")
-    async def shutdown():
+        yield
         await database.disconnect()
 
-    @app.route("/notes", methods=["GET"])
     async def list_notes(request):
         query = notes.select()
         results = await database.fetch_all(query)
@@ -73,14 +74,18 @@ async def list_notes(request):
         ]
         return JSONResponse(content)
 
-    @app.route("/notes", methods=["POST"])
     async def add_note(request):
         data = await request.json()
         query = notes.insert().values(text=data["text"], completed=data["completed"])
         await database.execute(query)
         return JSONResponse({"text": data["text"], "completed": data["completed"]})
 
-    return app
+    routes = [
+        Route("/notes", list_notes, methods=["GET"]),
+        Route("/notes", add_note, methods=["POST"]),
+    ]
+
+    return Starlette(routes=routes, lifespan=lifespan)
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)

From 37c450cf1d7848def2fa9483f6c7c507038d9829 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sat, 2 Mar 2024 19:40:30 +0500
Subject: [PATCH 02/15] Update docker-compose example to use newer MariaDB
 image

---
 docs/contributing.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/contributing.md b/docs/contributing.md
index 92ab3b3c..fccc8687 100644
--- a/docs/contributing.md
+++ b/docs/contributing.md
@@ -73,7 +73,7 @@ run all of those with lint script
     version: '2.1'
     services:
       postgres:
-        image: postgres:10.8
+        image: postgres:16
         environment:
           POSTGRES_USER: username
           POSTGRES_PASSWORD: password
@@ -82,7 +82,7 @@ run all of those with lint script
           - 5432:5432
     
       mysql:
-        image: mysql:5.7
+        image: mariadb:11
         environment:
           MYSQL_USER: username
           MYSQL_PASSWORD: password

From 8cbcccb3c09304a51d14fcbcbe45e5f983de248f Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sat, 2 Mar 2024 22:40:33 +0500
Subject: [PATCH 03/15] Rename PostgresBackend to AsyncpgBackend

---
 .../backends/{postgres.py => asyncpg.py}      |  33 ++---
 databases/backends/dialects/psycopg.py        |  11 +-
 databases/backends/psycopg.py                 | 118 ++++++++++++++++++
 databases/core.py                             |   8 +-
 tests/test_connection_options.py              |  20 +--
 5 files changed, 154 insertions(+), 36 deletions(-)
 rename databases/backends/{postgres.py => asyncpg.py} (88%)
 create mode 100644 databases/backends/psycopg.py

diff --git a/databases/backends/postgres.py b/databases/backends/asyncpg.py
similarity index 88%
rename from databases/backends/postgres.py
rename to databases/backends/asyncpg.py
index c42688e1..98ac44ea 100644
--- a/databases/backends/postgres.py
+++ b/databases/backends/asyncpg.py
@@ -7,7 +7,7 @@
 from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, create_column_maps
-from databases.backends.dialects.psycopg import dialect as psycopg_dialect
+from databases.backends.dialects.psycopg import get_dialect
 from databases.core import LOG_EXTRA, DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
@@ -19,28 +19,15 @@
 logger = logging.getLogger("databases")
 
 
-class PostgresBackend(DatabaseBackend):
+class AsyncpgBackend(DatabaseBackend):
     def __init__(
         self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any
     ) -> None:
         self._database_url = DatabaseURL(database_url)
         self._options = options
-        self._dialect = self._get_dialect()
+        self._dialect = get_dialect()
         self._pool = None
 
-    def _get_dialect(self) -> Dialect:
-        dialect = psycopg_dialect(paramstyle="pyformat")
-
-        dialect.implicit_returning = True
-        dialect.supports_native_enum = True
-        dialect.supports_smallserial = True  # 9.2+
-        dialect._backslash_escapes = False
-        dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
-        dialect._has_native_hstore = True
-        dialect.supports_native_decimal = True
-
-        return dialect
-
     def _get_connection_kwargs(self) -> dict:
         url_options = self._database_url.options
 
@@ -78,12 +65,12 @@ async def disconnect(self) -> None:
         await self._pool.close()
         self._pool = None
 
-    def connection(self) -> "PostgresConnection":
-        return PostgresConnection(self, self._dialect)
+    def connection(self) -> "AsyncpgConnection":
+        return AsyncpgConnection(self, self._dialect)
 
 
-class PostgresConnection(ConnectionBackend):
-    def __init__(self, database: PostgresBackend, dialect: Dialect):
+class AsyncpgConnection(ConnectionBackend):
+    def __init__(self, database: AsyncpgBackend, dialect: Dialect):
         self._database = database
         self._dialect = dialect
         self._connection: typing.Optional[asyncpg.connection.Connection] = None
@@ -159,7 +146,7 @@ async def iterate(
             yield Record(row, result_columns, self._dialect, column_maps)
 
     def transaction(self) -> TransactionBackend:
-        return PostgresTransaction(connection=self)
+        return AsyncpgTransaction(connection=self)
 
     def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]:
         compiled = query.compile(
@@ -197,8 +184,8 @@ def raw_connection(self) -> asyncpg.connection.Connection:
         return self._connection
 
 
-class PostgresTransaction(TransactionBackend):
-    def __init__(self, connection: PostgresConnection):
+class AsyncpgTransaction(TransactionBackend):
+    def __init__(self, connection: AsyncpgConnection):
         self._connection = connection
         self._transaction: typing.Optional[asyncpg.transaction.Transaction] = None
 
diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
index 07bd1880..1caf49fe 100644
--- a/databases/backends/dialects/psycopg.py
+++ b/databases/backends/dialects/psycopg.py
@@ -43,4 +43,13 @@ class PGDialect_psycopg(PGDialect):
     execution_ctx_cls = PGExecutionContext_psycopg
 
 
-dialect = PGDialect_psycopg
+def get_dialect() -> PGDialect_psycopg:
+    dialect = PGDialect_psycopg(paramstyle="pyformat")
+    dialect.implicit_returning = True
+    dialect.supports_native_enum = True
+    dialect.supports_smallserial = True  # 9.2+
+    dialect._backslash_escapes = False
+    dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
+    dialect._has_native_hstore = True
+    dialect.supports_native_decimal = True
+    return dialect
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
new file mode 100644
index 00000000..981742ce
--- /dev/null
+++ b/databases/backends/psycopg.py
@@ -0,0 +1,118 @@
+import typing
+from collections.abc import Sequence
+
+import psycopg_pool
+from sqlalchemy.sql import ClauseElement
+
+from databases.backends.dialects.psycopg import get_dialect
+from databases.core import DatabaseURL
+from databases.interfaces import (
+    ConnectionBackend,
+    DatabaseBackend,
+    TransactionBackend,
+)
+
+
+class PsycopgBackend(DatabaseBackend):
+    def __init__(
+        self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any
+    ) -> None:
+        self._database_url = DatabaseURL(database_url)
+        self._options = options
+        self._dialect = get_dialect()
+        self._pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None
+
+    async def connect(self) -> None:
+        if self._pool is not None:
+            return
+
+        self._pool = psycopg_pool.AsyncConnectionPool(
+            self._database_url.url, open=False, **self._options)
+        await self._pool.open()
+
+    async def disconnect(self) -> None:
+        if self._pool is None:
+            return
+
+        await self._pool.close()
+        self._pool = None
+
+    def connection(self) -> "PsycopgConnection":
+        return PsycopgConnection(self)
+
+
+class PsycopgConnection(ConnectionBackend):
+    def __init__(self, database: PsycopgBackend) -> None:
+        self._database = database
+
+    async def acquire(self) -> None:
+        if self._connection is not None:
+            return
+
+        if self._database._pool is None:
+            raise RuntimeError("PsycopgBackend is not running")
+
+        # TODO: Add configurable timeouts
+        self._connection = await self._database._pool.getconn()
+
+    async def release(self) -> None:
+        if self._connection is None:
+            return
+
+        await self._database._pool.putconn(self._connection)
+        self._connection = None
+
+    async def fetch_all(self, query: ClauseElement) -> typing.List["Record"]:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def fetch_one(self, query: ClauseElement) -> typing.Optional["Record"]:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def fetch_val(
+        self, query: ClauseElement, column: typing.Any = 0
+    ) -> typing.Any:
+        row = await self.fetch_one(query)
+        return None if row is None else row[column]
+
+    async def execute(self, query: ClauseElement) -> typing.Any:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def iterate(
+        self, query: ClauseElement
+    ) -> typing.AsyncGenerator[typing.Mapping, None]:
+        raise NotImplementedError()  # pragma: no cover
+        # mypy needs async iterators to contain a `yield`
+        # https://github.com/python/mypy/issues/5385#issuecomment-407281656
+        yield True  # pragma: no cover
+
+    def transaction(self) -> "TransactionBackend":
+        raise NotImplementedError()  # pragma: no cover
+
+    @property
+    def raw_connection(self) -> typing.Any:
+        raise NotImplementedError()  # pragma: no cover
+
+
+class PsycopgTransaction(TransactionBackend):
+    async def start(
+        self, is_root: bool, extra_options: typing.Dict[typing.Any, typing.Any]
+    ) -> None:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def commit(self) -> None:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def rollback(self) -> None:
+        raise NotImplementedError()  # pragma: no cover
+
+
+class Record(Sequence):
+    @property
+    def _mapping(self) -> typing.Mapping:
+        raise NotImplementedError()  # pragma: no cover
+
+    def __getitem__(self, key: typing.Any) -> typing.Any:
+        raise NotImplementedError()  # pragma: no cover
diff --git a/databases/core.py b/databases/core.py
index d55dd3c8..cba06ced 100644
--- a/databases/core.py
+++ b/databases/core.py
@@ -43,12 +43,16 @@
 
 class Database:
     SUPPORTED_BACKENDS = {
-        "postgresql": "databases.backends.postgres:PostgresBackend",
+        "postgres": "databases.backends.asyncpg:AsyncpgBackend",
+        "postgresql": "databases.backends.asyncpg:AsyncpgBackend",
         "postgresql+aiopg": "databases.backends.aiopg:AiopgBackend",
-        "postgres": "databases.backends.postgres:PostgresBackend",
+        "postgresql+asyncpg": "databases.backends.asyncpg:AsyncpgBackend",
+        "postgresql+psycopg": "databases.backends.psycopg:PsycopgBackend",
         "mysql": "databases.backends.mysql:MySQLBackend",
+        "mysql+aiomysql": "databases.backends.asyncmy:MySQLBackend",
         "mysql+asyncmy": "databases.backends.asyncmy:AsyncMyBackend",
         "sqlite": "databases.backends.sqlite:SQLiteBackend",
+        "sqlite+aiosqlite": "databases.backends.sqlite:SQLiteBackend",
     }
 
     _connection_map: "weakref.WeakKeyDictionary[asyncio.Task, 'Connection']"
diff --git a/tests/test_connection_options.py b/tests/test_connection_options.py
index 81ce2ac7..757393a4 100644
--- a/tests/test_connection_options.py
+++ b/tests/test_connection_options.py
@@ -6,7 +6,7 @@
 import pytest
 
 from databases.backends.aiopg import AiopgBackend
-from databases.backends.postgres import PostgresBackend
+from databases.backends.asyncpg import AsyncpgBackend
 from databases.core import DatabaseURL
 from tests.test_databases import DATABASE_URLS, async_adapter
 
@@ -19,7 +19,7 @@
 
 
 def test_postgres_pool_size():
-    backend = PostgresBackend("postgres://localhost/database?min_size=1&max_size=20")
+    backend = AsyncpgBackend("postgres://localhost/database?min_size=1&max_size=20")
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {"min_size": 1, "max_size": 20}
 
@@ -29,43 +29,43 @@ async def test_postgres_pool_size_connect():
     for url in DATABASE_URLS:
         if DatabaseURL(url).dialect != "postgresql":
             continue
-        backend = PostgresBackend(url + "?min_size=1&max_size=20")
+        backend = AsyncpgBackend(url + "?min_size=1&max_size=20")
         await backend.connect()
         await backend.disconnect()
 
 
 def test_postgres_explicit_pool_size():
-    backend = PostgresBackend("postgres://localhost/database", min_size=1, max_size=20)
+    backend = AsyncpgBackend("postgres://localhost/database", min_size=1, max_size=20)
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {"min_size": 1, "max_size": 20}
 
 
 def test_postgres_ssl():
-    backend = PostgresBackend("postgres://localhost/database?ssl=true")
+    backend = AsyncpgBackend("postgres://localhost/database?ssl=true")
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {"ssl": True}
 
 
 def test_postgres_ssl_verify_full():
-    backend = PostgresBackend("postgres://localhost/database?ssl=verify-full")
+    backend = AsyncpgBackend("postgres://localhost/database?ssl=verify-full")
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {"ssl": "verify-full"}
 
 
 def test_postgres_explicit_ssl():
-    backend = PostgresBackend("postgres://localhost/database", ssl=True)
+    backend = AsyncpgBackend("postgres://localhost/database", ssl=True)
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {"ssl": True}
 
 
 def test_postgres_explicit_ssl_verify_full():
-    backend = PostgresBackend("postgres://localhost/database", ssl="verify-full")
+    backend = AsyncpgBackend("postgres://localhost/database", ssl="verify-full")
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {"ssl": "verify-full"}
 
 
 def test_postgres_no_extra_options():
-    backend = PostgresBackend("postgres://localhost/database")
+    backend = AsyncpgBackend("postgres://localhost/database")
     kwargs = backend._get_connection_kwargs()
     assert kwargs == {}
 
@@ -74,7 +74,7 @@ def test_postgres_password_as_callable():
     def gen_password():
         return "Foo"
 
-    backend = PostgresBackend(
+    backend = AsyncpgBackend(
         "postgres://:password@localhost/database", password=gen_password
     )
     kwargs = backend._get_connection_kwargs()

From a86edfaae53792d7eccceb0d860d5f0200faebb0 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sat, 2 Mar 2024 23:17:09 +0500
Subject: [PATCH 04/15] S01E01

---
 databases/backends/asyncpg.py          | 45 ++++-------------------
 databases/backends/dialects/psycopg.py | 30 ++++++++++++++-
 databases/backends/psycopg.py          | 51 +++++++++++++++++---------
 3 files changed, 70 insertions(+), 56 deletions(-)

diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py
index 98ac44ea..ff61fe26 100644
--- a/databases/backends/asyncpg.py
+++ b/databases/backends/asyncpg.py
@@ -4,11 +4,10 @@
 import asyncpg
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.sql import ClauseElement
-from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, create_column_maps
-from databases.backends.dialects.psycopg import get_dialect
-from databases.core import LOG_EXTRA, DatabaseURL
+from databases.backends.dialects.psycopg import compile_query, get_dialect
+from databases.core import DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
     DatabaseBackend,
@@ -88,7 +87,7 @@ async def release(self) -> None:
 
     async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, result_columns = self._compile(query)
+        query_str, args, result_columns = compile_query(query, self._dialect)
         rows = await self._connection.fetch(query_str, *args)
         dialect = self._dialect
         column_maps = create_column_maps(result_columns)
@@ -96,7 +95,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
 
     async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, result_columns = self._compile(query)
+        query_str, args, result_columns = compile_query(query, self._dialect)
         row = await self._connection.fetchrow(query_str, *args)
         if row is None:
             return None
@@ -124,7 +123,7 @@ async def fetch_val(
 
     async def execute(self, query: ClauseElement) -> typing.Any:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, _ = self._compile(query)
+        query_str, args, _ = compile_query(query, self._dialect)
         return await self._connection.fetchval(query_str, *args)
 
     async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
@@ -133,14 +132,14 @@ async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
         # loop through multiple executes here, which should all end up
         # using the same prepared statement.
         for single_query in queries:
-            single_query, args, _ = self._compile(single_query)
+            single_query, args, _ = compile_query(single_query, self._dialect)
             await self._connection.execute(single_query, *args)
 
     async def iterate(
         self, query: ClauseElement
     ) -> typing.AsyncGenerator[typing.Any, None]:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, result_columns = self._compile(query)
+        query_str, args, result_columns = compile_query(query, self._dialect)
         column_maps = create_column_maps(result_columns)
         async for row in self._connection.cursor(query_str, *args):
             yield Record(row, result_columns, self._dialect, column_maps)
@@ -148,36 +147,6 @@ async def iterate(
     def transaction(self) -> TransactionBackend:
         return AsyncpgTransaction(connection=self)
 
-    def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]:
-        compiled = query.compile(
-            dialect=self._dialect, compile_kwargs={"render_postcompile": True}
-        )
-
-        if not isinstance(query, DDLElement):
-            compiled_params = sorted(compiled.params.items())
-
-            mapping = {
-                key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1)
-            }
-            compiled_query = compiled.string % mapping
-
-            processors = compiled._bind_processors
-            args = [
-                processors[key](val) if key in processors else val
-                for key, val in compiled_params
-            ]
-            result_map = compiled._result_columns
-        else:
-            compiled_query = compiled.string
-            args = []
-            result_map = None
-
-        query_message = compiled_query.replace(" \n", " ").replace("\n", " ")
-        logger.debug(
-            "Query: %s Args: %s", query_message, repr(tuple(args)), extra=LOG_EXTRA
-        )
-        return compiled_query, args, result_map
-
     @property
     def raw_connection(self) -> asyncpg.connection.Connection:
         assert self._connection is not None, "Connection is not acquired"
diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
index 1caf49fe..cfce052e 100644
--- a/databases/backends/dialects/psycopg.py
+++ b/databases/backends/dialects/psycopg.py
@@ -10,6 +10,9 @@
 from sqlalchemy import types, util
 from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext
 from sqlalchemy.engine import processors
+from sqlalchemy.engine.interfaces import Dialect
+from sqlalchemy.sql import ClauseElement
+from sqlalchemy.sql.ddl import DDLElement
 from sqlalchemy.types import Float, Numeric
 
 
@@ -43,7 +46,7 @@ class PGDialect_psycopg(PGDialect):
     execution_ctx_cls = PGExecutionContext_psycopg
 
 
-def get_dialect() -> PGDialect_psycopg:
+def get_dialect() -> Dialect:
     dialect = PGDialect_psycopg(paramstyle="pyformat")
     dialect.implicit_returning = True
     dialect.supports_native_enum = True
@@ -53,3 +56,28 @@ def get_dialect() -> PGDialect_psycopg:
     dialect._has_native_hstore = True
     dialect.supports_native_decimal = True
     return dialect
+
+
+def compile_query(query: ClauseElement, dialect: Dialect) -> typing.Tuple[str, list, tuple]:
+    compiled = query.compile(dialect=dialect, compile_kwargs={"render_postcompile": True})
+
+    if not isinstance(query, DDLElement):
+        compiled_params = sorted(compiled.params.items())
+
+        mapping = {
+            key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1)
+        }
+        compiled_query = compiled.string % mapping
+
+        processors = compiled._bind_processors
+        args = [
+            processors[key](val) if key in processors else val
+            for key, val in compiled_params
+        ]
+        result_map = compiled._result_columns
+    else:
+        compiled_query = compiled.string
+        args = []
+        result_map = None
+
+    return compiled_query, args, result_map
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index 981742ce..a047a85c 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -1,26 +1,36 @@
 import typing
-from collections.abc import Sequence
 
+import psycopg
 import psycopg_pool
+from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.sql import ClauseElement
 
-from databases.backends.dialects.psycopg import get_dialect
+from databases.backends.common.records import Record, create_column_maps
+from databases.backends.dialects.psycopg import compile_query, get_dialect
 from databases.core import DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
     DatabaseBackend,
+    Record as RecordInterface,
     TransactionBackend,
 )
 
 
 class PsycopgBackend(DatabaseBackend):
+    _database_url: DatabaseURL
+    _options: typing.Dict[str, typing.Any]
+    _dialect: Dialect
+    _pool: typing.Optional[psycopg_pool.AsyncConnectionPool]
+
     def __init__(
-        self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any
+        self,
+        database_url: typing.Union[DatabaseURL, str],
+        **options: typing.Dict[str, typing.Any],
     ) -> None:
         self._database_url = DatabaseURL(database_url)
         self._options = options
         self._dialect = get_dialect()
-        self._pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None
+        self._pool = None
 
     async def connect(self) -> None:
         if self._pool is not None:
@@ -28,22 +38,31 @@ async def connect(self) -> None:
 
         self._pool = psycopg_pool.AsyncConnectionPool(
             self._database_url.url, open=False, **self._options)
+
+        # TODO: Add configurable timeouts
         await self._pool.open()
 
     async def disconnect(self) -> None:
         if self._pool is None:
             return
 
+        # TODO: Add configurable timeouts
         await self._pool.close()
         self._pool = None
 
     def connection(self) -> "PsycopgConnection":
-        return PsycopgConnection(self)
+        return PsycopgConnection(self, self._dialect)
 
 
 class PsycopgConnection(ConnectionBackend):
-    def __init__(self, database: PsycopgBackend) -> None:
+    _database: PsycopgBackend
+    _dialect: Dialect
+    _connection: typing.Optional[psycopg.AsyncConnection]
+
+    def __init__(self, database: PsycopgBackend, dialect: Dialect) -> None:
         self._database = database
+        self._dialect = dialect
+        self._connection = None
 
     async def acquire(self) -> None:
         if self._connection is not None:
@@ -62,10 +81,17 @@ async def release(self) -> None:
         await self._database._pool.putconn(self._connection)
         self._connection = None
 
-    async def fetch_all(self, query: ClauseElement) -> typing.List["Record"]:
+    async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
+        if self._connection is None:
+            raise RuntimeError("Connection is not acquired")
+
+        query_str, args, result_columns = compile_query(query, self._dialect)
+        rows = await self._connection.fetch(query_str, *args)
+        column_maps = create_column_maps(result_columns)
+        return [Record(row, result_columns, self._dialect, column_maps) for row in rows]
         raise NotImplementedError()  # pragma: no cover
 
-    async def fetch_one(self, query: ClauseElement) -> typing.Optional["Record"]:
+    async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
         raise NotImplementedError()  # pragma: no cover
 
     async def fetch_val(
@@ -107,12 +133,3 @@ async def commit(self) -> None:
 
     async def rollback(self) -> None:
         raise NotImplementedError()  # pragma: no cover
-
-
-class Record(Sequence):
-    @property
-    def _mapping(self) -> typing.Mapping:
-        raise NotImplementedError()  # pragma: no cover
-
-    def __getitem__(self, key: typing.Any) -> typing.Any:
-        raise NotImplementedError()  # pragma: no cover

From 6d9256693fda84f21c210ac736d9adda058ce86e Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 02:15:22 +0500
Subject: [PATCH 05/15] S01E02

---
 databases/backends/dialects/psycopg.py |  8 +++-
 databases/backends/psycopg.py          | 65 +++++++++++++++++++++-----
 requirements.txt                       |  1 +
 setup.py                               |  2 +-
 4 files changed, 62 insertions(+), 14 deletions(-)

diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
index cfce052e..80bf5b76 100644
--- a/databases/backends/dialects/psycopg.py
+++ b/databases/backends/dialects/psycopg.py
@@ -58,8 +58,12 @@ def get_dialect() -> Dialect:
     return dialect
 
 
-def compile_query(query: ClauseElement, dialect: Dialect) -> typing.Tuple[str, list, tuple]:
-    compiled = query.compile(dialect=dialect, compile_kwargs={"render_postcompile": True})
+def compile_query(
+    query: ClauseElement, dialect: Dialect
+) -> typing.Tuple[str, list, tuple]:
+    compiled = query.compile(
+        dialect=dialect, compile_kwargs={"render_postcompile": True}
+    )
 
     if not isinstance(query, DDLElement):
         compiled_params = sorted(compiled.params.items())
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index a047a85c..0a31b7a7 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -37,7 +37,8 @@ async def connect(self) -> None:
             return
 
         self._pool = psycopg_pool.AsyncConnectionPool(
-            self._database_url.url, open=False, **self._options)
+            self._database_url._url, open=False, **self._options
+        )
 
         # TODO: Add configurable timeouts
         await self._pool.open()
@@ -86,13 +87,33 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
             raise RuntimeError("Connection is not acquired")
 
         query_str, args, result_columns = compile_query(query, self._dialect)
-        rows = await self._connection.fetch(query_str, *args)
+
+        async with self._connection.cursor() as cursor:
+            await cursor.execute(query_str, args)
+            rows = await cursor.fetchall()
+
         column_maps = create_column_maps(result_columns)
         return [Record(row, result_columns, self._dialect, column_maps) for row in rows]
-        raise NotImplementedError()  # pragma: no cover
 
     async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
-        raise NotImplementedError()  # pragma: no cover
+        if self._connection is None:
+            raise RuntimeError("Connection is not acquired")
+
+        query_str, args, result_columns = compile_query(query, self._dialect)
+
+        async with self._connection.cursor() as cursor:
+            await cursor.execute(query_str, args)
+            row = await cursor.fetchone()
+
+        if row is None:
+            return None
+
+        return Record(
+            row,
+            result_columns,
+            self._dialect,
+            create_column_maps(result_columns),
+        )
 
     async def fetch_val(
         self, query: ClauseElement, column: typing.Any = 0
@@ -101,25 +122,47 @@ async def fetch_val(
         return None if row is None else row[column]
 
     async def execute(self, query: ClauseElement) -> typing.Any:
-        raise NotImplementedError()  # pragma: no cover
+        if self._connection is None:
+            raise RuntimeError("Connection is not acquired")
+
+        query_str, args, _ = compile_query(query, self._dialect)
+
+        async with self._connection.cursor() as cursor:
+            await cursor.execute(query_str, args)
 
     async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
-        raise NotImplementedError()  # pragma: no cover
+        # TODO: Find a way to use psycopg's executemany
+        for query in queries:
+            await self.execute(query)
 
     async def iterate(
         self, query: ClauseElement
     ) -> typing.AsyncGenerator[typing.Mapping, None]:
-        raise NotImplementedError()  # pragma: no cover
-        # mypy needs async iterators to contain a `yield`
-        # https://github.com/python/mypy/issues/5385#issuecomment-407281656
-        yield True  # pragma: no cover
+        if self._connection is None:
+            raise RuntimeError("Connection is not acquired")
+
+        query_str, args, result_columns = compile_query(query, self._dialect)
+        column_maps = create_column_maps(result_columns)
+
+        async with self._connection.cursor() as cursor:
+            await cursor.execute(query_str, args)
+
+            while True:
+                row = await cursor.fetchone()
+
+                if row is None:
+                    break
+
+                yield Record(row, result_columns, self._dialect, column_maps)
 
     def transaction(self) -> "TransactionBackend":
         raise NotImplementedError()  # pragma: no cover
 
     @property
     def raw_connection(self) -> typing.Any:
-        raise NotImplementedError()  # pragma: no cover
+        if self._connection is None:
+            raise RuntimeError("Connection is not acquired")
+        return self._connection
 
 
 class PsycopgTransaction(TransactionBackend):
diff --git a/requirements.txt b/requirements.txt
index be086f77..785c998b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,6 +8,7 @@ aiosqlite==0.20.0
 asyncpg==0.29.0
 psycopg==3.1.18
 psycopg-binary==3.1.18
+psycopg-pool==3.2.1
 
 # Sync database drivers for standard tooling around setup/teardown/migrations.
 psycopg==3.1.18
diff --git a/setup.py b/setup.py
index 9218bc1f..4ad4fc5b 100644
--- a/setup.py
+++ b/setup.py
@@ -55,7 +55,7 @@ def get_packages(package):
         "postgresql": ["asyncpg"],
         "aiopg": ["aiopg"],
         "asyncpg": ["asyncpg"],
-        "psycopg3": ["psycopg"],
+        "psycopg3": ["psycopg", "psycopg-pool"],
         "sqlite": ["aiosqlite"],
         "aiosqlite": ["aiosqlite"],
     },

From 9d782217bb564eb3d9da0dc3f2606ea161357168 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 02:56:16 +0500
Subject: [PATCH 06/15] S01E03

---
 databases/backends/psycopg.py | 31 +++++++++++++++++++++++++++----
 databases/core.py             |  2 +-
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index 0a31b7a7..db9ec2fe 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -156,7 +156,7 @@ async def iterate(
                 yield Record(row, result_columns, self._dialect, column_maps)
 
     def transaction(self) -> "TransactionBackend":
-        raise NotImplementedError()  # pragma: no cover
+        return PsycopgTransaction(connection=self)
 
     @property
     def raw_connection(self) -> typing.Any:
@@ -166,13 +166,36 @@ def raw_connection(self) -> typing.Any:
 
 
 class PsycopgTransaction(TransactionBackend):
+    _connecttion: PsycopgConnection
+    _transaction: typing.Optional[psycopg.AsyncTransaction]
+
+    def __init__(self, connection: PsycopgConnection):
+        self._connection = connection
+        self._transaction: typing.Optional[psycopg.AsyncTransaction] = None
+
     async def start(
         self, is_root: bool, extra_options: typing.Dict[typing.Any, typing.Any]
     ) -> None:
-        raise NotImplementedError()  # pragma: no cover
+        if self._connection._connection is None:
+            raise RuntimeError("Connection is not acquired")
+
+        transaction = psycopg.AsyncTransaction(
+            self._connection._connection, **extra_options
+        )
+        async with transaction._conn.lock:
+            await transaction._conn.wait(transaction._enter_gen())
+        self._transaction = transaction
 
     async def commit(self) -> None:
-        raise NotImplementedError()  # pragma: no cover
+        if self._transaction is None:
+            raise RuntimeError("Transaction was not started")
+
+        async with self._transaction._conn.lock:
+            await self._transaction._conn.wait(self._transaction._commit_gen())
 
     async def rollback(self) -> None:
-        raise NotImplementedError()  # pragma: no cover
+        if self._transaction is None:
+            raise RuntimeError("Transaction was not started")
+
+        async with self._transaction._conn.lock:
+            await self._transaction._conn.wait(self._transaction._rollback_gen(None))
diff --git a/databases/core.py b/databases/core.py
index cba06ced..c09f8814 100644
--- a/databases/core.py
+++ b/databases/core.py
@@ -44,10 +44,10 @@
 class Database:
     SUPPORTED_BACKENDS = {
         "postgres": "databases.backends.asyncpg:AsyncpgBackend",
-        "postgresql": "databases.backends.asyncpg:AsyncpgBackend",
         "postgresql+aiopg": "databases.backends.aiopg:AiopgBackend",
         "postgresql+asyncpg": "databases.backends.asyncpg:AsyncpgBackend",
         "postgresql+psycopg": "databases.backends.psycopg:PsycopgBackend",
+        "postgresql": "databases.backends.psycopg:PsycopgBackend",
         "mysql": "databases.backends.mysql:MySQLBackend",
         "mysql+aiomysql": "databases.backends.asyncmy:MySQLBackend",
         "mysql+asyncmy": "databases.backends.asyncmy:AsyncMyBackend",

From d7ff8e85ccd82f57b0882a3f095da2b52a0f8a26 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 12:23:27 +0500
Subject: [PATCH 07/15] S01E04

---
 databases/backends/psycopg.py | 23 +++++++++++++++++------
 tests/test_databases.py       |  1 +
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index db9ec2fe..885e8336 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -3,10 +3,10 @@
 import psycopg
 import psycopg_pool
 from sqlalchemy.engine.interfaces import Dialect
+from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
 from sqlalchemy.sql import ClauseElement
 
 from databases.backends.common.records import Record, create_column_maps
-from databases.backends.dialects.psycopg import compile_query, get_dialect
 from databases.core import DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
@@ -29,7 +29,7 @@ def __init__(
     ) -> None:
         self._database_url = DatabaseURL(database_url)
         self._options = options
-        self._dialect = get_dialect()
+        self._dialect = PGDialect_psycopg()
         self._pool = None
 
     async def connect(self) -> None:
@@ -86,7 +86,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
         if self._connection is None:
             raise RuntimeError("Connection is not acquired")
 
-        query_str, args, result_columns = compile_query(query, self._dialect)
+        query_str, args, result_columns = self._compile(query)
 
         async with self._connection.cursor() as cursor:
             await cursor.execute(query_str, args)
@@ -99,7 +99,7 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterfa
         if self._connection is None:
             raise RuntimeError("Connection is not acquired")
 
-        query_str, args, result_columns = compile_query(query, self._dialect)
+        query_str, args, result_columns = self._compile(query)
 
         async with self._connection.cursor() as cursor:
             await cursor.execute(query_str, args)
@@ -125,7 +125,7 @@ async def execute(self, query: ClauseElement) -> typing.Any:
         if self._connection is None:
             raise RuntimeError("Connection is not acquired")
 
-        query_str, args, _ = compile_query(query, self._dialect)
+        query_str, args, _ = self._compile(query)
 
         async with self._connection.cursor() as cursor:
             await cursor.execute(query_str, args)
@@ -141,7 +141,7 @@ async def iterate(
         if self._connection is None:
             raise RuntimeError("Connection is not acquired")
 
-        query_str, args, result_columns = compile_query(query, self._dialect)
+        query_str, args, result_columns = self._compile(query)
         column_maps = create_column_maps(result_columns)
 
         async with self._connection.cursor() as cursor:
@@ -164,6 +164,17 @@ def raw_connection(self) -> typing.Any:
             raise RuntimeError("Connection is not acquired")
         return self._connection
 
+    def _compile(
+        self, query: ClauseElement,
+    ) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]:
+        compiled = query.compile(dialect=self._dialect)
+
+        compiled_query = compiled.string
+        params = compiled.params
+        result_map = compiled._result_columns
+
+        return compiled_query, params, result_map
+
 
 class PsycopgTransaction(TransactionBackend):
     _connecttion: PsycopgConnection
diff --git a/tests/test_databases.py b/tests/test_databases.py
index 90771b1d..5c0b61d1 100644
--- a/tests/test_databases.py
+++ b/tests/test_databases.py
@@ -1338,6 +1338,7 @@ async def test_queries_with_expose_backend_connection(database_url):
                     "mysql+asyncmy",
                     "mysql+aiomysql",
                     "postgresql+aiopg",
+                    "postgresql+psycopg",
                 ]:
                     insert_query = "INSERT INTO notes (text, completed) VALUES (%s, %s)"
                 else:

From ddd8aaa3efbf960b33e1bdeb7d1b72a915d8bb41 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 12:43:43 +0500
Subject: [PATCH 08/15] S01E05

---
 databases/backends/common/records.py |  1 -
 databases/backends/psycopg.py        | 13 +++++++------
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py
index f57de839..65032fc8 100644
--- a/databases/backends/common/records.py
+++ b/databases/backends/common/records.py
@@ -4,7 +4,6 @@
 from sqlalchemy.engine.row import Row as SQLRow
 from sqlalchemy.sql.compiler import _CompileLabel
 from sqlalchemy.sql.schema import Column
-from sqlalchemy.sql.sqltypes import JSON
 from sqlalchemy.types import TypeEngine
 
 from databases.interfaces import Record as RecordInterface
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index 885e8336..3392a54b 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -2,6 +2,7 @@
 
 import psycopg
 import psycopg_pool
+from psycopg.rows import namedtuple_row
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
 from sqlalchemy.sql import ClauseElement
@@ -58,12 +59,11 @@ def connection(self) -> "PsycopgConnection":
 class PsycopgConnection(ConnectionBackend):
     _database: PsycopgBackend
     _dialect: Dialect
-    _connection: typing.Optional[psycopg.AsyncConnection]
+    _connection: typing.Optional[psycopg.AsyncConnection] = None
 
     def __init__(self, database: PsycopgBackend, dialect: Dialect) -> None:
         self._database = database
         self._dialect = dialect
-        self._connection = None
 
     async def acquire(self) -> None:
         if self._connection is not None:
@@ -74,6 +74,7 @@ async def acquire(self) -> None:
 
         # TODO: Add configurable timeouts
         self._connection = await self._database._pool.getconn()
+        await self._connection.set_autocommit(True)
 
     async def release(self) -> None:
         if self._connection is None:
@@ -88,7 +89,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
 
         query_str, args, result_columns = self._compile(query)
 
-        async with self._connection.cursor() as cursor:
+        async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
             await cursor.execute(query_str, args)
             rows = await cursor.fetchall()
 
@@ -101,7 +102,7 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterfa
 
         query_str, args, result_columns = self._compile(query)
 
-        async with self._connection.cursor() as cursor:
+        async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
             await cursor.execute(query_str, args)
             row = await cursor.fetchone()
 
@@ -127,7 +128,7 @@ async def execute(self, query: ClauseElement) -> typing.Any:
 
         query_str, args, _ = self._compile(query)
 
-        async with self._connection.cursor() as cursor:
+        async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
             await cursor.execute(query_str, args)
 
     async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
@@ -144,7 +145,7 @@ async def iterate(
         query_str, args, result_columns = self._compile(query)
         column_maps = create_column_maps(result_columns)
 
-        async with self._connection.cursor() as cursor:
+        async with self._connection.cursor(row_factory=namedtuple_row) as cursor:
             await cursor.execute(query_str, args)
 
             while True:

From e08eb4f4fa78e72187d5d5c610ef47e07601a3c1 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 14:03:56 +0500
Subject: [PATCH 09/15] S01E06

---
 databases/backends/common/records.py | 4 ++++
 databases/backends/psycopg.py        | 5 ++++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py
index 65032fc8..5c7ae25c 100644
--- a/databases/backends/common/records.py
+++ b/databases/backends/common/records.py
@@ -1,4 +1,5 @@
 import typing
+from collections import namedtuple
 
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.engine.row import Row as SQLRow
@@ -39,6 +40,9 @@ def __init__(
 
     @property
     def _mapping(self) -> typing.Mapping:
+        if hasattr(self._row, "_asdict"):
+            return self._row._asdict()
+
         return self._row
 
     def keys(self) -> typing.KeysView:
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index 3392a54b..302f94e5 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -168,7 +168,10 @@ def raw_connection(self) -> typing.Any:
     def _compile(
         self, query: ClauseElement,
     ) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]:
-        compiled = query.compile(dialect=self._dialect)
+        compiled = query.compile(
+            dialect=self._dialect,
+            compile_kwargs={"render_postcompile": True},
+        )
 
         compiled_query = compiled.string
         params = compiled.params

From 1c58a73778c10500bbad4f7c0334cb44c30d650c Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 14:24:00 +0500
Subject: [PATCH 10/15] S01E07

---
 databases/backends/common/records.py |  6 ++++--
 databases/backends/psycopg.py        | 28 ++++++++++++++++++++++++----
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py
index 5c7ae25c..5f6ba263 100644
--- a/databases/backends/common/records.py
+++ b/databases/backends/common/records.py
@@ -1,5 +1,4 @@
 import typing
-from collections import namedtuple
 
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.engine.row import Row as SQLRow
@@ -53,7 +52,10 @@ def values(self) -> typing.ValuesView:
 
     def __getitem__(self, key: typing.Any) -> typing.Any:
         if len(self._column_map) == 0:
-            return self._row[key]
+            try:
+                return self._row[key]
+            except TypeError:
+                return self._mapping[key]
         elif isinstance(key, Column):
             idx, datatype = self._column_map_full[str(key)]
         elif isinstance(key, int):
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index 302f94e5..bb623cfa 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -3,9 +3,10 @@
 import psycopg
 import psycopg_pool
 from psycopg.rows import namedtuple_row
-from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
+from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.sql import ClauseElement
+from sqlalchemy.sql.schema import Column
 
 from databases.backends.common.records import Record, create_column_maps
 from databases.core import DatabaseURL
@@ -31,6 +32,7 @@ def __init__(
         self._database_url = DatabaseURL(database_url)
         self._options = options
         self._dialect = PGDialect_psycopg()
+        self._dialect.implicit_returning = True
         self._pool = None
 
     async def connect(self) -> None:
@@ -94,7 +96,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
             rows = await cursor.fetchall()
 
         column_maps = create_column_maps(result_columns)
-        return [Record(row, result_columns, self._dialect, column_maps) for row in rows]
+        return [PsycopgRecord(row, result_columns, self._dialect, column_maps) for row in rows]
 
     async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
         if self._connection is None:
@@ -109,7 +111,7 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterfa
         if row is None:
             return None
 
-        return Record(
+        return PsycopgRecord(
             row,
             result_columns,
             self._dialect,
@@ -154,7 +156,7 @@ async def iterate(
                 if row is None:
                     break
 
-                yield Record(row, result_columns, self._dialect, column_maps)
+                yield PsycopgRecord(row, result_columns, self._dialect, column_maps)
 
     def transaction(self) -> "TransactionBackend":
         return PsycopgTransaction(connection=self)
@@ -214,3 +216,21 @@ async def rollback(self) -> None:
 
         async with self._transaction._conn.lock:
             await self._transaction._conn.wait(self._transaction._rollback_gen(None))
+
+
+class PsycopgRecord(Record):
+    @property
+    def _mapping(self) -> typing.Mapping:
+        return self._row._asdict()
+
+    def __getitem__(self, key: typing.Any) -> typing.Any:
+        if len(self._column_map) == 0:
+            return self._mapping[key]
+        elif isinstance(key, Column):
+            idx, datatype = self._column_map_full[str(key)]
+        elif isinstance(key, int):
+            idx, datatype = self._column_map_int[key]
+        else:
+            idx, datatype = self._column_map[key]
+
+        return self._row[idx]

From 3f26f76056d0da62600a2fca5f3c23e6ee8bbae5 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 14:39:47 +0500
Subject: [PATCH 11/15] S01E08

---
 databases/backends/asyncpg.py          | 66 +++++++++++++++++++++-----
 databases/backends/dialects/psycopg.py | 43 +----------------
 databases/backends/psycopg.py          |  5 +-
 databases/core.py                      |  2 +-
 4 files changed, 58 insertions(+), 58 deletions(-)

diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py
index ff61fe26..124f7af1 100644
--- a/databases/backends/asyncpg.py
+++ b/databases/backends/asyncpg.py
@@ -4,10 +4,11 @@
 import asyncpg
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.sql import ClauseElement
+from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, create_column_maps
-from databases.backends.dialects.psycopg import compile_query, get_dialect
-from databases.core import DatabaseURL
+from databases.backends.dialects.psycopg import dialect as psycopg_dialect
+from databases.core import LOG_EXTRA, DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
     DatabaseBackend,
@@ -24,9 +25,20 @@ def __init__(
     ) -> None:
         self._database_url = DatabaseURL(database_url)
         self._options = options
-        self._dialect = get_dialect()
+        self._dialect = self._get_dialect()
         self._pool = None
 
+    def _get_dialect(self) -> Dialect:
+        dialect = psycopg_dialect(paramstyle="pyformat")
+        dialect.implicit_returning = True
+        dialect.supports_native_enum = True
+        dialect.supports_smallserial = True  # 9.2+
+        dialect._backslash_escapes = False
+        dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
+        dialect._has_native_hstore = True
+        dialect.supports_native_decimal = True
+        return dialect
+
     def _get_connection_kwargs(self) -> dict:
         url_options = self._database_url.options
 
@@ -87,7 +99,7 @@ async def release(self) -> None:
 
     async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, result_columns = compile_query(query, self._dialect)
+        query_str, args, result_columns = self._compile(query)
         rows = await self._connection.fetch(query_str, *args)
         dialect = self._dialect
         column_maps = create_column_maps(result_columns)
@@ -95,7 +107,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
 
     async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, result_columns = compile_query(query, self._dialect)
+        query_str, args, result_columns = self._compile(query)
         row = await self._connection.fetchrow(query_str, *args)
         if row is None:
             return None
@@ -123,7 +135,7 @@ async def fetch_val(
 
     async def execute(self, query: ClauseElement) -> typing.Any:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, _ = compile_query(query, self._dialect)
+        query_str, args, _ = self._compile(query)
         return await self._connection.fetchval(query_str, *args)
 
     async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
@@ -132,14 +144,14 @@ async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
         # loop through multiple executes here, which should all end up
         # using the same prepared statement.
         for single_query in queries:
-            single_query, args, _ = compile_query(single_query, self._dialect)
+            single_query, args, _ = self._compile(single_query)
             await self._connection.execute(single_query, *args)
 
     async def iterate(
         self, query: ClauseElement
     ) -> typing.AsyncGenerator[typing.Any, None]:
         assert self._connection is not None, "Connection is not acquired"
-        query_str, args, result_columns = compile_query(query, self._dialect)
+        query_str, args, result_columns = self._compile(query)
         column_maps = create_column_maps(result_columns)
         async for row in self._connection.cursor(query_str, *args):
             yield Record(row, result_columns, self._dialect, column_maps)
@@ -147,10 +159,40 @@ async def iterate(
     def transaction(self) -> TransactionBackend:
         return AsyncpgTransaction(connection=self)
 
-    @property
-    def raw_connection(self) -> asyncpg.connection.Connection:
-        assert self._connection is not None, "Connection is not acquired"
-        return self._connection
+    def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]:
+        compiled = query.compile(
+            dialect=self._dialect, compile_kwargs={"render_postcompile": True}
+        )
+
+        if not isinstance(query, DDLElement):
+            compiled_params = sorted(compiled.params.items())
+
+            mapping = {
+                key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1)
+            }
+            compiled_query = compiled.string % mapping
+
+            processors = compiled._bind_processors
+            args = [
+                processors[key](val) if key in processors else val
+                for key, val in compiled_params
+            ]
+            result_map = compiled._result_columns
+        else:
+            compiled_query = compiled.string
+            args = []
+            result_map = None
+
+        query_message = compiled_query.replace(" \n", " ").replace("\n", " ")
+        logger.debug(
+            "Query: %s Args: %s", query_message, repr(tuple(args)), extra=LOG_EXTRA
+        )
+        return compiled_query, args, result_map
+
+        @property
+        def raw_connection(self) -> asyncpg.connection.Connection:
+            assert self._connection is not None, "Connection is not acquired"
+            return self._connection
 
 
 class AsyncpgTransaction(TransactionBackend):
diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
index 80bf5b76..07bd1880 100644
--- a/databases/backends/dialects/psycopg.py
+++ b/databases/backends/dialects/psycopg.py
@@ -10,9 +10,6 @@
 from sqlalchemy import types, util
 from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext
 from sqlalchemy.engine import processors
-from sqlalchemy.engine.interfaces import Dialect
-from sqlalchemy.sql import ClauseElement
-from sqlalchemy.sql.ddl import DDLElement
 from sqlalchemy.types import Float, Numeric
 
 
@@ -46,42 +43,4 @@ class PGDialect_psycopg(PGDialect):
     execution_ctx_cls = PGExecutionContext_psycopg
 
 
-def get_dialect() -> Dialect:
-    dialect = PGDialect_psycopg(paramstyle="pyformat")
-    dialect.implicit_returning = True
-    dialect.supports_native_enum = True
-    dialect.supports_smallserial = True  # 9.2+
-    dialect._backslash_escapes = False
-    dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
-    dialect._has_native_hstore = True
-    dialect.supports_native_decimal = True
-    return dialect
-
-
-def compile_query(
-    query: ClauseElement, dialect: Dialect
-) -> typing.Tuple[str, list, tuple]:
-    compiled = query.compile(
-        dialect=dialect, compile_kwargs={"render_postcompile": True}
-    )
-
-    if not isinstance(query, DDLElement):
-        compiled_params = sorted(compiled.params.items())
-
-        mapping = {
-            key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1)
-        }
-        compiled_query = compiled.string % mapping
-
-        processors = compiled._bind_processors
-        args = [
-            processors[key](val) if key in processors else val
-            for key, val in compiled_params
-        ]
-        result_map = compiled._result_columns
-    else:
-        compiled_query = compiled.string
-        args = []
-        result_map = None
-
-    return compiled_query, args, result_map
+dialect = PGDialect_psycopg
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index bb623cfa..f83f4917 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -39,9 +39,8 @@ async def connect(self) -> None:
         if self._pool is not None:
             return
 
-        self._pool = psycopg_pool.AsyncConnectionPool(
-            self._database_url._url, open=False, **self._options
-        )
+        url = self._database_url._url.replace("postgresql+psycopg", "postgresql")
+        self._pool = psycopg_pool.AsyncConnectionPool(url, open=False, **self._options)
 
         # TODO: Add configurable timeouts
         await self._pool.open()
diff --git a/databases/core.py b/databases/core.py
index c09f8814..cba06ced 100644
--- a/databases/core.py
+++ b/databases/core.py
@@ -44,10 +44,10 @@
 class Database:
     SUPPORTED_BACKENDS = {
         "postgres": "databases.backends.asyncpg:AsyncpgBackend",
+        "postgresql": "databases.backends.asyncpg:AsyncpgBackend",
         "postgresql+aiopg": "databases.backends.aiopg:AiopgBackend",
         "postgresql+asyncpg": "databases.backends.asyncpg:AsyncpgBackend",
         "postgresql+psycopg": "databases.backends.psycopg:PsycopgBackend",
-        "postgresql": "databases.backends.psycopg:PsycopgBackend",
         "mysql": "databases.backends.mysql:MySQLBackend",
         "mysql+aiomysql": "databases.backends.asyncmy:MySQLBackend",
         "mysql+asyncmy": "databases.backends.asyncmy:AsyncMyBackend",

From cdbf97f53bb13a58c06e6f444a1abacf474785be Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 15:33:39 +0500
Subject: [PATCH 12/15] S01E09

---
 databases/backends/aiopg.py              |  6 +-
 databases/backends/asyncpg.py            |  4 +-
 databases/backends/compilers/__init__.py |  0
 databases/backends/compilers/psycopg.py  | 17 -----
 databases/backends/dialects/__init__.py  |  0
 databases/backends/dialects/psycopg.py   | 46 -------------
 databases/backends/psycopg.py            | 15 +++--
 tests/test_databases.py                  | 86 +++++++++---------------
 8 files changed, 43 insertions(+), 131 deletions(-)
 delete mode 100644 databases/backends/compilers/__init__.py
 delete mode 100644 databases/backends/compilers/psycopg.py
 delete mode 100644 databases/backends/dialects/__init__.py
 delete mode 100644 databases/backends/dialects/psycopg.py

diff --git a/databases/backends/aiopg.py b/databases/backends/aiopg.py
index 0b4d95a3..1df30699 100644
--- a/databases/backends/aiopg.py
+++ b/databases/backends/aiopg.py
@@ -5,15 +5,13 @@
 import uuid
 
 import aiopg
+from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
 from sqlalchemy.engine.cursor import CursorResultMetaData
 from sqlalchemy.engine.interfaces import Dialect, ExecutionContext
-from sqlalchemy.engine.row import Row
 from sqlalchemy.sql import ClauseElement
 from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, Row, create_column_maps
-from databases.backends.compilers.psycopg import PGCompiler_psycopg
-from databases.backends.dialects.psycopg import PGDialect_psycopg
 from databases.core import LOG_EXTRA, DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
@@ -38,12 +36,10 @@ def _get_dialect(self) -> Dialect:
         dialect = PGDialect_psycopg(
             json_serializer=json.dumps, json_deserializer=lambda x: x
         )
-        dialect.statement_compiler = PGCompiler_psycopg
         dialect.implicit_returning = True
         dialect.supports_native_enum = True
         dialect.supports_smallserial = True  # 9.2+
         dialect._backslash_escapes = False
-        dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
         dialect._has_native_hstore = True
         dialect.supports_native_decimal = True
 
diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py
index 124f7af1..cc6441f5 100644
--- a/databases/backends/asyncpg.py
+++ b/databases/backends/asyncpg.py
@@ -2,12 +2,12 @@
 import typing
 
 import asyncpg
+from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.sql import ClauseElement
 from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, create_column_maps
-from databases.backends.dialects.psycopg import dialect as psycopg_dialect
 from databases.core import LOG_EXTRA, DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
@@ -29,7 +29,7 @@ def __init__(
         self._pool = None
 
     def _get_dialect(self) -> Dialect:
-        dialect = psycopg_dialect(paramstyle="pyformat")
+        dialect = PGDialect_psycopg(paramstyle="pyformat")
         dialect.implicit_returning = True
         dialect.supports_native_enum = True
         dialect.supports_smallserial = True  # 9.2+
diff --git a/databases/backends/compilers/__init__.py b/databases/backends/compilers/__init__.py
deleted file mode 100644
index e69de29b..00000000
diff --git a/databases/backends/compilers/psycopg.py b/databases/backends/compilers/psycopg.py
deleted file mode 100644
index 654c22a1..00000000
--- a/databases/backends/compilers/psycopg.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from sqlalchemy.dialects.postgresql.psycopg import PGCompiler_psycopg
-
-
-class APGCompiler_psycopg2(PGCompiler_psycopg):
-    def construct_params(self, *args, **kwargs):
-        pd = super().construct_params(*args, **kwargs)
-
-        for column in self.prefetch:
-            pd[column.key] = self._exec_default(column.default)
-
-        return pd
-
-    def _exec_default(self, default):
-        if default.is_callable:
-            return default.arg(self.dialect)
-        else:
-            return default.arg
diff --git a/databases/backends/dialects/__init__.py b/databases/backends/dialects/__init__.py
deleted file mode 100644
index e69de29b..00000000
diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
deleted file mode 100644
index 07bd1880..00000000
--- a/databases/backends/dialects/psycopg.py
+++ /dev/null
@@ -1,46 +0,0 @@
-"""
-All the unique changes for the databases package
-with the custom Numeric as the deprecated pypostgresql
-for backwards compatibility and to make sure the
-package can go to SQLAlchemy 2.0+.
-"""
-
-import typing
-
-from sqlalchemy import types, util
-from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext
-from sqlalchemy.engine import processors
-from sqlalchemy.types import Float, Numeric
-
-
-class PGExecutionContext_psycopg(PGExecutionContext):
-    ...
-
-
-class PGNumeric(Numeric):
-    def bind_processor(
-        self, dialect: typing.Any
-    ) -> typing.Union[str, None]:  # pragma: no cover
-        return processors.to_str
-
-    def result_processor(
-        self, dialect: typing.Any, coltype: typing.Any
-    ) -> typing.Union[float, None]:  # pragma: no cover
-        if self.asdecimal:
-            return None
-        else:
-            return processors.to_float
-
-
-class PGDialect_psycopg(PGDialect):
-    colspecs = util.update_copy(
-        PGDialect.colspecs,
-        {
-            types.Numeric: PGNumeric,
-            types.Float: Float,
-        },
-    )
-    execution_ctx_cls = PGExecutionContext_psycopg
-
-
-dialect = PGDialect_psycopg
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index f83f4917..da0a6718 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -22,7 +22,7 @@ class PsycopgBackend(DatabaseBackend):
     _database_url: DatabaseURL
     _options: typing.Dict[str, typing.Any]
     _dialect: Dialect
-    _pool: typing.Optional[psycopg_pool.AsyncConnectionPool]
+    _pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None
 
     def __init__(
         self,
@@ -33,7 +33,6 @@ def __init__(
         self._options = options
         self._dialect = PGDialect_psycopg()
         self._dialect.implicit_returning = True
-        self._pool = None
 
     async def connect(self) -> None:
         if self._pool is not None:
@@ -95,7 +94,10 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
             rows = await cursor.fetchall()
 
         column_maps = create_column_maps(result_columns)
-        return [PsycopgRecord(row, result_columns, self._dialect, column_maps) for row in rows]
+        return [
+            PsycopgRecord(row, result_columns, self._dialect, column_maps)
+            for row in rows
+        ]
 
     async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]:
         if self._connection is None:
@@ -167,7 +169,8 @@ def raw_connection(self) -> typing.Any:
         return self._connection
 
     def _compile(
-        self, query: ClauseElement,
+        self,
+        query: ClauseElement,
     ) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]:
         compiled = query.compile(
             dialect=self._dialect,
@@ -224,7 +227,9 @@ def _mapping(self) -> typing.Mapping:
 
     def __getitem__(self, key: typing.Any) -> typing.Any:
         if len(self._column_map) == 0:
-            return self._mapping[key]
+            if isinstance(key, str):
+                return self._mapping[key]
+            return self._row[key]
         elif isinstance(key, Column):
             idx, datatype = self._column_map_full[str(key)]
         elif isinstance(key, int):
diff --git a/tests/test_databases.py b/tests/test_databases.py
index 5c0b61d1..66164aea 100644
--- a/tests/test_databases.py
+++ b/tests/test_databases.py
@@ -204,17 +204,17 @@ async def test_queries(database_url):
 
             assert len(results) == 3
             assert results[0]["text"] == "example1"
-            assert results[0]["completed"] == True
+            assert results[0]["completed"] is True
             assert results[1]["text"] == "example2"
-            assert results[1]["completed"] == False
+            assert results[1]["completed"] is False
             assert results[2]["text"] == "example3"
-            assert results[2]["completed"] == True
+            assert results[2]["completed"] is True
 
             # fetch_one()
             query = notes.select()
             result = await database.fetch_one(query=query)
             assert result["text"] == "example1"
-            assert result["completed"] == True
+            assert result["completed"] is True
 
             # fetch_val()
             query = sqlalchemy.sql.select(*[notes.c.text])
@@ -246,11 +246,11 @@ async def test_queries(database_url):
                 iterate_results.append(result)
             assert len(iterate_results) == 3
             assert iterate_results[0]["text"] == "example1"
-            assert iterate_results[0]["completed"] == True
+            assert iterate_results[0]["completed"] is True
             assert iterate_results[1]["text"] == "example2"
-            assert iterate_results[1]["completed"] == False
+            assert iterate_results[1]["completed"] is False
             assert iterate_results[2]["text"] == "example3"
-            assert iterate_results[2]["completed"] == True
+            assert iterate_results[2]["completed"] is True
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)
@@ -280,26 +280,26 @@ async def test_queries_raw(database_url):
             results = await database.fetch_all(query=query, values={"completed": True})
             assert len(results) == 2
             assert results[0]["text"] == "example1"
-            assert results[0]["completed"] == True
+            assert results[0]["completed"] is True
             assert results[1]["text"] == "example3"
-            assert results[1]["completed"] == True
+            assert results[1]["completed"] is True
 
             # fetch_one()
             query = "SELECT * FROM notes WHERE completed = :completed"
             result = await database.fetch_one(query=query, values={"completed": False})
             assert result["text"] == "example2"
-            assert result["completed"] == False
+            assert result["completed"] is False
 
             # fetch_val()
             query = "SELECT completed FROM notes WHERE text = :text"
             result = await database.fetch_val(query=query, values={"text": "example1"})
-            assert result == True
+            assert result is True
 
             query = "SELECT * FROM notes WHERE text = :text"
             result = await database.fetch_val(
                 query=query, values={"text": "example1"}, column="completed"
             )
-            assert result == True
+            assert result is True
 
             # iterate()
             query = "SELECT * FROM notes"
@@ -308,11 +308,11 @@ async def test_queries_raw(database_url):
                 iterate_results.append(result)
             assert len(iterate_results) == 3
             assert iterate_results[0]["text"] == "example1"
-            assert iterate_results[0]["completed"] == True
+            assert iterate_results[0]["completed"] is True
             assert iterate_results[1]["text"] == "example2"
-            assert iterate_results[1]["completed"] == False
+            assert iterate_results[1]["completed"] is False
             assert iterate_results[2]["text"] == "example3"
-            assert iterate_results[2]["completed"] == True
+            assert iterate_results[2]["completed"] is True
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)
@@ -380,7 +380,7 @@ async def test_results_support_mapping_interface(database_url):
 
             assert isinstance(results_as_dicts[0]["id"], int)
             assert results_as_dicts[0]["text"] == "example1"
-            assert results_as_dicts[0]["completed"] == True
+            assert results_as_dicts[0]["completed"] is True
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)
@@ -467,7 +467,7 @@ async def test_execute_return_val(database_url):
                 query = notes.select().where(notes.c.id == pk)
                 result = await database.fetch_one(query)
                 assert result["text"] == "example1"
-                assert result["completed"] == True
+                assert result["completed"] is True
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)
@@ -857,7 +857,7 @@ async def test_transaction_commit_low_level(database_url):
             try:
                 query = notes.insert().values(text="example1", completed=True)
                 await database.execute(query)
-            except:  # pragma: no cover
+            except Exception:  # pragma: no cover
                 await transaction.rollback()
             else:
                 await transaction.commit()
@@ -881,7 +881,7 @@ async def test_transaction_rollback_low_level(database_url):
                 query = notes.insert().values(text="example1", completed=True)
                 await database.execute(query)
                 raise RuntimeError()
-            except:
+            except Exception:
                 await transaction.rollback()
             else:  # pragma: no cover
                 await transaction.commit()
@@ -1354,13 +1354,12 @@ async def test_queries_with_expose_backend_connection(database_url):
                 ]:
                     cursor = await raw_connection.cursor()
                     await cursor.execute(insert_query, values)
-                elif database.url.scheme == "mysql+asyncmy":
+                elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]:
                     async with raw_connection.cursor() as cursor:
                         await cursor.execute(insert_query, values)
                 elif database.url.scheme in [
                     "postgresql",
                     "postgresql+asyncpg",
-                    "postgresql+psycopg",
                 ]:
                     await raw_connection.execute(insert_query, *values)
                 elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]:
@@ -1372,7 +1371,7 @@ async def test_queries_with_expose_backend_connection(database_url):
                 if database.url.scheme in ["mysql", "mysql+aiomysql"]:
                     cursor = await raw_connection.cursor()
                     await cursor.executemany(insert_query, values)
-                elif database.url.scheme == "mysql+asyncmy":
+                elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]:
                     async with raw_connection.cursor() as cursor:
                         await cursor.executemany(insert_query, values)
                 elif database.url.scheme == "postgresql+aiopg":
@@ -1395,15 +1394,11 @@ async def test_queries_with_expose_backend_connection(database_url):
                     cursor = await raw_connection.cursor()
                     await cursor.execute(select_query)
                     results = await cursor.fetchall()
-                elif database.url.scheme == "mysql+asyncmy":
+                elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]:
                     async with raw_connection.cursor() as cursor:
                         await cursor.execute(select_query)
                         results = await cursor.fetchall()
-                elif database.url.scheme in [
-                    "postgresql",
-                    "postgresql+asyncpg",
-                    "postgresql+psycopg",
-                ]:
+                elif database.url.scheme in ["postgresql", "postgresql+asyncpg"]:
                     results = await raw_connection.fetch(select_query)
                 elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]:
                     results = await raw_connection.execute_fetchall(select_query)
@@ -1411,20 +1406,16 @@ async def test_queries_with_expose_backend_connection(database_url):
                 assert len(results) == 3
                 # Raw output for the raw request
                 assert results[0][1] == "example1"
-                assert results[0][2] == True
+                assert results[0][2] is True
                 assert results[1][1] == "example2"
-                assert results[1][2] == False
+                assert results[1][2] is False
                 assert results[2][1] == "example3"
-                assert results[2][2] == True
+                assert results[2][2] is True
 
                 # fetch_one()
-                if database.url.scheme in [
-                    "postgresql",
-                    "postgresql+asyncpg",
-                    "postgresql+psycopg",
-                ]:
+                if database.url.scheme in ["postgresql", "postgresql+asyncpg"]:
                     result = await raw_connection.fetchrow(select_query)
-                elif database.url.scheme == "mysql+asyncmy":
+                elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]:
                     async with raw_connection.cursor() as cursor:
                         await cursor.execute(select_query)
                         result = await cursor.fetchone()
@@ -1435,7 +1426,7 @@ async def test_queries_with_expose_backend_connection(database_url):
 
                 # Raw output for the raw request
                 assert result[1] == "example1"
-                assert result[2] == True
+                assert result[2] is True
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)
@@ -1606,7 +1597,7 @@ async def test_column_names(database_url, select_query):
 
             assert sorted(results[0]._mapping.keys()) == ["completed", "id", "text"]
             assert results[0]["text"] == "example1"
-            assert results[0]["completed"] == True
+            assert results[0]["completed"] is True
 
 
 @pytest.mark.parametrize("database_url", DATABASE_URLS)
@@ -1641,23 +1632,6 @@ async def test_result_named_access(database_url):
         assert result.completed is True
 
 
-@pytest.mark.parametrize("database_url", DATABASE_URLS)
-@async_adapter
-async def test_mapping_property_interface(database_url):
-    """
-    Test that all connections implement interface with `_mapping` property
-    """
-    async with Database(database_url) as database:
-        query = notes.select()
-        single_result = await database.fetch_one(query=query)
-        assert single_result._mapping["text"] == "example1"
-        assert single_result._mapping["completed"] is True
-
-        list_result = await database.fetch_all(query=query)
-        assert list_result[0]._mapping["text"] == "example1"
-        assert list_result[0]._mapping["completed"] is True
-
-
 @async_adapter
 async def test_should_not_maintain_ref_when_no_cache_param():
     async with Database(

From 6910288add45ba7362ed3416ce7dc730502944fd Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 17:30:07 +0500
Subject: [PATCH 13/15] S01E10

---
 databases/backends/aiopg.py              |  5 ++-
 databases/backends/asyncpg.py            | 14 ++++----
 databases/backends/compilers/__init__.py |  0
 databases/backends/compilers/psycopg.py  | 17 ++++++++++
 databases/backends/dialects/__init__.py  |  0
 databases/backends/dialects/psycopg.py   | 42 ++++++++++++++++++++++++
 6 files changed, 71 insertions(+), 7 deletions(-)
 create mode 100644 databases/backends/compilers/__init__.py
 create mode 100644 databases/backends/compilers/psycopg.py
 create mode 100644 databases/backends/dialects/__init__.py
 create mode 100644 databases/backends/dialects/psycopg.py

diff --git a/databases/backends/aiopg.py b/databases/backends/aiopg.py
index 1df30699..9928f8b3 100644
--- a/databases/backends/aiopg.py
+++ b/databases/backends/aiopg.py
@@ -5,13 +5,14 @@
 import uuid
 
 import aiopg
-from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
 from sqlalchemy.engine.cursor import CursorResultMetaData
 from sqlalchemy.engine.interfaces import Dialect, ExecutionContext
 from sqlalchemy.sql import ClauseElement
 from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, Row, create_column_maps
+from databases.backends.compilers.psycopg import PGCompiler_psycopg
+from databases.backends.dialects.psycopg import PGDialect_psycopg
 from databases.core import LOG_EXTRA, DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
@@ -36,10 +37,12 @@ def _get_dialect(self) -> Dialect:
         dialect = PGDialect_psycopg(
             json_serializer=json.dumps, json_deserializer=lambda x: x
         )
+        dialect.statement_compiler = PGCompiler_psycopg
         dialect.implicit_returning = True
         dialect.supports_native_enum = True
         dialect.supports_smallserial = True  # 9.2+
         dialect._backslash_escapes = False
+        dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
         dialect._has_native_hstore = True
         dialect.supports_native_decimal = True
 
diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py
index cc6441f5..92ad93b0 100644
--- a/databases/backends/asyncpg.py
+++ b/databases/backends/asyncpg.py
@@ -2,12 +2,12 @@
 import typing
 
 import asyncpg
-from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
 from sqlalchemy.engine.interfaces import Dialect
 from sqlalchemy.sql import ClauseElement
 from sqlalchemy.sql.ddl import DDLElement
 
 from databases.backends.common.records import Record, create_column_maps
+from databases.backends.dialects.psycopg import dialect as psycopg_dialect
 from databases.core import LOG_EXTRA, DatabaseURL
 from databases.interfaces import (
     ConnectionBackend,
@@ -29,7 +29,8 @@ def __init__(
         self._pool = None
 
     def _get_dialect(self) -> Dialect:
-        dialect = PGDialect_psycopg(paramstyle="pyformat")
+        dialect = psycopg_dialect(paramstyle="pyformat")
+
         dialect.implicit_returning = True
         dialect.supports_native_enum = True
         dialect.supports_smallserial = True  # 9.2+
@@ -37,6 +38,7 @@ def _get_dialect(self) -> Dialect:
         dialect.supports_sane_multi_rowcount = True  # psycopg 2.0.9+
         dialect._has_native_hstore = True
         dialect.supports_native_decimal = True
+
         return dialect
 
     def _get_connection_kwargs(self) -> dict:
@@ -189,10 +191,10 @@ def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]:
         )
         return compiled_query, args, result_map
 
-        @property
-        def raw_connection(self) -> asyncpg.connection.Connection:
-            assert self._connection is not None, "Connection is not acquired"
-            return self._connection
+    @property
+    def raw_connection(self) -> asyncpg.connection.Connection:
+        assert self._connection is not None, "Connection is not acquired"
+        return self._connection
 
 
 class AsyncpgTransaction(TransactionBackend):
diff --git a/databases/backends/compilers/__init__.py b/databases/backends/compilers/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/databases/backends/compilers/psycopg.py b/databases/backends/compilers/psycopg.py
new file mode 100644
index 00000000..654c22a1
--- /dev/null
+++ b/databases/backends/compilers/psycopg.py
@@ -0,0 +1,17 @@
+from sqlalchemy.dialects.postgresql.psycopg import PGCompiler_psycopg
+
+
+class APGCompiler_psycopg2(PGCompiler_psycopg):
+    def construct_params(self, *args, **kwargs):
+        pd = super().construct_params(*args, **kwargs)
+
+        for column in self.prefetch:
+            pd[column.key] = self._exec_default(column.default)
+
+        return pd
+
+    def _exec_default(self, default):
+        if default.is_callable:
+            return default.arg(self.dialect)
+        else:
+            return default.arg
diff --git a/databases/backends/dialects/__init__.py b/databases/backends/dialects/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
new file mode 100644
index 00000000..7740d976
--- /dev/null
+++ b/databases/backends/dialects/psycopg.py
@@ -0,0 +1,42 @@
+"""
+All the unique changes for the databases package
+with the custom Numeric as the deprecated pypostgresql
+for backwards compatibility and to make sure the
+package can go to SQLAlchemy 2.0+.
+"""
+
+import typing
+
+from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext
+from sqlalchemy.engine import processors
+from sqlalchemy.types import Numeric
+
+
+class PGExecutionContext_psycopg(PGExecutionContext):
+    ...
+
+
+class PGNumeric(Numeric):
+    def bind_processor(
+        self, dialect: typing.Any
+    ) -> typing.Union[str, None]:  # pragma: no cover
+        return processors.to_str
+
+    def result_processor(
+        self, dialect: typing.Any, coltype: typing.Any
+    ) -> typing.Union[float, None]:  # pragma: no cover
+        if self.asdecimal:
+            return None
+        else:
+            return processors.to_float
+
+
+class PGDialect_psycopg(PGDialect):
+    colspecs = {
+        **PGDialect.colspecs,
+        Numeric: PGNumeric,
+    }
+    execution_ctx_cls = PGExecutionContext_psycopg
+
+
+dialect = PGDialect_psycopg

From 09c1b2bbb318b12dd97d956416afec88fbdb5b03 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 17:58:10 +0500
Subject: [PATCH 14/15] S01E11

---
 databases/backends/common/records.py   |  8 +-------
 databases/backends/dialects/psycopg.py | 14 +++++++++-----
 databases/backends/psycopg.py          | 22 ++++++++++++++++++++--
 3 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py
index 5f6ba263..65032fc8 100644
--- a/databases/backends/common/records.py
+++ b/databases/backends/common/records.py
@@ -39,9 +39,6 @@ def __init__(
 
     @property
     def _mapping(self) -> typing.Mapping:
-        if hasattr(self._row, "_asdict"):
-            return self._row._asdict()
-
         return self._row
 
     def keys(self) -> typing.KeysView:
@@ -52,10 +49,7 @@ def values(self) -> typing.ValuesView:
 
     def __getitem__(self, key: typing.Any) -> typing.Any:
         if len(self._column_map) == 0:
-            try:
-                return self._row[key]
-            except TypeError:
-                return self._mapping[key]
+            return self._row[key]
         elif isinstance(key, Column):
             idx, datatype = self._column_map_full[str(key)]
         elif isinstance(key, int):
diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py
index 7740d976..07bd1880 100644
--- a/databases/backends/dialects/psycopg.py
+++ b/databases/backends/dialects/psycopg.py
@@ -7,9 +7,10 @@
 
 import typing
 
+from sqlalchemy import types, util
 from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext
 from sqlalchemy.engine import processors
-from sqlalchemy.types import Numeric
+from sqlalchemy.types import Float, Numeric
 
 
 class PGExecutionContext_psycopg(PGExecutionContext):
@@ -32,10 +33,13 @@ def result_processor(
 
 
 class PGDialect_psycopg(PGDialect):
-    colspecs = {
-        **PGDialect.colspecs,
-        Numeric: PGNumeric,
-    }
+    colspecs = util.update_copy(
+        PGDialect.colspecs,
+        {
+            types.Numeric: PGNumeric,
+            types.Float: Float,
+        },
+    )
     execution_ctx_cls = PGExecutionContext_psycopg
 
 
diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index da0a6718..eb3cec16 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -1,6 +1,9 @@
 import typing
 
+import orjson
 import psycopg
+import psycopg.adapt
+import psycopg.types
 import psycopg_pool
 from psycopg.rows import namedtuple_row
 from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg
@@ -18,6 +21,16 @@
 )
 
 
+class JsonLoader(psycopg.adapt.Loader):
+    def load(self, data):
+        return orjson.loads(data)
+
+
+class JsonDumper(psycopg.adapt.Dumper):
+    def dump(self, data):
+        return orjson.dumps(data)
+
+
 class PsycopgBackend(DatabaseBackend):
     _database_url: DatabaseURL
     _options: typing.Dict[str, typing.Any]
@@ -73,8 +86,13 @@ async def acquire(self) -> None:
             raise RuntimeError("PsycopgBackend is not running")
 
         # TODO: Add configurable timeouts
-        self._connection = await self._database._pool.getconn()
-        await self._connection.set_autocommit(True)
+        connection = await self._database._pool.getconn()
+        connection.adapters.register_loader("json", JsonLoader)
+        connection.adapters.register_loader("jsonb", JsonLoader)
+        connection.adapters.register_dumper(dict, JsonDumper)
+        connection.adapters.register_dumper(list, JsonDumper)
+        await connection.set_autocommit(True)
+        self._connection = connection
 
     async def release(self) -> None:
         if self._connection is None:

From e7358af76575df8bd72c5b12fc6d7de92f086750 Mon Sep 17 00:00:00 2001
From: ansipunk <ansipunk@use.startmail.com>
Date: Sun, 3 Mar 2024 18:13:37 +0500
Subject: [PATCH 15/15] S01E12

---
 databases/backends/psycopg.py | 23 ++++++++++++++++++++---
 requirements.txt              |  3 +++
 setup.py                      |  1 +
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py
index eb3cec16..527b2600 100644
--- a/databases/backends/psycopg.py
+++ b/databases/backends/psycopg.py
@@ -1,6 +1,5 @@
 import typing
 
-import orjson
 import psycopg
 import psycopg.adapt
 import psycopg.types
@@ -20,15 +19,33 @@
     TransactionBackend,
 )
 
+try:
+    import orjson
+
+    def load(data):
+        return orjson.loads(data)
+
+    def dump(data):
+        return orjson.dumps(data)
+
+except ImportError:
+    import json
+
+    def load(data):
+        return json.loads(data.decode("utf-8"))
+
+    def dump(data):
+        return json.dumps(data).encode("utf-8")
+
 
 class JsonLoader(psycopg.adapt.Loader):
     def load(self, data):
-        return orjson.loads(data)
+        return load(data)
 
 
 class JsonDumper(psycopg.adapt.Dumper):
     def dump(self, data):
-        return orjson.dumps(data)
+        return dump(data)
 
 
 class PsycopgBackend(DatabaseBackend):
diff --git a/requirements.txt b/requirements.txt
index 785c998b..450b1c63 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,8 @@
 -e .
 
+# Speedups
+orjson==3.9.15
+
 # Async database drivers
 asyncmy==0.2.9
 aiomysql==0.2.0
diff --git a/setup.py b/setup.py
index 4ad4fc5b..33b6f137 100644
--- a/setup.py
+++ b/setup.py
@@ -58,6 +58,7 @@ def get_packages(package):
         "psycopg3": ["psycopg", "psycopg-pool"],
         "sqlite": ["aiosqlite"],
         "aiosqlite": ["aiosqlite"],
+        "orjson": ["orjson"],
     },
     classifiers=[
         "Development Status :: 3 - Alpha",