Skip to content

Commit

Permalink
22329: Transactional support in direct client, MINOR
Browse files Browse the repository at this point in the history
Adds runtime options to the direct client.  If you create a trainee as

    Trainee(persistence='always', runtime={'transactional': True})

then every operation will append a log entry to the .caml file.  This is
faster per operation, and all of the data continues to be recorded on
disk, but the resulting files will be larger than always-persist mode
without the transactional option.
  • Loading branch information
dmaze committed Dec 7, 2024
1 parent 4c28c9a commit e87458e
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 70 deletions.
4 changes: 2 additions & 2 deletions howso/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def get_api(engine_path: t.Optional[Path | str] = None) -> EngineApi:
if result[0] == 1 and isinstance(result[1], dict):
return EngineApi(result[1]["payload"])
raise ValueError("Invalid response")
except Exception:
raise HowsoError('Failed to retrieve the Howso Engine API schema.')
except Exception as e:
raise HowsoError('Failed to retrieve the Howso Engine API schema.') from e
finally:
amlg.destroy_entity(entity_id)
del amlg
2 changes: 0 additions & 2 deletions howso/client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ def trainee(self, trainee_builder):
trainee = trainee_builder.create(features=features, overwrite_trainee=True)
try:
yield trainee
except Exception:
raise
finally:
trainee_builder.delete(trainee)

Expand Down
111 changes: 87 additions & 24 deletions howso/direct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
TraineeVersion,
)
from howso.client.typing import LibraryType, Persistence
from howso.direct.schemas import DirectTrainee
from howso.utilities import internals

# Client version
Expand Down Expand Up @@ -407,16 +408,18 @@ def _auto_persist_trainee(self, trainee_id: str):
trainee_id : str
The ID of the Trainee to persist.
"""
try:
trainee = self.trainee_cache.get(trainee_id)
if trainee.persistence == 'always':
self.amlg.store_entity(
handle=trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id)
)
except KeyError:
# Trainee not cached, ignore
pass
trainee = self.trainee_cache.get(trainee_id)
if trainee is None:
return
if trainee.persistence != 'always':
return
if getattr(trainee, 'transactional', False):
return

self.amlg.store_entity(
handle=trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id)
)

def _store_session(self, trainee_id: str, session: Session):
"""Store session details in a Trainee."""
Expand All @@ -442,6 +445,20 @@ def _initialize_trainee(self, trainee_id: str):
# If tracing is enabled, log the trainee version
self.execute(trainee_id, "get_trainee_version", {})

def _initialize_transactional_trainee(self, trainee_id: str):
# Create a temporary trainee and initialize it in the normal way, then clone it with transactional mode on.
tmp_id = str(uuid.uuid4())
self._initialize_trainee(tmp_id)
try:
cloned = self.amlg.clone_entity(tmp_id, trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id),
persist=True,
json_file_params='{"transactional":true,"flatten":true}')
if not cloned:
raise HowsoError(f'Failed to initialize the Trainee "{trainee_id}"')
finally:
self.amlg.destroy_entity(handle=tmp_id)

def _get_trainee_from_engine(self, trainee_id: str) -> Trainee:
"""
Retrieve the Howso Engine representation of a Trainee object.
Expand All @@ -468,12 +485,14 @@ def _get_trainee_from_engine(self, trainee_id: str) -> Trainee:
persistence = metadata.get('persistence', 'allow')
trainee_meta = metadata.get('metadata')
trainee_name = metadata.get('name')
transactional = metadata.get('transactional', False)

