Skip to content

Commit

Permalink
Reload schema on changin of tarantool schema_id value.
Browse files Browse the repository at this point in the history
* Reloading of schema on Tarantool schema error (closes gh-63)
* Rewrite request using collections.Sequence
* Delete RETRIES on completion status == 1 (no this completion status,
  anymore, also closes gh-45)
* flush schema now, also, loads schema from server.
* Modify/Simplify schema internals
* Added absent constants
* Updated errorcodes (closes gh-69)
* Schema is now flushed, upon successfull authentication (closes gh-68)

Proofed to work on py27, py35 and pypy
  • Loading branch information
bigbes committed Jun 29, 2016
1 parent 49658d9 commit 4639d9a
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 227 deletions.
3 changes: 1 addition & 2 deletions tarantool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
DatabaseError,
NetworkError,
NetworkWarning,
RetryWarning
)

from tarantool.schema import (
Expand Down Expand Up @@ -48,4 +47,4 @@ def connect(host="localhost", port=33013, user=None, password=None, encoding=ENC


__all__ = ['connect', 'Connection', 'Schema', 'Error', 'DatabaseError',
'NetworkError', 'NetworkWarning', 'RetryWarning', 'SchemaError']
'NetworkError', 'NetworkWarning', 'SchemaError']
47 changes: 28 additions & 19 deletions tarantool/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
SOCKET_TIMEOUT,
RECONNECT_MAX_ATTEMPTS,
RECONNECT_DELAY,
RETRY_MAX_ATTEMPTS,
REQUEST_TYPE_OK,
REQUEST_TYPE_ERROR,
IPROTO_GREETING_SIZE,
Expand All @@ -52,9 +51,8 @@
from tarantool.error import (
NetworkError,
DatabaseError,
warn,
RetryWarning,
NetworkWarning)
NetworkWarning,
SchemaReloadException)

