Skip to content

Commit

Permalink
refactor(cluster): update ClusterConfiguration for final release (#115)
Browse files Browse the repository at this point in the history
* refactor(cluster): re-arrange config for final release

Updated comments, and also method for front-running protection

* test: add test for checking hmac algo
  • Loading branch information
fubuloubu authored Sep 23, 2024
1 parent eae33b9 commit d10f01f
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 57 deletions.
159 changes: 102 additions & 57 deletions silverback/cluster/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@
import math
import uuid
from datetime import datetime
from typing import Annotated
from typing import Annotated, Any

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.hmac import HMAC, hashes
from eth_pydantic_types import Address, HexBytes
from eth_utils import to_bytes, to_int
from pydantic import BaseModel, Field, computed_field, field_validator


def normalize_bytes(val: bytes, length: int = 16) -> bytes:
return b"\x00" * (length - len(val)) + val


class WorkspaceInfo(BaseModel):
id: uuid.UUID
owner_id: uuid.UUID
Expand All @@ -18,17 +26,15 @@ class WorkspaceInfo(BaseModel):
class ClusterConfiguration(BaseModel):
"""Configuration of the cluster (represented as 16 byte value)"""

# NOTE: This configuration must be encode-able to a uint64 value for db storage
# and on-chain processing through ApePay

# NOTE: All defaults should be the minimal end of the scale,
# so that `__or__` works right
# NOTE: This configuration must be encode-able to a uint64 value for db duration and on-chain
# processing through ApePay
# NOTE: All defaults should be the minimal end of the scale, so that `__or__` works right

# Version byte (Byte 0)
# NOTE: Just in-case we change this after release
# NOTE: Update this to revise new models for every configuration change
version: int = 1

# Bot Worker Configuration (Bytes 1-2)
# Bot Worker Configuration, priced per bot (Bytes 1-2)
cpu: Annotated[int, Field(ge=0, le=6)] = 0 # defaults to 0.25 vCPU
"""Allocated vCPUs per bot:
- 0.25 vCPU (0)
Expand All @@ -42,25 +48,27 @@ class ClusterConfiguration(BaseModel):
memory: Annotated[int, Field(ge=0, le=120)] = 0 # defaults to 512 MiB
"""Total memory per bot (in GB, 0 means '512 MiB')"""

# NOTE: Configure # of workers based on cpu & memory settings
# NOTE: # of workers configured based on cpu & memory settings

# Runner configuration (Bytes 3-5)
# Runner configuration (Bytes 3-4)
networks: Annotated[int, Field(ge=1, le=20)] = 1
"""Maximum number of concurrent network runners"""

bots: Annotated[int, Field(ge=1, le=250)] = 1
"""Maximum number of concurrent bots running"""
"""Maximum number of concurrent running bots"""

triggers: Annotated[int, Field(ge=50, le=1000, multiple_of=5)] = 50
"""Maximum number of task triggers across all running bots"""
# NOTE: Byte 5 unused

# Recorder configuration (Byte 6)
storage: Annotated[int, Field(ge=0, le=250)] = 0 # 512 GB
"""Total task results and metrics parquet storage (in TB, 0 means '512 GB')"""
# Recorder configuration (Bytes 6-7)
bandwidth: Annotated[int, Field(ge=0, le=250)] = 0 # 512 kB/sec
"""Rate at which data should be emitted by cluster (in MB/sec, 0 means '512 kB')"""
# NOTE: This rate is only estimated average, and will serve as a throttling threshold

# Cluster general configuration (Byte 7)
secrets: Annotated[int, Field(ge=10, le=100)] = 10
"""Total managed secrets"""
duration: Annotated[int, Field(ge=1, le=120)] = 1
"""Time to keep data recording duration (in months)"""
# NOTE: The storage space alloted for your recordings will be `bandwidth x duration`.
# If the storage space is exceeded, it will be aggressively pruned to maintain that size.
# We will also prune duration past that point less aggressively, if there is unused space.

@field_validator("cpu", mode="before")
def parse_cpu_value(cls, value: str | int) -> int:
Expand All @@ -75,43 +83,40 @@ def parse_memory_value(cls, value: str | int) -> int:
return value

mem, units = value.split(" ")
if units.lower() == "mib":
if units.lower() in ("mib", "mb"):
assert mem == "512"
return 0

assert units.lower() == "gb"
return int(mem)

@field_validator("storage", mode="before")
def parse_storage_value(cls, value: str | int) -> int:
@field_validator("bandwidth", mode="before")
def parse_bandwidth_value(cls, value: str | int) -> int:
if not isinstance(value, str):
return value

storage, units = value.split(" ")
if units.lower() == "gb":
assert storage == "512"
bandwidth, units = value.split(" ")
if units.lower() == "b/sec":
assert bandwidth == "512"
return 0

assert units.lower() == "tb"
return int(storage)
assert units.lower() == "kb/sec"
return int(bandwidth)

def settings_display_dict(self) -> dict:
return dict(
version=self.version,
runner=dict(
networks=self.networks,
bots=self.bots,
),
bots=dict(
cpu=f"{256 * 2**self.cpu / 1024} vCPU",
memory=f"{self.memory} GB" if self.memory > 0 else "512 MiB",
),
general=dict(
bots=self.bots,
secrets=self.secrets,
),
runner=dict(
networks=self.networks,
triggers=self.triggers,
),
recorder=dict(
storage=f"{self.storage} TB" if self.storage > 0 else "512 GB",
bandwidth=f"{self.bandwidth} MB/sec" if self.bandwidth > 0 else "512 kB/sec",
duration=f"{self.duration} months",
),
)

Expand All @@ -121,41 +126,83 @@ def _decode_byte(value: int, byte: int) -> int:
return (value >> (8 * byte)) & (2**8 - 1) # NOTE: max uint8

@classmethod
def decode(cls, value: int) -> "ClusterConfiguration":
def decode(cls, value: Any) -> "ClusterConfiguration":
"""Decode the configuration from 8 byte integer value"""
if isinstance(value, ClusterConfiguration):
return value # TODO: Something weird with SQLModel

elif isinstance(value, bytes):
value = to_int(value)

elif not isinstance(value, int):
raise ValueError(f"Cannot decode type: '{type(value)}'")

# NOTE: Do not change the order of these, these are not forwards compatible
return cls(
version=cls._decode_byte(value, 0),
cpu=cls._decode_byte(value, 1),
memory=cls._decode_byte(value, 2),
networks=cls._decode_byte(value, 3),
bots=cls._decode_byte(value, 4),
triggers=5 * cls._decode_byte(value, 5),
storage=cls._decode_byte(value, 6),
secrets=cls._decode_byte(value, 7),
)
if (version := cls._decode_byte(value, 0)) == 1:
return cls(
version=version,
cpu=cls._decode_byte(value, 1),
memory=cls._decode_byte(value, 2),
networks=cls._decode_byte(value, 3),
bots=cls._decode_byte(value, 4),
bandwidth=cls._decode_byte(value, 6),
duration=cls._decode_byte(value, 7),
)

# NOTE: Update this to revise new models for every configuration change

raise ValueError(f"Unsupported version: '{version}'")

@staticmethod
def _encode_byte(value: int, byte: int) -> int:
return value << (8 * byte)

def encode(self) -> int:
"""Encode configuration as 8 byte integer value"""
# NOTE: Do not change the order of these, these are not forwards compatible
# NOTE: Only need to encode the latest version, can change implementation below
return (
self._encode_byte(self.version, 0)
+ self._encode_byte(self.cpu, 1)
+ self._encode_byte(self.memory, 2)
+ self._encode_byte(self.networks, 3)
+ self._encode_byte(self.bots, 4)
+ self._encode_byte(self.triggers // 5, 5)
+ self._encode_byte(self.storage, 6)
+ self._encode_byte(self.secrets, 7)
+ self._encode_byte(self.bandwidth, 6)
+ self._encode_byte(self.duration, 7)
)

def get_product_code(self, owner: Address, cluster_id: uuid.UUID) -> HexBytes:
# returns bytes32 product code `(sig || config)`
config = normalize_bytes(to_bytes(self.encode()))

# NOTE: MD5 is not recommended for general use, but is not considered insecure for HMAC use.
# However, our security property here is simple front-running protection to ensure
# only Workspace members can open a Stream to fund a Cluster (since `cluster_id` is a
# shared secret kept private between members of a Workspace when Cluster is created).
# Unless HMAC-MD5 can be shown insecure enough to recover the secret key in <5mins,
# this is probably good enough for now (and retains 16B size digest that fits with our
# encoded 16B configuration into a bytes32 val, to avoid memory expansion w/ DynArray)
h = HMAC(cluster_id.bytes, hashes.MD5())
h.update(normalize_bytes(to_bytes(hexstr=owner), length=20) + config)
sig = normalize_bytes(h.finalize()) # 16 bytes

return HexBytes(config + sig)

def validate_product_code(
self, owner: Address, signature: bytes, cluster_id: uuid.UUID
) -> bool:
# NOTE: Put `cluster_id` last so it's easy to use with `functools.partial`
config = normalize_bytes(to_bytes(self.encode()))

h = HMAC(cluster_id.bytes, hashes.MD5())
h.update(normalize_bytes(to_bytes(hexstr=owner), length=20) + config)

try:
h.verify(signature)
return True

except InvalidSignature:
return False


class ClusterTier(enum.IntEnum):
"""Suggestions for different tier configurations"""
Expand All @@ -165,18 +212,16 @@ class ClusterTier(enum.IntEnum):
memory="512 MiB",
networks=3,
bots=5,
triggers=50,
storage="512 GB",
secrets=10,
bandwidth="512 B/sec", # 1.236 GB/mo
duration=3, # months
).encode()
PROFESSIONAL = ClusterConfiguration(
cpu="1 vCPU",
memory="2 GB",
networks=10,
bots=20,
triggers=400,
storage="5 TB",
secrets=25,
bandwidth="5 kB/sec", # 12.36 GB/mo
duration=12, # 1 year = ~148GB
).encode()

def configuration(self) -> ClusterConfiguration:
Expand Down
13 changes: 13 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import uuid

from silverback.cluster.types import ClusterConfiguration


def test_hmac_signature():
config = ClusterConfiguration()
cluster_id = uuid.uuid4()
owner = "0x4838B106FCe9647Bdf1E7877BF73cE8B0BAD5f97"
product_code = config.get_product_code(owner, cluster_id)
# NOTE: Ensure we can properly decode the encoded product code into a configuration
assert config == ClusterConfiguration.decode(product_code[:16])
assert config.validate_product_code(owner, product_code[16:], cluster_id)

0 comments on commit d10f01f

Please sign in to comment.