diff --git a/config/config.go b/config/config.go index cc008478ef..92081b2f10 100644 --- a/config/config.go +++ b/config/config.go @@ -37,6 +37,10 @@ const ( // LogLevelEnvVariable can be used to define logging configuration. LogLevelEnvVariable = "LOG_LEVEL" + + // PubsubLogLevelEnvVariable can be used to define logging configuration + // for the pubsub implementation. + PubsubLogLevelEnvVariable = "PUBSUB_LOG_LEVEL" ) // Config is the top level config structure. diff --git a/main.go b/main.go index 3b8e0806c4..96216f6909 100644 --- a/main.go +++ b/main.go @@ -18,10 +18,7 @@ import ( var logger = log.Logger("keep-main") func main() { - err := logging.Configure(os.Getenv(config.LogLevelEnvVariable)) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to configure logging: [%v]\n", err) - } + configureLogging() rootCmd := cmd.Initialize(build.Version, build.Revision) @@ -29,3 +26,26 @@ func main() { logger.Fatal(err) } } + +func configureLogging() { + logLevel := "info" + if env := os.Getenv(config.LogLevelEnvVariable); len(env) > 0 { + logLevel = env + } + + pubsubLogLevel := "warn" + if env := os.Getenv(config.PubsubLogLevelEnvVariable); len(env) > 0 { + pubsubLogLevel = env + } + + levelDirective := fmt.Sprintf( + "%s pubsub=%s", + logLevel, + pubsubLogLevel, + ) + + err := logging.Configure(levelDirective) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to configure logging: [%v]\n", err) + } +} diff --git a/pkg/net/libp2p/libp2p.go b/pkg/net/libp2p/libp2p.go index be862ea33f..5f56306222 100644 --- a/pkg/net/libp2p/libp2p.go +++ b/pkg/net/libp2p/libp2p.go @@ -28,6 +28,7 @@ import ( rhost "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/upgrader" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" ) @@ -68,6 +69,10 @@ const ( // to prevent uncontrolled message propagation. const MaximumDisseminationTime = 90 +// pingTimeout is the maximum duration of the ping test performed for +// freshly connected peers. +const pingTestTimeout = 60 * time.Second + // Config defines the configuration for the libp2p network provider. type Config struct { Bootstrap bool @@ -320,7 +325,7 @@ func Connect( return nil, err } - host.Network().Notify(buildNotifiee()) + host.Network().Notify(buildNotifiee(host)) broadcastChannelManager, err := newChannelManager(ctx, identity, host, ticker) if err != nil { @@ -528,17 +533,20 @@ func extractMultiAddrFromPeers(peers []string) ([]peer.AddrInfo, error) { return peerInfos, nil } -func buildNotifiee() libp2pnet.Notifiee { +func buildNotifiee(libp2pHost host.Host) libp2pnet.Notifiee { notifyBundle := &libp2pnet.NotifyBundle{} notifyBundle.ConnectedF = func(_ libp2pnet.Network, connection libp2pnet.Conn) { - logger.Infof( - "established connection to [%v]", - multiaddressWithIdentity( - connection.RemoteMultiaddr(), - connection.RemotePeer(), - ), + peerID := connection.RemotePeer() + + peerMultiaddress := multiaddressWithIdentity( + connection.RemoteMultiaddr(), + peerID, ) + + logger.Infof("established connection to [%v]", peerMultiaddress) + + go executePingTest(libp2pHost, peerID, peerMultiaddress) } notifyBundle.DisconnectedF = func(_ libp2pnet.Network, connection libp2pnet.Conn) { logger.Infof( @@ -553,6 +561,46 @@ func buildNotifiee() libp2pnet.Notifiee { return notifyBundle } +func executePingTest( + libp2pHost host.Host, + peerID peer.ID, + peerMultiaddress string, +) { + logger.Infof("starting ping test for [%v]", peerMultiaddress) + + ctx, cancelCtx := context.WithTimeout( + context.Background(), + pingTestTimeout, + ) + defer cancelCtx() + + resultChan := ping.Ping(ctx, libp2pHost, peerID) + + select { + case result := <-resultChan: + if result.Error != nil { + logger.Warnf( + "ping test for [%v] failed: [%v]", + peerMultiaddress, + result.Error, + ) + } else if result.Error == nil && result.RTT == 0 { + logger.Warnf( + "peer test for [%v] failed without clear reason", + peerMultiaddress, + ) + } else { + logger.Infof( + "ping test for [%v] completed with success (RTT [%v])", + peerMultiaddress, + result.RTT, + ) + } + case <-ctx.Done(): + logger.Warnf("ping test for [%v] timed out", peerMultiaddress) + } +} + func multiaddressWithIdentity( multiaddress ma.Multiaddr, peerID peer.ID, diff --git a/pkg/protocol/announcer/announcer.go b/pkg/protocol/announcer/announcer.go index c69904492f..54860aa0ee 100644 --- a/pkg/protocol/announcer/announcer.go +++ b/pkg/protocol/announcer/announcer.go @@ -172,3 +172,35 @@ loop: return readyMembersIndexes, nil } + +// UnreadyMembers returns a list of member indexes that turned out to be unready +// during the announcement. The list is sorted in ascending order. +func UnreadyMembers( + readyMembers []group.MemberIndex, + groupSize int, +) []group.MemberIndex { + if len(readyMembers) == groupSize { + return []group.MemberIndex{} + } + + readyMembersSet := make(map[group.MemberIndex]bool) + for _, memberIndex := range readyMembers { + readyMembersSet[memberIndex] = true + } + + unreadyMembers := make([]group.MemberIndex, 0) + + for i := 0; i < groupSize; i++ { + memberIndex := group.MemberIndex(i + 1) + + if _, isReady := readyMembersSet[memberIndex]; !isReady { + unreadyMembers = append(unreadyMembers, memberIndex) + } + } + + sort.Slice(unreadyMembers, func(i, j int) bool { + return unreadyMembers[i] < unreadyMembers[j] + }) + + return unreadyMembers +} diff --git a/pkg/protocol/announcer/announcer_test.go b/pkg/protocol/announcer/announcer_test.go index 2472a14db0..b96b69f8da 100644 --- a/pkg/protocol/announcer/announcer_test.go +++ b/pkg/protocol/announcer/announcer_test.go @@ -227,3 +227,46 @@ func TestAnnouncer(t *testing.T) { }) } } + +func TestUnreadyMembers(t *testing.T) { + tests := map[string]struct { + readyMembers []group.MemberIndex + groupSize int + expected []group.MemberIndex + }{ + "all members are ready": { + readyMembers: []group.MemberIndex{1, 2, 3, 4, 5}, + groupSize: 5, + expected: []group.MemberIndex{}, + }, + "some members are not ready": { + readyMembers: []group.MemberIndex{1, 3, 5}, + groupSize: 5, + expected: []group.MemberIndex{2, 4}, + }, + "no members are ready": { + readyMembers: []group.MemberIndex{}, + groupSize: 5, + expected: []group.MemberIndex{1, 2, 3, 4, 5}, + }, + "group size is zero": { + readyMembers: []group.MemberIndex{}, + groupSize: 0, + expected: []group.MemberIndex{}, + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + result := UnreadyMembers(test.readyMembers, test.groupSize) + + if !reflect.DeepEqual(test.expected, result) { + t.Errorf( + "unexpected result\nexpected: %v\nactual: %v", + test.expected, + result, + ) + } + }) + } +} diff --git a/pkg/tbtc/dkg.go b/pkg/tbtc/dkg.go index bfd0cd85a5..06f417b32b 100644 --- a/pkg/tbtc/dkg.go +++ b/pkg/tbtc/dkg.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "golang.org/x/exp/maps" "math/big" "sort" @@ -181,10 +182,15 @@ func (de *dkgExecutor) checkEligibility( } dkgLogger.Infof( - "selected group members for DKG = %s", + "selected group members (seats) for DKG: [%s]", groupSelectionResult.OperatorsAddresses, ) + dkgLogger.Infof( + "distinct operators participating in DKG: [%s]", + maps.Keys(groupSelectionResult.OperatorsAddresses.Set()), + ) + if len(groupSelectionResult.OperatorsAddresses) > de.groupParameters.GroupSize { return nil, nil, fmt.Errorf( "group size larger than supported: [%v]", diff --git a/pkg/tbtc/dkg_loop.go b/pkg/tbtc/dkg_loop.go index 082f7310b0..4b7955abc9 100644 --- a/pkg/tbtc/dkg_loop.go +++ b/pkg/tbtc/dkg_loop.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + "github.com/keep-network/keep-core/pkg/protocol/announcer" "math/big" "github.com/ipfs/go-log/v2" @@ -209,6 +210,11 @@ func (drl *dkgRetryLoop) start( continue } + unreadyMembersIndexes := announcer.UnreadyMembers( + readyMembersIndexes, + drl.groupParameters.GroupSize, + ) + // Check the loop stop signal. if ctx.Err() != nil { return nil, ctx.Err() @@ -217,19 +223,23 @@ func (drl *dkgRetryLoop) start( if len(readyMembersIndexes) >= drl.groupParameters.GroupQuorum { drl.logger.Infof( "[member:%v] completed announcement phase for attempt [%v] "+ - "with quorum of [%v] members ready to perform DKG", + "with quorum of [%v] members ready to perform DKG; "+ + "following members are not ready: [%v]", drl.memberIndex, drl.attemptCounter, len(readyMembersIndexes), + unreadyMembersIndexes, ) } else { drl.logger.Warnf( "[member:%v] completed announcement phase for attempt [%v] "+ "with non-quorum of [%v] members ready to perform DKG; "+ - "starting next attempt", + "following members are not ready: [%v]; "+ + "moving to the next attempt", drl.memberIndex, drl.attemptCounter, len(readyMembersIndexes), + unreadyMembersIndexes, ) continue } diff --git a/pkg/tbtc/signing_loop.go b/pkg/tbtc/signing_loop.go index 10b964eecf..f67a585fe7 100644 --- a/pkg/tbtc/signing_loop.go +++ b/pkg/tbtc/signing_loop.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + "github.com/keep-network/keep-core/pkg/protocol/announcer" "math/big" "math/rand" "sort" @@ -261,6 +262,11 @@ func (srl *signingRetryLoop) start( continue } + unreadyMembersIndexes := announcer.UnreadyMembers( + readyMembersIndexes, + len(srl.signingGroupOperators), + ) + // Check the loop stop signal again. The announcement took some time // and the context may be done now. if ctx.Err() != nil { @@ -270,19 +276,23 @@ func (srl *signingRetryLoop) start( if len(readyMembersIndexes) >= srl.groupParameters.HonestThreshold { srl.logger.Infof( "[member:%v] completed announcement phase for attempt [%v] "+ - "with honest majority of [%v] members ready to sign", + "with honest majority of [%v] members ready to sign; "+ + "following members are not ready: [%v]", srl.signingGroupMemberIndex, srl.attemptCounter, len(readyMembersIndexes), + unreadyMembersIndexes, ) } else { srl.logger.Warnf( "[member:%v] completed announcement phase for attempt [%v] "+ "with minority of [%v] members ready to sign; "+ - "starting next attempt", + "following members are not ready: [%v]; "+ + "moving to the next attempt", srl.signingGroupMemberIndex, srl.attemptCounter, len(readyMembersIndexes), + unreadyMembersIndexes, ) continue }