return Trainee(
return DirectTrainee(
name=trainee_name,
id=trainee_id,
persistence=persistence,
metadata=trainee_meta,
transactional=transactional
)

def _get_trainee_thread_count(self, trainee_id: str) -> int:
Expand Down Expand Up @@ -733,8 +752,16 @@ def create_trainee( # noqa: C901
.. deprecated:: 31.0
Pass via `runtime` instead.
runtime : TraineeRuntime, optional
(Not implemented in this client)
runtime : TraineeDirectRuntimeOptions, optional
Additional backend-specific settings.
* `transactional`: if true, and `persistence='always'`, then write
out an incremental update on each action rather than the entire
state. Generally results in faster operation at the cost of
increased disk utilization.
.. versionchanged:: 32.1
Supports the `transactional` parameter.
Returns
-------
Expand All @@ -750,6 +777,10 @@ def create_trainee( # noqa: C901
if features is None:
features = {}

if runtime is None:
runtime = {}
transactional = runtime.get('transactional', False)

if library_type is not None:
warnings.warn(
'The create trainee parameter `library_type` is deprecated and will be removed in '
Expand Down Expand Up @@ -795,13 +826,17 @@ def create_trainee( # noqa: C901
if self.configuration.verbose:
print('Creating new Trainee')
# Initialize Amalgam entity
self._initialize_trainee(trainee_id)
if transactional:
self._initialize_transactional_trainee(trainee_id)
else:
self._initialize_trainee(trainee_id)

# Store the metadata
trainee_metadata = dict(
name=name,
persistence=persistence,
metadata=metadata
metadata=metadata,
transactional=transactional
)
self.execute(trainee_id, "set_metadata", {"metadata": trainee_metadata})

Expand All @@ -812,11 +847,12 @@ def create_trainee( # noqa: C901
features = internals.postprocess_feature_attributes(features)

# Cache and return the trainee
new_trainee = Trainee(
new_trainee = DirectTrainee(
name=name,
persistence=persistence,
id=trainee_id,
metadata=metadata
metadata=metadata,
transactional=transactional
)
self.trainee_cache.set(new_trainee, feature_attributes=features)
return new_trainee
Expand All @@ -835,7 +871,7 @@ def update_trainee(self, trainee: Mapping | Trainee) -> Trainee:
Trainee
The `Trainee` object that was updated.
"""
instance = Trainee.from_dict(trainee) if isinstance(trainee, Mapping) else trainee
instance = DirectTrainee.from_dict(trainee) if isinstance(trainee, Mapping) else trainee

if not instance.id:
raise ValueError("A Trainee id is required.")
Expand All @@ -848,6 +884,7 @@ def update_trainee(self, trainee: Mapping | Trainee) -> Trainee:
'name': instance.name,
'metadata': instance.metadata,
'persistence': instance.persistence,
'transactional': getattr(instance, 'transactional', False)
}
self.execute(instance.id, "set_metadata", {"metadata": metadata})

Expand Down Expand Up @@ -1151,6 +1188,7 @@ def copy_trainee(
new_trainee_id: t.Optional[str] = None,
*,
library_type: t.Optional[LibraryType] = None,
persistence: t.Optional[Persistence] = None,
resources: t.Optional[Mapping[str, t.Any]] = None,
runtime: t.Optional[TraineeRuntimeOptions] = None
) -> Trainee:
Expand All @@ -1174,18 +1212,30 @@ def copy_trainee(
.. deprecated:: 31.0
Pass via `runtime` instead.
persistence : {"allow", "always", "never"}, optional
The requested persistence state of the Trainee. If not specified,
the new trainee will inherit the value from the original.
.. versionadded:: 32.1
resources : dict, optional
(Not Implemented) Customize the resources provisioned for the
Trainee instance. If not specified, the new trainee will inherit
the value from the original.
.. deprecated:: 31.0
Pass via `runtime` instead.
runtime : TraineeRuntimeOptions, optional
Library type, resource requirements, and other runtime settings
for the new Trainee instance. If not specified, the new trainee
will inherit the values from the original. Not used in this
client implementation.
runtime : TraineeDirectRuntimeOptions, optional
Additional backend-specific settings. If not specified, the new
trainee will inherit the values from the original.
* `transactional`: if true, and `persistence='always'`, then write
out an incremental update on each action rather than the entire
state. Generally results in faster operation at the cost of
increased disk utilization.
.. versionchanged:: 32.1
Supports the `transactional` parameter.
Returns
-------
Expand Down Expand Up @@ -1213,9 +1263,19 @@ def copy_trainee(
'The copy trainee parameter `resources` is deprecated and will be removed in '
'a future release. Please use `runtime` instead.', DeprecationWarning)

transactional = runtime is not None and runtime.get('transactional', False)
if transactional:
persist = True
json_file_params = '{"transactional":true,"flatten":true}'
else:
persist = False
json_file_params = ""

is_cloned = self.amlg.clone_entity(
handle=trainee_id,
clone_handle=new_trainee_id,
persist=persist,
json_file_params=json_file_params
)
if not is_cloned:
raise HowsoError(
Expand All @@ -1225,12 +1285,15 @@ def copy_trainee(

# Create the copy trainee
new_trainee = deepcopy(original_trainee)
new_trainee = DirectTrainee.from_dict(original_trainee.to_dict())
new_trainee.name = new_trainee_name
new_trainee._id = new_trainee_id # type: ignore
new_trainee._transactional = transactional
metadata = {
'name': new_trainee.name,
'metadata': new_trainee.metadata,
'persistence': new_trainee.persistence,
'persistence': persistence or new_trainee.persistence,
'transactional': transactional
}
self.execute(new_trainee_id, "set_metadata", {"metadata": metadata})
# Add new trainee to cache
Expand Down
6 changes: 6 additions & 0 deletions howso/direct/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .trainee import DirectTrainee, TraineeDirectRuntimeOptions

__all__ = [
"DirectTrainee",
"TraineeDirectRuntimeOptions"
]
71 changes: 71 additions & 0 deletions howso/direct/schemas/trainee.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

from collections.abc import Mapping
import typing as t
from uuid import UUID

from typing_extensions import NotRequired, ReadOnly

from ...client.schemas.trainee import Trainee, TraineeDict, TraineeRuntimeOptions
from ...client.typing import Persistence


class DirectTraineeDict(TraineeDict):
"""
Direct-client-specific trainee state.
.. versionadded:: 32.1
"""

transactional: bool


class DirectTrainee(Trainee):
"""
Direct-client-specific internal representation of a trainee.
.. versionadded:: 32.1
"""

attribute_map = dict(Trainee.attribute_map, transactional='transactional')

def __init__(
self,
id: str | UUID,
name: t.Optional[str] = None,
*,
metadata: t.Optional[Mapping] = None,
persistence: Persistence = 'allow',
project_id: t.Optional[str | UUID] = None,
transactional: bool = False
):
"""Initialize the Trainee instance."""
super().__init__(id, name, metadata=metadata, persistence=persistence, project_id=project_id)
self._transactional = transactional

@property
def transactional(self) -> bool:
"""
Whether this trainee is in transactional mode.
Returns
-------
bool
true if this trainee is running in transactional mode
"""
return self._transactional


class TraineeDirectRuntimeOptions(TraineeRuntimeOptions):
"""
Runtime options specific to the direct client.
.. versionadded:: 32.1
"""

transactional: ReadOnly[NotRequired[bool | None]]
"""Use transactional mode when `persistence='always'."""
45 changes: 43 additions & 2 deletions howso/direct/tests/test_standalone.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from pathlib import Path

import pytest

from amalgam.api import Amalgam
from howso.direct import HowsoDirectClient
from howso.direct.schemas.trainee import TraineeDirectRuntimeOptions
from howso.utilities.testing import get_configurationless_test_client


@pytest.fixture
def client():
def client(tmp_path: Path):
"""Direct client instance using latest binaries."""
return get_configurationless_test_client(client_class=HowsoDirectClient,
verbose=True, trace=True)
verbose=True, trace=True, default_persist_path=tmp_path)


def test_direct_client(client: HowsoDirectClient):
Expand All @@ -27,3 +30,41 @@ def test_direct_client(client: HowsoDirectClient):
def test_check_name_valid_for_save(client, filename, truthiness):
"""Ensure that the internal function `check_name_valid_for_save` works."""
assert client.check_name_valid_for_save(filename, clobber=True)[0] == truthiness


def test_persistence_always(client: HowsoDirectClient, tmp_path: Path):
"""Test that persist-always mode creates a file on disk."""
trainee = client.create_trainee(persistence='always')
trainee_path = tmp_path / f"{trainee.id}.caml"
client.set_feature_attributes(trainee.id, {"f": {"type": "nominal"}})
assert trainee_path.exists()


def test_persistence_always_shrinks(client: HowsoDirectClient, tmp_path: Path):
"""Test that persist-always mode rewrites a file to maybe be smaller."""
trainee = client.create_trainee(persistence='always')
trainee_path = tmp_path / f"{trainee.id}.caml"
client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"},
"other_unrelated_feature": {"type": "continuous"}})
old_size = trainee_path.stat().st_size
client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"}})
new_size = trainee_path.stat().st_size
# We've deleted a feature so the file should be smaller
assert new_size < old_size


def test_persistence_always_transactional_grows(client: HowsoDirectClient, tmp_path: Path):
"""Test that transactional mode makes a file larger."""
trainee = client.create_trainee(persistence='always', runtime=TraineeDirectRuntimeOptions(transactional=True))
trainee_path = tmp_path / f"{trainee.id}.caml"
client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"},
"other_unrelated_feature": {"type": "continuous"}})
old_size = trainee_path.stat().st_size
client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"}})
# Transactional mode always makes the file larger
new_size = trainee_path.stat().st_size
assert new_size > old_size
client.persist_trainee(trainee.id)
# But now saving should compact the file
new_new_size = trainee_path.stat().st_size
assert new_new_size < old_size
Loading

0 comments on commit e87458e

Please sign in to comment.