Skip to content

Commit

Permalink
fix: Deserialization error when reading from DynamoDB using `KeyCondi…
Browse files Browse the repository at this point in the history
…tionExpression` (#2607)
  • Loading branch information
LeonLuttenberger authored Jan 11, 2024
1 parent 1ac3646 commit 57330fd
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 18 deletions.
7 changes: 4 additions & 3 deletions awswrangler/dynamodb/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@

if TYPE_CHECKING:
from mypy_boto3_dynamodb.client import DynamoDBClient
from mypy_boto3_dynamodb.type_defs import TableAttributeValueTypeDef

_logger: logging.Logger = logging.getLogger(__name__)


_ItemsListType = List[Dict[str, Any]]
_ItemsListType = List[Dict[str, "TableAttributeValueTypeDef"]]


def _read_chunked(iterator: Iterator[dict[str, Any]]) -> Iterator[pd.DataFrame]:
def _read_chunked(iterator: Iterator[dict[str, "TableAttributeValueTypeDef"]]) -> Iterator[pd.DataFrame]:
for item in iterator:
yield pd.DataFrame(item)

Expand Down Expand Up @@ -252,7 +253,7 @@ def _read_query_chunked(table_name: str, dynamodb_client: "DynamoDBClient", **kw
response = dynamodb_client.query(TableName=table_name, **kwargs)
items = response.get("Items", [])
total_items += len(items)
yield items
yield [_deserialize_item(item) for item in items]

if ("Limit" in kwargs) and (total_items >= kwargs["Limit"]):
break
Expand Down
5 changes: 3 additions & 2 deletions awswrangler/dynamodb/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
AttributeValueTypeDef,
ExecuteStatementOutputTypeDef,
KeySchemaElementTypeDef,
TableAttributeValueTypeDef,
WriteRequestTypeDef,
)

Expand Down Expand Up @@ -57,15 +58,15 @@ def get_table(


def _serialize_item(
item: Mapping[str, Any], serializer: TypeSerializer | None = None
item: Mapping[str, "TableAttributeValueTypeDef"], serializer: TypeSerializer | None = None
) -> dict[str, "AttributeValueTypeDef"]:
serializer = serializer if serializer else TypeSerializer()
return {k: serializer.serialize(v) for k, v in item.items()}


def _deserialize_item(
item: Mapping[str, "AttributeValueTypeDef"], deserializer: TypeDeserializer | None = None
) -> dict[str, Any]:
) -> dict[str, "TableAttributeValueTypeDef"]:
deserializer = deserializer if deserializer else TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in item.items()}

Expand Down
206 changes: 193 additions & 13 deletions tests/unit/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import awswrangler as wr
import awswrangler.pandas as pd

from .._utils import is_ray_modin
from .._utils import assert_pandas_equals, is_ray_modin

pytestmark = pytest.mark.distributed

