Skip to content

Commit

Permalink
feat: error-handling
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <[email protected]>
  • Loading branch information
LingKa28 committed Jan 18, 2024
1 parent 7b78699 commit 7fb88f8
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 16 deletions.
18 changes: 18 additions & 0 deletions client/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,21 @@ class ExecuteError(Exception):

def __init__(self, err: _ExecuteError) -> None:
self.inner = err


class ShuttingDownError(Exception):
"""Server is shutting down"""

pass


class WrongClusterVersionError(Exception):
"""Wrong cluster version"""

pass


class InternalError(Exception):
"""Internal Error in client"""

pass
206 changes: 190 additions & 16 deletions client/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
ProposeId,
FetchClusterResponse,
FetchClusterRequest,
CurpError,
)
from api.curp.curp_command_pb2_grpc import ProtocolStub
from client.error import ExecuteError
from client.error import (
ExecuteError,
WrongClusterVersionError,
ShuttingDownError,
InternalError,
)
from api.xline.xline_command_pb2 import Command, CommandResponse, SyncResponse
from api.xline.xline_error_pb2 import ExecuteError as _ExecuteError

Expand All @@ -36,7 +42,6 @@ class ProtocolClient:
state: State
connects: dict[int, grpc.Channel]
cluster_version: int
# TODO config

