diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index d875a079..030d8979 100755 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -162,6 +162,7 @@ func parseLivepeerConfig() starter.LivepeerConfig { cfg.MaxGasPrice = flag.Int("maxGasPrice", *cfg.MaxGasPrice, "Maximum gas price (priority fee + base fee) for ETH transactions in wei, 40 Gwei = 40000000000") cfg.EthController = flag.String("ethController", *cfg.EthController, "Protocol smart contract address") cfg.InitializeRound = flag.Bool("initializeRound", *cfg.InitializeRound, "Set to true if running as a transcoder and the node should automatically initialize new rounds") + cfg.InitializeRoundMaxDelay = flag.Duration("initializeRoundMaxDelay", *cfg.InitializeRoundMaxDelay, "Maximum delay to wait before initializing a round") cfg.TicketEV = flag.String("ticketEV", *cfg.TicketEV, "The expected value for PM tickets") cfg.MaxFaceValue = flag.String("maxFaceValue", *cfg.MaxFaceValue, "set max ticket face value in WEI") // Broadcaster max acceptable ticket EV diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 2896d954..f9296abf 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -74,76 +74,77 @@ const ( ) type LivepeerConfig struct { - Network *string - RtmpAddr *string - CliAddr *string - HttpAddr *string - ServiceAddr *string - OrchAddr *string - VerifierURL *string - EthController *string - VerifierPath *string - LocalVerify *bool - HttpIngest *bool - Orchestrator *bool - Transcoder *bool - Broadcaster *bool - OrchSecret *string - TranscodingOptions *string - MaxAttempts *int - SelectRandWeight *float64 - SelectStakeWeight *float64 - SelectPriceWeight *float64 - SelectPriceExpFactor *float64 - OrchPerfStatsURL *string - Region *string - MaxPricePerUnit *string - MinPerfScore *float64 - MaxSessions *string - CurrentManifest *bool - Nvidia *string - Netint *string - TestTranscoder *bool - EthAcctAddr *string - EthPassword *string - EthKeystorePath *string - EthOrchAddr *string - EthUrl *string - TxTimeout *time.Duration - MaxTxReplacements *int - GasLimit *int - MinGasPrice *int64 - MaxGasPrice *int - InitializeRound *bool - TicketEV *string - MaxFaceValue *string - MaxTicketEV *string - MaxTotalEV *string - DepositMultiplier *int - PricePerUnit *string - PixelsPerUnit *string - PriceFeedAddr *string - AutoAdjustPrice *bool - PricePerBroadcaster *string - BlockPollingInterval *int - Redeemer *bool - RedeemerAddr *string - Reward *bool - Monitor *bool - MetricsPerStream *bool - MetricsExposeClientIP *bool - MetadataQueueUri *string - MetadataAmqpExchange *string - MetadataPublishTimeout *time.Duration - Datadir *string - Objectstore *string - Recordstore *string - FVfailGsBucket *string - FVfailGsKey *string - AuthWebhookURL *string - OrchWebhookURL *string - OrchBlacklist *string - TestOrchAvail *bool + Network *string + RtmpAddr *string + CliAddr *string + HttpAddr *string + ServiceAddr *string + OrchAddr *string + VerifierURL *string + EthController *string + VerifierPath *string + LocalVerify *bool + HttpIngest *bool + Orchestrator *bool + Transcoder *bool + Broadcaster *bool + OrchSecret *string + TranscodingOptions *string + MaxAttempts *int + SelectRandWeight *float64 + SelectStakeWeight *float64 + SelectPriceWeight *float64 + SelectPriceExpFactor *float64 + OrchPerfStatsURL *string + Region *string + MaxPricePerUnit *string + MinPerfScore *float64 + MaxSessions *string + CurrentManifest *bool + Nvidia *string + Netint *string + TestTranscoder *bool + EthAcctAddr *string + EthPassword *string + EthKeystorePath *string + EthOrchAddr *string + EthUrl *string + TxTimeout *time.Duration + MaxTxReplacements *int + GasLimit *int + MinGasPrice *int64 + MaxGasPrice *int + InitializeRound *bool + InitializeRoundMaxDelay *time.Duration + TicketEV *string + MaxFaceValue *string + MaxTicketEV *string + MaxTotalEV *string + DepositMultiplier *int + PricePerUnit *string + PixelsPerUnit *string + PriceFeedAddr *string + AutoAdjustPrice *bool + PricePerBroadcaster *string + BlockPollingInterval *int + Redeemer *bool + RedeemerAddr *string + Reward *bool + Monitor *bool + MetricsPerStream *bool + MetricsExposeClientIP *bool + MetadataQueueUri *string + MetadataAmqpExchange *string + MetadataPublishTimeout *time.Duration + Datadir *string + Objectstore *string + Recordstore *string + FVfailGsBucket *string + FVfailGsKey *string + AuthWebhookURL *string + OrchWebhookURL *string + OrchBlacklist *string + TestOrchAvail *bool } // DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process. @@ -190,6 +191,7 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultMaxGasPrice := 0 defaultEthController := "" defaultInitializeRound := false + defaultInitializeRoundMaxDelay := 30 * time.Second defaultTicketEV := "8000000000" defaultMaxFaceValue := "0" defaultMaxTicketEV := "3000000000000" @@ -264,36 +266,37 @@ func DefaultLivepeerConfig() LivepeerConfig { TestTranscoder: &defaultTestTranscoder, // Onchain: - EthAcctAddr: &defaultEthAcctAddr, - EthPassword: &defaultEthPassword, - EthKeystorePath: &defaultEthKeystorePath, - EthOrchAddr: &defaultEthOrchAddr, - EthUrl: &defaultEthUrl, - TxTimeout: &defaultTxTimeout, - MaxTxReplacements: &defaultMaxTxReplacements, - GasLimit: &defaultGasLimit, - MaxGasPrice: &defaultMaxGasPrice, - EthController: &defaultEthController, - InitializeRound: &defaultInitializeRound, - TicketEV: &defaultTicketEV, - MaxFaceValue: &defaultMaxFaceValue, - MaxTicketEV: &defaultMaxTicketEV, - MaxTotalEV: &defaultMaxTotalEV, - DepositMultiplier: &defaultDepositMultiplier, - MaxPricePerUnit: &defaultMaxPricePerUnit, - PixelsPerUnit: &defaultPixelsPerUnit, - PriceFeedAddr: &defaultPriceFeedAddr, - AutoAdjustPrice: &defaultAutoAdjustPrice, - PricePerBroadcaster: &defaultPricePerBroadcaster, - BlockPollingInterval: &defaultBlockPollingInterval, - Redeemer: &defaultRedeemer, - RedeemerAddr: &defaultRedeemerAddr, - Monitor: &defaultMonitor, - MetricsPerStream: &defaultMetricsPerStream, - MetricsExposeClientIP: &defaultMetricsExposeClientIP, - MetadataQueueUri: &defaultMetadataQueueUri, - MetadataAmqpExchange: &defaultMetadataAmqpExchange, - MetadataPublishTimeout: &defaultMetadataPublishTimeout, + EthAcctAddr: &defaultEthAcctAddr, + EthPassword: &defaultEthPassword, + EthKeystorePath: &defaultEthKeystorePath, + EthOrchAddr: &defaultEthOrchAddr, + EthUrl: &defaultEthUrl, + TxTimeout: &defaultTxTimeout, + MaxTxReplacements: &defaultMaxTxReplacements, + GasLimit: &defaultGasLimit, + MaxGasPrice: &defaultMaxGasPrice, + EthController: &defaultEthController, + InitializeRound: &defaultInitializeRound, + InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay, + TicketEV: &defaultTicketEV, + MaxFaceValue: &defaultMaxFaceValue, + MaxTicketEV: &defaultMaxTicketEV, + MaxTotalEV: &defaultMaxTotalEV, + DepositMultiplier: &defaultDepositMultiplier, + MaxPricePerUnit: &defaultMaxPricePerUnit, + PixelsPerUnit: &defaultPixelsPerUnit, + PriceFeedAddr: &defaultPriceFeedAddr, + AutoAdjustPrice: &defaultAutoAdjustPrice, + PricePerBroadcaster: &defaultPricePerBroadcaster, + BlockPollingInterval: &defaultBlockPollingInterval, + Redeemer: &defaultRedeemer, + RedeemerAddr: &defaultRedeemerAddr, + Monitor: &defaultMonitor, + MetricsPerStream: &defaultMetricsPerStream, + MetricsExposeClientIP: &defaultMetricsExposeClientIP, + MetadataQueueUri: &defaultMetadataQueueUri, + MetadataAmqpExchange: &defaultMetadataAmqpExchange, + MetadataPublishTimeout: &defaultMetadataPublishTimeout, // Ingest: HttpIngest: &defaultHttpIngest, @@ -975,7 +978,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { if *cfg.InitializeRound { // Start round initializer // The node will only initialize rounds if it in the upcoming active set for the round - initializer := eth.NewRoundInitializer(n.Eth, timeWatcher) + initializer := eth.NewRoundInitializer(n.Eth, timeWatcher, *cfg.InitializeRoundMaxDelay) go func() { if err := initializer.Start(); err != nil { serviceErr <- err diff --git a/eth/roundinitializer.go b/eth/roundinitializer.go index d3f2cbee..a7aaff66 100644 --- a/eth/roundinitializer.go +++ b/eth/roundinitializer.go @@ -2,10 +2,11 @@ package eth import ( "math/big" + "math/rand" "sync" + "time" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/golang/glog" ) @@ -29,20 +30,22 @@ type timeWatcher interface { // This selection process is purely a client side implementation that attempts to minimize on-chain transaction collisions, but // collisions are still possible if initialization transactions are submitted by parties that are not using this selection process type RoundInitializer struct { - client LivepeerEthClient - tw timeWatcher - quit chan struct{} + maxDelay time.Duration + client LivepeerEthClient + tw timeWatcher + quit chan struct{} nextRoundStartL1Block *big.Int mu sync.Mutex } // NewRoundInitializer creates a RoundInitializer instance -func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher) *RoundInitializer { +func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher, maxDelay time.Duration) *RoundInitializer { return &RoundInitializer{ - client: client, - tw: tw, - quit: make(chan struct{}), + maxDelay: maxDelay, + client: client, + tw: tw, + quit: make(chan struct{}), } } @@ -104,23 +107,23 @@ func (r *RoundInitializer) tryInitialize() error { r.mu.Lock() defer r.mu.Unlock() - currentL1Blk := r.tw.LastSeenL1Block() - lastInitializedL1BlkHash := r.tw.LastInitializedL1BlockHash() - - epochSeed := r.currentEpochSeed(currentL1Blk, r.nextRoundStartL1Block, lastInitializedL1BlkHash) - - ok, err := r.shouldInitialize(epochSeed) - if err != nil { - return err + if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 { + // Round already initialized + return nil } - // Noop if the caller should not initialize the round - if !ok { - return nil + if r.maxDelay > 0 { + randDelay := time.Duration(rand.Int63n(int64(r.maxDelay))) + glog.Infof("Waiting %v before attempting to initialize round", randDelay) + time.Sleep(randDelay) + + if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 { + glog.Infof("Round is already initialized, not initializing") + return nil + } } currentRound := new(big.Int).Add(r.tw.LastInitializedRound(), big.NewInt(1)) - glog.Infof("New round - preparing to initialize round to join active set, current round is %d", currentRound) tx, err := r.client.InitializeRound() @@ -136,55 +139,3 @@ func (r *RoundInitializer) tryInitialize() error { return nil } - -func (r *RoundInitializer) shouldInitialize(epochSeed *big.Int) (bool, error) { - transcoders, err := r.client.TranscoderPool() - if err != nil { - return false, err - } - - numActive := big.NewInt(int64(len(transcoders))) - - // Should not initialize if the upcoming active set is empty - if numActive.Cmp(big.NewInt(0)) == 0 { - return false, nil - } - - // Find the caller's rank in the upcoming active set - rank := int64(-1) - maxRank := numActive.Int64() - caller := r.client.Account().Address - for i := int64(0); i < maxRank; i++ { - if transcoders[i].Address == caller { - rank = i - break - } - } - - // Should not initialize if the caller is not in the upcoming active set - if rank == -1 { - return false, nil - } - - // Use the seed to select a position within the active set - selection := new(big.Int).Mod(epochSeed, numActive) - // Should not initialize if the selection does not match the caller's rank in the active set - if selection.Int64() != int64(rank) { - return false, nil - } - - // If the selection matches the caller's rank the caller should initialize the round - return true, nil -} - -// Returns the seed used to select a round initializer in the current epoch for the current round -// This seed is not meant to be unpredictable. The only requirement for the seed is that it is calculated the same way for each -// party running the round initializer -func (r *RoundInitializer) currentEpochSeed(currentL1Block, roundStartL1Block *big.Int, lastInitializedL1BlkHash [32]byte) *big.Int { - epochNum := new(big.Int).Sub(currentL1Block, roundStartL1Block) - epochNum.Div(epochNum, epochL1Blocks) - - // The seed for the current epoch is calculated as: - // keccak256(lastInitializedL1BlkHash | epochNum) - return crypto.Keccak256Hash(append(lastInitializedL1BlkHash[:], epochNum.Bytes()...)).Big() -} diff --git a/eth/roundinitializer_test.go b/eth/roundinitializer_test.go index 25e60205..a96cb3b2 100644 --- a/eth/roundinitializer_test.go +++ b/eth/roundinitializer_test.go @@ -16,84 +16,6 @@ import ( "github.com/stretchr/testify/mock" ) -func TestRoundInitializer_CurrentEpochSeed(t *testing.T) { - initializer := NewRoundInitializer(nil, nil) - - assert := assert.New(t) - - // Test epochNum = 0 - blkHash := [32]byte{123} - - epochSeed := initializer.currentEpochSeed(big.NewInt(5), big.NewInt(5), blkHash) - // epochNum = (5 - 5) / 5 = 0 - // epochSeed = keccak256(blkHash | 0) = 53205358842179480591542570540016728811976439286094436690881169143335261643310 - expEpochSeed, _ := new(big.Int).SetString("53205358842179480591542570540016728811976439286094436690881169143335261643310", 10) - assert.Equal(expEpochSeed, epochSeed) - - // Test epochNum > 0 - epochSeed = initializer.currentEpochSeed(big.NewInt(20), big.NewInt(5), blkHash) - // epochNum = (20 - 5) / 5 = 3 - // epochSeed = keccak256(blkHash | 3) = 42541119854153860846042329644941941146216657514071318786342840580076059276721 - expEpochSeed.SetString("42541119854153860846042329644941941146216657514071318786342840580076059276721", 10) - assert.Equal(expEpochSeed, epochSeed) - - // Test epochNum > 0 with some # of blocks into the epoch - epochSeed = initializer.currentEpochSeed(big.NewInt(20), big.NewInt(4), blkHash) - // epochNum = (20 - 4) / 5 = 3.2 -> 3 - assert.Equal(expEpochSeed, epochSeed) -} - -func TestRoundInitializer_ShouldInitialize(t *testing.T) { - client := &MockClient{} - tw := &stubTimeWatcher{} - initializer := NewRoundInitializer(client, tw) - - assert := assert.New(t) - - // Test error getting transcoders - expErr := errors.New("TranscoderPool error") - client.On("TranscoderPool").Return(nil, expErr).Once() - - ok, err := initializer.shouldInitialize(nil) - assert.EqualError(err, expErr.Error()) - assert.False(ok) - - // Test active set is empty because no registered transcoders - client.On("TranscoderPool").Return([]*lpTypes.Transcoder{}, nil).Once() - ok, err = initializer.shouldInitialize(nil) - assert.Nil(err) - assert.False(ok) - - // Test that caller is not in active set because it is not registered - caller := ethcommon.BytesToAddress([]byte("foo")) - client.On("Account").Return(accounts.Account{Address: caller}) - - registered := []*lpTypes.Transcoder{ - {Address: ethcommon.BytesToAddress([]byte("jar"))}, - {Address: ethcommon.BytesToAddress([]byte("bar"))}, - } - client.On("TranscoderPool").Return(registered, nil).Once() - - ok, err = initializer.shouldInitialize(nil) - assert.Nil(err) - assert.False(ok) - - // Test not selected - registered = append(registered, &lpTypes.Transcoder{Address: caller}) - client.On("TranscoderPool").Return(registered, nil) - - seed := big.NewInt(3) - ok, err = initializer.shouldInitialize(seed) - assert.Nil(err) - assert.False(ok) - - // Test caller selected - seed = big.NewInt(5) - ok, err = initializer.shouldInitialize(seed) - assert.Nil(err) - assert.True(ok) -} - func TestRoundInitializer_TryInitialize(t *testing.T) { client := &MockClient{} tw := &stubTimeWatcher{ @@ -101,45 +23,17 @@ func TestRoundInitializer_TryInitialize(t *testing.T) { lastInitializedRound: big.NewInt(100), lastInitializedBlockHash: [32]byte{123}, } - initializer := NewRoundInitializer(client, tw) + initializer := NewRoundInitializer(client, tw, 0) initializer.nextRoundStartL1Block = big.NewInt(5) assert := assert.New(t) - // Test error checking should initialize - expErr := errors.New("shouldInitialize error") - client.On("TranscoderPool").Return(nil, expErr).Once() - - err := initializer.tryInitialize() - assert.EqualError(err, expErr.Error()) - - // Test should not initialize - caller := ethcommon.BytesToAddress([]byte("foo")) - client.On("Account").Return(accounts.Account{Address: caller}) - - registered := []*lpTypes.Transcoder{ - {Address: ethcommon.BytesToAddress([]byte("jar"))}, - } - client.On("TranscoderPool").Return(registered, nil).Once() - - err = initializer.tryInitialize() - assert.Nil(err) - - // Test error when submitting initialization tx - registered = []*lpTypes.Transcoder{{Address: caller}} - client.On("TranscoderPool").Return(registered, nil) - expErr = errors.New("InitializeRound error") - client.On("InitializeRound").Return(nil, expErr).Once() - - err = initializer.tryInitialize() - assert.EqualError(err, expErr.Error()) - // Test error checking initialization tx tx := &types.Transaction{} client.On("InitializeRound").Return(tx, nil) - expErr = errors.New("CheckTx error") + expErr := errors.New("CheckTx error") client.On("CheckTx", mock.Anything).Return(expErr).Once() - err = initializer.tryInitialize() + err := initializer.tryInitialize() assert.EqualError(err, expErr.Error()) // Test success