Skip to content

Commit

Permalink
Merge pull request #25 from jplana/timeouts
Browse files Browse the repository at this point in the history
Timeouts
  • Loading branch information
jplana committed Jan 13, 2014
2 parents 0997a77 + 483b241 commit f466d52
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 15 deletions.
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Watch a key
.. code:: python
client.read('/nodes/n1', watch = True) # will wait till the key is changed, and return once its changed
client.read('/nodes/n1', watch = True, timeout=30) # will wait till the key is changed, and return once its changed, or exit with an exception after 30 seconds.
client.read('/nodes/n1', watch = True, watchIndex = 10) # get all changes on this key starting from index 10
client.watch('/nodes/n1') #equivalent to client.read('/nodes/n1', watch = True)
client.watch('/nodes/n1', index = 10)
Expand All @@ -100,7 +101,7 @@ Locking module
lock = client.get_lock('/customer1', ttl=60)
# Use the lock object:
lock.acquire()
lock.acquire(timeout=30) #returns if lock could not be acquired within 30 seconds
lock.is_locked() # True
lock.renew(60)
lock.release()
Expand All @@ -125,7 +126,7 @@ Leader Election module
# is used.
# Zero or no ttl means the leader object is persistent.
client = etcd.Client()
client.election.set('/mysql', name='foo.example.com', ttl=120) # returns the etcd index
client.election.set('/mysql', name='foo.example.com', ttl=120, timeout=30) # returns the etcd index
# Get the name
print(client.election.get('/mysql')) # 'foo.example.com'
Expand Down
2 changes: 1 addition & 1 deletion src/etcd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class EtcdResult(object):
'dir': False,
}

def __init__(self, action=None, node=None):
def __init__(self, action=None, node=None, **kwdargs):
"""
Creates an EtcdResult object.
Expand Down
28 changes: 21 additions & 7 deletions src/etcd/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def uri(protocol, host, port):

kw = {}

if self._read_timeout > 0:
kw['timeout'] = self._read_timeout

if protocol == 'https':
# If we don't allow TLSv1, clients using older version of OpenSSL
# (<1.0) won't be able to connect.
Expand Down Expand Up @@ -261,7 +264,8 @@ def write(self, key, value, ttl=None, dir=False, append=False, **kwdargs):
path = kwdargs['_endpoint'] + key
else:
path = self.key_endpoint + key
response = self.api_execute(path, method, params)

response = self.api_execute(path, method, params=params)
return self._result_from_response(response)

def read(self, key, **kwdargs):
Expand Down Expand Up @@ -302,8 +306,10 @@ def read(self, key, **kwdargs):
else:
params[k] = v

timeout = 'timeout' in kwdargs and kwdargs['timeout'] or None

response = self.api_execute(
self.key_endpoint + key, self._MGET, params)
self.key_endpoint + key, self._MGET, params=params, timeout=timeout)
return self._result_from_response(response)

def delete(self, key, recursive=None, dir=None):
Expand Down Expand Up @@ -400,7 +406,7 @@ def get(self, key):
"""
return self.read(key)

def watch(self, key, index=None):
def watch(self, key, index=None, timeout=None):
"""
Blocks until a new event has been received, starting at index 'index'
Expand All @@ -420,9 +426,9 @@ def watch(self, key, index=None):
"""
if index:
return self.read(key, wait=True, waitIndex=index)
return self.read(key, wait=True, waitIndex=index, timeout=timeout)
else:
return self.read(key, wait=True)
return self.read(key, wait=True, timeout=timeout)

def ethernal_watch(self, key, index=None):
"""
Expand All @@ -445,7 +451,7 @@ def ethernal_watch(self, key, index=None):
"""
local_index = index
while True:
response = self.watch(key, index=local_index)
response = self.watch(key, index=local_index, timeout=0)
if local_index is not None:
local_index += 1
yield response
Expand Down Expand Up @@ -478,12 +484,18 @@ def _next_server(self):
except IndexError:
raise etcd.EtcdException('No more machines in the cluster')

def api_execute(self, path, method, params=None):
def api_execute(self, path, method, params=None, timeout=None):
""" Executes the query. """

some_request_failed = False
response = False

if timeout is None:
timeout = self.read_timeout

if timeout == 0:
timeout = None

if not path.startswith('/'):
raise ValueError('Path does not start with /')

Expand All @@ -495,6 +507,7 @@ def api_execute(self, path, method, params=None):
response = self.http.request(
method,
url,
timeout=timeout,
fields=params,
redirect=self.allow_redirect)

Expand All @@ -503,6 +516,7 @@ def api_execute(self, path, method, params=None):
method,
url,
fields=params,
timeout=timeout,
encode_multipart=False,
redirect=self.allow_redirect)
else:
Expand Down
5 changes: 3 additions & 2 deletions src/etcd/election.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def get_path(self, key):
key = '/' + key
return '/mod/v2/leader{}'.format(key)

def set(self, key, name=None, ttl=0):
def set(self, key, name=None, ttl=0, timeout=None):
"""
Initialize a leader election object.
Expand All @@ -40,7 +40,8 @@ def set(self, key, name=None, ttl=0):
params = {'ttl': ttl, 'name': name}
path = self.get_path(key)

res = self.client.api_execute(path, self.client._MPUT, params=params)
res = self.client.api_execute(
path, self.client._MPUT, params=params, timeout=timeout)
return res.data.decode('utf-8')

def get(self, key):
Expand Down
6 changes: 3 additions & 3 deletions src/etcd/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ def __exit__(self, type, value, traceback):
def _path(self):
return u'/mod/v2/lock{}'.format(self.key)

def acquire(self):
def acquire(self, timeout=None):
"""Acquire the lock from etcd. Blocks until lock is acquired."""
params = {u'ttl': self.ttl}
if self.value is not None:
params[u'value'] = self.value

res = self.client.api_execute(
self._path, self.client._MPOST, params=params)
self._path, self.client._MPOST, params=params, timeout=timeout)
self._index = res.data.decode('utf-8')
return self

Expand Down Expand Up @@ -91,7 +91,7 @@ def release(self):
self._path, self.client._MDELETE, params=params)
self._index = None

def renew(self, new_ttl):
def renew(self, new_ttl, timeout=None):
"""
Renew the TTL on this lock.
Expand Down

0 comments on commit f466d52

Please sign in to comment.