diff --git a/tests/system/test_pg8000_connection.py b/tests/system/test_pg8000_connection.py index 9cf56aa8..ae683278 100644 --- a/tests/system/test_pg8000_connection.py +++ b/tests/system/test_pg8000_connection.py @@ -1,4 +1,4 @@ -"""" +""" Copyright 2021 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,81 +13,112 @@ See the License for the specific language governing permissions and limitations under the License. """ + +from datetime import datetime import os -from typing import Generator -import uuid +from typing import Tuple # [START cloud_sql_connector_postgres_pg8000] import pg8000 -import pytest import sqlalchemy from google.cloud.sql.connector import Connector -# [END cloud_sql_connector_postgres_pg8000] - -table_name = f"books_{uuid.uuid4().hex}" +def create_sqlalchemy_engine( + inst_uri: str, + user: str, + password: str, + db: str, + refresh_strategy: str = "background", +) -> Tuple[sqlalchemy.engine.Engine, Connector]: + """Creates a connection pool for a Cloud SQL instance and returns the pool + and the connector. Callers are responsible for closing the pool and the + connector. + + A sample invocation looks like: + + engine, connector = create_sqlalchemy_engine( + inst_uri, + user, + password, + db, + ) + with engine.connect() as conn: + time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() + conn.commit() + curr_time = time[0] + # do something with query result + connector.close() + + Args: + instance_connection_name (str): + The instance connection name specifies the instance relative to the + project and region. For example: "my-project:my-region:my-instance" + user (str): + The database user name, e.g., postgres + password (str): + The database user's password, e.g., secret-password + db (str): + The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. + """ + connector = Connector(refresh_strategy=refresh_strategy) -# [START cloud_sql_connector_postgres_pg8000] -# The Cloud SQL Python Connector can be used along with SQLAlchemy using the -# 'creator' argument to 'create_engine' -def init_connection_engine() -> sqlalchemy.engine.Engine: def getconn() -> pg8000.dbapi.Connection: - # initialize Connector object for connections to Cloud SQL - with Connector() as connector: - conn: pg8000.dbapi.Connection = connector.connect( - os.environ["POSTGRES_CONNECTION_NAME"], - "pg8000", - user=os.environ["POSTGRES_USER"], - password=os.environ["POSTGRES_PASS"], - db=os.environ["POSTGRES_DB"], - ip_type="public", # can also be "private" or "psc" - ) - return conn + conn: pg8000.dbapi.Connection = connector.connect( + inst_uri, + "pg8000", + user=user, + password=password, + db=db, + ip_type="public", # can also be "private" or "psc" + ) + return conn # create SQLAlchemy connection pool - pool = sqlalchemy.create_engine( + engine = sqlalchemy.create_engine( "postgresql+pg8000://", creator=getconn, - execution_options={"isolation_level": "AUTOCOMMIT"}, ) - pool.dialect.description_encoding = None - return pool + return engine, connector # [END cloud_sql_connector_postgres_pg8000] -@pytest.fixture(name="pool") -def setup() -> Generator: - pool = init_connection_engine() - - with pool.connect() as conn: - conn.execute( - sqlalchemy.text( - f"CREATE TABLE IF NOT EXISTS {table_name}" - " ( id CHAR(20) NOT NULL, title TEXT NOT NULL );" - ) - ) +def test_pg8000_connection() -> None: + """Basic test to get time from database.""" + inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] + user = os.environ["POSTGRES_USER"] + password = os.environ["POSTGRES_PASS"] + db = os.environ["POSTGRES_DB"] - yield pool + engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db) + with engine.connect() as conn: + time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() + conn.commit() + curr_time = time[0] + assert type(curr_time) is datetime + connector.close() - with pool.connect() as conn: - conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}")) +def test_lazy_pg8000_connection() -> None: + """Basic test to get time from database.""" + inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] + user = os.environ["POSTGRES_USER"] + password = os.environ["POSTGRES_PASS"] + db = os.environ["POSTGRES_DB"] -def test_pooled_connection_with_pg8000(pool: sqlalchemy.engine.Engine) -> None: - insert_stmt = sqlalchemy.text( - f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)", + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, password, db, "lazy" ) - with pool.connect() as conn: - conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"}) - conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"}) - - select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;") - with pool.connect() as conn: - rows = conn.execute(select_stmt).fetchall() - titles = [row[0] for row in rows] - - assert titles == ["Book One", "Book Two"] + with engine.connect() as conn: + time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() + conn.commit() + curr_time = time[0] + assert type(curr_time) is datetime + connector.close()