diff --git a/docs/hwm/column/index.rst b/docs/hwm/column/index.rst index f8d36e4..160fe8e 100644 --- a/docs/hwm/column/index.rst +++ b/docs/hwm/column/index.rst @@ -13,7 +13,7 @@ Column HWM datetime_hwm What is Column HWM? -------------- +------------------- Sometimes it's necessary to read only changed rows from a table. diff --git a/docs/hwm/file/index.rst b/docs/hwm/file/index.rst index d23381c..36e6c64 100644 --- a/docs/hwm/file/index.rst +++ b/docs/hwm/file/index.rst @@ -11,7 +11,7 @@ File HWM file_list_hwm What is File HWM? -------------- +----------------- Sometimes it's necessary to read/download only new files from a source folder. diff --git a/docs/hwm/key_value/index.rst b/docs/hwm/key_value/index.rst index aa7dfee..8db2d78 100644 --- a/docs/hwm/key_value/index.rst +++ b/docs/hwm/key_value/index.rst @@ -1,7 +1,7 @@ .. _key_value_hwm_classes: KeyValue HWM -======== +============ .. toctree:: :maxdepth: 2 @@ -11,16 +11,16 @@ KeyValue HWM key_value_int_hwm What is KeyValue HWM? ----------------------- +--------------------- The KeyValue High Water Mark (HWM) is a specialized class designed to manage and track incremental data changes in systems where data is stored or represented as key-value pairs, such as in message queues like Kafka. Use Case ----------------------- +-------- -The ``KeyValueHWM`` class is particularly beneficial in scenarios where there is a need to `incrementally `_ upload data in an ETL process. +The ``KeyValueHWM`` class is particularly beneficial in scenarios where there is a need to `incrementally `_ upload data in an ETL process. -For instance, in typical ETL processes using `Spark with Kafka `_, data re-written entirely from all partitions in topics starting from **zero** offset. This approach can be inefficient, time-consuming and create duplicates in target. By leveraging the ``KeyValueIntHWM`` class, it becomes possible to track the last offset of data processed. This enables the ETL process to only write new data increments, significantly reducing the amount of data transferred during each run. +For instance, in typical ETL processes using `Spark with Kafka `_, data re-written entirely from all partitions in topics starting from **zero** offset. This approach can be inefficient, time-consuming and create duplicates in target. By leveraging the ``KeyValueIntHWM`` class, it becomes possible to track the last offset of data processed. This enables the ETL process to read data appended to topic since previous run instead of reading the entire topic content each time. Example Usage with Kafka Messages --------------------------------- @@ -73,4 +73,4 @@ Restrictions - **Non-Decreasing Values**: The ``KeyValueHWM`` class is designed to handle only non-decreasing values. During the update process, if the new offset provided for a given partition is less than the current offset, the value will not be updated. -- **Incomplete Key Updates**: If a key is not included in new hwm value, its value remains unchanged. This is essential because keys in systems like Kafka (partitions) cannot be deleted, and their last known +- **Incomplete Key Updates**: If a key is not included in new hwm value, its value remains unchanged. This is essential because keys in systems like Kafka (partitions) cannot be deleted, and their last known position is left intact. diff --git a/docs/hwm/key_value/key_value_int_hwm.rst b/docs/hwm/key_value/key_value_int_hwm.rst index 164561c..89ada05 100644 --- a/docs/hwm/key_value/key_value_int_hwm.rst +++ b/docs/hwm/key_value/key_value_int_hwm.rst @@ -1,5 +1,5 @@ KeyValue Int HWM -============= +================ .. currentmodule:: etl_entities.hwm.key_value.key_value_int_hwm diff --git a/etl_entities/hwm/key_value/key_value_hwm.py b/etl_entities/hwm/key_value/key_value_hwm.py index cefda8b..b54e993 100644 --- a/etl_entities/hwm/key_value/key_value_hwm.py +++ b/etl_entities/hwm/key_value/key_value_hwm.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from typing import Generic, Optional, TypeVar from frozendict import frozendict @@ -32,7 +34,7 @@ class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel): HWM unique name - value : ``frozendict[Any, KeyValueHWMValueType]]]`` , default: ``frozendict`` + value : ``frozendict[Any, KeyValueHWMValueType]`` , default: ``frozendict`` HWM value @@ -59,7 +61,7 @@ class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel): # is supported only from Python 3.9 onwards. value: frozendict = Field(default_factory=frozendict) - def update(self, new_data: dict) -> "KeyValueHWM[KeyValueHWMValueType]": + def update(self: KeyValueHWMType, new_data: dict) -> KeyValueHWMType: """ Updates the HWM value based on provided new key-value data. This method only updates the value if the new value is greater than the current valur for a given key diff --git a/etl_entities/hwm/key_value/key_value_int_hwm.py b/etl_entities/hwm/key_value/key_value_int_hwm.py index e2ba8e1..6eaf75f 100644 --- a/etl_entities/hwm/key_value/key_value_int_hwm.py +++ b/etl_entities/hwm/key_value/key_value_int_hwm.py @@ -14,6 +14,8 @@ from __future__ import annotations +from typing import Any + from frozendict import frozendict from pydantic import Field, validator @@ -31,7 +33,7 @@ class KeyValueIntHWM(KeyValueHWM[int]): HWM unique name - value : ``frozendict[Any, KeyValueHWMValueType]]]``, default: ``frozendict`` + value : ``frozendict[Any, KeyValueHWMValueType]``, default: ``frozendict`` HWM value @@ -69,7 +71,7 @@ class KeyValueIntHWM(KeyValueHWM[int]): ) """ - value: frozendict = Field(default_factory=frozendict) + value: frozendict[Any, int] = Field(default_factory=frozendict) def serialize(self) -> dict: """Return dict representation of HWM @@ -110,7 +112,7 @@ def serialize(self) -> dict: } return serialized_data # noqa: WPS331 - @validator("value", pre=True) + @validator("value", pre=True, allow_reuse=True) def _validate_int_values(cls, key_value): # noqa: N805 if key_value is None: return key_value diff --git a/tests/test_old_hwm/test_column_hwm.py b/tests/test_old_hwm/test_column_hwm.py index ae046b5..abe7801 100644 --- a/tests/test_old_hwm/test_column_hwm.py +++ b/tests/test_old_hwm/test_column_hwm.py @@ -241,6 +241,31 @@ def test_column_hwm_compare(hwm_class, value, delta): # noqa: WPS210 assert item2 < item1 +@pytest.mark.parametrize( # noqa: WPS210 + "hwm_class, value, delta", + [ + (DateHWM, date.today(), timedelta(days=2)), + (DateTimeHWM, datetime.now(), timedelta(seconds=2)), + (IntHWM, 1, 2), + ], +) +def test_column_hwm_covers(hwm_class, value, delta): # noqa: WPS210 + column = Column(name="some1") + table = Table(name="abc.another1", instance="proto1://url1") + + empty_hwm = hwm_class(column=column, source=table) + + assert not empty_hwm.covers(value) + assert not empty_hwm.covers(value - delta) + assert not empty_hwm.covers(value + delta) + + hwm = hwm_class(column=column, source=table, value=value) + + assert hwm.covers(value) + assert hwm.covers(value - delta) + assert not hwm.covers(value + delta) + + @pytest.mark.parametrize( "hwm_class, value", [