Skip to content

Commit

Permalink
tests: update tests to be more scalable
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwotherspoon committed Sep 4, 2024
1 parent bc13665 commit 12b4b72
Showing 1 changed file with 84 additions and 53 deletions.
137 changes: 84 additions & 53 deletions tests/system/test_pg8000_connection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""""
"""
Copyright 2021 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,81 +13,112 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

from datetime import datetime
import os
from typing import Generator
import uuid
from typing import Tuple

# [START cloud_sql_connector_postgres_pg8000]
import pg8000
import pytest
import sqlalchemy

from google.cloud.sql.connector import Connector

# [END cloud_sql_connector_postgres_pg8000]

table_name = f"books_{uuid.uuid4().hex}"

def create_sqlalchemy_engine(
inst_uri: str,
user: str,
password: str,
db: str,
refresh_strategy: str = "background",
) -> Tuple[sqlalchemy.engine.Engine, Connector]:
"""Creates a connection pool for a Cloud SQL instance and returns the pool
and the connector. Callers are responsible for closing the pool and the
connector.
A sample invocation looks like:
engine, connector = create_sqlalchemy_engine(
inst_uri,
user,
password,
db,
)
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
# do something with query result
connector.close()
Args:
instance_connection_name (str):
The instance connection name specifies the instance relative to the
project and region. For example: "my-project:my-region:my-instance"
user (str):
The database user name, e.g., postgres
password (str):
The database user's password, e.g., secret-password
db (str):
The name of the database, e.g., mydb
refresh_strategy (Optional[str]):
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
or "background". For serverless environments use "lazy" to avoid
errors resulting from CPU being throttled.
"""
connector = Connector(refresh_strategy=refresh_strategy)

# [START cloud_sql_connector_postgres_pg8000]
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'creator' argument to 'create_engine'
def init_connection_engine() -> sqlalchemy.engine.Engine:
def getconn() -> pg8000.dbapi.Connection:
# initialize Connector object for connections to Cloud SQL
with Connector() as connector:
conn: pg8000.dbapi.Connection = connector.connect(
os.environ["POSTGRES_CONNECTION_NAME"],
"pg8000",
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASS"],
db=os.environ["POSTGRES_DB"],
ip_type="public", # can also be "private" or "psc"
)
return conn
conn: pg8000.dbapi.Connection = connector.connect(
inst_uri,
"pg8000",
user=user,
password=password,
db=db,
ip_type="public", # can also be "private" or "psc"
)
return conn

# create SQLAlchemy connection pool
pool = sqlalchemy.create_engine(
engine = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
pool.dialect.description_encoding = None
return pool
return engine, connector


# [END cloud_sql_connector_postgres_pg8000]


@pytest.fixture(name="pool")
def setup() -> Generator:
pool = init_connection_engine()

with pool.connect() as conn:
conn.execute(
sqlalchemy.text(
f"CREATE TABLE IF NOT EXISTS {table_name}"
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
)
)
def test_pg8000_connection() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
password = os.environ["POSTGRES_PASS"]
db = os.environ["POSTGRES_DB"]

yield pool
engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db)
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
assert type(curr_time) is datetime
connector.close()

with pool.connect() as conn:
conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}"))

def test_lazy_pg8000_connection() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
password = os.environ["POSTGRES_PASS"]
db = os.environ["POSTGRES_DB"]

def test_pooled_connection_with_pg8000(pool: sqlalchemy.engine.Engine) -> None:
insert_stmt = sqlalchemy.text(
f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)",
engine, connector = create_sqlalchemy_engine(
inst_conn_name, user, password, db, "lazy"
)
with pool.connect() as conn:
conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"})

select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;")
with pool.connect() as conn:
rows = conn.execute(select_stmt).fetchall()
titles = [row[0] for row in rows]

assert titles == ["Book One", "Book Two"]
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
assert type(curr_time) is datetime
connector.close()

0 comments on commit 12b4b72

Please sign in to comment.