Skip to content

Commit

Permalink
[DOP-2330] - improve documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Jan 9, 2024
1 parent 9fa9517 commit 91e9614
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/hwm/column/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Column HWM
datetime_hwm

What is Column HWM?
-------------
-------------------

Sometimes it's necessary to read only changed rows from a table.

Expand Down
2 changes: 1 addition & 1 deletion docs/hwm/file/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ File HWM
file_list_hwm

What is File HWM?
-------------
-----------------

Sometimes it's necessary to read/download only new files from a source folder.

Expand Down
12 changes: 6 additions & 6 deletions docs/hwm/key_value/index.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. _key_value_hwm_classes:

KeyValue HWM
========
============

.. toctree::
:maxdepth: 2
Expand All @@ -11,16 +11,16 @@ KeyValue HWM
key_value_int_hwm

What is KeyValue HWM?
----------------------
---------------------

The KeyValue High Water Mark (HWM) is a specialized class designed to manage and track incremental data changes in systems where data is stored or represented as key-value pairs, such as in message queues like Kafka.

Use Case
----------------------
--------

The ``KeyValueHWM`` class is particularly beneficial in scenarios where there is a need to `incrementally <https://onetl.readthedocs.io/en/0.10.0/strategy/incremental_strategy.html>`_ upload data in an ETL process.
The ``KeyValueHWM`` class is particularly beneficial in scenarios where there is a need to `incrementally <https://onetl.readthedocs.io/en/stable/strategy/incremental_strategy.html>`_ upload data in an ETL process.

For instance, in typical ETL processes using `Spark with Kafka <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_, data re-written entirely from all partitions in topics starting from **zero** offset. This approach can be inefficient, time-consuming and create duplicates in target. By leveraging the ``KeyValueIntHWM`` class, it becomes possible to track the last offset of data processed. This enables the ETL process to only write new data increments, significantly reducing the amount of data transferred during each run.
For instance, in typical ETL processes using `Spark with Kafka <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_, data re-written entirely from all partitions in topics starting from **zero** offset. This approach can be inefficient, time-consuming and create duplicates in target. By leveraging the ``KeyValueIntHWM`` class, it becomes possible to track the last offset of data processed. This enables the ETL process to read data appended to topic since previous run instead of reading the entire topic content each time.

Example Usage with Kafka Messages
---------------------------------
Expand Down Expand Up @@ -73,4 +73,4 @@ Restrictions

- **Non-Decreasing Values**: The ``KeyValueHWM`` class is designed to handle only non-decreasing values. During the update process, if the new offset provided for a given partition is less than the current offset, the value will not be updated.

- **Incomplete Key Updates**: If a key is not included in new hwm value, its value remains unchanged. This is essential because keys in systems like Kafka (partitions) cannot be deleted, and their last known
- **Incomplete Key Updates**: If a key is not included in new hwm value, its value remains unchanged. This is essential because keys in systems like Kafka (partitions) cannot be deleted, and their last known position is left intact.
2 changes: 1 addition & 1 deletion docs/hwm/key_value/key_value_int_hwm.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
KeyValue Int HWM
=============
================

.. currentmodule:: etl_entities.hwm.key_value.key_value_int_hwm

Expand Down
6 changes: 4 additions & 2 deletions etl_entities/hwm/key_value/key_value_hwm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import Generic, Optional, TypeVar

from frozendict import frozendict
Expand All @@ -32,7 +34,7 @@ class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel):
HWM unique name
value : ``frozendict[Any, KeyValueHWMValueType]]]`` , default: ``frozendict``
value : ``frozendict[Any, KeyValueHWMValueType]`` , default: ``frozendict``
HWM value
Expand All @@ -59,7 +61,7 @@ class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel):
# is supported only from Python 3.9 onwards.
value: frozendict = Field(default_factory=frozendict)

def update(self, new_data: dict) -> "KeyValueHWM[KeyValueHWMValueType]":
def update(self: KeyValueHWMType, new_data: dict) -> KeyValueHWMType:
"""
Updates the HWM value based on provided new key-value data. This method only updates
the value if the new value is greater than the current valur for a given key
Expand Down
8 changes: 5 additions & 3 deletions etl_entities/hwm/key_value/key_value_int_hwm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from __future__ import annotations

from typing import Any

from frozendict import frozendict
from pydantic import Field, validator

Expand All @@ -31,7 +33,7 @@ class KeyValueIntHWM(KeyValueHWM[int]):
HWM unique name
value : ``frozendict[Any, KeyValueHWMValueType]]]``, default: ``frozendict``
value : ``frozendict[Any, KeyValueHWMValueType]``, default: ``frozendict``
HWM value
Expand Down Expand Up @@ -69,7 +71,7 @@ class KeyValueIntHWM(KeyValueHWM[int]):
)
"""

value: frozendict = Field(default_factory=frozendict)
value: frozendict[Any, int] = Field(default_factory=frozendict)

def serialize(self) -> dict:
"""Return dict representation of HWM
Expand Down Expand Up @@ -110,7 +112,7 @@ def serialize(self) -> dict:
}
return serialized_data # noqa: WPS331

@validator("value", pre=True)
@validator("value", pre=True, allow_reuse=True)
def _validate_int_values(cls, key_value): # noqa: N805
if key_value is None:
return key_value
Expand Down
25 changes: 25 additions & 0 deletions tests/test_old_hwm/test_column_hwm.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,31 @@ def test_column_hwm_compare(hwm_class, value, delta): # noqa: WPS210
assert item2 < item1


@pytest.mark.parametrize( # noqa: WPS210
"hwm_class, value, delta",
[
(DateHWM, date.today(), timedelta(days=2)),
(DateTimeHWM, datetime.now(), timedelta(seconds=2)),
(IntHWM, 1, 2),
],
)
def test_column_hwm_covers(hwm_class, value, delta): # noqa: WPS210
column = Column(name="some1")
table = Table(name="abc.another1", instance="proto1://url1")

empty_hwm = hwm_class(column=column, source=table)

assert not empty_hwm.covers(value)
assert not empty_hwm.covers(value - delta)
assert not empty_hwm.covers(value + delta)

hwm = hwm_class(column=column, source=table, value=value)

assert hwm.covers(value)
assert hwm.covers(value - delta)
assert not hwm.covers(value + delta)


@pytest.mark.parametrize(
"hwm_class, value",
[
Expand Down

0 comments on commit 91e9614

Please sign in to comment.