Skip to content

Commit

Permalink
Merge pull request #46 from masa-finance/chore--code-review
Browse files Browse the repository at this point in the history
chore: code review
  • Loading branch information
grantdfoster authored Dec 19, 2024
2 parents 89783d9 + 2e4c3df commit b8786bd
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 252 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"editor.formatOnSave": true,
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
}
}
206 changes: 75 additions & 131 deletions neurons/miner.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,34 @@
from fiber.miner.server import factory_app

from typing import Optional
from fiber.logging_utils import get_logger
from functools import partial
from dotenv import load_dotenv

import httpx
import os
import requests
from pydantic import BaseModel

# from fiber.chain import interface
import httpx
import uvicorn
import requests

# Import the vali_client module or object
from fastapi import FastAPI, Depends
from fiber.miner.middleware import configure_extra_logging_middleware
from fiber.chain import chain_utils, post_ip_to_chain, interface
from fiber.chain.metagraph import Metagraph
from dotenv import load_dotenv
from fiber.encrypted.miner.dependencies import blacklist_low_stake, verify_request
from fiber.encrypted.miner.security.encryption import decrypt_general_payload

import time

from cryptography.fernet import Fernet
from fastapi import Depends, Header

from fiber import constants as cst
from fiber.encrypted.miner.core.configuration import Config
from fiber.encrypted.miner.core.models.encryption import (
PublicKeyResponse,
SymmetricKeyExchange,
)
from fiber.miner.server import factory_app
from fiber.encrypted.miner.dependencies import (
blacklist_low_stake,
get_config,
verify_request,
)
from fiber.encrypted.miner.security.encryption import get_symmetric_key_b64_from_payload
from fiber.encrypted.miner.security.encryption import (
decrypt_general_payload,
)
from fiber.encrypted.miner.endpoints.handshake import (
get_public_key,
exchange_symmetric_key,
)

from fiber.networking.models import NodeWithFernet as Node
from fiber.logging_utils import get_logger

from functools import partial
from typing import Optional
from pydantic import BaseModel
from fastapi import FastAPI, Depends


logger = get_logger(__name__)


Expand All @@ -52,28 +41,27 @@ def __init__(self):
"""Initialize miner"""
load_dotenv()

# load environment variables
self.netuid = int(os.getenv("NETUID", "59"))
self.subtensor_network = os.getenv("SUBTENSOR_NETWORK", "finney")
self.subtensor_address = os.getenv(
"SUBTENSOR_ADDRESS", "wss://entrypoint-finney.opentensor.ai:443"
)
self.wallet_name = os.getenv("WALLET_NAME", "miner")
self.hotkey_name = os.getenv("HOTKEY_NAME", "default")
self.port = int(os.getenv("MINER_PORT", 8080))
self.port = int(os.getenv("MINER_PORT", 8082))
self.external_ip = self.get_external_ip()

# initialize server
self.server: Optional[factory_app] = None
self.app: Optional[FastAPI] = None
self.httpx_client = None
self.keypair = chain_utils.load_hotkey_keypair(
self.wallet_name, self.hotkey_name
)

self.netuid = int(os.getenv("NETUID", "59"))
self.httpx_client: Optional[httpx.AsyncClient] = None

self.subtensor_network = os.getenv("SUBTENSOR_NETWORK", "finney")
self.subtensor_address = os.getenv(
"SUBTENSOR_ADDRESS", "wss://entrypoint-finney.opentensor.ai:443"
)

self.server: Optional[factory_app] = None
self.app: Optional[FastAPI] = None
self.api_url = os.getenv("API_URL", "https://test.protocol-api.masa.ai")

