Skip to content

Commit

Permalink
feat: refactor: propose id
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <[email protected]>
  • Loading branch information
LingKa28 committed Jan 12, 2024
1 parent 4bb6db2 commit 7b78699
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 184 deletions.
15 changes: 3 additions & 12 deletions client/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Auth Client"""

import uuid
from typing import Optional
import grpc
from passlib.hash import pbkdf2_sha256
Expand Down Expand Up @@ -57,18 +56,16 @@ class AuthClient:
Client for Auth operations.
Attributes:
name: Name of the AuthClient, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
auth_client: The auth RPC client, only communicate with one server at a time.
token: The auth token.
"""

name: str
curp_client: CurpClient
auth_client: AuthStub
token: Optional[str]

def __init__(self, name: str, curp_client: CurpClient, channel: grpc.Channel, token: Optional[str]) -> None:
self.name = name
def __init__(self, curp_client: CurpClient, channel: grpc.Channel, token: Optional[str]) -> None:
self.curp_client = curp_client
self.auth_client = AuthStub(channel=channel)
self.token = token
Expand Down Expand Up @@ -262,8 +259,7 @@ async def handle_req(self, req: RequestWithToken, use_fast_path: bool) -> Comman
"""
Send request using fast path or slow path.
"""
propose_id = self.generate_propose_id()
cmd = Command(request=req, propose_id=propose_id)
cmd = Command(request=req)

if use_fast_path:
er, _ = await self.curp_client.propose(cmd, True)
Expand All @@ -275,11 +271,6 @@ async def handle_req(self, req: RequestWithToken, use_fast_path: bool) -> Comman
raise Exception(msg)
return er

def generate_propose_id(self) -> str:
"""Generate propose id with the given prefix."""
propose_id = f"{self.name}-{uuid.uuid4()}"
return propose_id

@staticmethod
def hash_password(password: str) -> str:
"""Generate hash of the password."""
Expand Down
8 changes: 4 additions & 4 deletions client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ async def connect(cls, addrs: list[str]) -> Client:
# TODO: Acquire the auth token
id_gen = LeaseIdGenerator()

kv_client = KvClient("client", protocol_client, "")
lease_client = LeaseClient("client", protocol_client, channel, "", id_gen)
kv_client = KvClient(protocol_client, "")
lease_client = LeaseClient(protocol_client, channel, "", id_gen)
watch_client = WatchClient(channel)
lock_client = LockClient("client", protocol_client, channel, "", id_gen)
auth_client = AuthClient("client", protocol_client, channel, "")
lock_client = LockClient(protocol_client, channel, "", id_gen)
auth_client = AuthClient(protocol_client, channel, "")
maintenance_client = MaintenanceClient(channel)

return cls(kv_client, lease_client, watch_client, lock_client, auth_client, maintenance_client)
40 changes: 1 addition & 39 deletions client/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,10 @@
Client Errors
"""

from api.curp.curp_error_pb2 import (
ProposeError as _ProposeError,
CommandSyncError as _CommandSyncError,
WaitSyncError as _WaitSyncError,
)
from api.xline.xline_error_pb2 import ExecuteError as _ExecuteError


class ResDecodeError(Exception):
"""Response decode error"""

pass


class ProposeError(BaseException):
"""Propose error"""

inner: _ProposeError

def __init__(self, err: _ProposeError) -> None:
self.inner = err


class CommandSyncError(BaseException):
"""Command sync error"""

inner: _CommandSyncError

def __init__(self, err: _CommandSyncError) -> None:
self.inner = err


class WaitSyncError(BaseException):
"""Wait sync error"""

inner: _WaitSyncError

def __init__(self, err: _WaitSyncError) -> None:
self.inner = err


class ExecuteError(BaseException):
class ExecuteError(Exception):
"""Execute error"""

inner: _ExecuteError
Expand Down
20 changes: 1 addition & 19 deletions client/kv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Kv Client"""

import uuid
from typing import Optional, Literal
from client.protocol import ProtocolClient as CurpClient
from client.txn import Txn
Expand Down Expand Up @@ -32,12 +31,10 @@ class KvClient:
token: The auth token.
"""

name: str
curp_client: CurpClient
token: Optional[str]

def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None:
self.name = name
def __init__(self, curp_client: CurpClient, token: Optional[str]) -> None:
self.curp_client = curp_client
self.token = token

Expand Down Expand Up @@ -76,15 +73,13 @@ async def range(
max_create_revision=max_create_revision,
)
key_ranges = [KeyRange(key=key, range_end=range_end)]
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
range_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.range_response
Expand All @@ -105,15 +100,13 @@ async def put(
key=key, value=value, lease=lease, prev_kv=prev_kv, ignore_value=ignore_value, ignore_lease=ignore_lease
)
key_ranges = [KeyRange(key=key, range_end=key)]
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
put_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.put_response
Expand All @@ -128,7 +121,6 @@ async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool
prev_kv=prev_kv,
)
key_ranges = [KeyRange(key=key, range_end=range_end)]
propose_id = generate_propose_id(self.name)

request_with_token = RequestWithToken(
delete_range_request=req,
Expand All @@ -137,7 +129,6 @@ async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.delete_range_response
Expand All @@ -148,7 +139,6 @@ def txn(self) -> Txn:
"""

