Skip to content

Commit

Permalink
Merge d57c028 into 1125be8
Browse files Browse the repository at this point in the history
  • Loading branch information
MauroToscano authored Nov 26, 2024
2 parents 1125be8 + d57c028 commit f71d99a
Show file tree
Hide file tree
Showing 96 changed files with 1,277 additions and 6,410 deletions.
36 changes: 0 additions & 36 deletions .github/workflows/test-go-retries.yml

This file was deleted.

63 changes: 19 additions & 44 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ OS := $(shell uname -s)
CONFIG_FILE?=config-files/config.yaml
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.11.2
OPERATOR_VERSION=v0.11.3

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand Down Expand Up @@ -563,18 +563,18 @@ run_storage: ## Run storage using storage-docker-compose.yaml
@echo "Running storage..."
@docker compose -f storage-docker-compose.yaml up

__DEPLOYMENT__:
deploy_aligned_contracts: ## Deploy Aligned Contracts
@echo "Deploying Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/deploy_aligned_contracts.sh
__DEPLOYMENT__: ## ____
deploy_aligned_contracts: ## Deploy Aligned Contracts. Parameters: NETWORK=<mainnet|holesky|sepolia>
@echo "Deploying Aligned Contracts on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/deploy_aligned_contracts.sh

deploy_pauser_registry: ## Deploy Pauser Registry
@echo "Deploying Pauser Registry..."
@. contracts/scripts/.env && . contracts/scripts/deploy_pauser_registry.sh

upgrade_aligned_contracts: ## Upgrade Aligned Contracts
@echo "Upgrading Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_aligned_contracts.sh
upgrade_aligned_contracts: ## Upgrade Aligned Contracts. Parameters: NETWORK=<mainnet|holesky|sepolia>
@echo "Upgrading Aligned Contracts on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/upgrade_aligned_contracts.sh

upgrade_pauser_aligned_contracts: ## Upgrade Aligned Contracts with Pauser initialization
@echo "Upgrading Aligned Contracts with Pauser initialization..."
Expand Down Expand Up @@ -608,13 +608,13 @@ deploy_verify_batch_inclusion_caller:
@echo "Deploying VerifyBatchInclusionCaller contract..."
@. examples/verify/.env && . examples/verify/scripts/deploy_verify_batch_inclusion_caller.sh

deploy_batcher_payment_service:
@echo "Deploying BatcherPayments contract..."
@. contracts/scripts/.env && . contracts/scripts/deploy_batcher_payment_service.sh
deploy_batcher_payment_service: ## Deploy BatcherPayments contract. Parameters: NETWORK=<mainnet|holesky|sepolia>
@echo "Deploying BatcherPayments contract on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/deploy_batcher_payment_service.sh

upgrade_batcher_payment_service:
@echo "Upgrading BatcherPayments contract..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_batcher_payment_service.sh
upgrade_batcher_payment_service: ## Upgrade BatcherPayments contract. Parameters: NETWORK=<mainnet|holesky|sepolia
@echo "Upgrading BatcherPayments Contract on $(NETWORK) network..."
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/upgrade_batcher_payment_service.sh

build_aligned_contracts:
@cd contracts/src/core && forge build
Expand Down Expand Up @@ -844,35 +844,6 @@ explorer_create_env:
@cd explorer && \
cp .env.dev .env

__TRACKER__:

tracker_devnet_start: tracker_run_db
@cd operator_tracker/ && \
cargo run -r -- --env-file .env.dev

tracker_install: tracker_build_db
cargo install --path ./operator_tracker

tracker_build_db:
@cd operator_tracker && \
docker build -t tracker-postgres-image .

tracker_run_db: tracker_build_db tracker_remove_db_container
@cd operator_tracker && \
docker run -d --name tracker-postgres-container -p 5433:5432 -v tracker-postgres-data:/var/lib/postgresql/data tracker-postgres-image

tracker_remove_db_container:
docker stop tracker-postgres-container || true && \
docker rm tracker-postgres-container || true

tracker_clean_db: tracker_remove_db_container
docker volume rm tracker-postgres-data || true

tracker_dump_db:
@cd operator_tracker && \
docker exec -t tracker-postgres-container pg_dumpall -c -U tracker_user > dump.$$(date +\%Y\%m\%d_\%H\%M\%S).sql
@echo "Dumped database successfully to /operator_tracker"

DOCKER_RPC_URL=http://anvil:8545
PROOF_GENERATOR_ADDRESS=0x66f9664f97F2b50F62D13eA064982f936dE76657

Expand Down Expand Up @@ -1093,7 +1064,7 @@ docker_logs_batcher:

__TELEMETRY__:
# Collector, Jaeger and Elixir API
telemetry_full_start: open_telemetry_start telemetry_start
telemetry_full_start: telemetry_compile_bls_verifier open_telemetry_start telemetry_start

# Collector and Jaeger
open_telemetry_start: ## Run open telemetry services using telemetry-docker-compose.yaml
Expand Down Expand Up @@ -1137,6 +1108,10 @@ telemetry_create_env:
@cd telemetry_api && \
cp .env.dev .env

telemetry_compile_bls_verifier:
@cd telemetry_api/priv && \
go build ../bls_verifier/bls_verify.go

