Skip to content

Commit

Permalink
step 2 of the refactor branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
nitronit committed Sep 2, 2023
1 parent 6a613a5 commit 78ac0ec
Show file tree
Hide file tree
Showing 22 changed files with 993 additions and 727 deletions.
10 changes: 5 additions & 5 deletions cmd/horcrux/cmd/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@ horcrux elect 2 # elect specific leader`,
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()

grpcClient := proto.NewICosignerGRPCServerClient(conn)
grpcClient := proto.NewIRaftGRPCClient(conn)
_, err = grpcClient.TransferLeadership(
ctx,
&proto.CosignerGRPCTransferLeadershipRequest{LeaderID: leaderID},
&proto.RaftGRPCTransferLeadershipRequest{LeaderID: leaderID},
)
if err != nil {
return err
}

res, err := grpcClient.GetLeader(ctx, &proto.CosignerGRPCGetLeaderRequest{})
res, err := grpcClient.GetLeader(ctx, &proto.RaftGRPCGetLeaderRequest{})
if err != nil {
return err
}
Expand Down Expand Up @@ -167,9 +167,9 @@ func getLeaderCmd() *cobra.Command {
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()

grpcClient := proto.NewICosignerGRPCServerClient(conn)
grpcClient := proto.NewIRaftGRPCClient(conn)

res, err := grpcClient.GetLeader(ctx, &proto.CosignerGRPCGetLeaderRequest{})
res, err := grpcClient.GetLeader(ctx, &proto.RaftGRPCGetLeaderRequest{})
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions cmd/horcrux/cmd/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func NewThresholdValidator(

thresholdCfg := config.Config.ThresholdModeConfig
// NOTE: Shouldnt this be a list of concrete type instead of interface type?
remoteCosigners := make([]node.ICosigner, 0, len(thresholdCfg.Cosigners)-1)
remoteCosigners := make([]pcosigner.IRemoteCosigner, 0, len(thresholdCfg.Cosigners)-1)
remoteIcosigners := make([]pcosigner.ICosigner, 0, len(thresholdCfg.Cosigners)-1)

var p2pListen string

Expand All @@ -42,10 +43,14 @@ func NewThresholdValidator(

for _, c := range thresholdCfg.Cosigners {
if c.ShardID != security.GetID() {
temp := pcosigner.NewRemoteCosigner(c.ShardID, c.P2PAddr)
remoteCosigners = append(
remoteCosigners,
pcosigner.NewRemoteCosigner(c.ShardID, c.P2PAddr),
temp,
)
remoteIcosigners = append(
remoteIcosigners,
temp)
} else {
p2pListen = c.P2PAddr
}
Expand Down Expand Up @@ -74,9 +79,8 @@ func NewThresholdValidator(
// RAFT node ID is the cosigner ID
nodeID := fmt.Sprint(security.GetID())

// Start RAFT store listener
raftStore := node.NewRaftStore(nodeID,
raftDir, p2pListen, raftTimeout, logger)
raftDir, p2pListen, raftTimeout, logger, localCosigner, remoteIcosigners)
if err := raftStore.Start(); err != nil {
return nil, nil, fmt.Errorf("error starting raft store: %w", err)
}
Expand Down
27 changes: 17 additions & 10 deletions pkg/multiresolver/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"testing"
"time"

"github.com/cometbft/cometbft/libs/log"
"github.com/strangelove-ventures/horcrux/pkg/node"
"github.com/strangelove-ventures/horcrux/pkg/pcosigner"

grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/strangelove-ventures/horcrux/pkg/multiresolver"
Expand All @@ -26,27 +28,32 @@ func createListener(nodeID string, homedir string) (string, func(), error) {
}

port := strconv.Itoa(sock.Addr().(*net.TCPAddr).Port)
localcosign := pcosigner.NewLocalCosigner(
log.NewNopLogger(), nil, nil, "")

var remoteCosigners []pcosigner.IRemoteCosigner
var timeDuration time.Duration

shadowRemoteCosign := pcosigner.ToIcosigner(remoteCosigners)
s := node.NewRaftStore(
nodeID,
homedir,
"127.0.0.1:"+port,
500*time.Millisecond,
nil)
nil, localcosign, shadowRemoteCosign)

// Need to set pointers to avoid nil pointers.
var cosigners []node.ICosigner
var timeDuration time.Duration
thresholdvalidator := node.NewThresholdValidator(nil, nil, 0, timeDuration, 0, nil, cosigners, nil)

thresholdvalidator := node.NewThresholdValidator(log.NewNopLogger(), nil, 0, timeDuration, 0, localcosign, remoteCosigners, s)

Check failure on line 47 in pkg/multiresolver/multi_test.go

View workflow job for this annotation

GitHub Actions / lint

line is 127 characters (lll)
s.SetThresholdValidator(thresholdvalidator)

transportManager, err := s.Open()
transportManager, err := s.Open(shadowRemoteCosign)
if err != nil {
return "", nil, err
}

grpcServer := grpc.NewServer()
proto.RegisterICosignerGRPCServerServer(grpcServer, node.NewGRPCServer(nil, nil, s))
proto.RegisterICosignerGRPCServer(grpcServer, node.NewGRPCServer(nil, s))
transportManager.Register(grpcServer)

go func() {
Expand Down Expand Up @@ -97,8 +104,8 @@ func TestMultiResolver(t *testing.T) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()

grpcClient := proto.NewICosignerGRPCServerClient(connDNS)
_, err = grpcClient.GetLeader(ctx, &proto.CosignerGRPCGetLeaderRequest{})
grpcClient := proto.NewIRaftGRPCClient(connDNS)
_, err = grpcClient.GetLeader(ctx, &proto.RaftGRPCGetLeaderRequest{})
require.NoError(t, err)

connIP, err := grpc.Dial(targetIP,
Expand All @@ -110,7 +117,7 @@ func TestMultiResolver(t *testing.T) {
require.NoError(t, err)
defer connIP.Close()

grpcClient = proto.NewICosignerGRPCServerClient(connIP)
_, err = grpcClient.GetLeader(ctx, &proto.CosignerGRPCGetLeaderRequest{})
grpcClient = proto.NewIRaftGRPCClient(connIP)
_, err = grpcClient.GetLeader(ctx, &proto.RaftGRPCGetLeaderRequest{})
require.NoError(t, err)
}
135 changes: 79 additions & 56 deletions pkg/node/grpc_server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// This is responsible for the Cosigners Connections.
package node

import (
"context"
"fmt"
"time"

"github.com/cometbft/cometbft/libs/log"
"github.com/strangelove-ventures/horcrux/pkg/pcosigner"
"github.com/strangelove-ventures/horcrux/pkg/types"

Expand All @@ -13,35 +15,54 @@ import (
)

// Enures that GRPCServer implements the proto.CosignerGRPCServer interface.
var _ proto.ICosignerGRPCServerServer = &GRPCServer{}
var _ proto.ICosignerGRPCServer = &GRPCServer{}
var _ proto.IRaftGRPCServer = &GRPCServer{}

type GRPCServer struct {
cosigner *pcosigner.LocalCosigner // The "node's" LocalCosigner
thresholdValidator *ThresholdValidator // The "node's" ThresholdValidator
raftStore *RaftStore // The "node's" RaftStore
// TODO Implement as

type CosignGRPCServer struct {
cosigner pcosigner.ILocalCosigner

logger log.Logger
proto.UnimplementedICosignerGRPCServer
// Promoted Fields is embedded to have forward compatiblitity
proto.UnimplementedICosignerGRPCServerServer
}

type RaftGRPCServer struct {
// logger log.Logger
peers []pcosigner.ICosigner
// The "node's" ThresholdValidator
// thresholdValidator *ThresholdValidator

// The "node's" RaftStore
raftStore *RaftStore

// Promoted Fields is embedded to have forward compatiblitity
proto.UnimplementedIRaftGRPCServer
}
type GRPCServer struct {
*CosignGRPCServer
// The "node's" LocalCosigner
*RaftGRPCServer
}

// NewGRPCServer returns a new GRPCServer.
func NewGRPCServer(
cosigner *pcosigner.LocalCosigner,
thresholdValidator *ThresholdValidator,
cosigner pcosigner.ILocalCosigner,
// thresholdValidator *ThresholdValidator,
raftStore *RaftStore,
) *GRPCServer {
return &GRPCServer{
// TODO: This is a hack to get around the fact that the cosigner is not a?
cosigner: cosigner,
thresholdValidator: thresholdValidator,
raftStore: raftStore,
CosignGRPCServer: &CosignGRPCServer{cosigner: cosigner}, // The nodes local cosigner
RaftGRPCServer: &RaftGRPCServer{raftStore: raftStore}, // The nodes raftStore
}
}

// SignBlock "pseudo-implements" the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
func (rpc *GRPCServer) SignBlock(
func (rpc *RaftGRPCServer) SignBlock(
_ context.Context,
req *proto.CosignerGRPCSignBlockRequest,
) (*proto.CosignerGRPCSignBlockResponse, error) {
req *proto.RaftGRPCSignBlockRequest,
) (*proto.RaftGRPCSignBlockResponse, error) {
block := &Block{
Height: req.Block.GetHeight(),
Round: req.Block.GetRound(),
Expand All @@ -50,16 +71,53 @@ func (rpc *GRPCServer) SignBlock(
Timestamp: time.Unix(0, req.Block.GetTimestamp()),
}
// this
res, _, err := rpc.thresholdValidator.SignBlock(req.ChainID, block)
res, _, err := rpc.raftStore.thresholdValidator.SignBlock(req.ChainID, block)
if err != nil {
return nil, err
}
return &proto.CosignerGRPCSignBlockResponse{
return &proto.RaftGRPCSignBlockResponse{
Signature: res,
}, nil
}

func (rpc *GRPCServer) SetNoncesAndSign(
// TransferLeadership pseudo-implements the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
func (rpc *RaftGRPCServer) TransferLeadership(
_ context.Context,
req *proto.RaftGRPCTransferLeadershipRequest,
) (*proto.RaftGRPCTransferLeadershipResponse, error) {
if rpc.raftStore.raft.State() != raft.Leader {
return &proto.RaftGRPCTransferLeadershipResponse{}, nil
}
leaderID := req.GetLeaderID()
if leaderID != "" {
// TODO: Not an elegant fix. Se other notes.
for _, c := range rpc.peers {
shardID := fmt.Sprint(c.GetID())
if shardID == leaderID {
raftAddress := p2pURLToRaftAddress(c.GetAddress())
fmt.Printf("Transferring leadership to ID: %s - Address: %s\n", shardID, raftAddress)
rpc.raftStore.raft.LeadershipTransferToServer(raft.ServerID(shardID), raft.ServerAddress(raftAddress))
return &proto.RaftGRPCTransferLeadershipResponse{LeaderID: shardID, LeaderAddress: raftAddress}, nil
}
}
}
fmt.Printf("Transferring leadership to next candidate\n")
rpc.raftStore.raft.LeadershipTransfer()
return &proto.RaftGRPCTransferLeadershipResponse{}, nil
}

// GetLeader pseudo-implements the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
// GetLeader gets the current raft cluster leader and send it as respons.
func (rpc *RaftGRPCServer) GetLeader(
context.Context,
*proto.RaftGRPCGetLeaderRequest,
) (*proto.RaftGRPCGetLeaderResponse, error) {
leader := rpc.raftStore.GetLeader()
return &proto.RaftGRPCGetLeaderResponse{Leader: string(leader)}, nil
}

// SetNoncesAndSign implements the ICosignerGRPCServer interface.
func (rpc *CosignGRPCServer) SetNoncesAndSign(
_ context.Context,
req *proto.CosignerGRPCSetNoncesAndSignRequest,
) (*proto.CosignerGRPCSetNoncesAndSignResponse, error) {
Expand All @@ -71,7 +129,7 @@ func (rpc *GRPCServer) SetNoncesAndSign(
SignBytes: req.GetSignBytes(),
})
if err != nil {
rpc.raftStore.logger.Error(
rpc.logger.Error(
"Failed to sign with shard",
"chain_id", req.ChainID,
"height", req.Hrst.Height,
Expand All @@ -81,7 +139,7 @@ func (rpc *GRPCServer) SetNoncesAndSign(
)
return nil, err
}
rpc.raftStore.logger.Info(
rpc.logger.Info(
"Signed with shard",
"chain_id", req.ChainID,
"height", req.Hrst.Height,
Expand All @@ -96,7 +154,7 @@ func (rpc *GRPCServer) SetNoncesAndSign(
}

// GetNonces implements the ICosignerGRPCServer interface.
func (rpc *GRPCServer) GetNonces(
func (rpc *CosignGRPCServer) GetNonces(
_ context.Context,
req *proto.CosignerGRPCGetNoncesRequest,
) (*proto.CosignerGRPCGetNoncesResponse, error) {
Expand All @@ -111,38 +169,3 @@ func (rpc *GRPCServer) GetNonces(
Nonces: pcosigner.CosignerNonces(res.Nonces).ToProto(),
}, nil
}

// TransferLeadership pseudo-implements the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
func (rpc *GRPCServer) TransferLeadership(
_ context.Context,
req *proto.CosignerGRPCTransferLeadershipRequest,
) (*proto.CosignerGRPCTransferLeadershipResponse, error) {
if rpc.raftStore.raft.State() != raft.Leader {
return &proto.CosignerGRPCTransferLeadershipResponse{}, nil
}
leaderID := req.GetLeaderID()
if leaderID != "" {
for _, c := range rpc.raftStore.thresholdValidator.peerCosigners {
shardID := fmt.Sprint(c.GetID())
if shardID == leaderID {
raftAddress := p2pURLToRaftAddress(c.GetAddress())
fmt.Printf("Transferring leadership to ID: %s - Address: %s\n", shardID, raftAddress)
rpc.raftStore.raft.LeadershipTransferToServer(raft.ServerID(shardID), raft.ServerAddress(raftAddress))
return &proto.CosignerGRPCTransferLeadershipResponse{LeaderID: shardID, LeaderAddress: raftAddress}, nil
}
}
}
fmt.Printf("Transferring leadership to next candidate\n")
rpc.raftStore.raft.LeadershipTransfer()
return &proto.CosignerGRPCTransferLeadershipResponse{}, nil
}

// GetLeader pseudo-implements the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
// GetLeader gets the current raft cluster leader and send it as respons.
func (rpc *GRPCServer) GetLeader(
context.Context,
*proto.CosignerGRPCGetLeaderRequest,
) (*proto.CosignerGRPCGetLeaderResponse, error) {
leader := rpc.raftStore.GetLeader()
return &proto.CosignerGRPCGetLeaderResponse{Leader: string(leader)}, nil
}
30 changes: 0 additions & 30 deletions pkg/node/icosigner.go

This file was deleted.

Loading

0 comments on commit 78ac0ec

Please sign in to comment.