Skip to content

Commit

Permalink
Merge pull request #23 from jplana/leader-election
Browse files Browse the repository at this point in the history
Leader Election support.
  • Loading branch information
lavagetto committed Jan 13, 2014
2 parents 06c4c4c + 92f369a commit 0997a77
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 2 deletions.
21 changes: 19 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ Watch a key
client.watch('/nodes/n1') #equivalent to client.read('/nodes/n1', watch = True)
client.watch('/nodes/n1', index = 10)
Get a lock
~~~~~~~~~~
Locking module
~~~~~~~~~~~~~~

.. code:: python
Expand All @@ -115,6 +115,23 @@ Get a lock
lock.renew(60)
lock.is_locked() # False
Leader Election module
~~~~~~~~~~~~~~~~~~~~~~

.. code:: python
# Set a leader object with a name; if no name is given, the local hostname
# is used.
# Zero or no ttl means the leader object is persistent.
client = etcd.Client()
client.election.set('/mysql', name='foo.example.com', ttl=120) # returns the etcd index
# Get the name
print(client.election.get('/mysql')) # 'foo.example.com'
# Delete it!
print(client.election.delete('/mysql', name='foo.example.com'))
Get machines in the cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
17 changes: 17 additions & 0 deletions docs-source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ Use lock primitives
lock.renew(60)
lock.is_locked() # False
Use the leader election primitives
..................................

.. code-block:: python
# Set a leader object with a name; if no name is given, the local hostname
# is used.
# Zero or no ttl means the leader object is persistent.
client = etcd.Client()
client.election.set('/mysql', name='foo.example.com', ttl=120) # returns the etcd index
# Get the name
print(client.election.get('/mysql')) # 'foo.example.com'
# Delete it!
print(client.election.delete('/mysql', name='foo.example.com'))
Get machines in the cluster
...........................
Expand Down
1 change: 1 addition & 0 deletions src/etcd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
from .client import Client
from .lock import Lock
from .election import LeaderElection


class EtcdResult(object):
Expand Down
4 changes: 4 additions & 0 deletions src/etcd/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ def ethernal_watch(self, key, index=None):
def get_lock(self, *args, **kwargs):
return etcd.Lock(self, *args, **kwargs)

@property
def election(self):
return etcd.LeaderElection(self)

def _result_from_response(self, response):
""" Creates an EtcdResult from json dictionary """
# TODO: add headers we obtained from the http respose to the etcd
Expand Down
81 changes: 81 additions & 0 deletions src/etcd/election.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import etcd
import platform


class LeaderElection(object):

"""
Leader Election class using the etcd module
"""

def __init__(self, client):
"""
Initialize a leader election object.
Args:
client (etcd.Client): etcd client to use for the connection
"""
self.client = client

def get_path(self, key):
if not key.startswith('/'):
key = '/' + key
return '/mod/v2/leader{}'.format(key)

def set(self, key, name=None, ttl=0):
"""
Initialize a leader election object.
Args:
key (string): name of the leader key,
ttl (int): ttl (in seconds) for the lock to live.
name (string): the name to store as the leader name. Defaults to the
client's hostname
"""

name = name or platform.node()
params = {'ttl': ttl, 'name': name}
path = self.get_path(key)

res = self.client.api_execute(path, self.client._MPUT, params=params)
return res.data.decode('utf-8')

def get(self, key):
"""
Get the name of a leader object.
Args:
key (string): name of the leader key,
Raises:
etcd.EtcdException
"""
res = self.client.api_execute(self.get_path(key), self.client._MGET)
if not res.data:
raise etcd.EtcdException('Leader path {} not found'.format(key))
return res.data.decode('utf-8')

def delete(self, key, name=None):
"""
Delete a leader object.
Args:
key (string): the leader key,
name (string): name of the elected leader
Raises:
etcd.EtcdException
"""
path = self.get_path(key)
name = name or platform.node()
res = self.client.api_execute(
path, self.client._MDELETE, {'name': name})
if (res.data.decode('utf-8') == ''):
return True
return False
2 changes: 2 additions & 0 deletions src/etcd/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(self, client, key, ttl=None, value=None):
value (mixed): value to store on the lock.
"""
self.client = client
if not key.startswith('/'):
key = '/' + key
self.key = key
self.ttl = ttl or 0
self.value = value
Expand Down
35 changes: 35 additions & 0 deletions src/etcd/tests/integration/test_election.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import etcd
from . import test_simple
import time
import unittest

class TestElection(test_simple.EtcdIntegrationTest):
def setUp(self):
self.client = etcd.Client(port=6001)

def test_set_get_delete(self):
e = self.client.election
res = e.set('/mysql', name='foo.example.com', ttl=30)
self.assertTrue(res != '')
res = e.get('/mysql')
self.assertEquals(res, 'foo.example.com')
self.assertTrue(e.delete('/mysql', name='foo.example.com'))


def test_set_invalid_ttl(self):
self.assertRaises(etcd.EtcdException, self.client.election.set, '/mysql', name='foo.example.com', ttl='ciao')

@unittest.skip
def test_get_non_existing(self):
"""This is actually expected to fail. See https://github.com/coreos/etcd/issues/446"""
self.assertRaises(etcd.EtcdException, self.client.election.get, '/foobar')

def test_delete_non_existing(self):
self.assertRaises(etcd.EtcdException, self.client.election.delete, '/foobar')

def test_get_delete_after_ttl_expired_raises(self):
e = self.client.election
e.set('/mysql', name='foo', ttl=1)
time.sleep(2)
self.assertRaises(etcd.EtcdException, e.get, '/mysql')
self.assertRaises(etcd.EtcdException, e.delete, '/mysql', name='foo')
48 changes: 48 additions & 0 deletions src/etcd/tests/unit/test_leader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import etcd
import unittest

from .test_request import TestClientApiBase

try:
import mock
except ImportError:
from unittest import mock

class EtcdLeaderElectionTestCase(TestClientApiBase):
def setUp(self):
self.client = etcd.Client()

def _mock_api(self, status, d):
#We want to test at a lower level here.
resp = self._prepare_response(status, d)
self.client.http.request_encode_body = mock.create_autospec(
self.client.http.request_encode_body, return_value=resp
)
self.client.http.request = mock.create_autospec(
self.client.http.request, return_value=resp
)


def test_get_leader(self):
""" Can fetch a leader value """
self._mock_api(200, 'foo.example.com')
self.assertEquals(self.client.election.get('/mysql'), 'foo.example.com')
self._mock_api(200,'')
self.assertRaises(etcd.EtcdException, self.client.election.get, '/mysql')

def test_set_leader(self):
""" Can set a leader value """
self._mock_api(200, u'234')
#set w/o a TTL or a name
self.assertEquals(self.client.election.set('/mysql'), u'234')
self.assertEquals(self.client.election.set(
'/mysql',
name='foo.example.com',
ttl=60), u'234')
self._mock_api(500, 'leader name required')
self.assertRaises(etcd.EtcdException, self.client.election.set,'/mysql')

def test_del_leader(self):
""" Can remove a leader value """
self._mock_api(200,'')
self.assertTrue(self.client.election.delete('/mysql'))

0 comments on commit 0997a77

Please sign in to comment.