Skip to content

refactor: trim some dead code, simplify auth #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: dylan/sim-impl
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions bin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use builder::{
config::BuilderConfig,
service::serve_builder_with_span,
service::serve_builder,
tasks::{
block::Simulator, bundler, metrics::MetricsTask, oauth::Authenticator, submit::SubmitTask,
tx_poller,
},
};
use init4_bin_base::deps::tracing;
use signet_sim::SimCache;
use signet_types::SlotCalculator;
use std::sync::Arc;
Expand All @@ -14,12 +15,11 @@ use tokio::select;
#[tokio::main]
async fn main() -> eyre::Result<()> {
let _guard = init4_bin_base::init4();

let span = tracing::info_span!("zenith-builder");
let init_span_guard = tracing::info_span!("builder initialization");

let config = BuilderConfig::load_from_env()?.clone();
let constants = config.load_pecorino_constants();
let authenticator = Authenticator::new(&config);
let authenticator = Authenticator::new(&config)?;

let (host_provider, ru_provider, sequencer_signer) = tokio::try_join!(
config.connect_host_provider(),
Expand All @@ -33,7 +33,7 @@ async fn main() -> eyre::Result<()> {
let (tx_channel, metrics_jh) = metrics.spawn();

let submit = SubmitTask {
authenticator: authenticator.clone(),
token: authenticator.token(),
host_provider,
zenith,
client: reqwest::Client::new(),
Expand All @@ -45,7 +45,7 @@ async fn main() -> eyre::Result<()> {
let tx_poller = tx_poller::TxPoller::new(&config);
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();

let bundle_poller = bundler::BundlePoller::new(&config, authenticator.clone());
let bundle_poller = bundler::BundlePoller::new(&config, authenticator.token());
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();

let authenticator_jh = authenticator.spawn();
Expand All @@ -63,8 +63,11 @@ async fn main() -> eyre::Result<()> {

let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel);

let port = config.builder_port;
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);
let server = serve_builder(([0, 0, 0, 0], config.builder_port));

// We have finished initializing the builder, so we can drop the init span
// guard.
drop(init_span_guard);

select! {
_ = tx_poller_jh => {
Expand Down
70 changes: 13 additions & 57 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,10 @@
use std::{fmt::Debug, net::SocketAddr};

use axum::{
Router,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
};
use tracing::{Instrument, Span};

/// App result
pub type AppResult<T, E = AppError> = Result<T, E>;

/// App error. This is a wrapper around eyre::Report that also includes an HTTP
/// status code. It implements [`IntoResponse`] so that it can be returned as an
/// error type from [`axum::handler::Handler`]s.
#[derive(Debug)]
pub struct AppError {
code: StatusCode,
eyre: eyre::Report,
}

impl AppError {
/// Instantiate a new error with the bad request status code.
pub fn bad_req<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self { code: StatusCode::BAD_REQUEST, eyre: e.into() }
}

/// Instantiate a new error with the bad request status code and an error
/// string.
pub fn bad_req_str(e: &str) -> Self {
Self { code: StatusCode::BAD_REQUEST, eyre: eyre::eyre!(e.to_owned()) }
}

/// Instantiate a new error with the internal server error status code.
pub fn server_err<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self { code: StatusCode::INTERNAL_SERVER_ERROR, eyre: e.into() }
}
}

impl IntoResponse for AppError {
fn into_response(self) -> Response {
(self.code, format!("{}", self.eyre)).into_response()
}
}
use std::net::SocketAddr;

/// Return a 404 Not Found response
pub async fn return_404() -> Response {
Expand All @@ -55,26 +17,20 @@ pub async fn return_200() -> Response {
}

/// Serve a builder service on the given socket address.
pub fn serve_builder_with_span(
socket: impl Into<SocketAddr>,
span: Span,
) -> tokio::task::JoinHandle<()> {
pub fn serve_builder(socket: impl Into<SocketAddr>) -> tokio::task::JoinHandle<()> {
let router = Router::new().route("/healthcheck", get(return_200)).fallback(return_404);

let addr = socket.into();
tokio::spawn(
async move {
match tokio::net::TcpListener::bind(&addr).await {
Ok(listener) => {
if let Err(err) = axum::serve(listener, router).await {
tracing::error!(%err, "serve failed");
}
}
Err(err) => {
tracing::error!(%err, "failed to bind to the address");
tokio::spawn(async move {
match tokio::net::TcpListener::bind(&addr).await {
Ok(listener) => {
if let Err(err) = axum::serve(listener, router).await {
tracing::error!(%err, "serve failed");
}
};
}
.instrument(span),
)
}
Err(err) => {
tracing::error!(%err, "failed to bind to the address");
}
};
})
}
28 changes: 14 additions & 14 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
pub use crate::config::BuilderConfig;
use crate::tasks::oauth::Authenticator;
use crate::tasks::oauth::SharedToken;
use oauth2::TokenResponse;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
Expand All @@ -9,6 +8,9 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use tokio::task::JoinHandle;
use tokio::time;
use tracing::{Instrument, debug, trace};

pub use crate::config::BuilderConfig;

/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
Expand All @@ -26,12 +28,12 @@ pub struct TxPoolBundleResponse {
}

/// The BundlePoller polls the tx-pool for bundles.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct BundlePoller {
/// The builder configuration values.
pub config: BuilderConfig,
/// Authentication module that periodically fetches and stores auth tokens.
pub authenticator: Authenticator,
pub token: SharedToken,
/// Holds a Reqwest client
pub client: Client,
/// Defines the interval at which the bundler polls the tx-pool for bundles.
Expand All @@ -41,28 +43,26 @@ pub struct BundlePoller {
/// Implements a poller for the block builder to pull bundles from the tx-pool.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self {
config: config.clone(),
authenticator,
client: Client::new(),
poll_interval_ms: 1000,
}
pub fn new(config: &BuilderConfig, token: SharedToken) -> Self {
Self { config: config.clone(), token, client: Client::new(), poll_interval_ms: 1000 }
}

/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(
config: &BuilderConfig,
authenticator: Authenticator,
token: SharedToken,
poll_interval_ms: u64,
) -> Self {
Self { config: config.clone(), authenticator, client: Client::new(), poll_interval_ms }
Self { config: config.clone(), token, client: Client::new(), poll_interval_ms }
}

/// Fetches bundles from the transaction cache and returns them.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
let token = self.authenticator.fetch_oauth_token().await?;
let Some(token) = self.token.read() else {
tracing::warn!("No token available, skipping bundle fetch");
return Ok(vec![]);
};

let result = self
.client
Expand Down
103 changes: 46 additions & 57 deletions src/tasks/oauth.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,88 @@
//! Service responsible for authenticating with the cache with Oauth tokens.
//! This authenticator periodically fetches a new token every set amount of seconds.
use std::sync::Arc;

use crate::config::BuilderConfig;
use oauth2::{
AuthUrl, ClientId, ClientSecret, EmptyExtraTokenFields, StandardTokenResponse, TokenUrl,
basic::{BasicClient, BasicTokenType},
reqwest::async_http_client,
};
use tokio::{sync::RwLock, task::JoinHandle};
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;

type Token = StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>;

/// A self-refreshing, periodically fetching authenticator for the block builder.
/// It is architected as a shareable struct that can be used across all the multiple builder tasks.
/// It fetches a new token every set amount of seconds, configured through the general builder config.
/// Readers are guaranteed to not read stale tokens as the [RwLock] guarantees that write tasks (refreshing the token) will claim priority over read access.
#[derive(Debug, Clone)]
pub struct Authenticator {
/// Configuration
pub config: BuilderConfig,
inner: Arc<RwLock<AuthenticatorInner>>,
}
/// A shared token that can be read and written to by multiple threads.
#[derive(Debug, Clone, Default)]
pub struct SharedToken(Arc<Mutex<Option<Token>>>);

/// Inner state of the Authenticator.
/// Contains the token that is being used for authentication.
#[derive(Debug)]
pub struct AuthenticatorInner {
/// The token
pub token: Option<Token>,
}
impl SharedToken {
/// Read the token from the shared token.
pub fn read(&self) -> Option<Token> {
self.0.lock().unwrap().clone()
}

impl Default for AuthenticatorInner {
fn default() -> Self {
Self::new()
/// Write a new token to the shared token.
pub fn write(&self, token: Token) {
let mut lock = self.0.lock().unwrap();
*lock = Some(token);
}
}

impl AuthenticatorInner {
/// Creates a new AuthenticatorInner with no token set.
pub const fn new() -> Self {
Self { token: None }
/// Check if the token is authenticated.
pub fn is_authenticated(&self) -> bool {
self.0.lock().unwrap().is_some()
}
}

/// A self-refreshing, periodically fetching authenticator for the block
/// builder. This task periodically fetches a new token, and stores it in a
/// [`SharedToken`].
#[derive(Debug)]
pub struct Authenticator {
/// Configuration
pub config: BuilderConfig,
client: BasicClient,
token: SharedToken,
}

impl Authenticator {
/// Creates a new Authenticator from the provided builder config.
pub fn new(config: &BuilderConfig) -> Self {
Self { config: config.clone(), inner: Arc::new(RwLock::new(AuthenticatorInner::new())) }
pub fn new(config: &BuilderConfig) -> eyre::Result<Self> {
let client = BasicClient::new(
ClientId::new(config.oauth_client_id.clone()),
Some(ClientSecret::new(config.oauth_client_secret.clone())),
AuthUrl::new(config.oauth_authenticate_url.clone())?,
Some(TokenUrl::new(config.oauth_token_url.clone())?),
);

Ok(Self { config: config.clone(), client, token: Default::default() })
}

/// Requests a new authentication token and, if successful, sets it to as the token
pub async fn authenticate(&self) -> eyre::Result<()> {
let token = self.fetch_oauth_token().await?;
self.set_token(token).await;
self.set_token(token);
Ok(())
}

/// Returns true if there is Some token set
pub async fn is_authenticated(&self) -> bool {
let lock = self.inner.read().await;

lock.token.is_some()
pub fn is_authenticated(&self) -> bool {
self.token.is_authenticated()
}

/// Sets the Authenticator's token to the provided value
pub async fn set_token(
&self,
token: StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>,
) {
let mut lock = self.inner.write().await;
lock.token = Some(token);
fn set_token(&self, token: StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>) {
self.token.write(token);
}

/// Returns the currently set token
pub async fn token(&self) -> Option<Token> {
let lock = self.inner.read().await;
lock.token.clone()
pub fn token(&self) -> SharedToken {
self.token.clone()
}

/// Fetches an oauth token
pub async fn fetch_oauth_token(
&self,
) -> eyre::Result<StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>> {
let config = self.config.clone();

let client = BasicClient::new(
ClientId::new(config.oauth_client_id.clone()),
Some(ClientSecret::new(config.oauth_client_secret.clone())),
AuthUrl::new(config.oauth_authenticate_url.clone())?,
Some(TokenUrl::new(config.oauth_token_url.clone())?),
);

async fn fetch_oauth_token(&self) -> eyre::Result<Token> {
let token_result =
client.exchange_client_credentials().request_async(async_http_client).await?;
self.client.exchange_client_credentials().request_async(async_http_client).await?;

Ok(token_result)
}
Expand Down
Loading
Loading