Skip to content

Commit

Permalink
Use SimpleConnectionPool to run DB queries and statements
Browse files Browse the repository at this point in the history
  • Loading branch information
index-git committed Sep 21, 2023
1 parent c1f8772 commit 85268aa
Showing 1 changed file with 43 additions and 4 deletions.
47 changes: 43 additions & 4 deletions src/db/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import re
import psycopg2
import psycopg2.pool
from urllib import parse
from flask import g

import crs as crs_def
Expand All @@ -11,6 +13,8 @@

FLASK_CONN_CUR_KEY = f'{__name__}:CONN_CUR'

CONNECTION_POOL_DICT = {}


def create_connection_cursor(db_uri_str=None, encapsulate_exception=True):
db_uri_str = db_uri_str or PG_URI_STR
Expand All @@ -25,6 +29,26 @@ def create_connection_cursor(db_uri_str=None, encapsulate_exception=True):
return connection, cursor


def get_connection_pool(db_uri_str=None):
db_uri_str = db_uri_str or PG_URI_STR
connection_pool = CONNECTION_POOL_DICT.get(db_uri_str)
if not connection_pool:
db_uri_parsed = parse.urlparse(db_uri_str)
try:
connection_pool = psycopg2.pool.SimpleConnectionPool(5, 50,
user=db_uri_parsed.username,
password=db_uri_parsed.password,
host=db_uri_parsed.hostname,
port=db_uri_parsed.port,
database=db_uri_parsed.path[1:],
)
except BaseException as exc:
raise Error(1) from exc
CONNECTION_POOL_DICT[db_uri_str] = connection_pool
return connection_pool



def get_connection_cursor(db_uri_str=None, encapsulate_exception=True):
if db_uri_str is None or db_uri_str == PG_URI_STR:
key = FLASK_CONN_CUR_KEY
Expand All @@ -39,8 +63,12 @@ def get_connection_cursor(db_uri_str=None, encapsulate_exception=True):

def run_query(query, data=None, conn_cur=None, encapsulate_exception=True, log_query=False):
if conn_cur is None:
conn_cur = get_connection_cursor()
conn, cur = conn_cur
pool = get_connection_pool()
conn = pool.getconn()
conn.autocommit = True
cur = conn.cursor()
else:
conn, cur = conn_cur
try:
if log_query:
logger.info(f"query={cur.mogrify(query, data).decode()}")
Expand All @@ -52,14 +80,21 @@ def run_query(query, data=None, conn_cur=None, encapsulate_exception=True, log_q
logger.error(f"run_query, query={query}, data={data}, exc={exc}")
raise Error(2) from exc
raise exc
finally:
if pool and conn:
pool.putconn(conn)

return rows


def run_statement(query, data=None, conn_cur=None, encapsulate_exception=True, log_query=False):
if conn_cur is None:
conn_cur = get_connection_cursor()
conn, cur = conn_cur
pool = get_connection_pool()
conn = pool.getconn()
conn.autocommit = True
cur = conn.cursor()
else:
conn, cur = conn_cur
try:
if log_query:
logger.info(f"query={cur.mogrify(query, data).decode()}")
Expand All @@ -71,6 +106,10 @@ def run_statement(query, data=None, conn_cur=None, encapsulate_exception=True, l
logger.error(f"run_query, query={query}, data={data}, exc={exc}")
raise Error(2) from exc
raise exc
finally:
if pool and conn:
pool.putconn(conn)

return rows


Expand Down

0 comments on commit 85268aa

Please sign in to comment.