Expand All @@ -33,12 +33,16 @@
)
@pytest.mark.parametrize("use_threads", [False, True])
def test_write(params: dict[str, Any], use_threads: bool, dynamodb_table: str) -> None:
df = pd.DataFrame(
{
"title": ["Titanic", "Snatch", "The Godfather"],
"year": [1997, 2000, 1972],
"genre": ["drama", "caper story", "crime"],
}
df = (
pd.DataFrame(
{
"title": ["Titanic", "Snatch", "The Godfather"],
"year": [1997, 2000, 1972],
"genre": ["drama", "caper story", "crime"],
}
)
.sort_values(by="year")
.reset_index(drop=True)
)
path = tempfile.gettempdir()
query = f'SELECT * FROM "{dynamodb_table}"'
Expand All @@ -47,16 +51,22 @@ def test_write(params: dict[str, Any], use_threads: bool, dynamodb_table: str) -
file_path = f"{path}/movies.json"
df.to_json(file_path, orient="records")
wr.dynamodb.put_json(file_path, dynamodb_table, use_threads=use_threads)

df2 = wr.dynamodb.read_partiql_query(query)
assert df.shape == df2.shape
df2 = df2[df.columns].sort_values(by="year").reset_index(drop=True)
df2["year"] = df["year"].astype("int64")
assert_pandas_equals(df, df2)

# CSV
wr.dynamodb.delete_items(items=df.to_dict("records"), table_name=dynamodb_table)
file_path = f"{path}/movies.csv"
df.to_csv(file_path, index=False)
wr.dynamodb.put_csv(file_path, dynamodb_table, use_threads=use_threads)

df3 = wr.dynamodb.read_partiql_query(query)
assert df.shape == df3.shape
df3 = df3[df.columns].sort_values(by="year").reset_index(drop=True)
df3["year"] = df3["year"].astype("int64")
assert_pandas_equals(df, df3)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -159,7 +169,12 @@ def test_execute_statement(params: dict[str, Any], use_threads: bool, dynamodb_t
parameters=[title, year],
)
df3 = wr.dynamodb.read_partiql_query(f'SELECT * FROM "{dynamodb_table}"')
assert df.shape == df3.shape
df3 = df3[df.columns].sort_values(by="year").reset_index(drop=True)
df3["year"] = df3["year"].astype("int64")
assert_pandas_equals(
df.sort_values(by="year").reset_index(drop=True),
df3,
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -199,8 +214,9 @@ def test_dynamodb_put_from_file(
raise RuntimeError(f"Unknown format {format}")

df2 = wr.dynamodb.read_partiql_query(query=f"SELECT * FROM {dynamodb_table}")

assert df.shape == df2.shape
df2 = df2.sort_values(by="par0").reset_index(drop=True)
df2["par0"] = df["par0"].astype("int64")
assert_pandas_equals(df, df2)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -394,7 +410,16 @@ def test_read_items_index(params: dict[str, Any], dynamodb_table: str, use_threa
)
if chunked:
df3 = pd.concat(df3)
assert df3.shape == df.shape

df3 = df3[df.columns].sort_values(by=["Title"]).reset_index(drop=True)
df3["Author"] = df3["Author"].astype(str)
df3["Title"] = df3["Title"].astype(str)
df3["Category"] = df3["Category"].astype(str)
assert_pandas_equals(
df.sort_values(by=["Title"]).reset_index(drop=True).drop(columns=["Formats"]),
df3.drop(columns=["Formats"]),
)
assert df.shape == df3.shape


@pytest.mark.parametrize(
Expand Down Expand Up @@ -549,3 +574,158 @@ def test_read_items_schema(params, dynamodb_table: str, chunked: bool):
}
wr.dynamodb.read_items(allow_full_scan=True, **kwargs)
wr.dynamodb.read_items(filter_expression=Attr("id").eq(1), **kwargs)


@pytest.mark.parametrize(
"params",
[
{
"KeySchema": [{"AttributeName": "par0", "KeyType": "HASH"}, {"AttributeName": "par1", "KeyType": "RANGE"}],
"AttributeDefinitions": [
{"AttributeName": "par0", "AttributeType": "N"},
{"AttributeName": "par1", "AttributeType": "S"},
],
}
],
)
def test_deserialization_read_single_item(params: dict[str, Any], dynamodb_table: str) -> None:
wr.dynamodb.put_items(
items=[
{
"par0": 0,
"par1": "foo",
},
{
"par0": 1,
"par1": "bar",
},
],
table_name=dynamodb_table,
)

items_df = wr.dynamodb.read_items(
table_name=dynamodb_table,
partition_values=[0],
sort_values=["foo"],
consistent=True,
)

assert items_df.iloc[0]["par0"] == 0
assert items_df.iloc[0]["par1"] == "foo"


@pytest.mark.parametrize(
"params",
[
{
"KeySchema": [{"AttributeName": "par0", "KeyType": "HASH"}, {"AttributeName": "par1", "KeyType": "RANGE"}],
"AttributeDefinitions": [
{"AttributeName": "par0", "AttributeType": "N"},
{"AttributeName": "par1", "AttributeType": "S"},
],
}
],
)
def test_deserialization_read_batch_items(params: dict[str, Any], dynamodb_table: str) -> None:
wr.dynamodb.put_items(
items=[
{
"par0": 0,
"par1": "foo",
},
{
"par0": 1,
"par1": "bar",
},
],
table_name=dynamodb_table,
)

items_df = wr.dynamodb.read_items(
table_name=dynamodb_table,
partition_values=[0, 1],
sort_values=["foo", "bar"],
consistent=True,
).sort_values(by=["par0"])

assert items_df.iloc[0]["par0"] == 0
assert items_df.iloc[0]["par1"] == "foo"
assert items_df.iloc[1]["par0"] == 1
assert items_df.iloc[1]["par1"] == "bar"


@pytest.mark.parametrize(
"params",
[
{
"KeySchema": [{"AttributeName": "par0", "KeyType": "HASH"}, {"AttributeName": "par1", "KeyType": "RANGE"}],
"AttributeDefinitions": [
{"AttributeName": "par0", "AttributeType": "N"},
{"AttributeName": "par1", "AttributeType": "S"},
],
}
],
)
def test_deserialization_read_query(params: dict[str, Any], dynamodb_table: str) -> None:
wr.dynamodb.put_items(
items=[
{
"par0": 0,
"par1": "foo",
},
{
"par0": 1,
"par1": "bar",
},
],
table_name=dynamodb_table,
)

items_df = wr.dynamodb.read_items(
table_name=dynamodb_table,
key_condition_expression="par0 = :v1",
expression_attribute_values={":v1": 0},
consistent=True,
)

assert items_df.iloc[0]["par0"] == 0
assert items_df.iloc[0]["par1"] == "foo"


@pytest.mark.parametrize(
"params",
[
{
"KeySchema": [{"AttributeName": "par0", "KeyType": "HASH"}, {"AttributeName": "par1", "KeyType": "RANGE"}],
"AttributeDefinitions": [
{"AttributeName": "par0", "AttributeType": "N"},
{"AttributeName": "par1", "AttributeType": "S"},
],
}
],
)
def test_deserialization_full_scan(params: dict[str, Any], dynamodb_table: str) -> None:
wr.dynamodb.put_items(
items=[
{
"par0": 0,
"par1": "foo",
},
{
"par0": 1,
"par1": "bar",
},
],
table_name=dynamodb_table,
)

items_df = wr.dynamodb.read_items(
table_name=dynamodb_table,
allow_full_scan=True,
consistent=True,
).sort_values(by=["par0"])

assert items_df.iloc[0]["par0"] == 0
assert items_df.iloc[0]["par1"] == "foo"
assert items_df.iloc[1]["par0"] == 1
assert items_df.iloc[1]["par1"] == "bar"

0 comments on commit 57330fd

Please sign in to comment.