From 3b973ac41e448ec5d6f1ccd849436b02b1327336 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 11 Jan 2016 14:37:12 -0500 Subject: [PATCH 1/3] Start of prototype. --- flocker/node/agents/localfs.py | 44 +++++++ flocker/node/agents/remotefs.py | 221 ++++++++++++++++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 flocker/node/agents/localfs.py create mode 100644 flocker/node/agents/remotefs.py diff --git a/flocker/node/agents/localfs.py b/flocker/node/agents/localfs.py new file mode 100644 index 0000000000..83ad889332 --- /dev/null +++ b/flocker/node/agents/localfs.py @@ -0,0 +1,44 @@ +""" +Testing implementation of ``IRemoteFilesystemAPI``. +""" + +from uuid import UUID +from subprocess import check_call + +from zope.interface import implementer + +from .remotefs import IRemoteFilesystemAPI, RemoteFilesystem + + +@implementer(IRemoteFilesystemAPI) +class LocalFilesystem(object): + """ + Local filesystem pretending to be remote filesystem. + + Can't be used for movement across nodes, but otherwise useful for testing. + """ + def __init__(self, root): + if not root.exists(): + root.makedirs() + self.root = root() + + def list(self): + return [RemoteFilesystem(dataset_id=UUID(child)) + for child in self.root.listdir()] + + def _child(self, dataset_id): + return self.root.child(unicode(dataset_id)) + + def create(self, dataset_id, metadata): + self._child(dataset_id).mkdir() + return RemoteFilesystem(dataset_id=dataset_id) + + def destroy(self, dataset_id): + self._child(dataset_id).remove() + + def mount(self, dataset_id, path): + check_call([b"mount", b"--bind", self._child(dataset_id).path, + path.path]) + + def umount(self, dataset_id, path): + check_call([b"umount", path.path]) diff --git a/flocker/node/agents/remotefs.py b/flocker/node/agents/remotefs.py new file mode 100644 index 0000000000..e250b34316 --- /dev/null +++ b/flocker/node/agents/remotefs.py @@ -0,0 +1,221 @@ +""" +Remote Filesystem deployer. +""" + +from uuid import UUID + +from pyrsistent import PClass, field, pmap_field + +from zope.interface import Interface, implementer, provider + +from eliot import ActionType + +from twisted.python.filepath import FilePath +from twisted.internet.defer import succeed, fail +from twisted.python.constants import Names, NamedConstant + +from .. import IStateChange + +# XXX infrastructure that should probably be in shared module, not in +# blockdevice: +from .blockdevice import ( + IDatasetStateChangeFactory, ICalculator, PollUntilAttached, TransitionTable, + DoNothing, +) + +from ...common.algebraic import TaggedUnionInvariant + + +class RemoteFilesystem(PClass): + dataset_id = field(type=UUID) + + +class IRemoteFilesystemAPI(Interface): + def list(): + """ + :return: List of ``RemoteFilesystem``. + """ + + def create(dataset_id, metadata): + """ + Metadata can be used to pass custom options to creation. + """ + + def destroy(dataset_id): + pass + + def mount(dataset_id, path): + pass + + def unmount(dataset_id, path): + pass + + +class DatasetStates(Names): + """ + States that a ``Dataset`` can be in. + + """ + # Doesn't exist yet. + NON_EXISTENT = NamedConstant() + # Exists, but attached elsewhere + ATTACHED_ELSEWHERE = NamedConstant() + # Exists, but not attached + NON_MANIFEST = NamedConstant() + # Mounted on this node + MOUNTED = NamedConstant() + # Deleted from the driver + DELETED = NamedConstant() + + +class DiscoveredDataset(PClass): + """ + Dataset as discovered by deployer. + + :ivar DatasetStates state: The state this dataset was determined to be in. + :ivar FilePath mount_point: The absolute path to the location on the node + where the dataset will be mounted. + """ + state = field( + invariant=lambda state: (state in DatasetStates.iterconstants(), + "Not a valid state"), + mandatory=True, + ) + dataset_id = field(type=UUID, mandatory=True) + mount_point = field(FilePath) + + __invariant__ = TaggedUnionInvariant( + tag_attribute='state', + attributes_for_tag={ + DatasetStates.ATTACHED_ELSEWHERE: set(), + DatasetStates.NON_MANIFEST: set(), + DatasetStates.MOUNTED: {'mount_point'}, + }, + ) + + +class DesiredDataset(PClass): + """ + Dataset as requested by configuration and applications. + """ + state = field( + invariant=lambda state: (state in DatasetStates.iterconstants(), + "Not a valid state"), + mandatory=True, + ) + dataset_id = field(type=UUID, mandatory=True) + metadata = pmap_field( + key_type=unicode, + value_type=unicode, + ) + mount_point = field(FilePath) + + __invariant__ = TaggedUnionInvariant( + tag_attribute='state', + attributes_for_tag={ + DatasetStates.NON_MANIFEST: {"metadata"}, + DatasetStates.MOUNTED: {"mount_point", "metadata"}, + DatasetStates.DELETED: set(), + }, + ) + + +API_CHANGE = ActionType(u"remotefs:deployer:action", [], []) + + +@provider(IDatasetStateChangeFactory) +@implementer(IStateChange) +class _APICommon(PClass): + @property + def eliot_action(self): + return API_CHANGE() + + def run(self, deployer): + try: + self._run(deployer.api) + return succeed(None) + except: + return fail() + + +class Create(_APICommon): + dataset_id = field() + metadata = field() + + @classmethod + def from_state_and_config(cls, discovered_dataset, desired_dataset): + return cls( + dataset_id=desired_dataset.dataset_id, + metadata=desired_dataset.metadata, + ) + + def _run(self, api): + api.create(self.dataset_id, self.metadata) + + +class Destroy(_APICommon): + dataset_id = field() + + @classmethod + def from_state_and_config(cls, discovered_dataset, desired_dataset): + return cls( + dataset_id=desired_dataset.dataset_id, + metadata=desired_dataset.metadata, + ) + + def _run(self, api): + api.destroy(self.dataset_id) + + +class Mount(_APICommon): + dataset_id = field() + mount_point = field() + + @classmethod + def from_state_and_config(cls, discovered_dataset, desired_dataset): + return cls( + dataset_id=desired_dataset.dataset_id, + mount_point=desired_dataset.mount_point, + ) + + def _run(self, api): + api.mount(self.dataset_id, self.mount_point) + + +class Unmount(_APICommon): + dataset_id = field() + mount_point = field() + + @classmethod + def from_state_and_config(cls, discovered_dataset, desired_dataset): + return cls( + dataset_id=desired_dataset.dataset_id, + mount_point=discovered_dataset.mount_point, + ) + + def _run(self, api): + api.unmount(self.dataset_id, self.mount_point) + + +Desired = Discovered = DatasetStates +DATASET_TRANSITIONS = TransitionTable.create({ + Desired.MOUNTED: { + Discovered.NON_EXISTENT: Create, + Discovered.ATTACHED_ELSEWHERE: PollUntilAttached, + Discovered.NON_MANIFEST: Mount, + Discovered.MOUNTED: DoNothing, + }, + Desired.NON_MANIFEST: { + Discovered.NON_EXISTENT: Create, + Discovered.ATTACHED_ELSEWHERE: DoNothing, + Discovered.NON_MANIFEST: DoNothing, + Discovered.MOUNTED: Unmount, + }, + Desired.DELETED: { + Discovered.NON_EXISTENT: DoNothing, + Discovered.ATTACHED_ELSEWHERE: DoNothing, + Discovered.NON_MANIFEST: Destroy, + Discovered.MOUNTED: Unmount, + }, +}) +del Desired, Discovered From 0a84c3e835459e1ca121ff5023b02d944e5473ce Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 11 Jan 2016 14:47:56 -0500 Subject: [PATCH 2/3] More sketching. --- flocker/node/agents/blockdevice.py | 5 +++-- flocker/node/agents/remotefs.py | 30 ++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/flocker/node/agents/blockdevice.py b/flocker/node/agents/blockdevice.py index 1a7ee3fcfc..5a3756242c 100644 --- a/flocker/node/agents/blockdevice.py +++ b/flocker/node/agents/blockdevice.py @@ -1445,6 +1445,7 @@ class BlockDeviceCalculator(PClass): transitions = field(TransitionTable, mandatory=True, factory=TransitionTable.create, initial=DATASET_TRANSITIONS) + dataset_states = field(initial=DatasetStates) def _calculate_dataset_change(self, discovered_dataset, desired_dataset): """ @@ -1460,12 +1461,12 @@ def _calculate_dataset_change(self, discovered_dataset, desired_dataset): # we detach it. desired_state = (desired_dataset.state if desired_dataset is not None - else DatasetStates.NON_MANIFEST) + else self.dataset_states.NON_MANIFEST) # If we haven't discovered a dataset, then it is doesn't # exist. discovered_state = (discovered_dataset.state if discovered_dataset is not None - else DatasetStates.NON_EXISTENT) + else self.dataset_states.NON_EXISTENT) if desired_state != discovered_state: transition = self.transitions[desired_state][discovered_state] return transition.from_state_and_config( diff --git a/flocker/node/agents/remotefs.py b/flocker/node/agents/remotefs.py index e250b34316..52949b9f01 100644 --- a/flocker/node/agents/remotefs.py +++ b/flocker/node/agents/remotefs.py @@ -19,15 +19,21 @@ # XXX infrastructure that should probably be in shared module, not in # blockdevice: from .blockdevice import ( - IDatasetStateChangeFactory, ICalculator, PollUntilAttached, TransitionTable, - DoNothing, + IDatasetStateChangeFactory, PollUntilAttached, + TransitionTable, DoNothing, BlockDeviceCalculator, ) from ...common.algebraic import TaggedUnionInvariant class RemoteFilesystem(PClass): + """ + In general case probably can't tell if filesystem is already mounted + elsewhere or not... so omit that information. + """ dataset_id = field(type=UUID) + # If None, not mounted locally: + mount_point = field(type=(None.__class__, FilePath)) class IRemoteFilesystemAPI(Interface): @@ -45,6 +51,10 @@ def destroy(dataset_id): pass def mount(dataset_id, path): + """ + Unlike IBlockDeviceAPI we assume mounting *must* happen on local + machine, so need for compute instance ID or anything of the sort. + """ pass def unmount(dataset_id, path): @@ -54,14 +64,11 @@ def unmount(dataset_id, path): class DatasetStates(Names): """ States that a ``Dataset`` can be in. - """ # Doesn't exist yet. NON_EXISTENT = NamedConstant() - # Exists, but attached elsewhere - ATTACHED_ELSEWHERE = NamedConstant() - # Exists, but not attached - NON_MANIFEST = NamedConstant() + # Exists, but not mounted on this machine: + NOT_MOUNTED = NamedConstant() # Mounted on this node MOUNTED = NamedConstant() # Deleted from the driver @@ -87,8 +94,7 @@ class DiscoveredDataset(PClass): __invariant__ = TaggedUnionInvariant( tag_attribute='state', attributes_for_tag={ - DatasetStates.ATTACHED_ELSEWHERE: set(), - DatasetStates.NON_MANIFEST: set(), + DatasetStates.NOT_MOUNTED: set(), DatasetStates.MOUNTED: {'mount_point'}, }, ) @@ -113,7 +119,7 @@ class DesiredDataset(PClass): __invariant__ = TaggedUnionInvariant( tag_attribute='state', attributes_for_tag={ - DatasetStates.NON_MANIFEST: {"metadata"}, + DatasetStates.NOT_MOUNTED: {"metadata"}, DatasetStates.MOUNTED: {"mount_point", "metadata"}, DatasetStates.DELETED: set(), }, @@ -219,3 +225,7 @@ def _run(self, api): }, }) del Desired, Discovered + +# Nothing particularly BlockDevice-specific about the class: +CALCULATOR = BlockDeviceCalculator( + transitions=DATASET_TRANSITIONS, dataset_states=DatasetStates) From 785d5234e4eb37f35678020ee987e135d3c1c9f4 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 13 Jan 2016 15:08:59 -0500 Subject: [PATCH 3/3] Finish up deployer sketch. --- flocker/node/agents/localfs.py | 23 ++++- flocker/node/agents/remotefs.py | 175 ++++++++++++++++++++++++++++++-- 2 files changed, 184 insertions(+), 14 deletions(-) diff --git a/flocker/node/agents/localfs.py b/flocker/node/agents/localfs.py index 83ad889332..79f53f2348 100644 --- a/flocker/node/agents/localfs.py +++ b/flocker/node/agents/localfs.py @@ -1,5 +1,5 @@ """ -Testing implementation of ``IRemoteFilesystemAPI``. +Localhost implementation of ``IRemoteFilesystemAPI``, for testing. """ from uuid import UUID @@ -7,6 +7,8 @@ from zope.interface import implementer +from twisted.python.filepath import FilePath + from .remotefs import IRemoteFilesystemAPI, RemoteFilesystem @@ -23,15 +25,30 @@ def __init__(self, root): self.root = root() def list(self): - return [RemoteFilesystem(dataset_id=UUID(child)) + return [self._to_fs(UUID(child)) for child in self.root.listdir()] def _child(self, dataset_id): return self.root.child(unicode(dataset_id)) + def _to_fs(self, dataset_id): + storage_path = self._child(dataset_id) + local_mount_point = None + for line in FilePath( + b"/proc/self/mountinfo").getContent().splitlines(): + # let's pretend there's no spaces in paths... + parts = line.split() + origin = parts[3] + destination = parts[4] + if origin == storage_path.path: + local_mount_point = FilePath(destination) + break + return RemoteFilesystem(dataset_id=dataset_id, + local_mount_point=local_mount_point) + def create(self, dataset_id, metadata): self._child(dataset_id).mkdir() - return RemoteFilesystem(dataset_id=dataset_id) + return self._to_fs(dataset_id) def destroy(self, dataset_id): self._child(dataset_id).remove() diff --git a/flocker/node/agents/remotefs.py b/flocker/node/agents/remotefs.py index 52949b9f01..f4aa12d13b 100644 --- a/flocker/node/agents/remotefs.py +++ b/flocker/node/agents/remotefs.py @@ -14,16 +14,19 @@ from twisted.internet.defer import succeed, fail from twisted.python.constants import Names, NamedConstant -from .. import IStateChange +from .. import IStateChange, IDeployer # XXX infrastructure that should probably be in shared module, not in # blockdevice: from .blockdevice import ( - IDatasetStateChangeFactory, PollUntilAttached, + IDatasetStateChangeFactory, TransitionTable, DoNothing, BlockDeviceCalculator, ) from ...common.algebraic import TaggedUnionInvariant +from ...control import ( + Dataset, Manifestation, NodeState, NonManifestDatasets, ILocalState, +) class RemoteFilesystem(PClass): @@ -32,11 +35,24 @@ class RemoteFilesystem(PClass): elsewhere or not... so omit that information. """ dataset_id = field(type=UUID) - # If None, not mounted locally: - mount_point = field(type=(None.__class__, FilePath)) + # If None, not mounted locally (though might be mounted remotely): + local_mount_point = field(type=(None.__class__, FilePath)) class IRemoteFilesystemAPI(Interface): + """ + Some assumptions: + + 1. In the general case the remote filesystem server will not be able to + know what volumes are attached where. So we basically have to have all + nodes doing discovery of state as pertains themselves and others. + + 2. Mount/unmount can only be run locally. + + 3. Local mount points can only really be easily known by the backend + implementation. Otherwise we have to do some quite difficult or + perhaps impossible attempt to parse e.g. the output of ``mount``. + """ def list(): """ :return: List of ``RemoteFilesystem``. @@ -207,20 +223,17 @@ def _run(self, api): DATASET_TRANSITIONS = TransitionTable.create({ Desired.MOUNTED: { Discovered.NON_EXISTENT: Create, - Discovered.ATTACHED_ELSEWHERE: PollUntilAttached, - Discovered.NON_MANIFEST: Mount, + Discovered.NOT_MOUNTED: Mount, Discovered.MOUNTED: DoNothing, }, - Desired.NON_MANIFEST: { + Desired.NOT_MOUNTED: { Discovered.NON_EXISTENT: Create, - Discovered.ATTACHED_ELSEWHERE: DoNothing, - Discovered.NON_MANIFEST: DoNothing, + Discovered.NOT_MOUNTED: DoNothing, Discovered.MOUNTED: Unmount, }, Desired.DELETED: { Discovered.NON_EXISTENT: DoNothing, - Discovered.ATTACHED_ELSEWHERE: DoNothing, - Discovered.NON_MANIFEST: Destroy, + Discovered.NOT_MOUNTED: Destroy, Discovered.MOUNTED: Unmount, }, }) @@ -229,3 +242,143 @@ def _run(self, api): # Nothing particularly BlockDevice-specific about the class: CALCULATOR = BlockDeviceCalculator( transitions=DATASET_TRANSITIONS, dataset_states=DatasetStates) + + +@implementer(ILocalState) +class LocalState(PClass): + hostname = field(type=unicode, mandatory=True) + node_uuid = field(type=UUID, mandatory=True) + datasets = pmap_field(UUID, DiscoveredDataset) + + def shared_state_changes(self): + """ + Returns the NodeState and the NonManifestDatasets of the local state. + These are the only parts of the state that need to be sent to the + control service. + """ + # XXX The structure of the shared state changes reflects the model + # currently used by the control service. However, that model doesn't + # seem to actually match what any consumer wants. + manifestations = {} + paths = {} + nonmanifest_datasets = {} + + for dataset in self.datasets.values(): + dataset_id = dataset.dataset_id + if dataset.state == DatasetStates.MOUNTED: + manifestations[unicode(dataset_id)] = Manifestation( + dataset=Dataset( + dataset_id=dataset_id, + maximum_size=None, + ), + primary=True, + ) + paths[unicode(dataset_id)] = dataset.mount_point + elif dataset.state == DatasetStates.NOT_MOUNTED: + # XXX this is a problem; if it's mounted somewhere else + # we'll stomp on each other... + nonmanifest_datasets[unicode(dataset_id)] = Dataset( + dataset_id=dataset_id, + maximum_size=None, + ) + + return ( + NodeState( + uuid=self.node_uuid, + hostname=self.hostname, + manifestations=manifestations, + paths=paths, + devices={}, + applications=None, + ), + NonManifestDatasets( + datasets=nonmanifest_datasets + ), + ) + + +@implementer(IDeployer) +class RemoteFilesystemDeployer(PClass): + """ + A lot of code can probably be shared with BlockDeviceDeployer. + """ + hostname = field(type=unicode, mandatory=True) + node_uuid = field(type=UUID, mandatory=True) + api = field(mandatory=True) + mountroot = field(type=FilePath, initial=FilePath(b"/flocker")) + + def discover_state(self, node_state): + datasets = {} + + for remotefs in self.api.list(): + if remotefs.local_mount_point is None: + datasets[remotefs.dataset_id] = DiscoveredDataset( + state=DatasetStates.NOT_MOUNTED) + else: + datasets[remotefs.dataset_id] = DiscoveredDataset( + state=DatasetStates.MOUNTED, + mount_point=remotefs.local_mount_point) + + local_state = LocalState( + node_uuid=self.node_uuid, + hostname=self.hostname, + datasets=datasets, + ) + + return succeed(local_state) + + def _calculate_desired_for_manifestation(self, manifestation): + """ + Get the ``DesiredDataset`` corresponding to a given manifestation. + + :param Manifestation manifestation: The + + :return: The ``DesiredDataset`` corresponding to the given + manifestation. + """ + dataset_id = UUID(manifestation.dataset.dataset_id) + if manifestation.dataset.deleted: + return DesiredDataset( + state=DatasetStates.DELETED, + dataset_id=dataset_id, + ) + else: + return DesiredDataset( + state=DatasetStates.MOUNTED, + dataset_id=dataset_id, + metadata=manifestation.dataset.metadata, + mount_point=self._mountpath_for_dataset_id( + unicode(dataset_id) + ), + ) + + def _calculate_desired_state( + self, configuration, local_applications, local_datasets + ): + # XXX not bothering with NotInUse filtering + this_node_config = configuration.get_node( + self.node_uuid, hostname=self.hostname) + + return { + UUID(manifestation.dataset.dataset_id): + self._calculate_desired_for_manifestation( + manifestation + ) + for manifestation in this_node_config.manifestations.values() + } + + def calculate_changes(self, configuration, cluster_state, local_state): + # XXX duplicates BlockDeviceDeployer.calculate_changes + local_node_state = cluster_state.get_node(self.node_uuid, + hostname=self.hostname) + + desired_datasets = self._calculate_desired_state( + configuration=configuration, + local_applications=local_node_state.applications, + local_datasets=local_state.datasets, + ) + + return self.calculator.calculate_changes_for_datasets( + discovered_datasets=local_state.datasets, + desired_datasets=desired_datasets, + )