Skip to content

Commit

Permalink
Add option to disable schema enforcement for pandas_to_eland
Browse files Browse the repository at this point in the history
  • Loading branch information
Ashton Sidhu authored Jan 14, 2022
1 parent 9206941 commit e3bff8a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
6 changes: 5 additions & 1 deletion eland/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def pandas_to_eland(
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
es_verify_mapping_compatibility: bool = True,
thread_count: int = 4,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
Expand Down Expand Up @@ -72,6 +73,9 @@ def pandas_to_eland(
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of field_name: es_data_type that overrides default es data types
es_verify_mapping_compatibility: bool, default 'True'
* True: Verify that the dataframe schema matches the Elasticsearch index schema
* False: Do not verify schema
thread_count: int
number of the threads to use for the bulk requests
chunksize: int, default None
Expand Down Expand Up @@ -172,7 +176,7 @@ def pandas_to_eland(
es_client.indices.delete(index=es_dest_index)
es_client.indices.create(index=es_dest_index, mappings=mapping["mappings"])

elif es_if_exists == "append":
elif es_if_exists == "append" and es_verify_mapping_compatibility:
dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[
es_dest_index
]
Expand Down
53 changes: 52 additions & 1 deletion tests/etl/test_pandas_to_eland.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_es_if_exists_append(self):
pd_df3 = pd_df.append(pd_df2)
assert_pandas_eland_frame_equal(pd_df3, df2)

def test_es_if_exists_append_mapping_mismatch(self):
def test_es_if_exists_append_mapping_mismatch_schema_enforcement(self):
df1 = pandas_to_eland(
pd_df,
es_client=ES_TEST_CLIENT,
Expand All @@ -162,9 +162,60 @@ def test_es_if_exists_append_mapping_mismatch(self):
"- 'Z' is missing from ES index mapping\n"
"- 'a' column type ('keyword') not compatible with ES index mapping type ('long')"
)

# Assert that the index isn't modified
assert_pandas_eland_frame_equal(pd_df, df1)

def test_es_if_exists_append_mapping_mismatch_no_schema_enforcement(self):
pandas_to_eland(
pd_df,
es_client=ES_TEST_CLIENT,
es_dest_index="test-index",
es_if_exists="append",
es_refresh=True,
)

pd_df2 = pd.DataFrame(
{
"a": [4, 5, 6],
"b": [-1.0, -2.0, -3.0],
"d": [dt, dt - timedelta(1), dt - timedelta(2)],
"e": ["A", "B", "C"],
},
index=["3", "4", "5"],
)

pandas_to_eland(
pd_df2,
es_client=ES_TEST_CLIENT,
es_dest_index="test-index",
es_if_exists="append",
es_refresh=True,
es_verify_mapping_compatibility=False,
)

final_df = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6],
"b": [1.0, 2.0, 3.0, -1.0, -2.0, -3.0],
"c": ["A", "B", "C", None, None, None],
"d": [
dt,
dt + timedelta(1),
dt + timedelta(2),
dt,
dt - timedelta(1),
dt - timedelta(2),
],
"e": [None, None, None, "A", "B", "C"],
},
index=["0", "1", "2", "3", "4", "5"],
)

eland_df = DataFrame(ES_TEST_CLIENT, "test-index")
# Assert that the index isn't modified
assert_pandas_eland_frame_equal(final_df, eland_df)

def test_es_if_exists_append_es_type_coerce_error(self):
df1 = pandas_to_eland(
pd_df,
Expand Down

0 comments on commit e3bff8a

Please sign in to comment.