# initialize substrate
self.substrate = interface.get_substrate(
subtensor_network=self.subtensor_network,
subtensor_address=self.subtensor_address,
Expand All @@ -83,21 +71,14 @@ def __init__(self):

self.post_ip_to_chain()

async def start(self):
"""Start the Fiber server and register with validator"""
async def start(self) -> None:
"""Start the miner service"""

try:
# Initialize httpx client first
self.httpx_client = httpx.AsyncClient()
# Start Fiber server before handshake
self.app = factory_app(debug=False)

self.register_routes()

# note, better logging - thanks Namoray!
if os.getenv("ENV", "prod").lower() == "dev":
configure_extra_logging_middleware(self.app)

# Start the FastAPI server
config = uvicorn.Config(
self.app, host="0.0.0.0", port=self.port, lifespan="on"
)
Expand All @@ -108,7 +89,7 @@ async def start(self):
logger.error(f"Failed to start miner: {str(e)}")
raise

def get_external_ip(self):
def get_external_ip(self) -> str:
env = os.getenv("ENV", "prod").lower()
if env == "dev":
# post this to chain to mark as local
Expand All @@ -121,7 +102,7 @@ def get_external_ip(self):
except requests.RequestException as e:
logger.error(f"Failed to get external IP: {e}")

def post_ip_to_chain(self):
def post_ip_to_chain(self) -> None:
node = self.node()
if node:
if node.ip != self.external_ip or node.port != self.port:
Expand Down Expand Up @@ -151,8 +132,7 @@ def post_ip_to_chain(self):
else:
raise Exception("Hotkey not registered to metagraph")

# note, requires metagraph sync
def node(self):
def node(self) -> Optional[Node]:
try:
nodes = self.metagraph.nodes
node = nodes[self.keypair.ss58_address]
Expand All @@ -161,33 +141,6 @@ def node(self):
logger.error(f"Failed to get node from metagraph: {e}")
return None

async def deregister_agent(self):
"""Register agent with the API"""
my_node = self.node()

try:
deregistration_data = {
"hotkey": self.keypair.ss58_address,
"uid": str(my_node.node_id),
"subnet_id": self.netuid,
"version": "4", # TODO: Implement versioning
"isActive": False,
}
endpoint = f"{self.api_url}/v1.0.0/subnet59/miners/register"
headers = {"Authorization": f"Bearer {os.getenv('API_KEY')}"}
response = await self.httpx_client.post(
endpoint, json=deregistration_data, headers=headers
)
if response.status_code == 200:
logger.info("Successfully deregistered agent!")
return response.json()
else:
logger.error(
f"Failed to register agent, status code: {response.status_code}, message: {response.text}"
)
except Exception as e:
logger.error(f"Exception occurred during agent registration: {str(e)}")

def get_verification_tweet_id(self) -> Optional[str]:
"""Get Verification Tweet ID For Agent Registration"""
try:
Expand All @@ -197,47 +150,17 @@ def get_verification_tweet_id(self) -> Optional[str]:
logger.error(f"Failed to get tweet: {str(e)}")
return None

async def stop(self):
async def stop(self) -> None:
"""Cleanup and shutdown"""
if self.server:
await self.server.stop()

async def get_self(self):
return self

async def get_public_key(self, config: Config = Depends(get_config)):
public_key = config.encryption_keys_handler.public_bytes.decode()
return PublicKeyResponse(
public_key=public_key,
timestamp=time.time(),
)

async def exchange_symmetric_key(
self,
payload: SymmetricKeyExchange,
validator_hotkey_address: str = Header(..., alias=cst.VALIDATOR_HOTKEY),
nonce: str = Header(..., alias=cst.NONCE),
symmetric_key_uuid: str = Header(..., alias=cst.SYMMETRIC_KEY_UUID),
config: Config = Depends(get_config),
):
base64_symmetric_key = get_symmetric_key_b64_from_payload(
payload, config.encryption_keys_handler.private_key
)
fernet = Fernet(base64_symmetric_key)
config.encryption_keys_handler.add_symmetric_key(
uuid=symmetric_key_uuid,
hotkey_ss58_address=validator_hotkey_address,
fernet=fernet,
)

return {"status": "Symmetric key exchanged successfully"}

async def registration_callback(
self,
decrypted_payload: DecryptedPayload = Depends(
partial(decrypt_general_payload, DecryptedPayload),
),
):
) -> None:
"""Registration Callback"""
try:
logger.info(f"Decrypted Payload: {decrypted_payload}")
Expand All @@ -247,34 +170,54 @@ async def registration_callback(
logger.error(f"Error in registration callback: {str(e)}")
return {"status": "Error in registration callback"}

def register_routes(self):
def get_self(self) -> None:
return self

def healthcheck(self):
try:
info = {
"ss58_address": str(self.keypair.ss58_address),
"uid": str(self.metagraph.nodes[self.keypair.ss58_address].node_id),
"ip": str(self.metagraph.nodes[self.keypair.ss58_address].ip),
"port": str(self.metagraph.nodes[self.keypair.ss58_address].port),
"netuid": str(self.netuid),
"subtensor_network": str(self.subtensor_network),
"subtensor_address": str(self.subtensor_address),
}
return info
except Exception as e:
logger.error(f"Failed to get validator info: {str(e)}")
return None

def register_routes(self) -> None:

self.app.add_api_route(
"/public-encryption-key", self.get_public_key, methods=["GET"]
"/healthcheck",
self.healthcheck,
methods=["GET"],
tags=["healthcheck"],
dependencies=[Depends(self.get_self)],
)

self.app.add_api_route(
"/public-encryption-key",
get_public_key,
methods=["GET"],
tags=["encryption"],
)

self.app.add_api_route(
"/exchange-symmetric-key",
self.exchange_symmetric_key,
exchange_symmetric_key,
methods=["POST"],
dependencies=[
Depends(self.get_self),
],
tags=["encryption"],
)

self.app.add_api_route(
"/get_verification_tweet_id",
self.get_verification_tweet_id,
methods=["GET"],
dependencies=[
Depends(self.get_self),
Depends(blacklist_low_stake),
],
)

self.app.add_api_route(
"/deregister_agent",
self.deregister_agent,
methods=["POST"],
tags=["registration"],
dependencies=[
Depends(self.get_self),
Depends(blacklist_low_stake),
Expand All @@ -285,6 +228,7 @@ def register_routes(self):
"/registration_callback",
self.registration_callback,
methods=["POST"],
tags=["registration"],
dependencies=[
Depends(self.get_self),
Depends(blacklist_low_stake),
Expand Down
Loading

0 comments on commit b8786bd

Please sign in to comment.