diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 5b63dc47de..ee2f9abee2 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1036,10 +1036,10 @@ def acked(err, msg): elif not isinstance( feature_group, ExternalFeatureGroup ) and self._start_offline_materialization(offline_write_options): - if offline_write_options.get("skip_offsets", False): - # don't provide the current offsets (read from where the job last left off) + if not offline_write_options.get("use_current_offsets", False): + # don't provide the current offsets (instead read from where the job last left off) initial_check_point = "" - # provide the initial_check_point as it will reduce the read amplification of materialization job + # the initial_check_point can reduce the read amplification of materialization job feature_group.materialization_job.run( args=feature_group.materialization_job.config.get("defaultArgs", "") + initial_check_point, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 1f321203c9..7fb977155d 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1771,6 +1771,9 @@ def save( connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to `False` and will use external listeners when connecting from outside of Hopsworks. + * key `use_current_offsets` and value `True` or `False` to configure if materialization job + should start consuming Kafka events from the offset that was received before anything was produced. + By default the materialization job starts consuming from where it last left off. validation_options: Additional validation options as key-value pairs, defaults to `{}`. * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. @@ -1953,6 +1956,9 @@ def insert( connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to `False` and will use external listeners when connecting from outside of Hopsworks. + * key `use_current_offsets` and value `True` or `False` to configure if materialization job + should start consuming Kafka events from the offset that was received before anything was produced. + By default the materialization job starts consuming from where it last left off. validation_options: Additional validation options as key-value pairs, defaults to `{}`. * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. @@ -2108,6 +2114,9 @@ def multi_part_insert( connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to `False` and will use external listeners when connecting from outside of Hopsworks. + * key `use_current_offsets` and value `True` or `False` to configure if materialization job + should start consuming Kafka events from the offset that was received before anything was produced. + By default the materialization job starts consuming from where it last left off. validation_options: Additional validation options as key-value pairs, defaults to `{}`. * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 43fcf066f3..d8d59c0252 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -3042,6 +3042,60 @@ def test_materialization_kafka(self, mocker): offline_write_options={"start_offline_materialization": True}, ) + # Assert + assert mock_python_engine_kafka_produce.call_count == 4 + job_mock.run.assert_called_once_with( + args="defaults", + await_termination=False, + ) + + def test_materialization_kafka_use_current_offsets_True(self, mocker): + # Arrange + mocker.patch("hsfs.engine.python.Engine._get_kafka_config", return_value={}) + mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") + mocker.patch("hsfs.engine.python.Engine._get_encoder_func") + mocker.patch("hsfs.engine.python.Engine._encode_complex_features") + mock_python_engine_kafka_produce = mocker.patch( + "hsfs.engine.python.Engine._kafka_produce" + ) + mocker.patch("hsfs.engine.python.Engine.get_job_url") + mocker.patch( + "hsfs.engine.python.Engine._kafka_get_offsets", + return_value=" tests_offsets", + ) + + python_engine = python.Engine() + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + stream=False, + time_travel_format="HUDI", + ) + + mocker.patch.object(fg, "commit_details", return_value={"commit1": 1}) + + fg._online_topic_name = "test_topic" + job_mock = mocker.MagicMock() + job_mock.config = {"defaultArgs": "defaults"} + fg._materialization_job = job_mock + + df = pd.DataFrame(data={"col1": [1, 2, 2, 3]}) + + # Act + python_engine._write_dataframe_kafka( + feature_group=fg, + dataframe=df, + offline_write_options={ + "start_offline_materialization": True, + "use_current_offsets": True, + }, + ) + # Assert assert mock_python_engine_kafka_produce.call_count == 4 job_mock.run.assert_called_once_with(