return Txn(
self.name,
self.curp_client,
self.token,
)
Expand All @@ -162,14 +152,12 @@ async def compact(self, revision: int, physical: bool = False) -> CompactionResp
physical=physical,
)
use_fast_path = physical
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
compaction_request=req,
token=self.token,
)
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)

if use_fast_path:
Expand All @@ -181,9 +169,3 @@ async def compact(self, revision: int, physical: bool = False) -> CompactionResp
msg = "sync_res is always Some when use_fast_path is false"
raise Exception(msg)
return er.compaction_response


def generate_propose_id(prefix: str) -> str:
"""Generate propose id with the given prefix"""
propose_id = f"{prefix}-{uuid.uuid4()}"
return propose_id
15 changes: 2 additions & 13 deletions client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,22 @@ class LeaseClient:
Client for Lease operations.
Attributes:
name: Name of the LeaseClient, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
lease_client: The lease RPC client, only communicate with one server at a time.
token: The auth token.
id_gen: Lease Id generator.
keepers: Keep alive keepers.
"""

name: str
curp_client: CurpClient
lease_client: LeaseStub
token: Optional[str]
id_gen: LeaseIdGenerator
keepers: dict[str, LeaseKeeper]

def __init__(
self, name: str, curp_client: CurpClient, channel: Channel, token: Optional[str], id_gen: LeaseIdGenerator
self, curp_client: CurpClient, channel: Channel, token: Optional[str], id_gen: LeaseIdGenerator
) -> None:
self.name = name
self.curp_client = curp_client
self.lease_client = LeaseStub(channel=channel)
self.token = token
Expand All @@ -119,10 +117,8 @@ async def grant(self, req: LeaseGrantRequest) -> LeaseGrantResponse:
if req.ID == 0:
req.ID = self.id_gen.next()
request_with_token = RequestWithToken(token=self.token, lease_grant_request=req)
propose_id = self.generate_propose_id()
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.lease_grant_response
Expand Down Expand Up @@ -165,15 +161,8 @@ async def leases(self) -> LeaseLeasesResponse:
Lists all existing leases.
"""
request_with_token = RequestWithToken(token=self.token, lease_leases_request=LeaseLeasesRequest())
propose_id = self.generate_propose_id()
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.lease_leases_response

def generate_propose_id(self) -> str:
"""Generate propose id with the given prefix."""
propose_id = f"{self.name}-{uuid.uuid4()}"
return propose_id
18 changes: 3 additions & 15 deletions client/lock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Lock Client"""

import uuid
from urllib import parse
from typing import Optional
from grpc import Channel
Expand Down Expand Up @@ -45,25 +44,20 @@ class LockClient:
Client for Lock operations.
Attributes:
name: Name of the LockClient.
curp_client: The client running the CURP protocol, communicate with all servers.
lease_client: The lease client.
watch_client: The watch client.
token: Auth token
"""

name: str
curp_client: CurpClient
lease_client: LeaseClient
watch_client: WatchClient
token: Optional[str]

def __init__(
self, name: str, curp_client: CurpClient, channel: Channel, token: str, id_gen: LeaseIdGenerator
) -> None:
self.name = name
def __init__(self, curp_client: CurpClient, channel: Channel, token: str, id_gen: LeaseIdGenerator) -> None:
self.curp_client = curp_client
self.lease_client = LeaseClient(name=name, curp_client=curp_client, channel=channel, token=token, id_gen=id_gen)
self.lease_client = LeaseClient(curp_client=curp_client, channel=channel, token=token, id_gen=id_gen)
self.watch_client = WatchClient(channel=channel)
self.token = token

Expand Down Expand Up @@ -169,8 +163,7 @@ async def propose(self, req: RequestWithToken, use_fast_path: bool) -> tuple[Com
"""
Send request using fast path.
"""
propose_id = self.generate_propose_id()
cmd = Command(request=req, propose_id=propose_id)
cmd = Command(request=req)

if use_fast_path:
res = await self.curp_client.propose(cmd, True)
Expand All @@ -182,11 +175,6 @@ async def propose(self, req: RequestWithToken, use_fast_path: bool) -> tuple[Com
raise Exception(msg)
return res

def generate_propose_id(self) -> str:
"""Generate propose id with the given prefix."""
propose_id = f"{self.name}-{uuid.uuid4()}"
return propose_id

async def wait_delete(self, pfx: str, my_rev: int) -> None:
"""
Wait until last key deleted.
Expand Down
Loading

0 comments on commit 7b78699

Please sign in to comment.