Skip to content

Commit

Permalink
[CDF-23684] Fix update kafka or mqtt source in hosted extractor. (#2096)
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino authored Jan 21, 2025
1 parent 73fe764 commit 39657e2
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 12 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ Changes are grouped as follows
### Added
- Support for the `/simulators` and `/simulators/integration` API endpoints.

## [7.72.2] - 2025-01-20
### Fixed
- Updating a Kafka or MQTT source with a write object in `mode="replace"` no longer raises a `CogniteAPIError` with
422 status code.

## [7.72.1] - 2025-01-14
### Fixed
- Data Modeling container type Enum now dumps correctly and no longer camelCases the user-defined values.
Expand Down
4 changes: 3 additions & 1 deletion cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,8 +1277,10 @@ def _clear_all_attributes(update_attributes: list[PropertySpec]) -> dict[str, di
for prop in update_attributes:
if prop.is_beta:
continue
elif prop.is_explicit_nullable_object:
clear_with: dict = {"setNull": True}
elif prop.is_object:
clear_with: dict = {"set": {}}
clear_with = {"set": {}}
elif prop.is_list:
clear_with = {"set": []}
elif prop.is_nullable:
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations

__version__ = "7.72.1"
__version__ = "7.72.2"

__api_subversion__ = "20230101"
2 changes: 2 additions & 0 deletions cognite/client/data_classes/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ class PropertySpec:
is_nullable: bool = True
# Used to skip replace when the value is None
is_beta: bool = False
# Objects that are nullable and support setNull. This is hosted extractor mqtt/kafka sources
is_explicit_nullable_object: bool = False

def __post_init__(self) -> None:
assert not (self.is_list and self.is_object), "PropertySpec cannot be both list and object"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ def target_data_set_id(self) -> DestinationUpdate._TargetDataSetIdUpdate:

@classmethod
def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]:
return [PropertySpec("credentials", is_nullable=True), PropertySpec("target_data_set_id", is_nullable=True)]
return [
PropertySpec("credentials", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
PropertySpec("target_data_set_id", is_nullable=True),
]


class DestinationWriteList(CogniteResourceList[DestinationWrite], ExternalIDTransformerMixin):
Expand Down
12 changes: 6 additions & 6 deletions cognite/client/data_classes/hosted_extractors/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ def _get_update_properties(cls, item: CogniteResource | None = None) -> list[Pro
return [
PropertySpec("host", is_nullable=False),
PropertySpec("port", is_nullable=True),
PropertySpec("authentication", is_nullable=True, is_object=True),
PropertySpec("authentication", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
PropertySpec("use_tls", is_nullable=False),
PropertySpec("ca_certificate", is_nullable=True, is_object=True),
PropertySpec("auth_certificate", is_nullable=True, is_object=True),
PropertySpec("ca_certificate", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
PropertySpec("auth_certificate", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
]


Expand Down Expand Up @@ -662,10 +662,10 @@ def auth_certificate(self) -> _AuthCertificateUpdate:
def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]:
return [
PropertySpec("bootstrap_brokers", is_nullable=False),
PropertySpec("authentication", is_nullable=True, is_object=True),
PropertySpec("authentication", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
PropertySpec("use_tls", is_nullable=False),
PropertySpec("ca_certificate", is_nullable=True, is_object=True),
PropertySpec("auth_certificate", is_nullable=True, is_object=True),
PropertySpec("ca_certificate", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
PropertySpec("auth_certificate", is_nullable=True, is_object=True, is_explicit_nullable_object=True),
]


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.72.1"
version = "7.72.2"

description = "Cognite Python SDK"
readme = "README.md"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
EventHubSource,
EventHubSourceUpdate,
EventHubSourceWrite,
MQTT5SourceWrite,
Source,
SourceList,
)
from cognite.client.exceptions import CogniteAPIError
Expand Down Expand Up @@ -60,6 +62,32 @@ def test_create_update_retrieve_delete(self, cognite_client: CogniteClient) -> N
if created:
cognite_client.hosted_extractors.sources.delete(created.external_id, ignore_unknown_ids=True)

def test_create_update_replace_retrieve(self, cognite_client: CogniteClient) -> None:
original = MQTT5SourceWrite(
external_id=f"myMqttSource-{random_string(10)}",
host="mqtt.hsl.fi",
port=1883,
)

created: Source | None = None
try:
created = cognite_client.hosted_extractors.sources.create(original)

update = MQTT5SourceWrite(original.external_id, host="mqtt.hsl.fi", port=1884)

updated = cognite_client.hosted_extractors.sources.update(update, mode="replace")

assert updated.port == 1884

retrieved = cognite_client.hosted_extractors.sources.retrieve(created.external_id)

assert retrieved is not None
assert retrieved.external_id == created.external_id
assert retrieved.port == 1884
finally:
if created:
cognite_client.hosted_extractors.sources.delete(created.external_id, ignore_unknown_ids=True)

@pytest.mark.usefixtures("one_event_hub_source")
def test_list(self, cognite_client: CogniteClient) -> None:
res = cognite_client.hosted_extractors.sources.list(limit=1)
Expand Down
23 changes: 21 additions & 2 deletions tests/tests_unit/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CogniteUpdate,
PropertySpec,
)
from cognite.client.data_classes.hosted_extractors import MQTT5SourceUpdate, MQTT5SourceWrite
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError
from cognite.client.utils._identifier import Identifier, IdentifierSequence
from tests.utils import jsgz_load, set_request_limit
Expand Down Expand Up @@ -1418,10 +1419,11 @@ def test_get_response_content_safe(self, content, expected):
assert APIClient._get_response_content_safe(res) == expected

@pytest.mark.parametrize(
"resource, mode, expected_update_object",
"resource, update_obj, mode, expected_update_object",
[
pytest.param(
TimeSeries(id=42, name="bla", metadata={"myNew": "metadataValue"}),
TimeSeriesUpdate,
"replace_ignore_null",
{
"name": {"set": "bla"},
Expand All @@ -1432,6 +1434,7 @@ def test_get_response_content_safe(self, content, expected):
pytest.param(
# is_string is ignored as it cannot be updated.
TimeSeries(id=42, name="bla", is_string=False, metadata={"myNew": "metadataValue"}),
TimeSeriesUpdate,
"patch",
{
"name": {"set": "bla"},
Expand All @@ -1441,6 +1444,7 @@ def test_get_response_content_safe(self, content, expected):
),
pytest.param(
TimeSeries(id=42, name="bla"),
TimeSeriesUpdate,
"replace",
{
"assetId": {"setNull": True},
Expand All @@ -1454,15 +1458,30 @@ def test_get_response_content_safe(self, content, expected):
},
id="replace",
),
pytest.param(
MQTT5SourceWrite(external_id="my-source-mqtt", host="mqtt.hsl.fi", port=1883),
MQTT5SourceUpdate,
"replace",
{
"host": {"set": "mqtt.hsl.fi"},
"port": {"set": 1883},
"useTls": {"set": False},
"authentication": {"setNull": True},
"caCertificate": {"setNull": True},
"authCertificate": {"setNull": True},
},
id="replace with setNull key",
),
],
)
def test_convert_resource_to_patch_object(
self,
resource: CogniteResource,
update_obj: type[CogniteUpdate],
mode: Literal["replace_ignore_null", "patch", "replace"],
expected_update_object: dict[str, dict[str, dict]],
):
update_attributes = TimeSeriesUpdate._get_update_properties()
update_attributes = update_obj._get_update_properties()
actual = APIClient._convert_resource_to_patch_object(resource, update_attributes, mode)
assert actual["update"] == expected_update_object

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from cognite.client.data_classes.hosted_extractors import DestinationUpdate


class TestDestinations:
def test_dump_update_obj(self) -> None:
obj = DestinationUpdate(external_id="my-destination").target_data_set_id.set(123).credentials.set(None)
assert obj.dump(camel_case=True) == {
"externalId": "my-destination",
"update": {"targetDataSetId": {"set": 123}, "credentials": {"setNull": True}},
}

0 comments on commit 39657e2

Please sign in to comment.