From d899e1229ff8a28570ae6c4c459c734ac39cdabb Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 13 Sep 2023 13:51:03 +0200 Subject: [PATCH 1/5] Update python.py --- python/hsfs/engine/python.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 5b63dc47de..6aa795bcbb 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1036,7 +1036,7 @@ def acked(err, msg): elif not isinstance( feature_group, ExternalFeatureGroup ) and self._start_offline_materialization(offline_write_options): - if offline_write_options.get("skip_offsets", False): + if offline_write_options.get("skip_offsets", True): # don't provide the current offsets (read from where the job last left off) initial_check_point = "" # provide the initial_check_point as it will reduce the read amplification of materialization job From 8f07ef83660e9ac0ae8da1754c606648ee1ff708 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 13 Sep 2023 13:53:09 +0200 Subject: [PATCH 2/5] Update python.py --- python/hsfs/engine/python.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 6aa795bcbb..68ba4b7541 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1039,7 +1039,7 @@ def acked(err, msg): if offline_write_options.get("skip_offsets", True): # don't provide the current offsets (read from where the job last left off) initial_check_point = "" - # provide the initial_check_point as it will reduce the read amplification of materialization job + # the initial_check_point can reduce the read amplification of materialization job feature_group.materialization_job.run( args=feature_group.materialization_job.config.get("defaultArgs", "") + initial_check_point, From 392a792f614f0d9c79613472efdb7188fb5860dd Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 13 Sep 2023 14:08:52 +0200 Subject: [PATCH 3/5] Update test_python.py --- python/tests/engine/test_python.py | 51 ++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 43fcf066f3..ae9eaa2b0a 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -3042,6 +3042,57 @@ def test_materialization_kafka(self, mocker): offline_write_options={"start_offline_materialization": True}, ) + # Assert + assert mock_python_engine_kafka_produce.call_count == 4 + job_mock.run.assert_called_once_with( + args="defaults", + await_termination=False, + ) + + def test_materialization_kafka_skip_offsets_False(self, mocker): + # Arrange + mocker.patch("hsfs.engine.python.Engine._get_kafka_config", return_value={}) + mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") + mocker.patch("hsfs.engine.python.Engine._get_encoder_func") + mocker.patch("hsfs.engine.python.Engine._encode_complex_features") + mock_python_engine_kafka_produce = mocker.patch( + "hsfs.engine.python.Engine._kafka_produce" + ) + mocker.patch("hsfs.engine.python.Engine.get_job_url") + mocker.patch( + "hsfs.engine.python.Engine._kafka_get_offsets", + return_value=" tests_offsets", + ) + + python_engine = python.Engine() + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + stream=False, + time_travel_format="HUDI", + ) + + mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) + + fg._online_topic_name = "test_topic" + job_mock = mocker.MagicMock() + job_mock.config = {"defaultArgs": "defaults"} + fg._materialization_job = job_mock + + df = pd.DataFrame(data={"col1": [1, 2, 2, 3]}) + + # Act + python_engine._write_dataframe_kafka( + feature_group=fg, + dataframe=df, + offline_write_options={"start_offline_materialization": True, "skip_offsets": False}, + ) + # Assert assert mock_python_engine_kafka_produce.call_count == 4 job_mock.run.assert_called_once_with( From 25cfcd57ad792a3b6a732570943692c49b406fbc Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 13 Sep 2023 14:10:51 +0200 Subject: [PATCH 4/5] Update test_python.py --- python/tests/engine/test_python.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index ae9eaa2b0a..c9c0b49309 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -3090,7 +3090,10 @@ def test_materialization_kafka_skip_offsets_False(self, mocker): python_engine._write_dataframe_kafka( feature_group=fg, dataframe=df, - offline_write_options={"start_offline_materialization": True, "skip_offsets": False}, + offline_write_options={ + "start_offline_materialization": True, + "skip_offsets": False, + }, ) # Assert From a09098380fd0f0606723857804e4399cd53fcf18 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 18 Sep 2023 13:36:44 +0200 Subject: [PATCH 5/5] address feedback * rename skip_offsets -> use_current_offsets * add documentation --- python/hsfs/engine/python.py | 4 ++-- python/hsfs/feature_group.py | 9 +++++++++ python/tests/engine/test_python.py | 4 ++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 68ba4b7541..ee2f9abee2 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1036,8 +1036,8 @@ def acked(err, msg): elif not isinstance( feature_group, ExternalFeatureGroup ) and self._start_offline_materialization(offline_write_options): - if offline_write_options.get("skip_offsets", True): - # don't provide the current offsets (read from where the job last left off) + if not offline_write_options.get("use_current_offsets", False): + # don't provide the current offsets (instead read from where the job last left off) initial_check_point = "" # the initial_check_point can reduce the read amplification of materialization job feature_group.materialization_job.run( diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 1f321203c9..7fb977155d 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1771,6 +1771,9 @@ def save( connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to `False` and will use external listeners when connecting from outside of Hopsworks. + * key `use_current_offsets` and value `True` or `False` to configure if materialization job + should start consuming Kafka events from the offset that was received before anything was produced. + By default the materialization job starts consuming from where it last left off. validation_options: Additional validation options as key-value pairs, defaults to `{}`. * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. @@ -1953,6 +1956,9 @@ def insert( connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to `False` and will use external listeners when connecting from outside of Hopsworks. + * key `use_current_offsets` and value `True` or `False` to configure if materialization job + should start consuming Kafka events from the offset that was received before anything was produced. + By default the materialization job starts consuming from where it last left off. validation_options: Additional validation options as key-value pairs, defaults to `{}`. * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. @@ -2108,6 +2114,9 @@ def multi_part_insert( connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to `False` and will use external listeners when connecting from outside of Hopsworks. + * key `use_current_offsets` and value `True` or `False` to configure if materialization job + should start consuming Kafka events from the offset that was received before anything was produced. + By default the materialization job starts consuming from where it last left off. validation_options: Additional validation options as key-value pairs, defaults to `{}`. * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index c9c0b49309..d8d59c0252 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -3049,7 +3049,7 @@ def test_materialization_kafka(self, mocker): await_termination=False, ) - def test_materialization_kafka_skip_offsets_False(self, mocker): + def test_materialization_kafka_use_current_offsets_True(self, mocker): # Arrange mocker.patch("hsfs.engine.python.Engine._get_kafka_config", return_value={}) mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") @@ -3092,7 +3092,7 @@ def test_materialization_kafka_skip_offsets_False(self, mocker): dataframe=df, offline_write_options={ "start_offline_materialization": True, - "skip_offsets": False, + "use_current_offsets": True, }, )