Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nitronit committed Sep 27, 2023
1 parent f03168c commit 8298bc8
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 79 deletions.
25 changes: 12 additions & 13 deletions cmd/horcrux/cmd/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewThresholdValidator(
thresholdCfg := config.Config.ThresholdModeConfig
// NOTE: Shouldnt this be a list of concrete type instead of interface type?
remoteCosigners := make([]pcosigner.IRemoteCosigner, 0, len(thresholdCfg.Cosigners)-1)
remoteIcosigners := make([]pcosigner.ICosigner, 0, len(thresholdCfg.Cosigners)-1)
// peers := make([]pcosigner.ICosigner, 0, len(thresholdCfg.Cosigners)-1)

var p2pListen string
var cosign pcosigner.Cosigner
Expand All @@ -43,21 +43,20 @@ func NewThresholdValidator(

for _, c := range thresholdCfg.Cosigners {
if c.ShardID != security.GetID() {
temp := pcosigner.NewRemoteCosigner(c.ShardID, c.P2PAddr)
remoteCosigners = append(
remoteCosigners,
temp,
)
remoteIcosigners = append(
remoteIcosigners,
temp)

// remoteCosigners = append(
// remoteCosigners,
// temp,
// )
remoteCosigners = append(remoteCosigners, pcosigner.NewRemoteCosigner(security.GetID(), c.P2PAddr))
} else {
// p2pListen = c.P2PAddr
cosign = pcosigner.NewCosign(c.ShardID, c.P2PAddr)

}
}

if p2pListen == "" {
if cosign.GetAddress() == "" {
return nil, nil, fmt.Errorf("cosigner config does not exist for our shard ID %d", security.GetID())
}

Expand All @@ -81,7 +80,7 @@ func NewThresholdValidator(
nodeID := fmt.Sprint(security.GetID())

raftStore := node.NewRaftStore(nodeID,
raftDir, p2pListen, raftTimeout, logger, localCosigner, remoteIcosigners)
raftDir, p2pListen, raftTimeout, logger)
if err := raftStore.Start(); err != nil {
return nil, nil, fmt.Errorf("error starting raft store: %w", err)
}
Expand All @@ -94,8 +93,8 @@ func NewThresholdValidator(
grpcTimeout,
maxWaitForSameBlockAttempts,
localCosigner,
remoteCosigners,
raftStore, // raftStore implements the ILeader interface
remoteCosigners, // remote Cosigners are the peers we are calling remotely
raftStore, // raftStore implements the ILeader interface
)

raftStore.SetThresholdValidator(val)
Expand Down
112 changes: 112 additions & 0 deletions go.sum

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions pkg/multiresolver/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/strangelove-ventures/horcrux/pkg/node"
"github.com/strangelove-ventures/horcrux/pkg/pcosigner"
proto "github.com/strangelove-ventures/horcrux/pkg/proto"
proto2 "github.com/strangelove-ventures/horcrux/pkg/proto"

grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/strangelove-ventures/horcrux/pkg/multiresolver"
Expand Down Expand Up @@ -51,20 +50,24 @@ func createListener(nodeID string, homedir string) (string, func(), error) {
homedir,
"127.0.0.1:"+port,
500*time.Millisecond,
log.NewNopLogger(), localcosign, peers)
log.NewNopLogger())

// Need to set pointers to avoid nil pointers.
thresholdvalidator := node.NewThresholdValidator(log.NewNopLogger(), nil, 0, timeDuration, 0, localcosign, remoteCosigners, s)
s.SetThresholdValidator(thresholdvalidator)

transportManager, err := s.Open(peers)
transportManager, err := s.Open()
if err != nil {
fmt.Printf("Error opening transport manager: %v\n", err)
return "", nil, err
}

grpcServer := grpc.NewServer()
proto.RegisterICosignerGRPCServer(grpcServer, node.NewGRPCServer(localcosign, s))

server := node.NewGRPCServer(s)
proto.RegisterIRaftGRPCServer(grpcServer, server.RaftGRPCServer)
proto.RegisterICosignerGRPCServer(grpcServer, server.CosignGRPCServer)

transportManager.Register(grpcServer)

go func() {
Expand Down Expand Up @@ -115,8 +118,8 @@ func TestMultiResolver(t *testing.T) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()

grpcClient := proto2.NewIRaftGRPCClient(connDNS)
_, err = grpcClient.GetLeader(ctx, &proto2.RaftGRPCGetLeaderRequest{})
grpcClient := proto.NewIRaftGRPCClient(connDNS)
_, err = grpcClient.GetLeader(ctx, &proto.RaftGRPCGetLeaderRequest{})
require.NoError(t, err)

connIP, err := grpc.Dial(targetIP,
Expand All @@ -128,7 +131,7 @@ func TestMultiResolver(t *testing.T) {
require.NoError(t, err)
defer connIP.Close()

grpcClient = proto2.NewIRaftGRPCClient(connIP)
_, err = grpcClient.GetLeader(ctx, &proto2.RaftGRPCGetLeaderRequest{})
grpcClient = proto.NewIRaftGRPCClient(connIP)
_, err = grpcClient.GetLeader(ctx, &proto.RaftGRPCGetLeaderRequest{})
require.NoError(t, err)
}
51 changes: 31 additions & 20 deletions pkg/node/grpc_server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This is responsible for the Cosigners Connections.
// This is responsible for the RAFT & Cosigners Connections.
package node

import (
Expand All @@ -13,12 +13,11 @@ import (
"github.com/hashicorp/raft"

"github.com/strangelove-ventures/horcrux/pkg/proto"
proto2 "github.com/strangelove-ventures/horcrux/pkg/proto"
)

// Enures that GRPCServer implements the proto.CosignerGRPCServer interface.
var _ proto.ICosignerGRPCServer = &GRPCServer{}
var _ proto2.IRaftGRPCServer = &GRPCServer{}
var _ proto.IRaftGRPCServer = &GRPCServer{}

// TODO Implement as

Expand All @@ -32,15 +31,15 @@ type CosignGRPCServer struct {

type RaftGRPCServer struct {
// logger log.Logger
peers []pcosigner.ICosigner
//peers []pcosigner.ICosigner
// The "node's" ThresholdValidator
// thresholdValidator *ThresholdValidator

// The "node's" RaftStore
raftStore *RaftStore

// Promoted Fields is embedded to have forward compatiblitity
proto2.UnimplementedIRaftGRPCServer
proto.UnimplementedIRaftGRPCServer
}
type GRPCServer struct {
*CosignGRPCServer
Expand All @@ -50,21 +49,33 @@ type GRPCServer struct {

// NewGRPCServer returns a new GRPCServer.
func NewGRPCServer(
cosigner pcosigner.ILocalCosigner,
//cosigner pcosigner.ILocalCosigner,
// thresholdValidator *ThresholdValidator,
raftStore *RaftStore,
) *GRPCServer {
return &GRPCServer{
CosignGRPCServer: &CosignGRPCServer{cosigner: cosigner}, // The nodes local cosigner
RaftGRPCServer: &RaftGRPCServer{raftStore: raftStore}, // The nodes raftStore
CosignGRPCServer: &CosignGRPCServer{cosigner: raftStore.thresholdValidator.thresholdalgorithm.GetLocalCosign()}, // The nodes local cosigner
RaftGRPCServer: &RaftGRPCServer{raftStore: raftStore},
// peers: pcosigner.FromIRemoteToICosigner(raftStore.thresholdValidator.thresholdalgorithm.GetPeers())}, // The nodes raftStore
}
}

func NewRaftGRPCServer(
//cosigner pcosigner.ILocalCosigner,
// thresholdValidator *ThresholdValidator,
raftStore *RaftStore,
) *RaftGRPCServer {
return &RaftGRPCServer{
raftStore: raftStore,
// peers: pcosigner.FromIRemoteToICosigner(raftStore.thresholdValidator.thresholdalgorithm.GetPeers()),
}
}

// SignBlock "pseudo-implements" the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
func (rpc *RaftGRPCServer) SignBlock(
_ context.Context,
req *proto2.RaftGRPCSignBlockRequest,
) (*proto2.RaftGRPCSignBlockResponse, error) {
req *proto.RaftGRPCSignBlockRequest,
) (*proto.RaftGRPCSignBlockResponse, error) {
block := &Block{
Height: req.Block.GetHeight(),
Round: req.Block.GetRound(),
Expand All @@ -77,45 +88,45 @@ func (rpc *RaftGRPCServer) SignBlock(
if err != nil {
return nil, err
}
return &proto2.RaftGRPCSignBlockResponse{
return &proto.RaftGRPCSignBlockResponse{
Signature: res,
}, nil
}

// TransferLeadership pseudo-implements the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
func (rpc *RaftGRPCServer) TransferLeadership(
_ context.Context,
req *proto2.RaftGRPCTransferLeadershipRequest,
) (*proto2.RaftGRPCTransferLeadershipResponse, error) {
req *proto.RaftGRPCTransferLeadershipRequest,
) (*proto.RaftGRPCTransferLeadershipResponse, error) {
if rpc.raftStore.raft.State() != raft.Leader {
return &proto2.RaftGRPCTransferLeadershipResponse{}, nil
return &proto.RaftGRPCTransferLeadershipResponse{}, nil
}
leaderID := req.GetLeaderID()
if leaderID != "" {
// TODO: Not an elegant fix. Se other notes.
for _, c := range rpc.peers {
for _, c := range rpc.raftStore.thresholdValidator.thresholdalgorithm.GetPeers() {
shardID := fmt.Sprint(c.GetID())
if shardID == leaderID {
raftAddress := p2pURLToRaftAddress(c.GetAddress())
fmt.Printf("Transferring leadership to ID: %s - Address: %s\n", shardID, raftAddress)
rpc.raftStore.raft.LeadershipTransferToServer(raft.ServerID(shardID), raft.ServerAddress(raftAddress))
return &proto2.RaftGRPCTransferLeadershipResponse{LeaderID: shardID, LeaderAddress: raftAddress}, nil
return &proto.RaftGRPCTransferLeadershipResponse{LeaderID: shardID, LeaderAddress: raftAddress}, nil
}
}
}
fmt.Printf("Transferring leadership to next candidate\n")
rpc.raftStore.raft.LeadershipTransfer()
return &proto2.RaftGRPCTransferLeadershipResponse{}, nil
return &proto.RaftGRPCTransferLeadershipResponse{}, nil
}

// GetLeader pseudo-implements the ICosignerGRPCServer interface in pkg/proto/cosigner_grpc_server_grpc.pb.go
// GetLeader gets the current raft cluster leader and send it as respons.
func (rpc *RaftGRPCServer) GetLeader(
context.Context,
*proto2.RaftGRPCGetLeaderRequest,
) (*proto2.RaftGRPCGetLeaderResponse, error) {
*proto.RaftGRPCGetLeaderRequest,
) (*proto.RaftGRPCGetLeaderResponse, error) {
leader := rpc.raftStore.GetLeader()
return &proto2.RaftGRPCGetLeaderResponse{Leader: string(leader)}, nil
return &proto.RaftGRPCGetLeaderResponse{Leader: string(leader)}, nil
}

// SetNoncesAndSign implements the ICosignerGRPCServer interface.
Expand Down
58 changes: 29 additions & 29 deletions pkg/node/raft_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync"
"time"

"github.com/strangelove-ventures/horcrux/pkg/pcosigner"
"github.com/strangelove-ventures/horcrux/pkg/proto"
"github.com/strangelove-ventures/horcrux/pkg/types"

Expand Down Expand Up @@ -69,7 +68,7 @@ type RaftStore struct {
// NewRaftStore returns a new RaftStore.
func NewRaftStore(
nodeID string, directory string, bindAddress string, timeout time.Duration,
logger log.Logger, localCosigner pcosigner.ILocalCosigner, peers []pcosigner.ICosigner) *RaftStore {
logger log.Logger) /*, localCosigner pcosigner.ILocalCosigner, peers []pcosigner.ICosigner) */ *RaftStore {
cosignerRaftStore := &RaftStore{
NodeID: nodeID,
RaftDir: directory,
Expand All @@ -79,11 +78,11 @@ func NewRaftStore(
logger: logger,
}

err := cosignerRaftStore.onStart(localCosigner, peers)
if err != nil {
fmt.Printf("cosignerRaftStore.onStart: %v", err)
panic(err)
}
// err := cosignerRaftStore.OnStart()
// if err != nil {
// fmt.Printf("cosignerRaftStore.onStart: %v", err)
// panic(err)
// }
cosignerRaftStore.BaseService = *service.NewBaseService(logger, "CosignerRaftStore", cosignerRaftStore)
return cosignerRaftStore
}
Expand All @@ -92,7 +91,7 @@ func (s *RaftStore) SetThresholdValidator(thresholdValidator *ThresholdValidator
s.thresholdValidator = thresholdValidator
}

func (s *RaftStore) init(localCosigner pcosigner.ILocalCosigner, peers []pcosigner.ICosigner) error {
func (s *RaftStore) init() error {
host := p2pURLToRaftAddress(s.RaftBind)
_, port, err := net.SplitHostPort(host)
if err != nil {
Expand All @@ -103,18 +102,17 @@ func (s *RaftStore) init(localCosigner pcosigner.ILocalCosigner, peers []pcosign
if err != nil {
return err
}
transportManager, err := s.Open(peers)
transportManager, err := s.Open()
if err != nil {
fmt.Printf("s.Open: %v", err)
return err
}
// Create a new gRPC server which is used by both the Raft, the threshold validator and the cosigner
grpcServer := grpc.NewServer()

// proto.RegisterIRaftGRPCServer(grpcServer, s)
proto.RegisterICosignerGRPCServer(grpcServer,
// FIX: this is a TempFIX get cosigner
NewGRPCServer(localCosigner, s))
server := NewGRPCServer(s)
proto.RegisterIRaftGRPCServer(grpcServer, server.RaftGRPCServer)
proto.RegisterICosignerGRPCServer(grpcServer, server.CosignGRPCServer)
transportManager.Register(grpcServer)
leaderhealth.Setup(s.raft, grpcServer, []string{"Leader"})
raftadmin.Register(grpcServer, s.raft)
Expand All @@ -124,18 +122,12 @@ func (s *RaftStore) init(localCosigner pcosigner.ILocalCosigner, peers []pcosign

// OnStart starts the raft server
func (s *RaftStore) OnStart() error {
s.logger.Info("Starting RaftStore")
return nil
}
func (s *RaftStore) onStart(localCosign pcosigner.ILocalCosigner, peers []pcosigner.ICosigner) error {
go func() {
err := s.init(localCosign, peers)
err := s.init()
if err != nil {
s.logger.Error("Starting RaftStore", "error", err)
panic(err)
}
}()

return nil
}

Expand All @@ -150,11 +142,12 @@ func p2pURLToRaftAddress(p2pURL string) string {
// Open opens the store. If enableSingle is set, and there are no existing peers,
// then this node becomes the first node, and therefore leader, of the cluster.
// localID should be the server identifier for this node.
func (s *RaftStore) Open(peers []pcosigner.ICosigner) (*raftgrpctransport.Manager, error) {
func (s *RaftStore) Open() (*raftgrpctransport.Manager, error) {
// Setup Raft configuration.
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(s.NodeID)
config.LogLevel = "ERROR"
//fmt.Printf("Node ID %s\n", s.NodeID)
config.LogLevel = "DEBUG"

// Create the snapshot store. This allows the Raft to truncate the log.
snapshots, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr)
Expand All @@ -176,6 +169,8 @@ func (s *RaftStore) Open(peers []pcosigner.ICosigner) (*raftgrpctransport.Manage
}

raftAddress := raft.ServerAddress(p2pURLToRaftAddress(s.RaftBind))
fmt.Printf("Node Bind %s\n", s.RaftBind)
fmt.Printf("raftaddress %s\n", raftAddress)

// Setup Raft communication.
transportManager := raftgrpctransport.New(raftAddress, []grpc.DialOption{
Expand All @@ -198,15 +193,17 @@ func (s *RaftStore) Open(peers []pcosigner.ICosigner) (*raftgrpctransport.Manage
},
}
// This is also a not so nice fix.
for _, c := range peers {
configuration.Servers = append(configuration.Servers,
raft.Server{
ID: raft.ServerID(fmt.Sprint(c.GetID())),
Address: raft.ServerAddress(p2pURLToRaftAddress(c.GetAddress())),
})
for _, c := range s.thresholdValidator.thresholdalgorithm.GetPeers() {
fmt.Printf("Adding peer to raft cluster \n\tid: %v, \n\taddress: %s\n", c.GetID(), c.GetAddress())
configuration.Servers = append(configuration.Servers, raft.Server{
ID: raft.ServerID(fmt.Sprint(c.GetID())),
Address: raft.ServerAddress(p2pURLToRaftAddress(c.GetAddress())),
})
}
fmt.Printf("Configuration: %v\n", configuration)
s.raft.BootstrapCluster(configuration)

// fmt.Printf("Leader: %s \n", s.GetLeader())
return transportManager, nil
}

Expand Down Expand Up @@ -308,9 +305,12 @@ func (s *RaftStore) IsLeader() bool {

func (s *RaftStore) GetLeader() raft.ServerAddress {
if s == nil || s.raft == nil {
fmt.Printf("s is nil %v, or s.raft is nil %v", s, s.raft)
return ""
}
return s.raft.Leader()
add, id := s.raft.LeaderWithID()
fmt.Printf("\naddress is: \n\t%s\nid is:\n\t%s\n", add, id)
return add
}

func (s *RaftStore) ShareSigned(lss types.ChainSignStateConsensus) error {
Expand Down
Loading

0 comments on commit 8298bc8

Please sign in to comment.