def __init__(
self,
Expand Down Expand Up @@ -82,25 +87,75 @@ async def fast_fetch_cluster(addrs: list[str]) -> FetchClusterResponse:
msg = "fetch cluster error"
raise Exception(msg)

async def fetch_cluster(self, linearizable: bool) -> FetchClusterResponse:
"""
Send fetch cluster requests to all servers
Note: The fetched cluster may still be outdated if `linearizable` is false
"""
connects = self.all_connects()
rpcs: list[grpc.Future[FetchClusterResponse]] = []

Check warning on line 96 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L95-L96

Added lines #L95 - L96 were not covered by tests

for channel in connects:
stub = ProtocolStub(channel)
rpcs.append(stub.FetchCluster(FetchClusterRequest(linearizable=linearizable)))

Check warning on line 100 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L99-L100

Added lines #L99 - L100 were not covered by tests

max_term = 0
resp: FetchClusterResponse | None = None
ok_cnt = 0
majority_cnt = len(connects) // 2 + 1

Check warning on line 105 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L102-L105

Added lines #L102 - L105 were not covered by tests

for rpc in asyncio.as_completed(rpcs):
try:
res: FetchClusterResponse = await rpc
except grpc.RpcError as e:
logging.warning(e)
continue

Check warning on line 112 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L108-L112

Added lines #L108 - L112 were not covered by tests

if max_term < res.term:
max_term = res.term

Check warning on line 115 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L115

Added line #L115 was not covered by tests
if len(res.members) == 0:
resp = res
ok_cnt = 1

Check warning on line 118 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L117-L118

Added lines #L117 - L118 were not covered by tests
elif max_term == res.term:
if len(res.members) == 0:
resp = res
ok_cnt += 1

Check warning on line 122 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L121-L122

Added lines #L121 - L122 were not covered by tests
else:
pass

Check warning on line 124 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L124

Added line #L124 was not covered by tests

if ok_cnt >= majority_cnt:
break

Check warning on line 127 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L127

Added line #L127 was not covered by tests

if resp is not None:
logging.debug("Fetch cluster succeeded, result: %s", res)
self.state.check_and_update(res.leader_id, res.term)
return resp

Check warning on line 132 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L130-L132

Added lines #L130 - L132 were not covered by tests

async def fetch_leader(self) -> ServerId:
"""
Send fetch leader requests to all servers until there is a leader
Note: The fetched leader may still be outdated
"""
res = await self.fetch_cluster(False)
return res.leader_id

Check warning on line 140 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L139-L140

Added lines #L139 - L140 were not covered by tests

async def propose(self, cmd: Command, use_fast_path: bool = False) -> tuple[CommandResponse, SyncResponse | None]:
"""
Propose the request to servers, if use_fast_path is false, it will wait for the synced index
"""
propose_id = self.gen_propose_id()

# TODO: retry
if use_fast_path:
return await self.fast_path(propose_id, cmd)
else:
return await self.slow_path(propose_id, cmd)
# TODO: error handling

async def fast_path(self, propose_id: ProposeId, cmd: Command) -> tuple[CommandResponse, SyncResponse | None]:
"""
Fast path of propose
"""
fast_round = self.fast_round(propose_id, cmd)
slow_round = self.slow_round(propose_id)
slow_round = self.slow_round(propose_id, cmd)

# Wait for the fast and slow round at the same time
for futures in asyncio.as_completed([fast_round, slow_round]):
Expand All @@ -111,10 +166,8 @@ async def fast_path(self, propose_id: ProposeId, cmd: Command) -> tuple[CommandR
continue

if isinstance(first, CommandResponse) and second:
# TODO: error handling
return (first, None)
if isinstance(second, CommandResponse) and isinstance(first, SyncResponse):
# TODO: error handling
return (second, first)

msg = "fast path error"
Expand All @@ -124,7 +177,7 @@ async def slow_path(self, propose_id: ProposeId, cmd: Command) -> tuple[CommandR
"""
Slow path of propose
"""
results = await asyncio.gather(self.fast_round(propose_id, cmd), self.slow_round(propose_id))
results = await asyncio.gather(self.fast_round(propose_id, cmd), self.slow_round(propose_id, cmd))
for result in results:
if isinstance(result[0], SyncResponse) and isinstance(result[1], CommandResponse):
return (result[1], result[0])
Expand Down Expand Up @@ -153,9 +206,21 @@ async def fast_round(self, propose_id: ProposeId, cmd: Command) -> tuple[Command
res: ProposeResponse = ProposeResponse()
try:
res = await future
except Exception as e:
except grpc.RpcError as e:
logging.warning(e)
continue
curp_err = CurpError()
dtl: str = e.details()
try:
curp_err.ParseFromString(dtl.encode())
except Exception as e:
logging.warning(e)
continue
if curp_err.HasField("ShuttingDown"):
raise ShuttingDownError from e

Check warning on line 219 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L219

Added line #L219 was not covered by tests
elif curp_err.HasField("WrongClusterVersion"):
raise WrongClusterVersionError from e

Check warning on line 221 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L221

Added line #L221 was not covered by tests
else:
continue

Check warning on line 223 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L223

Added line #L223 was not covered by tests

ok_cnt += 1
if not res.HasField("result"):
Expand All @@ -178,7 +243,7 @@ async def fast_round(self, propose_id: ProposeId, cmd: Command) -> tuple[Command
logging.info("fast round failed. propose id: %s", propose_id)
return (cmd_res, False)

async def slow_round(self, propose_id: ProposeId) -> tuple[SyncResponse, CommandResponse]:
async def slow_round(self, propose_id: ProposeId, cmd: Command) -> tuple[SyncResponse, CommandResponse]:
"""
The slow round of Curp protocol
"""
Expand All @@ -190,9 +255,33 @@ async def slow_round(self, propose_id: ProposeId) -> tuple[SyncResponse, Command

channel = self.connects[self.state.leader]
stub = ProtocolStub(channel)
res: WaitSyncedResponse = await stub.WaitSynced(
WaitSyncedRequest(propose_id=propose_id, cluster_version=self.cluster_version)
)

res = WaitSyncedResponse()
try:
res: WaitSyncedResponse = await stub.WaitSynced(
WaitSyncedRequest(propose_id=propose_id, cluster_version=self.cluster_version)
)
except grpc.RpcError as e:
logging.warning("wait synced rpc error: %s", e)
curp_err = CurpError()
dtl: str = e.details()
curp_err.ParseFromString(dtl.encode())

Check warning on line 268 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L264-L268

Added lines #L264 - L268 were not covered by tests
if curp_err.HasField("ShuttingDown"):
raise ShuttingDownError from e

Check warning on line 270 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L270

Added line #L270 was not covered by tests
elif curp_err.HasField("WrongClusterVersion"):
raise WrongClusterVersionError from e

Check warning on line 272 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L272

Added line #L272 was not covered by tests
elif curp_err.HasField("RpcTransport"):
# it's quite likely that the leader has crashed,
# then we should wait for some time and fetch the leader again
self.resend_propose(propose_id, cmd, None)

Check warning on line 276 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L276

Added line #L276 was not covered by tests
elif curp_err.HasField("redirect"):
new_leader = curp_err.redirect.leader_id
term = curp_err.redirect.term
self.state.check_and_update(new_leader, term)

Check warning on line 280 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L278-L280

Added lines #L278 - L280 were not covered by tests
# resend the propose to the new leader
self.resend_propose(propose_id, cmd, None)

Check warning on line 282 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L282

Added line #L282 was not covered by tests
else:
raise InternalError from e

Check warning on line 284 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L284

Added line #L284 was not covered by tests

if res.after_sync_result.ok:
asr.ParseFromString(res.after_sync_result.ok)
Expand All @@ -208,6 +297,44 @@ async def slow_round(self, propose_id: ProposeId) -> tuple[SyncResponse, Command

return (asr, er)

def resend_propose(self, propose_id: ProposeId, cmd: Command, new_leader: ServerId | None) -> True | None:
"""
Resend the propose only to the leader.
This is used when leader changes and we need to ensure that the propose is received by the new leader.
"""
leader_id: int | None = None

Check warning on line 305 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L305

Added line #L305 was not covered by tests
if new_leader is not None:
_id = new_leader
try:
self.fetch_leader()
leader_id = _id
except Exception as e:
logging.warning("failed to fetch leader, %s", e)
logging.debug("resend propose to %s", leader_id)

Check warning on line 313 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L307-L313

Added lines #L307 - L313 were not covered by tests

stub = ProtocolStub(self.get_connect(leader_id))

Check warning on line 315 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L315

Added line #L315 was not covered by tests

try:
stub.Propose(

Check warning on line 318 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L317-L318

Added lines #L317 - L318 were not covered by tests
ProposeRequest(
propose_id=propose_id, command=cmd.SerializeToString(), cluster_version=self.cluster_version
)
)
except grpc.RpcError as e:

Check warning on line 323 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L323

Added line #L323 was not covered by tests
# if the propose fails again, need to fetch the leader and try again
logging.warning("failed to resend propose, %s", e)
curp_err = CurpError()
dtl: str = e.details()
curp_err.ParseFromString(dtl.encode())

Check warning on line 328 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L325-L328

Added lines #L325 - L328 were not covered by tests
if curp_err.HasField("ShuttingDown"):
raise ShuttingDownError from e

Check warning on line 330 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L330

Added line #L330 was not covered by tests
elif curp_err.HasField("WrongClusterVersion"):
raise WrongClusterVersionError from e

Check warning on line 332 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L332

Added line #L332 was not covered by tests
elif curp_err.HasField("Duplicated"):
return True

Check warning on line 334 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L334

Added line #L334 was not covered by tests
else:
return None

Check warning on line 336 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L336

Added line #L336 was not covered by tests

def gen_propose_id(self) -> ProposeId:
"""
Generate a propose id
Expand All @@ -232,6 +359,18 @@ def get_client_id(self) -> int:
"""
return random.randint(0, 2**64 - 1)

def all_connects(self) -> list[grpc.Channel]:
"""
Get all connects
"""
return list(self.connects.values())

Check warning on line 366 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L366

Added line #L366 was not covered by tests

def get_connect(self, _id: ServerId) -> grpc.Channel:
"""
Get all connects
"""
return self.connects[_id]

Check warning on line 372 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L372

Added line #L372 was not covered by tests

@staticmethod
def super_quorum(nodes: int) -> int:
"""
Expand All @@ -258,9 +397,44 @@ class State:
term: Current term
"""

leader: int
leader: int | None
term: int

def __init__(self, leader: int, term: int) -> None:
def __init__(self, leader: int | None, term: int) -> None:
self.leader = leader
self.term = term

def check_and_update(self, leader_id: int | None, term: int):
"""
Check the term and leader id, update the state if needed
"""
if self.term < term:
# reset term only when the resp has leader id to prevent:
# If a server loses contact with its leader, it will update its term for election.
# Since other servers are all right, the election will not succeed.
# But if the client learns about the new term and updates its term to it, it will never get the true leader.
if leader_id is not None:
new_leader_id = leader_id
self.update_to_term(term)
self.set_leader(new_leader_id)

Check warning on line 419 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L417-L419

Added lines #L417 - L419 were not covered by tests
elif self.term == term:
if leader_id is not None:
new_leader_id = leader_id

Check warning on line 422 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L422

Added line #L422 was not covered by tests
if self.leader is None:
self.set_leader(new_leader_id)

Check warning on line 424 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L424

Added line #L424 was not covered by tests
else:
pass

Check warning on line 426 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L426

Added line #L426 was not covered by tests

def update_to_term(self, term: int) -> None:
"""
Update to the newest term and reset local cache
"""
self.term = term
self.leader = None

Check warning on line 433 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L432-L433

Added lines #L432 - L433 were not covered by tests

def set_leader(self, _id: ServerId) -> None:
"""
Set the leader and notify all the waiters
"""
logging.debug("client update its leader to %s", _id)
self.leader = _id

Check warning on line 440 in client/protocol.py

View check run for this annotation

Codecov / codecov/patch

client/protocol.py#L439-L440

Added lines #L439 - L440 were not covered by tests

0 comments on commit 7fb88f8

Please sign in to comment.