Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HotFix: PG to PG is empty bug #130

Merged
merged 2 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ jobs:
ref: PG
table_name: cow.solvers
if_exists: replace

- name: p2p-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
destination:
ref: PG
table_name: moo.p2p-test
13 changes: 7 additions & 6 deletions src/sources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

from src.interfaces import Source
from src.interfaces import Source, TypedDataFrame
from src.logger import log


Expand Down Expand Up @@ -41,7 +41,7 @@ def _convert_bytea_to_hex(df: DataFrame) -> DataFrame:
return df


class PostgresSource(Source[DataFrame]):
class PostgresSource(Source[TypedDataFrame]):
"""Represent PostgreSQL as a data source for retrieving data via SQL queries.

This class connects to a PostgreSQL database using SQLAlchemy and executes a query
Expand Down Expand Up @@ -100,7 +100,7 @@ def validate(self) -> bool:
log.error("Invalid SQL query: %s", str(e))
return False

async def fetch(self) -> DataFrame:
async def fetch(self) -> TypedDataFrame:
"""Execute the SQL query and retrieves the result as a DataFrame.

Returns
Expand All @@ -121,9 +121,10 @@ async def fetch(self) -> DataFrame:
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
)
return _convert_bytea_to_hex(df)
# TODO include types.
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={})

def is_empty(self, data: DataFrame) -> bool:
def is_empty(self, data: TypedDataFrame) -> bool:
"""Check if the provided DataFrame is empty.

Parameters
Expand All @@ -137,7 +138,7 @@ def is_empty(self, data: DataFrame) -> bool:
True if the DataFrame is empty, False otherwise.

"""
return data.empty
return data.is_empty()

def _set_query_string(self, query_string: str) -> None:
"""Set the SQL query string directly or from a file if it ends with '.sql'.
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/sources_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.dialects.postgresql import BYTEA

from src.config import RuntimeConfig
from src.interfaces import TypedDataFrame
from src.sources.dune import _reformat_varbinary_columns, dune_result_to_df
from src.sources.postgres import PostgresSource, _convert_bytea_to_hex
from tests import config_root, fixtures_root
Expand Down Expand Up @@ -136,5 +137,4 @@ def test_is_empty(self):
db_url="postgresql://postgres:postgres@localhost:5432/postgres",
query_string="SELECT 1",
)
df = pd.DataFrame([])
self.assertTrue(src.is_empty(df))
self.assertTrue(src.is_empty(TypedDataFrame(pd.DataFrame([]), {})))
Loading