diff --git a/client/auth.py b/client/auth.py index efc2545..dcd1a9e 100644 --- a/client/auth.py +++ b/client/auth.py @@ -1,6 +1,5 @@ """Auth Client""" -import uuid from typing import Optional import grpc from passlib.hash import pbkdf2_sha256 @@ -57,18 +56,16 @@ class AuthClient: Client for Auth operations. Attributes: - name: Name of the AuthClient, which will be used in CURP propose id generation. curp_client: The client running the CURP protocol, communicate with all servers. + auth_client: The auth RPC client, only communicate with one server at a time. token: The auth token. """ - name: str curp_client: CurpClient auth_client: AuthStub token: Optional[str] - def __init__(self, name: str, curp_client: CurpClient, channel: grpc.Channel, token: Optional[str]) -> None: - self.name = name + def __init__(self, curp_client: CurpClient, channel: grpc.Channel, token: Optional[str]) -> None: self.curp_client = curp_client self.auth_client = AuthStub(channel=channel) self.token = token @@ -262,8 +259,7 @@ async def handle_req(self, req: RequestWithToken, use_fast_path: bool) -> Comman """ Send request using fast path or slow path. """ - propose_id = self.generate_propose_id() - cmd = Command(request=req, propose_id=propose_id) + cmd = Command(request=req) if use_fast_path: er, _ = await self.curp_client.propose(cmd, True) @@ -275,11 +271,6 @@ async def handle_req(self, req: RequestWithToken, use_fast_path: bool) -> Comman raise Exception(msg) return er - def generate_propose_id(self) -> str: - """Generate propose id with the given prefix.""" - propose_id = f"{self.name}-{uuid.uuid4()}" - return propose_id - @staticmethod def hash_password(password: str) -> str: """Generate hash of the password.""" diff --git a/client/client.py b/client/client.py index bd4a221..a16df49 100644 --- a/client/client.py +++ b/client/client.py @@ -58,11 +58,11 @@ async def connect(cls, addrs: list[str]) -> Client: # TODO: Acquire the auth token id_gen = LeaseIdGenerator() - kv_client = KvClient("client", protocol_client, "") - lease_client = LeaseClient("client", protocol_client, channel, "", id_gen) + kv_client = KvClient(protocol_client, "") + lease_client = LeaseClient(protocol_client, channel, "", id_gen) watch_client = WatchClient(channel) - lock_client = LockClient("client", protocol_client, channel, "", id_gen) - auth_client = AuthClient("client", protocol_client, channel, "") + lock_client = LockClient(protocol_client, channel, "", id_gen) + auth_client = AuthClient(protocol_client, channel, "") maintenance_client = MaintenanceClient(channel) return cls(kv_client, lease_client, watch_client, lock_client, auth_client, maintenance_client) diff --git a/client/error.py b/client/error.py index c0ac471..e66794f 100644 --- a/client/error.py +++ b/client/error.py @@ -2,48 +2,10 @@ Client Errors """ -from api.curp.curp_error_pb2 import ( - ProposeError as _ProposeError, - CommandSyncError as _CommandSyncError, - WaitSyncError as _WaitSyncError, -) from api.xline.xline_error_pb2 import ExecuteError as _ExecuteError -class ResDecodeError(Exception): - """Response decode error""" - - pass - - -class ProposeError(BaseException): - """Propose error""" - - inner: _ProposeError - - def __init__(self, err: _ProposeError) -> None: - self.inner = err - - -class CommandSyncError(BaseException): - """Command sync error""" - - inner: _CommandSyncError - - def __init__(self, err: _CommandSyncError) -> None: - self.inner = err - - -class WaitSyncError(BaseException): - """Wait sync error""" - - inner: _WaitSyncError - - def __init__(self, err: _WaitSyncError) -> None: - self.inner = err - - -class ExecuteError(BaseException): +class ExecuteError(Exception): """Execute error""" inner: _ExecuteError diff --git a/client/kv.py b/client/kv.py index d430fd9..5299f3c 100644 --- a/client/kv.py +++ b/client/kv.py @@ -1,6 +1,5 @@ """Kv Client""" -import uuid from typing import Optional, Literal from client.protocol import ProtocolClient as CurpClient from client.txn import Txn @@ -32,12 +31,10 @@ class KvClient: token: The auth token. """ - name: str curp_client: CurpClient token: Optional[str] - def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None: - self.name = name + def __init__(self, curp_client: CurpClient, token: Optional[str]) -> None: self.curp_client = curp_client self.token = token @@ -76,7 +73,6 @@ async def range( max_create_revision=max_create_revision, ) key_ranges = [KeyRange(key=key, range_end=range_end)] - propose_id = generate_propose_id(self.name) request_with_token = RequestWithToken( range_request=req, token=self.token, @@ -84,7 +80,6 @@ async def range( cmd = Command( keys=key_ranges, request=request_with_token, - propose_id=propose_id, ) er, _ = await self.curp_client.propose(cmd, True) return er.range_response @@ -105,7 +100,6 @@ async def put( key=key, value=value, lease=lease, prev_kv=prev_kv, ignore_value=ignore_value, ignore_lease=ignore_lease ) key_ranges = [KeyRange(key=key, range_end=key)] - propose_id = generate_propose_id(self.name) request_with_token = RequestWithToken( put_request=req, token=self.token, @@ -113,7 +107,6 @@ async def put( cmd = Command( keys=key_ranges, request=request_with_token, - propose_id=propose_id, ) er, _ = await self.curp_client.propose(cmd, True) return er.put_response @@ -128,7 +121,6 @@ async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool prev_kv=prev_kv, ) key_ranges = [KeyRange(key=key, range_end=range_end)] - propose_id = generate_propose_id(self.name) request_with_token = RequestWithToken( delete_range_request=req, @@ -137,7 +129,6 @@ async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool cmd = Command( keys=key_ranges, request=request_with_token, - propose_id=propose_id, ) er, _ = await self.curp_client.propose(cmd, True) return er.delete_range_response @@ -148,7 +139,6 @@ def txn(self) -> Txn: """ return Txn( - self.name, self.curp_client, self.token, ) @@ -162,14 +152,12 @@ async def compact(self, revision: int, physical: bool = False) -> CompactionResp physical=physical, ) use_fast_path = physical - propose_id = generate_propose_id(self.name) request_with_token = RequestWithToken( compaction_request=req, token=self.token, ) cmd = Command( request=request_with_token, - propose_id=propose_id, ) if use_fast_path: @@ -181,9 +169,3 @@ async def compact(self, revision: int, physical: bool = False) -> CompactionResp msg = "sync_res is always Some when use_fast_path is false" raise Exception(msg) return er.compaction_response - - -def generate_propose_id(prefix: str) -> str: - """Generate propose id with the given prefix""" - propose_id = f"{prefix}-{uuid.uuid4()}" - return propose_id diff --git a/client/lease.py b/client/lease.py index ecfb60a..a93b387 100644 --- a/client/lease.py +++ b/client/lease.py @@ -86,14 +86,13 @@ class LeaseClient: Client for Lease operations. Attributes: - name: Name of the LeaseClient, which will be used in CURP propose id generation. curp_client: The client running the CURP protocol, communicate with all servers. lease_client: The lease RPC client, only communicate with one server at a time. token: The auth token. id_gen: Lease Id generator. + keepers: Keep alive keepers. """ - name: str curp_client: CurpClient lease_client: LeaseStub token: Optional[str] @@ -101,9 +100,8 @@ class LeaseClient: keepers: dict[str, LeaseKeeper] def __init__( - self, name: str, curp_client: CurpClient, channel: Channel, token: Optional[str], id_gen: LeaseIdGenerator + self, curp_client: CurpClient, channel: Channel, token: Optional[str], id_gen: LeaseIdGenerator ) -> None: - self.name = name self.curp_client = curp_client self.lease_client = LeaseStub(channel=channel) self.token = token @@ -119,10 +117,8 @@ async def grant(self, req: LeaseGrantRequest) -> LeaseGrantResponse: if req.ID == 0: req.ID = self.id_gen.next() request_with_token = RequestWithToken(token=self.token, lease_grant_request=req) - propose_id = self.generate_propose_id() cmd = Command( request=request_with_token, - propose_id=propose_id, ) er, _ = await self.curp_client.propose(cmd, True) return er.lease_grant_response @@ -165,15 +161,8 @@ async def leases(self) -> LeaseLeasesResponse: Lists all existing leases. """ request_with_token = RequestWithToken(token=self.token, lease_leases_request=LeaseLeasesRequest()) - propose_id = self.generate_propose_id() cmd = Command( request=request_with_token, - propose_id=propose_id, ) er, _ = await self.curp_client.propose(cmd, True) return er.lease_leases_response - - def generate_propose_id(self) -> str: - """Generate propose id with the given prefix.""" - propose_id = f"{self.name}-{uuid.uuid4()}" - return propose_id diff --git a/client/lock.py b/client/lock.py index 4d4ae07..0448623 100644 --- a/client/lock.py +++ b/client/lock.py @@ -1,6 +1,5 @@ """Lock Client""" -import uuid from urllib import parse from typing import Optional from grpc import Channel @@ -45,25 +44,20 @@ class LockClient: Client for Lock operations. Attributes: - name: Name of the LockClient. curp_client: The client running the CURP protocol, communicate with all servers. lease_client: The lease client. watch_client: The watch client. token: Auth token """ - name: str curp_client: CurpClient lease_client: LeaseClient watch_client: WatchClient token: Optional[str] - def __init__( - self, name: str, curp_client: CurpClient, channel: Channel, token: str, id_gen: LeaseIdGenerator - ) -> None: - self.name = name + def __init__(self, curp_client: CurpClient, channel: Channel, token: str, id_gen: LeaseIdGenerator) -> None: self.curp_client = curp_client - self.lease_client = LeaseClient(name=name, curp_client=curp_client, channel=channel, token=token, id_gen=id_gen) + self.lease_client = LeaseClient(curp_client=curp_client, channel=channel, token=token, id_gen=id_gen) self.watch_client = WatchClient(channel=channel) self.token = token @@ -169,8 +163,7 @@ async def propose(self, req: RequestWithToken, use_fast_path: bool) -> tuple[Com """ Send request using fast path. """ - propose_id = self.generate_propose_id() - cmd = Command(request=req, propose_id=propose_id) + cmd = Command(request=req) if use_fast_path: res = await self.curp_client.propose(cmd, True) @@ -182,11 +175,6 @@ async def propose(self, req: RequestWithToken, use_fast_path: bool) -> tuple[Com raise Exception(msg) return res - def generate_propose_id(self) -> str: - """Generate propose id with the given prefix.""" - propose_id = f"{self.name}-{uuid.uuid4()}" - return propose_id - async def wait_delete(self, pfx: str, my_rev: int) -> None: """ Wait until last key deleted. diff --git a/client/protocol.py b/client/protocol.py index 06bc4b8..732739b 100644 --- a/client/protocol.py +++ b/client/protocol.py @@ -6,16 +6,20 @@ import asyncio import logging import grpc +import random -from api.curp.message_pb2_grpc import ProtocolStub -from api.curp.message_pb2 import FetchClusterRequest, FetchClusterResponse -from api.curp.curp_command_pb2 import ProposeRequest, WaitSyncedRequest -from api.xline.xline_command_pb2 import Command, CommandResponse, SyncResponse -from client.error import ResDecodeError, CommandSyncError, WaitSyncError, ExecuteError -from api.curp.curp_error_pb2 import ( - CommandSyncError as _CommandSyncError, - WaitSyncError as _WaitSyncError, +from api.curp.curp_command_pb2 import ( + ProposeRequest, + ProposeResponse, + WaitSyncedRequest, + WaitSyncedResponse, + ProposeId, + FetchClusterResponse, + FetchClusterRequest, ) +from api.curp.curp_command_pb2_grpc import ProtocolStub +from client.error import ExecuteError +from api.xline.xline_command_pb2 import Command, CommandResponse, SyncResponse from api.xline.xline_error_pb2 import ExecuteError as _ExecuteError @@ -24,20 +28,25 @@ class ProtocolClient: Protocol client Attributes: - leader_id: cluster `leader id` + state: Current leader and term connects: `all servers's `Connect` + cluster_version: Cluster version """ - leader_id: int + state: State connects: dict[int, grpc.Channel] + cluster_version: int + # TODO config def __init__( self, - leader_id: int, + state: State, connects: dict[int, grpc.Channel], + cluster_version: int, ) -> None: - self.leader_id = leader_id + self.state = state self.connects = connects + self.cluster_version = cluster_version @classmethod async def build_from_addrs(cls, addrs: list[str]) -> ProtocolClient: @@ -48,12 +57,13 @@ async def build_from_addrs(cls, addrs: list[str]) -> ProtocolClient: connects = {} for member in cluster.members: - channel = grpc.aio.insecure_channel(member.name) + channel = grpc.aio.insecure_channel(member.addrs[0]) connects[member.id] = channel return cls( - cluster.leader_id, + State(cluster.leader_id, cluster.term), connects, + cluster.cluster_version, ) @staticmethod @@ -76,30 +86,45 @@ async def propose(self, cmd: Command, use_fast_path: bool = False) -> tuple[Comm """ Propose the request to servers, if use_fast_path is false, it will wait for the synced index """ + propose_id = self.gen_propose_id() + + # TODO: retry if use_fast_path: - return await self.fast_path(cmd) + return await self.fast_path(propose_id, cmd) else: - return await self.slow_path(cmd) + return await self.slow_path(propose_id, cmd) + # TODO: error handling - async def fast_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse | None]: + async def fast_path(self, propose_id: ProposeId, cmd: Command) -> tuple[CommandResponse, SyncResponse | None]: """ Fast path of propose """ - for futures in asyncio.as_completed([self.fast_round(cmd), self.slow_round(cmd)]): - first, second = await futures + fast_round = self.fast_round(propose_id, cmd) + slow_round = self.slow_round(propose_id) + + # Wait for the fast and slow round at the same time + for futures in asyncio.as_completed([fast_round, slow_round]): + try: + first, second = await futures + except Exception as e: + logging.warning(e) + continue + if isinstance(first, CommandResponse) and second: + # TODO: error handling return (first, None) if isinstance(second, CommandResponse) and isinstance(first, SyncResponse): + # TODO: error handling return (second, first) msg = "fast path error" raise Exception(msg) - async def slow_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse]: + async def slow_path(self, propose_id: ProposeId, cmd: Command) -> tuple[CommandResponse, SyncResponse]: """ Slow path of propose """ - results = await asyncio.gather(self.fast_round(cmd), self.slow_round(cmd)) + results = await asyncio.gather(self.fast_round(propose_id, cmd), self.slow_round(propose_id)) for result in results: if isinstance(result[0], SyncResponse) and isinstance(result[1], CommandResponse): return (result[1], result[0]) @@ -107,12 +132,12 @@ async def slow_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse]: msg = "slow path error" raise Exception(msg) - async def fast_round(self, cmd: Command) -> tuple[CommandResponse | None, bool]: + async def fast_round(self, propose_id: ProposeId, cmd: Command) -> tuple[CommandResponse | None, bool]: """ The fast round of Curp protocol It broadcast the requests to all the curp servers. """ - logging.info("fast round start. propose id: %s", cmd.propose_id) + logging.info("fast round start. propose id: %s", propose_id) ok_cnt = 0 is_received_leader_res = False @@ -122,64 +147,90 @@ async def fast_round(self, cmd: Command) -> tuple[CommandResponse | None, bool]: futures = [] for server_id in self.connects: stub = ProtocolStub(self.connects[server_id]) - futures.append(stub.Propose(ProposeRequest(command=cmd.SerializeToString()))) + futures.append(stub.Propose(ProposeRequest(propose_id=propose_id, command=cmd.SerializeToString()))) for future in asyncio.as_completed(futures): - res = await future + res: ProposeResponse = ProposeResponse() + try: + res = await future + except Exception as e: + logging.warning(e) + continue - if res.HasField("result"): - cmd_result = res.result - ok_cnt += 1 + ok_cnt += 1 + if not res.HasField("result"): + continue + cmd_result = res.result + if cmd_result.HasField("ok"): + if is_received_leader_res: + msg = "should not set exe result twice" + raise Exception(msg) + cmd_res.ParseFromString(cmd_result.ok) is_received_leader_res = True - if cmd_result.HasField("er"): - cmd_res.ParseFromString(cmd_result.er) - if cmd_result.HasField("error"): - exe_err.inner.ParseFromString(cmd_result.error) - raise exe_err - elif res.HasField("error"): - logging.info(res.error) - else: - ok_cnt += 1 + elif cmd_result.HasField("error"): + exe_err.inner.ParseFromString(cmd_result.error) + raise exe_err if is_received_leader_res and ok_cnt >= self.super_quorum(len(self.connects)): - logging.info("fast round succeed. propose id: %s", cmd.propose_id) + logging.info("fast round succeed. propose id: %s", propose_id) return (cmd_res, True) - logging.info("fast round failed. propose id: %s", cmd.propose_id) + logging.info("fast round failed. propose id: %s", propose_id) return (cmd_res, False) - async def slow_round(self, cmd: Command) -> tuple[SyncResponse, CommandResponse]: + async def slow_round(self, propose_id: ProposeId) -> tuple[SyncResponse, CommandResponse]: """ The slow round of Curp protocol """ - logging.info("slow round start. propose id: %s", cmd.propose_id) + logging.info("slow round start. propose id: %s", propose_id) - sync_res = SyncResponse() - cmd_res = CommandResponse() - exe_err = CommandSyncError(_CommandSyncError()) - after_sync_err = WaitSyncError(_WaitSyncError()) + asr = SyncResponse() + er = CommandResponse() + exe_err = ExecuteError(_ExecuteError()) - channel = self.connects[self.leader_id] + channel = self.connects[self.state.leader] stub = ProtocolStub(channel) - res = await stub.WaitSynced(WaitSyncedRequest(propose_id=cmd.propose_id)) - - if res.HasField("success"): - success = res.success - sync_res.ParseFromString(success.after_sync_result) - cmd_res.ParseFromString(success.exe_result) - logging.info("slow round succeed. propose id: %s", cmd.propose_id) - return (sync_res, cmd_res) - if res.HasField("error"): - cmd_sync_err = res.error - if cmd_sync_err.HasField("execute"): - exe_err.inner.ParseFromString(cmd_sync_err.execute) - raise exe_err - if cmd_sync_err.HasField("after_sync"): - after_sync_err.inner.ParseFromString(cmd_sync_err.after_sync) - raise after_sync_err + res: WaitSyncedResponse = await stub.WaitSynced( + WaitSyncedRequest(propose_id=propose_id, cluster_version=self.cluster_version) + ) + + if res.after_sync_result.ok: + asr.ParseFromString(res.after_sync_result.ok) + elif res.after_sync_result.error: + exe_err.inner.ParseFromString(res.after_sync_result.error) + raise exe_err + + if res.exe_result.ok: + er.ParseFromString(res.exe_result.ok) + elif res.exe_result.error: + exe_err.inner.ParseFromString(res.exe_result.error) + raise exe_err + + return (asr, er) - err_msg = "Response decode error" - raise ResDecodeError(err_msg) + def gen_propose_id(self) -> ProposeId: + """ + Generate a propose id + """ + client_id = self.get_client_id() + seq_sum = self.new_seq_num() + return ProposeId(client_id=client_id, seq_num=seq_sum) + + def new_seq_num(self) -> int: + """ + New a seq num and record it + + TODO: implement request tracker + """ + return 0 + + def get_client_id(self) -> int: + """ + Get the client id + + TODO: grant a client id from server + """ + return random.randint(0, 2**64 - 1) @staticmethod def super_quorum(nodes: int) -> int: @@ -193,3 +244,23 @@ def super_quorum(nodes: int) -> int: quorum = fault_tolerance + 1 superquorum = fault_tolerance + (quorum // 2) + 1 return superquorum + + +ServerId = int + + +class State: + """ + Protocol client state + + Attributes: + leader: Current leader + term: Current term + """ + + leader: int + term: int + + def __init__(self, leader: int, term: int) -> None: + self.leader = leader + self.term = term diff --git a/client/txn.py b/client/txn.py index 28bd30d..cdaee76 100644 --- a/client/txn.py +++ b/client/txn.py @@ -1,6 +1,5 @@ "Transaction" -import uuid from typing import List, Optional from client.client import ProtocolClient as CurpClient from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange @@ -28,12 +27,10 @@ class Txn: Transaction. Attributes: - name: Name of the Transaction, which will be used in CURP propose id generation. curp_client: The client running the CURP protocol, communicate with all servers. token: The auth token. """ - name: str curp_client: CurpClient token: Optional[str] @@ -41,8 +38,7 @@ class Txn: sus: List[RequestOp] fas: List[RequestOp] - def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None: - self.name = name + def __init__(self, curp_client: CurpClient, token: Optional[str]) -> None: self.curp_client = curp_client self.token = token @@ -67,7 +63,6 @@ async def commit(self) -> TxnResponse: krs = [] for cmp in self.cmps: krs.append(KeyRange(key=cmp.key, range_end=cmp.range_end)) - propose_id = self.generate_propose_id(self.name) r = TxnRequest(compare=self.cmps, success=self.sus, failure=self.fas) req = RequestWithToken( txn_request=r, @@ -76,13 +71,6 @@ async def commit(self) -> TxnResponse: cmd = Command( keys=krs, request=req, - propose_id=propose_id, ) er, _ = await self.curp_client.propose(cmd, False) return er.txn_response - - @staticmethod - def generate_propose_id(prefix: str) -> str: - """Generate propose id with the given prefix""" - propose_id = f"{prefix}-{uuid.uuid4()}" - return propose_id diff --git a/pyproject.toml b/pyproject.toml index 36df693..8fbe928 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,7 +80,6 @@ fmt = [ ] all = [ "style", - "typing", ] [tool.black] @@ -133,6 +132,8 @@ ignore = [ "A003", # Allow const use upercase "N806", + # Allow magic values + "S311", ] unfixable = [ # Don't touch unused imports diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh index 57adf88..fb42b89 100755 --- a/scripts/quick_start.sh +++ b/scripts/quick_start.sh @@ -55,7 +55,7 @@ stop_all() { run_container() { echo container starting size=${1} - image="ghcr.io/xline-kv/xline:b573f16" + image="ghcr.io/xline-kv/xline:v0.6.1" for ((i = 1; i <= ${size}; i++)); do docker run -d -it --rm --name=node${i} --net=xline_net --ip=${SERVERS[$i]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt ${image} bash & done diff --git a/tests/protocol_test.py b/tests/protocol_test.py index ae85253..30d1d39 100755 --- a/tests/protocol_test.py +++ b/tests/protocol_test.py @@ -1,6 +1,5 @@ """Tests for the protocol client.""" -import uuid import pytest from api.xline.xline_command_pb2 import Command, RequestWithToken @@ -22,7 +21,6 @@ async def test_propose_fast_path(): value=b"py-xline", ) ), - propose_id=f"client-{uuid.uuid4()}", ) er, _ = await client.propose(cmd, True) @@ -43,7 +41,6 @@ async def test_propose_slow_path(): value=b"py-xline1", ) ), - propose_id=f"client-{uuid.uuid4()}", ) er, asr = await client.propose(cmd, False)