diff --git a/README.rst b/README.rst index efac55bd..58043197 100644 --- a/README.rst +++ b/README.rst @@ -85,6 +85,7 @@ Watch a key .. code:: python client.read('/nodes/n1', watch = True) # will wait till the key is changed, and return once its changed + client.read('/nodes/n1', watch = True, timeout=30) # will wait till the key is changed, and return once its changed, or exit with an exception after 30 seconds. client.read('/nodes/n1', watch = True, watchIndex = 10) # get all changes on this key starting from index 10 client.watch('/nodes/n1') #equivalent to client.read('/nodes/n1', watch = True) client.watch('/nodes/n1', index = 10) @@ -100,7 +101,7 @@ Locking module lock = client.get_lock('/customer1', ttl=60) # Use the lock object: - lock.acquire() + lock.acquire(timeout=30) #returns if lock could not be acquired within 30 seconds lock.is_locked() # True lock.renew(60) lock.release() @@ -125,7 +126,7 @@ Leader Election module # is used. # Zero or no ttl means the leader object is persistent. client = etcd.Client() - client.election.set('/mysql', name='foo.example.com', ttl=120) # returns the etcd index + client.election.set('/mysql', name='foo.example.com', ttl=120, timeout=30) # returns the etcd index # Get the name print(client.election.get('/mysql')) # 'foo.example.com' diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py index 308d3411..63d88650 100644 --- a/src/etcd/__init__.py +++ b/src/etcd/__init__.py @@ -16,7 +16,7 @@ class EtcdResult(object): 'dir': False, } - def __init__(self, action=None, node=None): + def __init__(self, action=None, node=None, **kwdargs): """ Creates an EtcdResult object. diff --git a/src/etcd/client.py b/src/etcd/client.py index 8ec4510c..facdca8e 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -91,6 +91,9 @@ def uri(protocol, host, port): kw = {} + if self._read_timeout > 0: + kw['timeout'] = self._read_timeout + if protocol == 'https': # If we don't allow TLSv1, clients using older version of OpenSSL # (<1.0) won't be able to connect. @@ -261,7 +264,8 @@ def write(self, key, value, ttl=None, dir=False, append=False, **kwdargs): path = kwdargs['_endpoint'] + key else: path = self.key_endpoint + key - response = self.api_execute(path, method, params) + + response = self.api_execute(path, method, params=params) return self._result_from_response(response) def read(self, key, **kwdargs): @@ -302,8 +306,10 @@ def read(self, key, **kwdargs): else: params[k] = v + timeout = 'timeout' in kwdargs and kwdargs['timeout'] or None + response = self.api_execute( - self.key_endpoint + key, self._MGET, params) + self.key_endpoint + key, self._MGET, params=params, timeout=timeout) return self._result_from_response(response) def delete(self, key, recursive=None, dir=None): @@ -400,7 +406,7 @@ def get(self, key): """ return self.read(key) - def watch(self, key, index=None): + def watch(self, key, index=None, timeout=None): """ Blocks until a new event has been received, starting at index 'index' @@ -420,9 +426,9 @@ def watch(self, key, index=None): """ if index: - return self.read(key, wait=True, waitIndex=index) + return self.read(key, wait=True, waitIndex=index, timeout=timeout) else: - return self.read(key, wait=True) + return self.read(key, wait=True, timeout=timeout) def ethernal_watch(self, key, index=None): """ @@ -445,7 +451,7 @@ def ethernal_watch(self, key, index=None): """ local_index = index while True: - response = self.watch(key, index=local_index) + response = self.watch(key, index=local_index, timeout=0) if local_index is not None: local_index += 1 yield response @@ -478,12 +484,18 @@ def _next_server(self): except IndexError: raise etcd.EtcdException('No more machines in the cluster') - def api_execute(self, path, method, params=None): + def api_execute(self, path, method, params=None, timeout=None): """ Executes the query. """ some_request_failed = False response = False + if timeout is None: + timeout = self.read_timeout + + if timeout == 0: + timeout = None + if not path.startswith('/'): raise ValueError('Path does not start with /') @@ -495,6 +507,7 @@ def api_execute(self, path, method, params=None): response = self.http.request( method, url, + timeout=timeout, fields=params, redirect=self.allow_redirect) @@ -503,6 +516,7 @@ def api_execute(self, path, method, params=None): method, url, fields=params, + timeout=timeout, encode_multipart=False, redirect=self.allow_redirect) else: diff --git a/src/etcd/election.py b/src/etcd/election.py index b5876fab..a38c76d0 100644 --- a/src/etcd/election.py +++ b/src/etcd/election.py @@ -22,7 +22,7 @@ def get_path(self, key): key = '/' + key return '/mod/v2/leader{}'.format(key) - def set(self, key, name=None, ttl=0): + def set(self, key, name=None, ttl=0, timeout=None): """ Initialize a leader election object. @@ -40,7 +40,8 @@ def set(self, key, name=None, ttl=0): params = {'ttl': ttl, 'name': name} path = self.get_path(key) - res = self.client.api_execute(path, self.client._MPUT, params=params) + res = self.client.api_execute( + path, self.client._MPUT, params=params, timeout=timeout) return res.data.decode('utf-8') def get(self, key): diff --git a/src/etcd/lock.py b/src/etcd/lock.py index adf7ee5a..c862243f 100644 --- a/src/etcd/lock.py +++ b/src/etcd/lock.py @@ -41,14 +41,14 @@ def __exit__(self, type, value, traceback): def _path(self): return u'/mod/v2/lock{}'.format(self.key) - def acquire(self): + def acquire(self, timeout=None): """Acquire the lock from etcd. Blocks until lock is acquired.""" params = {u'ttl': self.ttl} if self.value is not None: params[u'value'] = self.value res = self.client.api_execute( - self._path, self.client._MPOST, params=params) + self._path, self.client._MPOST, params=params, timeout=timeout) self._index = res.data.decode('utf-8') return self @@ -91,7 +91,7 @@ def release(self): self._path, self.client._MDELETE, params=params) self._index = None - def renew(self, new_ttl): + def renew(self, new_ttl, timeout=None): """ Renew the TTL on this lock.