Skip to content

Commit

Permalink
Handle bytea[] types on p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith committed Feb 5, 2025
1 parent 5c3dc8b commit 56c0d97
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
9 changes: 5 additions & 4 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ jobs:
source:
ref: PG
query_string: |
SELECT
1 AS number,
SELECT
1 AS number,
'\\x1234'::bytea AS my_bytes,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json,
'[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict
ARRAY['\\x1234'::bytea, '\\x5678'::bytea] AS array_bytea,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json
-- '[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict
destination:
ref: PG
Expand Down
9 changes: 6 additions & 3 deletions src/sources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _convert_dict_to_json(df: DataFrame) -> DataFrame:
non_null_values = df[column][df[column].notna()]
if len(non_null_values) > 0:
first_value = non_null_values.iloc[0]
if isinstance(first_value, dict | list):
if isinstance(first_value, dict):
df[column] = df[column].apply(
lambda x: json.dumps(x) if x is not None else None
)
Expand Down Expand Up @@ -57,6 +57,10 @@ def _convert_bytea_to_hex(df: DataFrame) -> DataFrame:
for column in df.columns:
if isinstance(df[column].iloc[0], memoryview):
df[column] = df[column].apply(lambda x: f"0x{x.tobytes().hex()}")
# if isinstance(df[column].iloc[0], list):
# # Check if the list contains memoryview objects
# if all(isinstance(item, memoryview) for item in df[column].iloc[0]):
# df[column] = df[column].apply(lambda lst: [f"0x{item.tobytes().hex()}" for item in lst])
return df


Expand Down Expand Up @@ -140,9 +144,8 @@ async def fetch(self) -> TypedDataFrame:
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
)

df = _convert_dict_to_json(df)
df = _convert_bytea_to_hex(df)
df = _convert_dict_to_json(df)
# TODO include types.
return TypedDataFrame(dataframe=df, types={})

Expand Down

0 comments on commit 56c0d97

Please sign in to comment.