diff --git a/tarantool/__init__.py b/tarantool/__init__.py index 491168bd..2ac72e55 100644 --- a/tarantool/__init__.py +++ b/tarantool/__init__.py @@ -16,7 +16,6 @@ DatabaseError, NetworkError, NetworkWarning, - RetryWarning ) from tarantool.schema import ( @@ -48,4 +47,4 @@ def connect(host="localhost", port=33013, user=None, password=None, encoding=ENC __all__ = ['connect', 'Connection', 'Schema', 'Error', 'DatabaseError', - 'NetworkError', 'NetworkWarning', 'RetryWarning', 'SchemaError'] + 'NetworkError', 'NetworkWarning', 'SchemaError'] diff --git a/tarantool/connection.py b/tarantool/connection.py index 477b7dc1..f9626660 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -41,7 +41,6 @@ SOCKET_TIMEOUT, RECONNECT_MAX_ATTEMPTS, RECONNECT_DELAY, - RETRY_MAX_ATTEMPTS, REQUEST_TYPE_OK, REQUEST_TYPE_ERROR, IPROTO_GREETING_SIZE, @@ -52,9 +51,8 @@ from tarantool.error import ( NetworkError, DatabaseError, - warn, - RetryWarning, - NetworkWarning) + NetworkWarning, + SchemaReloadException) from .schema import Schema from .utils import check_key, greeting_decode, version_id @@ -108,6 +106,7 @@ def __init__(self, host, port, self.reconnect_delay = reconnect_delay self.reconnect_max_attempts = reconnect_max_attempts self.schema = Schema(self) + self.schema_version = 1 self._socket = None self.connected = False self.error = True @@ -166,6 +165,7 @@ def connect(self): # the connection fails because the server is simply # not bound to port self._socket.settimeout(self.socket_timeout) + self.load_schema() except socket.error as e: self.connected = False raise NetworkError(e) @@ -210,18 +210,17 @@ def _send_request_wo_reconnect(self, request): ''' assert isinstance(request, Request) - # Repeat request in a loop if the server returns completion_status == 1 - # (try again) - for attempt in range(RETRY_MAX_ATTEMPTS): # pylint: disable=W0612 - self._socket.sendall(bytes(request)) - response = Response(self, self._read_response()) - - if response.completion_status != 1: - return response - warn(response.return_message, RetryWarning) + response = None + while True: + try: + self._socket.sendall(bytes(request)) + response = Response(self, self._read_response()) + break + except SchemaReloadException as e: + self.update_schema(e.schema_version) + continue - # Raise an error if the maximum number of attempts have been made - raise DatabaseError(response.return_code, response.return_message) + return response def _opt_reconnect(self): ''' @@ -294,13 +293,20 @@ def _send_request(self, request): assert isinstance(request, Request) self._opt_reconnect() - response = self._send_request_wo_reconnect( - request) - return response + return self._send_request_wo_reconnect(request) + + def load_schema(self): + self.schema.fetch_space_all() + self.schema.fetch_index_all() + + def update_schema(self, schema_version): + self.schema_version = schema_version + self.flush_schema() def flush_schema(self): self.schema.flush() + self.load_schema() def call(self, func_name, *args): ''' @@ -378,7 +384,10 @@ def authenticate(self, user, password): request = RequestAuthenticate(self, self._salt, self.user, self.password) - return self._send_request_wo_reconnect(request) + auth_response = self._send_request_wo_reconnect(request) + if auth_response.return_code == 0: + self.flush_schema() + return auth_response def _join_v16(self, server_uuid): request = RequestJoin(self, server_uuid) diff --git a/tarantool/const.py b/tarantool/const.py index c9a512fe..4fb4adfb 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -3,26 +3,38 @@ import six +# IPROTO_CODE = 0x00 IPROTO_SYNC = 0x01 +# replication keys (header) +IPROTO_SERVER_ID = 0x02 +IPROTO_LSN = 0x03 +IPROTO_TIMESTAMP = 0x04 +IPROTO_SCHEMA_ID = 0X05 +# IPROTO_SPACE_ID = 0x10 IPROTO_INDEX_ID = 0x11 IPROTO_LIMIT = 0x12 IPROTO_OFFSET = 0x13 IPROTO_ITERATOR = 0x14 +IPROTO_INDEX_BASE = 0x15 +# IPROTO_KEY = 0x20 IPROTO_TUPLE = 0x21 IPROTO_FUNCTION_NAME = 0x22 IPROTO_USER_NAME = 0x23 +# IPROTO_SERVER_UUID = 0x24 IPROTO_CLUSTER_UUID = 0x25 IPROTO_VCLOCK = 0x26 IPROTO_EXPR = 0x27 IPROTO_OPS = 0x28 +# IPROTO_DATA = 0x30 IPROTO_ERROR = 0x31 IPROTO_GREETING_SIZE = 128 +IPROTO_BODY_MAX_LEN = 2147483648 REQUEST_TYPE_OK = 0 REQUEST_TYPE_SELECT = 1 @@ -74,9 +86,6 @@ RECONNECT_MAX_ATTEMPTS = 10 # Default delay between attempts to reconnect (seconds) RECONNECT_DELAY = 0.1 -# Number of reattempts in case of server -# return completion_status == 1 (try again) -RETRY_MAX_ATTEMPTS = 10 if six.PY2: ENCODING_DEFAULT = None diff --git a/tarantool/error.py b/tarantool/error.py index dbc4d54b..b9d559a0 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -108,7 +108,6 @@ def os_strerror_patched(code): os.strerror = os_strerror_patched del os_strerror_patched - class SchemaError(DatabaseError): def __init__(self, value): super(SchemaError, self).__init__(0, value) @@ -117,6 +116,16 @@ def __init__(self, value): def __str__(self): return str(self.value) +class SchemaReloadException(DatabaseError): + def __init__(self, message, schema_version): + super(SchemaReloadException, self).__init__(109, message) + self.code = 109 + self.message = message + self.schema_version = schema_version + + def __str__(self): + return str(self.message) + class NetworkError(DatabaseError): @@ -144,18 +153,8 @@ class NetworkWarning(UserWarning): pass -class RetryWarning(UserWarning): - - ''' - Warning is emited in case of server return completion_status == 1 - (try again) - ''' - pass - - # always print this warnings warnings.filterwarnings("always", category=NetworkWarning) -warnings.filterwarnings("always", category=RetryWarning) def warn(message, warning_class): @@ -169,68 +168,120 @@ def warn(message, warning_class): warnings.warn_explicit(message, warning_class, module_name, line_no) _strerror = { - 0: ("ER_OK", "OK"), - 1: ("ER_ILLEGAL_PARAMS", "Illegal parameters, %s"), - 2: ("ER_MEMORY_ISSUE", "Failed to allocate %u bytes in %s for %s"), - 3: ("ER_TUPLE_FOUND", "Duplicate key exists in unique index %u"), - 4: ("ER_TUPLE_NOT_FOUND", "Tuple doesn't exist in index %u"), - 5: ("ER_UNSUPPORTED", "%s does not support %s"), - 6: ("ER_NONMASTER", - "Can't modify data on a replication slave. My master is: %s"), - 7: ("ER_SECONDARY", - "Can't modify data upon a request on the secondary port."), - 8: ("ER_INJECTION", "Error injection '%s'"), - 9: ("ER_CREATE_SPACE", "Failed to create space %u: %s"), - 10: ("ER_SPACE_EXISTS", "Space %u already exists"), - 11: ("ER_DROP_SPACE", "Can't drop space %u: %s"), - 12: ("ER_ALTER_SPACE", "Can't modify space %u: %s"), - 13: ("ER_INDEX_TYPE", - "Unsupported index type supplied for index %u in space %u"), - 14: ("ER_MODIFY_INDEX", - "Can't create or modify index %u in space %u: %s"), - 15: ("ER_LAST_DROP", - "Can't drop the primary key in a system space, space id %u"), - 16: ("ER_TUPLE_FORMAT_LIMIT", "Tuple format limit reached: %u"), - 17: ("ER_DROP_PRIMARY_KEY", - "Can't drop primary key in space %u while secondary keys exist"), - 18: ("ER_KEY_FIELD_TYPE", - ("Supplied key type of part %u does not match index part type:" - " expected %s")), - 19: ("ER_EXACT_MATCH", - "Invalid key part count in an exact match (expected %u, got %u)"), - 20: ("ER_INVALID_MSGPACK", "Invalid MsgPack - %s"), - 21: ("ER_PROC_RET", "msgpack.encode: can not encode Lua type '%s'"), - 22: ("ER_TUPLE_NOT_ARRAY", "Tuple/Key must be MsgPack array"), - 23: ("ER_FIELD_TYPE", - ("Tuple field %u type does not match one required by operation:" - " expected %s")), - 24: ("ER_FIELD_TYPE_MISMATCH", - ("Ambiguous field type in index %u, key part %u. Requested type" - " is %s but the field has previously been defined as %s")), - 25: ("ER_SPLICE", "Field SPLICE error: %s"), - 26: ("ER_ARG_TYPE", - ("Argument type in operation on field %u does not match field type:" - " expected a %s")), - 27: ("ER_TUPLE_IS_TOO_LONG", "Tuple is too long %u"), - 28: ("ER_UNKNOWN_UPDATE_OP", "Unknown UPDATE operation"), - 29: ("ER_UPDATE_FIELD", "Field %u UPDATE error: %s"), - 30: ("ER_FIBER_STACK", - "Can not create a new fiber: recursion limit reached"), - 31: ("ER_KEY_PART_COUNT", - "Invalid key part count (expected [0..%u], got %u)"), - 32: ("ER_PROC_LUA", "%s"), - 33: ("ER_NO_SUCH_PROC", "Procedure '%.*s' is not defined"), - 34: ("ER_NO_SUCH_TRIGGER", "Trigger is not found"), - 35: ("ER_NO_SUCH_INDEX", "No index #%u is defined in space %u"), - 36: ("ER_NO_SUCH_SPACE", "Space %u does not exist"), - 37: ("ER_NO_SUCH_FIELD", "Field %u was not found in the tuple"), - 38: ("ER_SPACE_ARITY", - "Tuple field count %u does not match space %u arity %u"), - 39: ("ER_INDEX_ARITY", - ("Tuple field count %u is less than required by a defined index" - " (expected %u)")), - 40: ("ER_WAL_IO", "Failed to write to disk"), - 41: ("ER_MORE_THAN_ONE_TUPLE", "More than one tuple found"), + 0: ("ER_UNKNOWN", "Unknown error"), + 1: ("ER_ILLEGAL_PARAMS", "Illegal parameters, %s"), + 2: ("ER_MEMORY_ISSUE", "Failed to allocate %u bytes in %s for %s"), + 3: ("ER_TUPLE_FOUND", "Duplicate key exists in unique index '%s' in space '%s'"), + 4: ("ER_TUPLE_NOT_FOUND", "Tuple doesn't exist in index '%s' in space '%s'"), + 5: ("ER_UNSUPPORTED", "%s does not support %s"), + 6: ("ER_NONMASTER", "Can't modify data on a replication slave. My master is: %s"), + 7: ("ER_READONLY", "Can't modify data because this server is in read-only mode."), + 8: ("ER_INJECTION", "Error injection '%s'"), + 9: ("ER_CREATE_SPACE", "Failed to create space '%s': %s"), + 10: ("ER_SPACE_EXISTS", "Space '%s' already exists"), + 11: ("ER_DROP_SPACE", "Can't drop space '%s': %s"), + 12: ("ER_ALTER_SPACE", "Can't modify space '%s': %s"), + 13: ("ER_INDEX_TYPE", "Unsupported index type supplied for index '%s' in space '%s'"), + 14: ("ER_MODIFY_INDEX", "Can't create or modify index '%s' in space '%s': %s"), + 15: ("ER_LAST_DROP", "Can't drop the primary key in a system space, space '%s'"), + 16: ("ER_TUPLE_FORMAT_LIMIT", "Tuple format limit reached: %u"), + 17: ("ER_DROP_PRIMARY_KEY", "Can't drop primary key in space '%s' while secondary keys exist"), + 18: ("ER_KEY_PART_TYPE", "Supplied key type of part %u does not match index part type: expected %s"), + 19: ("ER_EXACT_MATCH", "Invalid key part count in an exact match (expected %u, got %u)"), + 20: ("ER_INVALID_MSGPACK", "Invalid MsgPack - %s"), + 21: ("ER_PROC_RET", "msgpack.encode: can not encode Lua type '%s'"), + 22: ("ER_TUPLE_NOT_ARRAY", "Tuple/Key must be MsgPack array"), + 23: ("ER_FIELD_TYPE", "Tuple field %u type does not match one required by operation: expected %s"), + 24: ("ER_FIELD_TYPE_MISMATCH", "Ambiguous field type in index '%s', key part %u. Requested type is %s but the field has previously been defined as %s"), + 25: ("ER_SPLICE", "SPLICE error on field %u: %s"), + 26: ("ER_ARG_TYPE", "Argument type in operation '%c' on field %u does not match field type: expected a %s"), + 27: ("ER_TUPLE_IS_TOO_LONG", "Tuple is too long %u"), + 28: ("ER_UNKNOWN_UPDATE_OP", "Unknown UPDATE operation"), + 29: ("ER_UPDATE_FIELD", "Field %u UPDATE error: %s"), + 30: ("ER_FIBER_STACK", "Can not create a new fiber: recursion limit reached"), + 31: ("ER_KEY_PART_COUNT", "Invalid key part count (expected [0..%u], got %u)"), + 32: ("ER_PROC_LUA", "%s"), + 33: ("ER_NO_SUCH_PROC", "Procedure '%.*s' is not defined"), + 34: ("ER_NO_SUCH_TRIGGER", "Trigger is not found"), + 35: ("ER_NO_SUCH_INDEX", "No index #%u is defined in space '%s'"), + 36: ("ER_NO_SUCH_SPACE", "Space '%s' does not exist"), + 37: ("ER_NO_SUCH_FIELD", "Field %d was not found in the tuple"), + 38: ("ER_SPACE_FIELD_COUNT", "Tuple field count %u does not match space '%s' field count %u"), + 39: ("ER_INDEX_FIELD_COUNT", "Tuple field count %u is less than required by a defined index (expected %u)"), + 40: ("ER_WAL_IO", "Failed to write to disk"), + 41: ("ER_MORE_THAN_ONE_TUPLE", "More than one tuple found by get()"), + 42: ("ER_ACCESS_DENIED", "%s access on %s is denied for user '%s'"), + 43: ("ER_CREATE_USER", "Failed to create user '%s': %s"), + 44: ("ER_DROP_USER", "Failed to drop user or role '%s': %s"), + 45: ("ER_NO_SUCH_USER", "User '%s' is not found"), + 46: ("ER_USER_EXISTS", "User '%s' already exists"), + 47: ("ER_PASSWORD_MISMATCH", "Incorrect password supplied for user '%s'"), + 48: ("ER_UNKNOWN_REQUEST_TYPE", "Unknown request type %u"), + 49: ("ER_UNKNOWN_SCHEMA_OBJECT", "Unknown object type '%s'"), + 50: ("ER_CREATE_FUNCTION", "Failed to create function '%s': %s"), + 51: ("ER_NO_SUCH_FUNCTION", "Function '%s' does not exist"), + 52: ("ER_FUNCTION_EXISTS", "Function '%s' already exists"), + 53: ("ER_FUNCTION_ACCESS_DENIED", "%s access is denied for user '%s' to function '%s'"), + 54: ("ER_FUNCTION_MAX", "A limit on the total number of functions has been reached: %u"), + 55: ("ER_SPACE_ACCESS_DENIED", "%s access is denied for user '%s' to space '%s'"), + 56: ("ER_USER_MAX", "A limit on the total number of users has been reached: %u"), + 57: ("ER_NO_SUCH_ENGINE", "Space engine '%s' does not exist"), + 58: ("ER_RELOAD_CFG", "Can't set option '%s' dynamically"), + 59: ("ER_CFG", "Incorrect value for option '%s': %s"), + 60: ("ER_SOPHIA", "%s"), + 61: ("ER_LOCAL_SERVER_IS_NOT_ACTIVE", "Local server is not active"), + 62: ("ER_UNKNOWN_SERVER", "Server %s is not registered with the cluster"), + 63: ("ER_CLUSTER_ID_MISMATCH", "Cluster id of the replica %s doesn't match cluster id of the master %s"), + 64: ("ER_INVALID_UUID", "Invalid UUID: %s"), + 65: ("ER_CLUSTER_ID_IS_RO", "Can't reset cluster id: it is already assigned"), + 66: ("ER_RESERVED66", "Reserved66"), + 67: ("ER_SERVER_ID_IS_RESERVED", "Can't initialize server id with a reserved value %u"), + 68: ("ER_INVALID_ORDER", "Invalid LSN order for server %u: previous LSN = %llu, new lsn = %llu"), + 69: ("ER_MISSING_REQUEST_FIELD", "Missing mandatory field '%s' in request"), + 70: ("ER_IDENTIFIER", "Invalid identifier '%s' (expected letters, digits or an underscore)"), + 71: ("ER_DROP_FUNCTION", "Can't drop function %u: %s"), + 72: ("ER_ITERATOR_TYPE", "Unknown iterator type '%s'"), + 73: ("ER_REPLICA_MAX", "Replica count limit reached: %u"), + 74: ("ER_INVALID_XLOG", "Failed to read xlog: %lld"), + 75: ("ER_INVALID_XLOG_NAME", "Invalid xlog name: expected %lld got %lld"), + 76: ("ER_INVALID_XLOG_ORDER", "Invalid xlog order: %lld and %lld"), + 77: ("ER_NO_CONNECTION", "Connection is not established"), + 78: ("ER_TIMEOUT", "Timeout exceeded"), + 79: ("ER_ACTIVE_TRANSACTION", "Operation is not permitted when there is an active transaction "), + 80: ("ER_NO_ACTIVE_TRANSACTION", "Operation is not permitted when there is no active transaction "), + 81: ("ER_CROSS_ENGINE_TRANSACTION", "A multi-statement transaction can not use multiple storage engines"), + 82: ("ER_NO_SUCH_ROLE", "Role '%s' is not found"), + 83: ("ER_ROLE_EXISTS", "Role '%s' already exists"), + 84: ("ER_CREATE_ROLE", "Failed to create role '%s': %s"), + 85: ("ER_INDEX_EXISTS", "Index '%s' already exists"), + 86: ("ER_TUPLE_REF_OVERFLOW", "Tuple reference counter overflow"), + 87: ("ER_ROLE_LOOP", "Granting role '%s' to role '%s' would create a loop"), + 88: ("ER_GRANT", "Incorrect grant arguments: %s"), + 89: ("ER_PRIV_GRANTED", "User '%s' already has %s access on %s '%s'"), + 90: ("ER_ROLE_GRANTED", "User '%s' already has role '%s'"), + 91: ("ER_PRIV_NOT_GRANTED", "User '%s' does not have %s access on %s '%s'"), + 92: ("ER_ROLE_NOT_GRANTED", "User '%s' does not have role '%s'"), + 93: ("ER_MISSING_SNAPSHOT", "Can't find snapshot"), + 94: ("ER_CANT_UPDATE_PRIMARY_KEY", "Attempt to modify a tuple field which is part of index '%s' in space '%s'"), + 95: ("ER_UPDATE_INTEGER_OVERFLOW", "Integer overflow when performing '%c' operation on field %u"), + 96: ("ER_GUEST_USER_PASSWORD", "Setting password for guest user has no effect"), + 97: ("ER_TRANSACTION_CONFLICT", "Transaction has been aborted by conflict"), + 98: ("ER_UNSUPPORTED_ROLE_PRIV", "Unsupported role privilege '%s'"), + 99: ("ER_LOAD_FUNCTION", "Failed to dynamically load function '%s': %s"), + 100: ("ER_FUNCTION_LANGUAGE", "Unsupported language '%s' specified for function '%s'"), + 101: ("ER_RTREE_RECT", "RTree: %s must be an array with %u (point) or %u (rectangle/box) numeric coordinates"), + 102: ("ER_PROC_C", "%s"), + 103: ("ER_UNKNOWN_RTREE_INDEX_DISTANCE_TYPE", "Unknown RTREE index distance type %s"), + 104: ("ER_PROTOCOL", "%s"), + 105: ("ER_UPSERT_UNIQUE_SECONDARY_KEY", "Space %s has a unique secondary index and does not support UPSERT"), + 106: ("ER_WRONG_INDEX_RECORD", "Wrong record in _index space: got {%s}, expected {%s}"), + 107: ("ER_WRONG_INDEX_PARTS", "Wrong index parts (field %u): %s; expected field1 id (number), field1 type (string), ..."), + 108: ("ER_WRONG_INDEX_OPTIONS", "Wrong index options (field %u): %s"), + 109: ("ER_WRONG_SCHEMA_VERSION", "Wrong schema version, current: %d, in request: %u"), + 110: ("ER_SLAB_ALLOC_MAX", "Failed to allocate %u bytes for tuple in the slab allocator: tuple is too large. Check 'slab_alloc_maximal' configuration option."), + 111: ("ER_WRONG_SPACE_OPTIONS", "Wrong space options (field %u): %s"), + 112: ("ER_UNSUPPORTED_INDEX_FEATURE", "Index '%s' (%s) of space '%s' (%s) does not support %s"), + 113: ("ER_VIEW_IS_RO", "View '%s' is read-only"), } diff --git a/tarantool/request.py b/tarantool/request.py index 2ac22908..42af6da6 100644 --- a/tarantool/request.py +++ b/tarantool/request.py @@ -25,6 +25,8 @@ IPROTO_VCLOCK, IPROTO_EXPR, IPROTO_OPS, + IPROTO_INDEX_BASE, + IPROTO_SCHEMA_ID, REQUEST_TYPE_OK, REQUEST_TYPE_PING, REQUEST_TYPE_SELECT, @@ -59,7 +61,8 @@ def __init__(self, conn): self._sync = None def __bytes__(self): - return self._bytes + return self.header(len(self._body)) + self._body + __str__ = __bytes__ @property @@ -74,12 +77,12 @@ def sync(self): def header(self, length): self._sync = self.conn.generate_sync() - header = msgpack.dumps({IPROTO_CODE: self.request_type, - IPROTO_SYNC: self._sync}) + header = msgpack.dumps({IPROTO_CODE: self.request_type, + IPROTO_SYNC: self._sync, + IPROTO_SCHEMA_ID: self.conn.schema_version}) return msgpack.dumps(length + len(header)) + header - class RequestInsert(Request): ''' Represents INSERT request @@ -96,7 +99,7 @@ def __init__(self, conn, space_no, values): request_body = msgpack.dumps({IPROTO_SPACE_ID: space_no, IPROTO_TUPLE: values}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestAuthenticate(Request): @@ -127,7 +130,7 @@ def strxor(rhs, lhs): scramble = strxor(hash1, scramble) request_body = msgpack.dumps({IPROTO_USER_NAME: user, IPROTO_TUPLE: ("chap-sha1", scramble)}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestReplace(Request): @@ -146,7 +149,7 @@ def __init__(self, conn, space_no, values): request_body = msgpack.dumps({IPROTO_SPACE_ID: space_no, IPROTO_TUPLE: values}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestDelete(Request): @@ -165,7 +168,7 @@ def __init__(self, conn, space_no, index_no, key): IPROTO_INDEX_ID: index_no, IPROTO_KEY: key}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestSelect(Request): @@ -184,7 +187,7 @@ def __init__(self, conn, space_no, index_no, key, offset, limit, iterator): IPROTO_ITERATOR: iterator, IPROTO_KEY: key}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestUpdate(Request): @@ -203,7 +206,7 @@ def __init__(self, conn, space_no, index_no, key, op_list): IPROTO_KEY: key, IPROTO_TUPLE: op_list}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestCall(Request): @@ -220,7 +223,7 @@ def __init__(self, conn, name, args): request_body = msgpack.dumps({IPROTO_FUNCTION_NAME: name, IPROTO_TUPLE: args}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestEval(Request): @@ -237,7 +240,7 @@ def __init__(self, conn, name, args): request_body = msgpack.dumps({IPROTO_EXPR: name, IPROTO_TUPLE: args}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestPing(Request): @@ -248,7 +251,7 @@ class RequestPing(Request): def __init__(self, conn): super(RequestPing, self).__init__(conn) - self._bytes = self.header(0) + self._body = b'' class RequestUpsert(Request): ''' @@ -266,7 +269,7 @@ def __init__(self, conn, space_no, index_no, tuple_value, op_list): IPROTO_TUPLE: tuple_value, IPROTO_OPS: op_list}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestJoin(Request): ''' @@ -278,7 +281,7 @@ class RequestJoin(Request): def __init__(self, conn, server_uuid): super(RequestJoin, self).__init__(conn) request_body = msgpack.dumps({IPROTO_SERVER_UUID: server_uuid}) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestSubscribe(Request): @@ -297,7 +300,7 @@ def __init__(self, conn, cluster_uuid, server_uuid, vclock): IPROTO_SERVER_UUID: server_uuid, IPROTO_VCLOCK: vclock }) - self._bytes = self.header(len(request_body)) + request_body + self._body = request_body class RequestOK(Request): ''' @@ -310,4 +313,4 @@ def __init__(self, conn, sync): super(RequestOK, self).__init__(conn) header = msgpack.dumps({IPROTO_CODE: self.request_type, IPROTO_SYNC: sync}) - self._bytes = msgpack.dumps(len(header)) + header + self._body = request_body diff --git a/tarantool/response.py b/tarantool/response.py index e9f43288..098be5c3 100644 --- a/tarantool/response.py +++ b/tarantool/response.py @@ -2,23 +2,25 @@ # pylint: disable=C0301,W0105,W0401,W0614 import sys -import msgpack import yaml +import msgpack +import collections from tarantool.const import ( IPROTO_CODE, IPROTO_DATA, IPROTO_ERROR, IPROTO_SYNC, + IPROTO_SCHEMA_ID, REQUEST_TYPE_ERROR ) -from tarantool.error import DatabaseError, tnt_strerror +from tarantool.error import DatabaseError, tnt_strerror, SchemaReloadException if sys.version_info < (2, 6): bytes = str # pylint: disable=W0622 -class Response(list): +class Response(collections.Sequence): ''' Represents a single response from the server in compliance with the Tarantool protocol. @@ -49,10 +51,11 @@ def __init__(self, conn, response): unpacker.feed(response) header = unpacker.unpack() + self.conn = conn self._sync = header.get(IPROTO_SYNC, 0) - self.conn = conn self._code = header[IPROTO_CODE] self._body = {} + self._schema_version = header.get(IPROTO_SCHEMA_ID, None) try: self._body = unpacker.unpack() except msgpack.OutOfData: @@ -60,36 +63,60 @@ def __init__(self, conn, response): if self._code < REQUEST_TYPE_ERROR: self._return_code = 0 - self._completion_status = 0 + self._schema_version = header.get(IPROTO_SCHEMA_ID, None) self._data = self._body.get(IPROTO_DATA, None) - # Backward-compatibility - if isinstance(self._data, (list, tuple)): - self.extend(self._data) - else: - self.append(self._data) + if not isinstance(self._data, (list, tuple)) and self._data is not None: + self._data = [self._data] + # # Backward-compatibility + # if isinstance(self._data, (list, tuple)): + # self.extend(self._data) + # else: + # self.append(self._data) else: # Separate return_code and completion_code self._return_message = self._body.get(IPROTO_ERROR, "") self._return_code = self._code & (REQUEST_TYPE_ERROR - 1) - self._completion_status = 2 - self._data = None + self._data = [] + if self._return_code == 109: + raise SchemaReloadException(self._return_message, + self._schema_version) if self.conn.error: raise DatabaseError(self._return_code, self._return_message) - @property - def completion_status(self): - ''' - :type: int - - Request completion status. - - There are only three completion status codes in use: - * ``0`` -- "success"; the only possible :attr:`return_code` with - this status is ``0`` - * ``2`` -- "error"; in this case :attr:`return_code` holds - the actual error. - ''' - return self._completion_status + def __getitem__(self, idx): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return self._data.__getitem__(idx) + + def __len__(self): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return len(self._data) + + def __contains__(self, item): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return item in self._data + + def __iter__(self): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return iter(self._data) + + def __reversed__(self): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return reversed(self._data) + + def index(self, *args): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return self._data.index(*args) + + def count(self, item): + if self._data == None: + raise InterfaceError("Trying to access data, when there's no data") + return self._data.count(item) @property def rowcount(self): @@ -174,6 +201,15 @@ def return_message(self): ''' return self._return_message + @property + def schema_version(self): + ''' + :type: int + + Current schema version of request. + ''' + return self._schema_version + def __str__(self): ''' Return user friendy string representation of the object. @@ -181,11 +217,13 @@ def __str__(self): :rtype: str or None ''' - if self.completion_status: - return yaml.dump({'error': { - 'code': self.strerror[0], - 'reason': self.return_message - }}) + if self.return_code: + return yaml.dump({ + 'error': { + 'code' : self.strerror[0], + 'reason': self.return_message + } + }) return yaml.dump(self._data) __repr__ = __str__ diff --git a/tarantool/schema.py b/tarantool/schema.py index f7237a48..1d8ba798 100644 --- a/tarantool/schema.py +++ b/tarantool/schema.py @@ -25,7 +25,10 @@ def __init__(self, index_row, space): self.parts.append((k, v)) else: for i in range(index_row[5]): - self.parts.append((index_row[5 + 1 + i * 2], index_row[5 + 2 + i * 2])) + self.parts.append(( + index_row[5 + 1 + i * 2], + index_row[5 + 2 + i * 2] + )) self.space = space self.space.indexes[self.iid] = self if self.name: @@ -66,26 +69,54 @@ def get_space(self, space): return self.schema[space] except KeyError: pass - _index = (const.INDEX_SPACE_NAME - if isinstance(space, six.string_types) - else const.INDEX_SPACE_PRIMARY) + + return self.fetch_space(space) + + def fetch_space(self, space): + space_row = self.fetch_space_from(space) + + if len(space_row) > 1: + # We have selected more than one space, it's strange + raise SchemaError('Some strange output from server: \n' + str(space_row)) + elif len(space_row) == 0 or not len(space_row[0]): + # We can't find space with this name or id + temp_name = ('name' if isinstance(space, six.string_types) else 'id') + errmsg = "There's no space with {1} '{0}'".format(space, temp_name) + raise SchemaError(errmsg) + + space_row = space_row[0] + + return SchemaSpace(space_row, self.schema) + + def fetch_space_from(self, space): + _index = None + if isinstance(space, six.string_types): + _index = const.INDEX_SPACE_NAME + else: + _index = const.INDEX_SPACE_PRIMARY + + if space == None: + space = () space_row = None try: + # Try to fetch from '_vspace' space_row = self.con.select(const.SPACE_VSPACE, space, index=_index) except DatabaseError as e: + # if space can't be found, then user is using old version of + # tarantool, try again with '_space' if e.args[0] != 36: raise if space_row is None: + # Try to fetch from '_space' space_row = self.con.select(const.SPACE_SPACE, space, index=_index) - if len(space_row) > 1: - raise SchemaError('Some strange output from server: \n' + str(space_row)) - elif len(space_row) == 0 or not len(space_row[0]): - temp_name = ('name' if isinstance(space, six.string_types) else 'id') - raise SchemaError( - "There's no space with {1} '{0}'".format(space, temp_name)) - space_row = space_row[0] - return SchemaSpace(space_row, self.schema) + + return space_row + + def fetch_space_all(self): + space_rows = self.fetch_space_from(None) + for row in space_rows: + SchemaSpace(row, self.schema) def get_index(self, space, index): _space = self.get_space(space) @@ -93,30 +124,62 @@ def get_index(self, space, index): return _space.indexes[index] except KeyError: pass - _index = (const.INDEX_INDEX_NAME - if isinstance(index, six.string_types) - else const.INDEX_INDEX_PRIMARY) + + return self.fetch_index(_space, index) + + def fetch_index(self, space_object, index): + index_row = self.fetch_index_from(space_object.sid, index) + + if len(index_row) > 1: + # We have selected more than one index, it's strange + raise SchemaError('Some strange output from server: \n' + str(index_row)) + elif len(index_row) == 0 or not len(index_row[0]): + # We can't find index with this name or id + temp_name = ('name' if isinstance(index, six.string_types) else 'id') + errmsg = ("There's no index with {2} '{0}'" + " in space '{1}'").format(index, space_object.name, temp_name) + raise SchemaError(errmsg) + + index_row = index_row[0] + + return SchemaIndex(index_row, space_object) + + def fetch_index_all(self): + index_rows = self.fetch_index_from(None, None) + for row in index_rows: + SchemaIndex(row, self.schema[row[0]]) + + def fetch_index_from(self, space, index): + _index = None + if isinstance(index, six.string_types): + _index = const.INDEX_INDEX_NAME + else: + _index = const.INDEX_INDEX_PRIMARY + + _key_tuple = None + if space is None and index is None: + _key_tuple = () + elif space is not None and index is None: + _key_tuple = (space) + elif space is not None and index is not None: + _key_tuple = (space, index) + else: + raise SchemaError("Bad arguments for schema resolving") index_row = None try: - index_row = self.con.select(const.SPACE_VINDEX, [_space.sid, index], - index=_index) + # Try to fetch from '_vindex' + index_row = self.con.select(const.SPACE_VINDEX, _key_tuple, index=_index) except DatabaseError as e: + # if space can't be found, then user is using old version of + # tarantool, try again with '_index' if e.args[0] != 36: raise if index_row is None: - index_row = self.con.select(const.SPACE_INDEX, [_space.sid, index], - index=_index) + # Try to fetch from '_index' + index_row = self.con.select(const.SPACE_INDEX, _key_tuple, index=_index) - if len(index_row) > 1: - raise SchemaError('Some strange output from server: \n' + str(index_row)) - elif len(index_row) == 0 or not len(index_row[0]): - temp_name = ('name' if isinstance(index, six.string_types) else 'id') - raise SchemaError( - "There's no index with {2} '{0}' in space '{1}'".format( - index, _space.name, temp_name)) - index_row = index_row[0] - return SchemaIndex(index_row, _space) + return index_row def flush(self): self.schema.clear() diff --git a/tests/suites/test_dml.py b/tests/suites/test_dml.py index e73efc55..ef47df59 100644 --- a/tests/suites/test_dml.py +++ b/tests/suites/test_dml.py @@ -67,26 +67,26 @@ def test_00_03_answer_repr(self): def test_02_select(self): # Check that select with different keys are Ok. (With and without index names) - self.assertEqual(self.con.select('space_1', 20), [[20, 0, 'tuple_20']]) - self.assertEqual(self.con.select('space_1', [21]), [[21, 1, 'tuple_21']]) - self.assertEqual(self.con.select('space_1', [22], index='primary'), [[22, 2, 'tuple_22']]) - self.assertEqual(self.con.select('space_1', [23], index='primary'), [[23, 3, 'tuple_23']]) + self.assertSequenceEqual(self.con.select('space_1', 20), [[20, 0, 'tuple_20']]) + self.assertSequenceEqual(self.con.select('space_1', [21]), [[21, 1, 'tuple_21']]) + self.assertSequenceEqual(self.con.select('space_1', [22], index='primary'), [[22, 2, 'tuple_22']]) + self.assertSequenceEqual(self.con.select('space_1', [23], index='primary'), [[23, 3, 'tuple_23']]) # Check that Offset and Limit args are working fine. - self.assertEqual(self.con.select('space_1', [20], index='primary', limit=1), [[20, 0, 'tuple_20']]) + self.assertSequenceEqual(self.con.select('space_1', [20], index='primary', limit=1), [[20, 0, 'tuple_20']]) # With other indexes too - self.assertEqual( + self.assertSequenceEqual( sorted( self.con.select('space_1', [0], index='secondary', offset=3, limit=0), key = lambda x: x[0]), [] ) - self.assertEqual( + self.assertSequenceEqual( sorted( self.con.select('space_1', [0], index='secondary', offset=3, limit=1), key = lambda x: x[0]), [[110, 0, 'tuple_110']] ) - self.assertEqual( + self.assertSequenceEqual( sorted( self.con.select('space_1', [0], index='secondary', offset=3, limit=2), key = lambda x: x[0]), @@ -108,30 +108,30 @@ def test_02_select(self): self.assertEqual(len(self.con.select('space_1', [0], index='secondary', limit=50)), 50) # TODO: Check iterator_types - self.assertEqual( + self.assertSequenceEqual( self.con.select('space_1', [0, 'tuple_20'], index='secondary', limit=2, iterator=tarantool.const.ITERATOR_GT), [[200, 0, 'tuple_200'], [205, 0, 'tuple_205']] ) def test_03_delete(self): # Check that delete works fine - self.assertEqual(self.con.delete('space_1', 20), [[20, 0, 'tuple_20']]) - self.assertEqual(self.con.delete('space_1', [20]), []) - self.assertEqual(self.con.select('space_1', [20], index='primary'), []) + self.assertSequenceEqual(self.con.delete('space_1', 20), [[20, 0, 'tuple_20']]) + self.assertSequenceEqual(self.con.delete('space_1', [20]), []) + self.assertSequenceEqual(self.con.select('space_1', [20], index='primary'), []) # Check that field has no meaning, yet. with self.assertRaisesRegexp(tarantool.DatabaseError, '(19, .*)'): self.con.delete('space_1', [1, 'tuple_21']) - self.assertEqual(self.con.select('space_1', [21], index='primary'), [[21, 1, 'tuple_21']]) + self.assertSequenceEqual(self.con.select('space_1', [21], index='primary'), [[21, 1, 'tuple_21']]) def test_04_replace(self): # Check replace that is Ok. - self.assertEqual(self.con.replace('space_1', [2, 2, 'tuple_3']), [[2, 2, 'tuple_3']]) - self.assertEqual(self.con.select('space_1', 2), [[2, 2, 'tuple_3']]) + self.assertSequenceEqual(self.con.replace('space_1', [2, 2, 'tuple_3']), [[2, 2, 'tuple_3']]) + self.assertSequenceEqual(self.con.select('space_1', 2), [[2, 2, 'tuple_3']]) # Check replace that isn't Ok. with self.assertRaisesRegexp(tarantool.DatabaseError, '(39, .*)'): - self.assertEqual(self.con.replace('space_1', [2, 2]), [[2, 2, 'tuple_2']]) + self.assertSequenceEqual(self.con.replace('space_1', [2, 2]), [[2, 2, 'tuple_2']]) def test_05_ping(self): # Simple ping test @@ -141,23 +141,23 @@ def test_05_ping(self): self.assertEqual(self.con.ping(notime=True), "Success") def test_06_update(self): - self.assertEqual(self.con.update('space_1', (2,), [('+', 1, 3)]), + self.assertSequenceEqual(self.con.update('space_1', (2,), [('+', 1, 3)]), [[2, 5, 'tuple_3']]) - self.assertEqual(self.con.update('space_1', (2,), [('-', 1, 3)]), + self.assertSequenceEqual(self.con.update('space_1', (2,), [('-', 1, 3)]), [[2, 2, 'tuple_3']]) - self.assertEqual(self.con.update('space_1', (2,), [(':', 2, 3, 2, 'lalal')]), + self.assertSequenceEqual(self.con.update('space_1', (2,), [(':', 2, 3, 2, 'lalal')]), [[2, 2, 'tuplalal_3']]) - self.assertEqual(self.con.update('space_1', (2,), [('!', 2, '1')]), + self.assertSequenceEqual(self.con.update('space_1', (2,), [('!', 2, '1')]), [[2, 2, '1', 'tuplalal_3']]) - self.assertEqual(self.con.update('space_1', (2,), [('!', 2, 'oingo, boingo')]), + self.assertSequenceEqual(self.con.update('space_1', (2,), [('!', 2, 'oingo, boingo')]), [[2, 2, 'oingo, boingo', '1', 'tuplalal_3']]) - self.assertEqual(self.con.update('space_1', (2,), [('#', 2, 2)]), + self.assertSequenceEqual(self.con.update('space_1', (2,), [('#', 2, 2)]), [[2, 2, 'tuplalal_3']]) def test_07_call(self): - self.assertEqual(self.con.call('json.decode', '[123, 234, 345]'), [[123, 234, 345]]) - self.assertEqual(self.con.call('json.decode', ['[123, 234, 345]']), [[123, 234, 345]]) - self.assertEqual(self.con.call('json.decode', ('[123, 234, 345]',)), [[123, 234, 345]]) + self.assertSequenceEqual(self.con.call('json.decode', '[123, 234, 345]'), [[123, 234, 345]]) + self.assertSequenceEqual(self.con.call('json.decode', ['[123, 234, 345]']), [[123, 234, 345]]) + self.assertSequenceEqual(self.con.call('json.decode', ('[123, 234, 345]',)), [[123, 234, 345]]) with self.assertRaisesRegexp(tarantool.DatabaseError, '(32, .*)'): self.con.call('json.decode') with self.assertRaisesRegexp(tarantool.DatabaseError, '(32, .*)'): @@ -175,60 +175,60 @@ def test_07_call(self): self.assertEqual(len(ans[0]), 1) self.assertIsInstance(ans[0][0], str) - self.assertEqual(self.con.call('box.tuple.new', [1, 2, 3, 'fld_1']), [[1, 2, 3, 'fld_1']]) - self.assertEqual(self.con.call('box.tuple.new', 'fld_1'), [['fld_1']]) + self.assertSequenceEqual(self.con.call('box.tuple.new', [1, 2, 3, 'fld_1']), [[1, 2, 3, 'fld_1']]) + self.assertSequenceEqual(self.con.call('box.tuple.new', 'fld_1'), [['fld_1']]) def test_08_eval(self): - self.assertEqual(self.con.eval('return json.decode(...)', + self.assertSequenceEqual(self.con.eval('return json.decode(...)', '[123, 234, 345]'), [[123, 234, 345]]) - self.assertEqual(self.con.eval('return json.decode(...)', + self.assertSequenceEqual(self.con.eval('return json.decode(...)', ['[123, 234, 345]']), [[123, 234, 345]]) - self.assertEqual(self.con.eval('return json.decode(...)', + self.assertSequenceEqual(self.con.eval('return json.decode(...)', ('[123, 234, 345]',)), [[123, 234, 345]]) - self.assertEqual(self.con.eval('return json.decode("[123, 234, 345]")'), + self.assertSequenceEqual(self.con.eval('return json.decode("[123, 234, 345]")'), [[123, 234, 345]]) - self.assertEqual(self.con.eval('return json.decode("[123, 234, 345]"), '+ + self.assertSequenceEqual(self.con.eval('return json.decode("[123, 234, 345]"), '+ 'json.decode("[123, 234, 345]")'), [[123, 234, 345], [123, 234, 345]]) - self.assertEqual(self.con.eval('json.decode("[123, 234, 345]")'), []) + self.assertSequenceEqual(self.con.eval('json.decode("[123, 234, 345]")'), []) def test_09_upsert(self): - self.assertEqual(self.con.select('space_1', [22], index='primary'), [[22, 2, 'tuple_22']]) - self.assertEqual(self.con.select('space_1', [23], index='primary'), [[23, 3, 'tuple_23']]) - self.assertEqual(self.con.select('space_1', [499], index='primary'), [[499, 4, 'tuple_499']]) - self.assertEqual(self.con.select('space_1', [500], index='primary'), []) - self.assertEqual(self.con.upsert('space_1', [500, 123, 'hello, world'], + self.assertSequenceEqual(self.con.select('space_1', [22], index='primary'), [[22, 2, 'tuple_22']]) + self.assertSequenceEqual(self.con.select('space_1', [23], index='primary'), [[23, 3, 'tuple_23']]) + self.assertSequenceEqual(self.con.select('space_1', [499], index='primary'), [[499, 4, 'tuple_499']]) + self.assertSequenceEqual(self.con.select('space_1', [500], index='primary'), []) + self.assertSequenceEqual(self.con.upsert('space_1', [500, 123, 'hello, world'], [(':', 2, 2, 3, "---")]), []) - self.assertEqual(self.con.select('space_1', [500], index='primary'), [[500, 123, 'hello, world']]) - self.assertEqual(self.con.upsert('space_1', [500, 123, 'hello, world'], + self.assertSequenceEqual(self.con.select('space_1', [500], index='primary'), [[500, 123, 'hello, world']]) + self.assertSequenceEqual(self.con.upsert('space_1', [500, 123, 'hello, world'], [(':', 2, 2, 3, "---")]), []) - self.assertEqual(self.con.select('space_1', [500], index='primary'), [[500, 123, 'he---, world']]) + self.assertSequenceEqual(self.con.select('space_1', [500], index='primary'), [[500, 123, 'he---, world']]) def test_10_space(self): space = self.con.space('space_1') - self.assertEqual(space.select([22], index='primary'), [[22, 2, 'tuple_22']]) - self.assertEqual(space.replace([22, 10, 'lol']), [[22, 10, 'lol']]) - self.assertEqual(space.insert([900, 10, 'foo']), [[900, 10, 'foo']]) - self.assertEqual(space.select([10], index='secondary'), [ + self.assertSequenceEqual(space.select([22], index='primary'), [[22, 2, 'tuple_22']]) + self.assertSequenceEqual(space.replace([22, 10, 'lol']), [[22, 10, 'lol']]) + self.assertSequenceEqual(space.insert([900, 10, 'foo']), [[900, 10, 'foo']]) + self.assertSequenceEqual(space.select([10], index='secondary'), [ [900, 10, 'foo'], [22, 10, 'lol'] ]) - self.assertEqual(space.select([10], index='secondary', limit=1), [ + self.assertSequenceEqual(space.select([10], index='secondary', limit=1), [ [900, 10, 'foo'] ]) - self.assertEqual(space.select([10], index='secondary', limit=1, offset=1), [ + self.assertSequenceEqual(space.select([10], index='secondary', limit=1, offset=1), [ [22, 10, 'lol'] ]) - self.assertEqual(space.select([501], index='primary'), []) - self.assertEqual(space.upsert([501, 123, 'hello, world'], + self.assertSequenceEqual(space.select([501], index='primary'), []) + self.assertSequenceEqual(space.upsert([501, 123, 'hello, world'], [(':', 2, 2, 3, "---")]), []) - self.assertEqual(space.select([501], index='primary'), [[501, 123, 'hello, world']]) - self.assertEqual(space.upsert([501, 123, 'hello, world'], + self.assertSequenceEqual(space.select([501], index='primary'), [[501, 123, 'hello, world']]) + self.assertSequenceEqual(space.upsert([501, 123, 'hello, world'], [(':', 2, 2, 3, "---")]), []) - self.assertEqual(space.update([400], [('!', 2, 'oingo, boingo')]), + self.assertSequenceEqual(space.update([400], [('!', 2, 'oingo, boingo')]), [[400, 0, 'oingo, boingo', 'tuple_400']]) - self.assertEqual(space.update([400], [('#', 2, 1)]), + self.assertSequenceEqual(space.update([400], [('#', 2, 1)]), [[400, 0, 'tuple_400']]) - self.assertEqual(space.delete([900]), [[900, 10, 'foo']]) + self.assertSequenceEqual(space.delete([900]), [[900, 10, 'foo']]) def test_11_select_all_hash(self): space = self.con.space('space_2') diff --git a/tests/suites/test_schema.py b/tests/suites/test_schema.py index 4f7ae7f3..094aec8e 100755 --- a/tests/suites/test_schema.py +++ b/tests/suites/test_schema.py @@ -207,6 +207,11 @@ def test_06_index_cached(self): self.assertEqual(index.name, 'name') self.assertEqual(len(index.parts), 1) + def test_07_schema_version_update(self): + self.assertEqual(len(self.con.select('_space')), 12) + self.srv.admin("box.schema.create_space('ttt22')") + self.assertEqual(len(self.con.select('_space')), 13) + @classmethod def tearDownClass(self): self.srv.stop()