diff --git a/README.rst b/README.rst index 4bc5df42..efac55bd 100644 --- a/README.rst +++ b/README.rst @@ -89,8 +89,8 @@ Watch a key client.watch('/nodes/n1') #equivalent to client.read('/nodes/n1', watch = True) client.watch('/nodes/n1', index = 10) -Get a lock -~~~~~~~~~~ +Locking module +~~~~~~~~~~~~~~ .. code:: python @@ -115,6 +115,23 @@ Get a lock lock.renew(60) lock.is_locked() # False + +Leader Election module +~~~~~~~~~~~~~~~~~~~~~~ + +.. code:: python + + # Set a leader object with a name; if no name is given, the local hostname + # is used. + # Zero or no ttl means the leader object is persistent. + client = etcd.Client() + client.election.set('/mysql', name='foo.example.com', ttl=120) # returns the etcd index + + # Get the name + print(client.election.get('/mysql')) # 'foo.example.com' + # Delete it! + print(client.election.delete('/mysql', name='foo.example.com')) + Get machines in the cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs-source/index.rst b/docs-source/index.rst index 61a691ba..0411c69a 100644 --- a/docs-source/index.rst +++ b/docs-source/index.rst @@ -116,6 +116,23 @@ Use lock primitives lock.renew(60) lock.is_locked() # False +Use the leader election primitives +.................................. + +.. code-block:: python + + # Set a leader object with a name; if no name is given, the local hostname + # is used. + # Zero or no ttl means the leader object is persistent. + client = etcd.Client() + client.election.set('/mysql', name='foo.example.com', ttl=120) # returns the etcd index + + # Get the name + print(client.election.get('/mysql')) # 'foo.example.com' + # Delete it! + print(client.election.delete('/mysql', name='foo.example.com')) + + Get machines in the cluster ........................... diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py index 3d17d27e..308d3411 100644 --- a/src/etcd/__init__.py +++ b/src/etcd/__init__.py @@ -1,6 +1,7 @@ import collections from .client import Client from .lock import Lock +from .election import LeaderElection class EtcdResult(object): diff --git a/src/etcd/client.py b/src/etcd/client.py index 2baae33b..8ec4510c 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -453,6 +453,10 @@ def ethernal_watch(self, key, index=None): def get_lock(self, *args, **kwargs): return etcd.Lock(self, *args, **kwargs) + @property + def election(self): + return etcd.LeaderElection(self) + def _result_from_response(self, response): """ Creates an EtcdResult from json dictionary """ # TODO: add headers we obtained from the http respose to the etcd diff --git a/src/etcd/election.py b/src/etcd/election.py new file mode 100644 index 00000000..b5876fab --- /dev/null +++ b/src/etcd/election.py @@ -0,0 +1,81 @@ +import etcd +import platform + + +class LeaderElection(object): + + """ + Leader Election class using the etcd module + """ + + def __init__(self, client): + """ + Initialize a leader election object. + + Args: + client (etcd.Client): etcd client to use for the connection + """ + self.client = client + + def get_path(self, key): + if not key.startswith('/'): + key = '/' + key + return '/mod/v2/leader{}'.format(key) + + def set(self, key, name=None, ttl=0): + """ + Initialize a leader election object. + + Args: + key (string): name of the leader key, + + ttl (int): ttl (in seconds) for the lock to live. + + name (string): the name to store as the leader name. Defaults to the + client's hostname + + """ + + name = name or platform.node() + params = {'ttl': ttl, 'name': name} + path = self.get_path(key) + + res = self.client.api_execute(path, self.client._MPUT, params=params) + return res.data.decode('utf-8') + + def get(self, key): + """ + Get the name of a leader object. + + Args: + key (string): name of the leader key, + + Raises: + etcd.EtcdException + + """ + res = self.client.api_execute(self.get_path(key), self.client._MGET) + if not res.data: + raise etcd.EtcdException('Leader path {} not found'.format(key)) + return res.data.decode('utf-8') + + def delete(self, key, name=None): + """ + Delete a leader object. + + Args: + key (string): the leader key, + + name (string): name of the elected leader + + Raises: + etcd.EtcdException + + """ + path = self.get_path(key) + name = name or platform.node() + res = self.client.api_execute( + path, self.client._MDELETE, {'name': name}) + if (res.data.decode('utf-8') == ''): + return True + return False diff --git a/src/etcd/lock.py b/src/etcd/lock.py index 452ef545..adf7ee5a 100644 --- a/src/etcd/lock.py +++ b/src/etcd/lock.py @@ -24,6 +24,8 @@ def __init__(self, client, key, ttl=None, value=None): value (mixed): value to store on the lock. """ self.client = client + if not key.startswith('/'): + key = '/' + key self.key = key self.ttl = ttl or 0 self.value = value diff --git a/src/etcd/tests/integration/test_election.py b/src/etcd/tests/integration/test_election.py new file mode 100644 index 00000000..11e333d1 --- /dev/null +++ b/src/etcd/tests/integration/test_election.py @@ -0,0 +1,35 @@ +import etcd +from . import test_simple +import time +import unittest + +class TestElection(test_simple.EtcdIntegrationTest): + def setUp(self): + self.client = etcd.Client(port=6001) + + def test_set_get_delete(self): + e = self.client.election + res = e.set('/mysql', name='foo.example.com', ttl=30) + self.assertTrue(res != '') + res = e.get('/mysql') + self.assertEquals(res, 'foo.example.com') + self.assertTrue(e.delete('/mysql', name='foo.example.com')) + + + def test_set_invalid_ttl(self): + self.assertRaises(etcd.EtcdException, self.client.election.set, '/mysql', name='foo.example.com', ttl='ciao') + + @unittest.skip + def test_get_non_existing(self): + """This is actually expected to fail. See https://github.com/coreos/etcd/issues/446""" + self.assertRaises(etcd.EtcdException, self.client.election.get, '/foobar') + + def test_delete_non_existing(self): + self.assertRaises(etcd.EtcdException, self.client.election.delete, '/foobar') + + def test_get_delete_after_ttl_expired_raises(self): + e = self.client.election + e.set('/mysql', name='foo', ttl=1) + time.sleep(2) + self.assertRaises(etcd.EtcdException, e.get, '/mysql') + self.assertRaises(etcd.EtcdException, e.delete, '/mysql', name='foo') diff --git a/src/etcd/tests/unit/test_leader.py b/src/etcd/tests/unit/test_leader.py new file mode 100644 index 00000000..0d2b3788 --- /dev/null +++ b/src/etcd/tests/unit/test_leader.py @@ -0,0 +1,48 @@ +import etcd +import unittest + +from .test_request import TestClientApiBase + +try: + import mock +except ImportError: + from unittest import mock + +class EtcdLeaderElectionTestCase(TestClientApiBase): + def setUp(self): + self.client = etcd.Client() + + def _mock_api(self, status, d): + #We want to test at a lower level here. + resp = self._prepare_response(status, d) + self.client.http.request_encode_body = mock.create_autospec( + self.client.http.request_encode_body, return_value=resp + ) + self.client.http.request = mock.create_autospec( + self.client.http.request, return_value=resp + ) + + + def test_get_leader(self): + """ Can fetch a leader value """ + self._mock_api(200, 'foo.example.com') + self.assertEquals(self.client.election.get('/mysql'), 'foo.example.com') + self._mock_api(200,'') + self.assertRaises(etcd.EtcdException, self.client.election.get, '/mysql') + + def test_set_leader(self): + """ Can set a leader value """ + self._mock_api(200, u'234') + #set w/o a TTL or a name + self.assertEquals(self.client.election.set('/mysql'), u'234') + self.assertEquals(self.client.election.set( + '/mysql', + name='foo.example.com', + ttl=60), u'234') + self._mock_api(500, 'leader name required') + self.assertRaises(etcd.EtcdException, self.client.election.set,'/mysql') + + def test_del_leader(self): + """ Can remove a leader value """ + self._mock_api(200,'') + self.assertTrue(self.client.election.delete('/mysql'))