Skip to content

Commit

Permalink
Merge branch 'staging' into 1595-hotfixtelemetry-script-to-rewrite-op…
Browse files Browse the repository at this point in the history
…erator-names
  • Loading branch information
uri-99 committed Dec 10, 2024
2 parents 848248d + a4e8e31 commit c0c22a5
Show file tree
Hide file tree
Showing 79 changed files with 2,910 additions and 1,089 deletions.
46 changes: 31 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,16 @@ upgrade_add_aggregator: ## Add Aggregator to Aligned Contracts
@echo "Adding Aggregator to Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_add_aggregator_to_service_manager.sh

set_aggregator_address:
@echo "Setting Aggregator Address in Aligned Service Manager Contract on $(NETWORK) network..."
@echo "Aggregator address: $(AGGREGATOR_ADDRESS)"
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/set_aggregator_address.sh $(AGGREGATOR_ADDRESS)

set_aggregator_address_devnet:
@echo "Setting Aggregator Address in Aligned Service Manager Contract..."
@echo "Aggregator address: $(AGGREGATOR_ADDRESS)"
RPC_URL="http://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/set_aggregator_address.sh $(AGGREGATOR_ADDRESS)

upgrade_initialize_disabled_verifiers:
@echo "Adding disabled verifiers to Aligned Service Manager..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_disabled_verifiers_in_service_manager.sh
Expand Down Expand Up @@ -906,7 +916,7 @@ docker_down:
@echo "Everything down"
docker ps

DOCKER_BURST_SIZE=2
DOCKER_BURST_SIZE=1
DOCKER_PROOFS_PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80

docker_batcher_send_sp1_burst:
Expand All @@ -918,7 +928,8 @@ docker_batcher_send_sp1_burst:
--vm_program ./scripts/test_files/sp1/sp1_fibonacci.elf \
--repetitions $(DOCKER_BURST_SIZE) \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--rpc_url $(DOCKER_RPC_URL)
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

docker_batcher_send_risc0_burst:
@echo "Sending Risc0 fibonacci task to Batcher..."
Expand All @@ -930,7 +941,8 @@ docker_batcher_send_risc0_burst:
--public_input ./scripts/test_files/risc_zero/fibonacci_proof_generator/risc_zero_fibonacci.pub \
--repetitions $(DOCKER_BURST_SIZE) \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--rpc_url $(DOCKER_RPC_URL)
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

docker_batcher_send_plonk_bn254_burst:
@echo "Sending Groth16Bn254 1!=0 task to Batcher..."
Expand All @@ -942,7 +954,8 @@ docker_batcher_send_plonk_bn254_burst:
--vk ./scripts/test_files/gnark_plonk_bn254_script/plonk.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--rpc_url $(DOCKER_RPC_URL) \
--repetitions $(DOCKER_BURST_SIZE)
--repetitions $(DOCKER_BURST_SIZE) \
--max_fee 0.1ether

docker_batcher_send_plonk_bls12_381_burst:
@echo "Sending Groth16 BLS12-381 1!=0 task to Batcher..."
Expand All @@ -954,19 +967,21 @@ docker_batcher_send_plonk_bls12_381_burst:
--vk ./scripts/test_files/gnark_plonk_bls12_381_script/plonk.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--repetitions $(DOCKER_BURST_SIZE) \
--rpc_url $(DOCKER_RPC_URL)
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

docker_batcher_send_groth16_burst:
@echo "Sending Groth16 BLS12-381 1!=0 task to Batcher..."
docker exec $(shell docker ps | grep batcher | awk '{print $$1}') aligned submit \
--private_key $(DOCKER_PROOFS_PRIVATE_KEY) \
--proving_system Groth16Bn254 \
--proof ./scripts/test_files/gnark_groth16_bn254_script/groth16.proof \
--public_input ./scripts/test_files/gnark_groth16_bn254_script/plonk_pub_input.pub \
--vk ./scripts/test_files/gnark_groth16_bn254_script/groth16.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--repetitions $(DOCKER_BURST_SIZE) \
--rpc_url $(DOCKER_RPC_URL)
--private_key $(DOCKER_PROOFS_PRIVATE_KEY) \
--proving_system Groth16Bn254 \
--proof ./scripts/test_files/gnark_groth16_bn254_script/groth16.proof \
--public_input ./scripts/test_files/gnark_groth16_bn254_script/plonk_pub_input.pub \
--vk ./scripts/test_files/gnark_groth16_bn254_script/groth16.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--repetitions $(DOCKER_BURST_SIZE) \
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

# Update target as new proofs are supported.
docker_batcher_send_all_proofs_burst:
Expand All @@ -993,6 +1008,7 @@ docker_batcher_send_infinite_groth16:
--public_input scripts/test_files/gnark_groth16_bn254_infinite_script/infinite_proofs/ineq_$${counter}_groth16.pub \
--vk scripts/test_files/gnark_groth16_bn254_infinite_script/infinite_proofs/ineq_$${counter}_groth16.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS); \
--max_fee 0.1ether
sleep $${timer}; \
counter=$$((counter + 1)); \
done \
Expand All @@ -1010,7 +1026,7 @@ docker_verify_proofs_onchain:
done \
'

DOCKER_PROOFS_WAIT_TIME=30
DOCKER_PROOFS_WAIT_TIME=60

docker_verify_proof_submission_success:
@echo "Verifying proofs were successfully submitted..."
Expand All @@ -1032,7 +1048,7 @@ docker_verify_proof_submission_success:
fi; \
echo "---------------------------------------------------------------------------------------------------"; \
done; \
if [ $$(ls -1 ./aligned_verification_data/*.cbor | wc -l) -ne 10 ]; then \
if [ $$(ls -1 ./aligned_verification_data/*.cbor | wc -l) -ne 5 ]; then \
echo "ERROR: Some proofs were verified successfully, but some proofs are missing in the aligned_verification_data/ directory"; \
exit 1; \
fi; \
Expand Down
17 changes: 17 additions & 0 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores the start time for each batch of the aggregator by task index
batchStartTimeByIdx map[uint32]time.Time

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand All @@ -78,6 +81,7 @@ type Aggregator struct {
// - batchCreatedBlockByIdx
// - batchDataByIdentifierHash
// - nextBatchIndex
// - batchStartTimeByIdx
taskMutex *sync.Mutex

// Mutex to protect ethereum wallet
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
batchStartTimeByIdx := make(map[uint32]time.Time)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
batchStartTimeByIdx: batchStartTimeByIdx,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")

Expand Down Expand Up @@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)

// Only observe quorum reached if successful
agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))

agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
}

startTime := time.Now()
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
batchMerkleRoot,
Expand All @@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
return nil, err
}

// We only send the latency metric if the response is successul
agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.batchStartTimeByIdx[batchIndex] = time.Now()
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
Expand Down Expand Up @@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
delete(agg.batchStartTimeByIdx, i)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
Expand Down
Loading

0 comments on commit c0c22a5

Please sign in to comment.