setup_local_aligned_all:
tmux kill-session -t aligned_layer || true
tmux new-session -d -s aligned_layer
Expand Down
29 changes: 2 additions & 27 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (
"encoding/hex"
"fmt"
"math/big"
"strings"
"sync"
"time"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/prometheus/client_golang/prometheus"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/metrics"

sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
Expand Down Expand Up @@ -331,6 +329,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
nonSignerStakesAndSignature,
agg.AggregatorConfig.Aggregator.GasBaseBumpPercentage,
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit,
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
onGasPriceBumped,
)
Expand Down Expand Up @@ -396,7 +395,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.InitializeNewTaskRetryable(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout)
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
}
Expand All @@ -409,30 +408,6 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by

// |---RETRYABLE---|

/*
InitializeNewTaskRetryable
Initialize a new task in the BLS Aggregation service
- Errors:
Permanent:
- TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27).
Transient:
- All others.
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error {
initializeNewTask_func := func() error {
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry)
if err != nil {
// Task is already initialized
if strings.Contains(err.Error(), "already initialized") {
err = retry.PermanentError{Inner: err}
}
}
return err
}
return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
// It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
Expand Down
16 changes: 13 additions & 3 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
taskIndex := uint32(0)

taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
// The Aggregator may receive the Task Identifier after the operators.
// If that's the case, we won't know about the task at this point
// so we make GetTaskIndex retryable, waiting for some seconds,
// before trying to fetch the task again from the map.
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash, retry.NetworkRetryParams())

if err != nil {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
Expand Down Expand Up @@ -106,7 +110,13 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
return nil
}

func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
/*
Checks Internal mapping for Signed Task Response, returns its TaskIndex.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
TODO: We should refactor the retry duration considering extending it to a larger time or number of retries, at least somewhere between 1 and 2 blocks
*/
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte, config *retry.RetryParams) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
Expand All @@ -118,5 +128,5 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error
}
}

return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.RetryWithData(getTaskIndex_func, config)
}
2 changes: 1 addition & 1 deletion batcher/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 61 additions & 11 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ use retry::batcher_retryables::{
get_user_nonce_from_ethereum_retryable, user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::timeout;
use types::batch_state::BatchState;
use types::user_state::UserState;

use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use aligned_sdk::core::constants::{
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF, AGGREGATOR_GAS_COST, BUMP_BACKOFF_FACTOR,
BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONSTANT_GAS_COST,
DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF,
BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONNECTION_TIMEOUT,
CONSTANT_GAS_COST, DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF,
ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, PERCENTAGE_DIVIDER,
RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
Expand Down Expand Up @@ -265,13 +267,19 @@ impl Batcher {
.map_err(|e| BatcherError::TcpListenerError(e.to_string()))?;
info!("Listening on: {}", address);

// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
self.metrics.open_connections.inc();
let batcher = self.clone();
tokio::spawn(batcher.handle_connection(stream, addr));
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let batcher = self.clone();
// Let's spawn the handling of each connection in a separate task.
tokio::spawn(batcher.handle_connection(stream, addr));
}
Err(e) => {
self.metrics.user_error(&["connection_accept_error", ""]);
error!("Couldn't accept new connection: {}", e);
}
}
}
Ok(())
}

/// Listen for Ethereum new blocks.
Expand Down Expand Up @@ -360,7 +368,24 @@ impl Batcher {
addr: SocketAddr,
) -> Result<(), BatcherError> {
info!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
self.metrics.open_connections.inc();

let ws_stream_future = tokio_tungstenite::accept_async(raw_stream);
let ws_stream =
match timeout(Duration::from_secs(CONNECTION_TIMEOUT), ws_stream_future).await {
Ok(Ok(stream)) => stream,
Ok(Err(e)) => {
warn!("Error while establishing websocket connection: {}", e);
self.metrics.open_connections.dec();
return Ok(());
}
Err(e) => {
warn!("Error while establishing websocket connection: {}", e);
self.metrics.open_connections.dec();
self.metrics.user_error(&["user_timeout", ""]);
return Ok(());
}
};

debug!("WebSocket connection established: {}", addr);
let (outgoing, incoming) = ws_stream.split();
Expand All @@ -379,8 +404,33 @@ impl Batcher {
.send(Message::binary(serialized_protocol_version_msg))
.await?;

match incoming
.try_filter(|msg| future::ready(msg.is_binary()))
let mut incoming_filter = incoming.try_filter(|msg| future::ready(msg.is_binary()));
let future_msg = incoming_filter.try_next();

// timeout to prevent a DOS attack
match timeout(Duration::from_secs(CONNECTION_TIMEOUT), future_msg).await {
Ok(Ok(Some(msg))) => {
self.clone().handle_message(msg, outgoing.clone()).await?;
}
Err(elapsed) => {
warn!("[{}] {}", &addr, elapsed);
self.metrics.user_error(&["user_timeout", ""]);
self.metrics.open_connections.dec();
return Ok(());
}
Ok(Ok(None)) => {
info!("[{}] Connection closed by the other side", &addr);
self.metrics.open_connections.dec();
return Ok(());
}
Ok(Err(e)) => {
error!("Unexpected error: {}", e);
self.metrics.open_connections.dec();
return Ok(());
}
};

match incoming_filter
.try_for_each(|msg| self.clone().handle_message(msg, outgoing.clone()))
.await
{
Expand Down
Loading

0 comments on commit f71d99a

Please sign in to comment.