from .schema import Schema
from .utils import check_key, greeting_decode, version_id
Expand Down Expand Up @@ -108,6 +106,7 @@ def __init__(self, host, port,
self.reconnect_delay = reconnect_delay
self.reconnect_max_attempts = reconnect_max_attempts
self.schema = Schema(self)
self.schema_version = 1
self._socket = None
self.connected = False
self.error = True
Expand Down Expand Up @@ -166,6 +165,7 @@ def connect(self):
# the connection fails because the server is simply
# not bound to port
self._socket.settimeout(self.socket_timeout)
self.load_schema()
except socket.error as e:
self.connected = False
raise NetworkError(e)
Expand Down Expand Up @@ -210,18 +210,17 @@ def _send_request_wo_reconnect(self, request):
'''
assert isinstance(request, Request)

# Repeat request in a loop if the server returns completion_status == 1
# (try again)
for attempt in range(RETRY_MAX_ATTEMPTS): # pylint: disable=W0612
self._socket.sendall(bytes(request))
response = Response(self, self._read_response())

if response.completion_status != 1:
return response
warn(response.return_message, RetryWarning)
response = None
while True:
try:
self._socket.sendall(bytes(request))
response = Response(self, self._read_response())
break
except SchemaReloadException as e:
self.update_schema(e.schema_version)
continue

# Raise an error if the maximum number of attempts have been made
raise DatabaseError(response.return_code, response.return_message)
return response

def _opt_reconnect(self):
'''
Expand Down Expand Up @@ -294,13 +293,20 @@ def _send_request(self, request):
assert isinstance(request, Request)

self._opt_reconnect()
response = self._send_request_wo_reconnect(
request)

return response
return self._send_request_wo_reconnect(request)

def load_schema(self):
self.schema.fetch_space_all()
self.schema.fetch_index_all()

def update_schema(self, schema_version):
self.schema_version = schema_version
self.flush_schema()

def flush_schema(self):
self.schema.flush()
self.load_schema()

def call(self, func_name, *args):
'''
Expand Down Expand Up @@ -378,7 +384,10 @@ def authenticate(self, user, password):

request = RequestAuthenticate(self, self._salt, self.user,
self.password)
return self._send_request_wo_reconnect(request)
auth_response = self._send_request_wo_reconnect(request)
if auth_response.return_code == 0:
self.flush_schema()
return auth_response

def _join_v16(self, server_uuid):
request = RequestJoin(self, server_uuid)
Expand Down
15 changes: 12 additions & 3 deletions tarantool/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,38 @@

import six

#
IPROTO_CODE = 0x00
IPROTO_SYNC = 0x01
# replication keys (header)
IPROTO_SERVER_ID = 0x02
IPROTO_LSN = 0x03
IPROTO_TIMESTAMP = 0x04
IPROTO_SCHEMA_ID = 0X05
#
IPROTO_SPACE_ID = 0x10
IPROTO_INDEX_ID = 0x11
IPROTO_LIMIT = 0x12
IPROTO_OFFSET = 0x13
IPROTO_ITERATOR = 0x14
IPROTO_INDEX_BASE = 0x15
#
IPROTO_KEY = 0x20
IPROTO_TUPLE = 0x21
IPROTO_FUNCTION_NAME = 0x22
IPROTO_USER_NAME = 0x23
#
IPROTO_SERVER_UUID = 0x24
IPROTO_CLUSTER_UUID = 0x25
IPROTO_VCLOCK = 0x26
IPROTO_EXPR = 0x27
IPROTO_OPS = 0x28
#
IPROTO_DATA = 0x30
IPROTO_ERROR = 0x31

IPROTO_GREETING_SIZE = 128
IPROTO_BODY_MAX_LEN = 2147483648

REQUEST_TYPE_OK = 0
REQUEST_TYPE_SELECT = 1
Expand Down Expand Up @@ -74,9 +86,6 @@
RECONNECT_MAX_ATTEMPTS = 10
# Default delay between attempts to reconnect (seconds)
RECONNECT_DELAY = 0.1
# Number of reattempts in case of server
# return completion_status == 1 (try again)
RETRY_MAX_ATTEMPTS = 10

if six.PY2:
ENCODING_DEFAULT = None
Expand Down
197 changes: 124 additions & 73 deletions tarantool/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def os_strerror_patched(code):
os.strerror = os_strerror_patched
del os_strerror_patched


class SchemaError(DatabaseError):
def __init__(self, value):
super(SchemaError, self).__init__(0, value)
Expand All @@ -117,6 +116,16 @@ def __init__(self, value):
def __str__(self):
return str(self.value)

class SchemaReloadException(DatabaseError):
def __init__(self, message, schema_version):
super(SchemaReloadException, self).__init__(109, message)
self.code = 109
self.message = message
self.schema_version = schema_version

def __str__(self):
return str(self.message)


class NetworkError(DatabaseError):

Expand Down Expand Up @@ -144,18 +153,8 @@ class NetworkWarning(UserWarning):
pass


class RetryWarning(UserWarning):

'''
Warning is emited in case of server return completion_status == 1
(try again)
'''
pass


# always print this warnings
warnings.filterwarnings("always", category=NetworkWarning)
warnings.filterwarnings("always", category=RetryWarning)


def warn(message, warning_class):
Expand All @@ -169,68 +168,120 @@ def warn(message, warning_class):
warnings.warn_explicit(message, warning_class, module_name, line_no)

_strerror = {
0: ("ER_OK", "OK"),
1: ("ER_ILLEGAL_PARAMS", "Illegal parameters, %s"),
2: ("ER_MEMORY_ISSUE", "Failed to allocate %u bytes in %s for %s"),
3: ("ER_TUPLE_FOUND", "Duplicate key exists in unique index %u"),
4: ("ER_TUPLE_NOT_FOUND", "Tuple doesn't exist in index %u"),
5: ("ER_UNSUPPORTED", "%s does not support %s"),
6: ("ER_NONMASTER",
"Can't modify data on a replication slave. My master is: %s"),
7: ("ER_SECONDARY",
"Can't modify data upon a request on the secondary port."),
8: ("ER_INJECTION", "Error injection '%s'"),
9: ("ER_CREATE_SPACE", "Failed to create space %u: %s"),
10: ("ER_SPACE_EXISTS", "Space %u already exists"),
11: ("ER_DROP_SPACE", "Can't drop space %u: %s"),
12: ("ER_ALTER_SPACE", "Can't modify space %u: %s"),
13: ("ER_INDEX_TYPE",
"Unsupported index type supplied for index %u in space %u"),
14: ("ER_MODIFY_INDEX",
"Can't create or modify index %u in space %u: %s"),
15: ("ER_LAST_DROP",
"Can't drop the primary key in a system space, space id %u"),
16: ("ER_TUPLE_FORMAT_LIMIT", "Tuple format limit reached: %u"),
17: ("ER_DROP_PRIMARY_KEY",
"Can't drop primary key in space %u while secondary keys exist"),
18: ("ER_KEY_FIELD_TYPE",
("Supplied key type of part %u does not match index part type:"
" expected %s")),
19: ("ER_EXACT_MATCH",
"Invalid key part count in an exact match (expected %u, got %u)"),
20: ("ER_INVALID_MSGPACK", "Invalid MsgPack - %s"),
21: ("ER_PROC_RET", "msgpack.encode: can not encode Lua type '%s'"),
22: ("ER_TUPLE_NOT_ARRAY", "Tuple/Key must be MsgPack array"),
23: ("ER_FIELD_TYPE",
("Tuple field %u type does not match one required by operation:"
" expected %s")),
24: ("ER_FIELD_TYPE_MISMATCH",
("Ambiguous field type in index %u, key part %u. Requested type"
" is %s but the field has previously been defined as %s")),
25: ("ER_SPLICE", "Field SPLICE error: %s"),
26: ("ER_ARG_TYPE",
("Argument type in operation on field %u does not match field type:"
" expected a %s")),
27: ("ER_TUPLE_IS_TOO_LONG", "Tuple is too long %u"),
28: ("ER_UNKNOWN_UPDATE_OP", "Unknown UPDATE operation"),
29: ("ER_UPDATE_FIELD", "Field %u UPDATE error: %s"),
30: ("ER_FIBER_STACK",
"Can not create a new fiber: recursion limit reached"),
31: ("ER_KEY_PART_COUNT",
"Invalid key part count (expected [0..%u], got %u)"),
32: ("ER_PROC_LUA", "%s"),
33: ("ER_NO_SUCH_PROC", "Procedure '%.*s' is not defined"),
34: ("ER_NO_SUCH_TRIGGER", "Trigger is not found"),
35: ("ER_NO_SUCH_INDEX", "No index #%u is defined in space %u"),
36: ("ER_NO_SUCH_SPACE", "Space %u does not exist"),
37: ("ER_NO_SUCH_FIELD", "Field %u was not found in the tuple"),
38: ("ER_SPACE_ARITY",
"Tuple field count %u does not match space %u arity %u"),
39: ("ER_INDEX_ARITY",
("Tuple field count %u is less than required by a defined index"
" (expected %u)")),
40: ("ER_WAL_IO", "Failed to write to disk"),
41: ("ER_MORE_THAN_ONE_TUPLE", "More than one tuple found"),
0: ("ER_UNKNOWN", "Unknown error"),
1: ("ER_ILLEGAL_PARAMS", "Illegal parameters, %s"),
2: ("ER_MEMORY_ISSUE", "Failed to allocate %u bytes in %s for %s"),
3: ("ER_TUPLE_FOUND", "Duplicate key exists in unique index '%s' in space '%s'"),
4: ("ER_TUPLE_NOT_FOUND", "Tuple doesn't exist in index '%s' in space '%s'"),
5: ("ER_UNSUPPORTED", "%s does not support %s"),
6: ("ER_NONMASTER", "Can't modify data on a replication slave. My master is: %s"),
7: ("ER_READONLY", "Can't modify data because this server is in read-only mode."),
8: ("ER_INJECTION", "Error injection '%s'"),
9: ("ER_CREATE_SPACE", "Failed to create space '%s': %s"),
10: ("ER_SPACE_EXISTS", "Space '%s' already exists"),
11: ("ER_DROP_SPACE", "Can't drop space '%s': %s"),
12: ("ER_ALTER_SPACE", "Can't modify space '%s': %s"),
13: ("ER_INDEX_TYPE", "Unsupported index type supplied for index '%s' in space '%s'"),
14: ("ER_MODIFY_INDEX", "Can't create or modify index '%s' in space '%s': %s"),
15: ("ER_LAST_DROP", "Can't drop the primary key in a system space, space '%s'"),
16: ("ER_TUPLE_FORMAT_LIMIT", "Tuple format limit reached: %u"),
17: ("ER_DROP_PRIMARY_KEY", "Can't drop primary key in space '%s' while secondary keys exist"),
18: ("ER_KEY_PART_TYPE", "Supplied key type of part %u does not match index part type: expected %s"),
19: ("ER_EXACT_MATCH", "Invalid key part count in an exact match (expected %u, got %u)"),
20: ("ER_INVALID_MSGPACK", "Invalid MsgPack - %s"),
21: ("ER_PROC_RET", "msgpack.encode: can not encode Lua type '%s'"),
22: ("ER_TUPLE_NOT_ARRAY", "Tuple/Key must be MsgPack array"),
23: ("ER_FIELD_TYPE", "Tuple field %u type does not match one required by operation: expected %s"),
24: ("ER_FIELD_TYPE_MISMATCH", "Ambiguous field type in index '%s', key part %u. Requested type is %s but the field has previously been defined as %s"),
25: ("ER_SPLICE", "SPLICE error on field %u: %s"),
26: ("ER_ARG_TYPE", "Argument type in operation '%c' on field %u does not match field type: expected a %s"),
27: ("ER_TUPLE_IS_TOO_LONG", "Tuple is too long %u"),
28: ("ER_UNKNOWN_UPDATE_OP", "Unknown UPDATE operation"),
29: ("ER_UPDATE_FIELD", "Field %u UPDATE error: %s"),
30: ("ER_FIBER_STACK", "Can not create a new fiber: recursion limit reached"),
31: ("ER_KEY_PART_COUNT", "Invalid key part count (expected [0..%u], got %u)"),
32: ("ER_PROC_LUA", "%s"),
33: ("ER_NO_SUCH_PROC", "Procedure '%.*s' is not defined"),
34: ("ER_NO_SUCH_TRIGGER", "Trigger is not found"),
35: ("ER_NO_SUCH_INDEX", "No index #%u is defined in space '%s'"),
36: ("ER_NO_SUCH_SPACE", "Space '%s' does not exist"),
37: ("ER_NO_SUCH_FIELD", "Field %d was not found in the tuple"),
38: ("ER_SPACE_FIELD_COUNT", "Tuple field count %u does not match space '%s' field count %u"),
39: ("ER_INDEX_FIELD_COUNT", "Tuple field count %u is less than required by a defined index (expected %u)"),
40: ("ER_WAL_IO", "Failed to write to disk"),
41: ("ER_MORE_THAN_ONE_TUPLE", "More than one tuple found by get()"),
42: ("ER_ACCESS_DENIED", "%s access on %s is denied for user '%s'"),
43: ("ER_CREATE_USER", "Failed to create user '%s': %s"),
44: ("ER_DROP_USER", "Failed to drop user or role '%s': %s"),
45: ("ER_NO_SUCH_USER", "User '%s' is not found"),
46: ("ER_USER_EXISTS", "User '%s' already exists"),
47: ("ER_PASSWORD_MISMATCH", "Incorrect password supplied for user '%s'"),
48: ("ER_UNKNOWN_REQUEST_TYPE", "Unknown request type %u"),
49: ("ER_UNKNOWN_SCHEMA_OBJECT", "Unknown object type '%s'"),
50: ("ER_CREATE_FUNCTION", "Failed to create function '%s': %s"),
51: ("ER_NO_SUCH_FUNCTION", "Function '%s' does not exist"),
52: ("ER_FUNCTION_EXISTS", "Function '%s' already exists"),
53: ("ER_FUNCTION_ACCESS_DENIED", "%s access is denied for user '%s' to function '%s'"),
54: ("ER_FUNCTION_MAX", "A limit on the total number of functions has been reached: %u"),
55: ("ER_SPACE_ACCESS_DENIED", "%s access is denied for user '%s' to space '%s'"),
56: ("ER_USER_MAX", "A limit on the total number of users has been reached: %u"),
57: ("ER_NO_SUCH_ENGINE", "Space engine '%s' does not exist"),
58: ("ER_RELOAD_CFG", "Can't set option '%s' dynamically"),
59: ("ER_CFG", "Incorrect value for option '%s': %s"),
60: ("ER_SOPHIA", "%s"),
61: ("ER_LOCAL_SERVER_IS_NOT_ACTIVE", "Local server is not active"),
62: ("ER_UNKNOWN_SERVER", "Server %s is not registered with the cluster"),
63: ("ER_CLUSTER_ID_MISMATCH", "Cluster id of the replica %s doesn't match cluster id of the master %s"),
64: ("ER_INVALID_UUID", "Invalid UUID: %s"),
65: ("ER_CLUSTER_ID_IS_RO", "Can't reset cluster id: it is already assigned"),
66: ("ER_RESERVED66", "Reserved66"),
67: ("ER_SERVER_ID_IS_RESERVED", "Can't initialize server id with a reserved value %u"),
68: ("ER_INVALID_ORDER", "Invalid LSN order for server %u: previous LSN = %llu, new lsn = %llu"),
69: ("ER_MISSING_REQUEST_FIELD", "Missing mandatory field '%s' in request"),
70: ("ER_IDENTIFIER", "Invalid identifier '%s' (expected letters, digits or an underscore)"),
71: ("ER_DROP_FUNCTION", "Can't drop function %u: %s"),
72: ("ER_ITERATOR_TYPE", "Unknown iterator type '%s'"),
73: ("ER_REPLICA_MAX", "Replica count limit reached: %u"),
74: ("ER_INVALID_XLOG", "Failed to read xlog: %lld"),
75: ("ER_INVALID_XLOG_NAME", "Invalid xlog name: expected %lld got %lld"),
76: ("ER_INVALID_XLOG_ORDER", "Invalid xlog order: %lld and %lld"),
77: ("ER_NO_CONNECTION", "Connection is not established"),
78: ("ER_TIMEOUT", "Timeout exceeded"),
79: ("ER_ACTIVE_TRANSACTION", "Operation is not permitted when there is an active transaction "),
80: ("ER_NO_ACTIVE_TRANSACTION", "Operation is not permitted when there is no active transaction "),
81: ("ER_CROSS_ENGINE_TRANSACTION", "A multi-statement transaction can not use multiple storage engines"),
82: ("ER_NO_SUCH_ROLE", "Role '%s' is not found"),
83: ("ER_ROLE_EXISTS", "Role '%s' already exists"),
84: ("ER_CREATE_ROLE", "Failed to create role '%s': %s"),
85: ("ER_INDEX_EXISTS", "Index '%s' already exists"),
86: ("ER_TUPLE_REF_OVERFLOW", "Tuple reference counter overflow"),
87: ("ER_ROLE_LOOP", "Granting role '%s' to role '%s' would create a loop"),
88: ("ER_GRANT", "Incorrect grant arguments: %s"),
89: ("ER_PRIV_GRANTED", "User '%s' already has %s access on %s '%s'"),
90: ("ER_ROLE_GRANTED", "User '%s' already has role '%s'"),
91: ("ER_PRIV_NOT_GRANTED", "User '%s' does not have %s access on %s '%s'"),
92: ("ER_ROLE_NOT_GRANTED", "User '%s' does not have role '%s'"),
93: ("ER_MISSING_SNAPSHOT", "Can't find snapshot"),
94: ("ER_CANT_UPDATE_PRIMARY_KEY", "Attempt to modify a tuple field which is part of index '%s' in space '%s'"),
95: ("ER_UPDATE_INTEGER_OVERFLOW", "Integer overflow when performing '%c' operation on field %u"),
96: ("ER_GUEST_USER_PASSWORD", "Setting password for guest user has no effect"),
97: ("ER_TRANSACTION_CONFLICT", "Transaction has been aborted by conflict"),
98: ("ER_UNSUPPORTED_ROLE_PRIV", "Unsupported role privilege '%s'"),
99: ("ER_LOAD_FUNCTION", "Failed to dynamically load function '%s': %s"),
100: ("ER_FUNCTION_LANGUAGE", "Unsupported language '%s' specified for function '%s'"),
101: ("ER_RTREE_RECT", "RTree: %s must be an array with %u (point) or %u (rectangle/box) numeric coordinates"),
102: ("ER_PROC_C", "%s"),
103: ("ER_UNKNOWN_RTREE_INDEX_DISTANCE_TYPE", "Unknown RTREE index distance type %s"),
104: ("ER_PROTOCOL", "%s"),
105: ("ER_UPSERT_UNIQUE_SECONDARY_KEY", "Space %s has a unique secondary index and does not support UPSERT"),
106: ("ER_WRONG_INDEX_RECORD", "Wrong record in _index space: got {%s}, expected {%s}"),
107: ("ER_WRONG_INDEX_PARTS", "Wrong index parts (field %u): %s; expected field1 id (number), field1 type (string), ..."),
108: ("ER_WRONG_INDEX_OPTIONS", "Wrong index options (field %u): %s"),
109: ("ER_WRONG_SCHEMA_VERSION", "Wrong schema version, current: %d, in request: %u"),
110: ("ER_SLAB_ALLOC_MAX", "Failed to allocate %u bytes for tuple in the slab allocator: tuple is too large. Check 'slab_alloc_maximal' configuration option."),
111: ("ER_WRONG_SPACE_OPTIONS", "Wrong space options (field %u): %s"),
112: ("ER_UNSUPPORTED_INDEX_FEATURE", "Index '%s' (%s) of space '%s' (%s) does not support %s"),
113: ("ER_VIEW_IS_RO", "View '%s' is read-only"),
}


Expand Down
Loading

0 comments on commit 4639d9a

Please sign in to comment.