Skip to content

Commit

Permalink
Indirect cosigner (#51)
Browse files Browse the repository at this point in the history
* If there is a common RPC port, use that port on the leader IP, useful when cosigners are not directly communicating with each other, e.g. inside kubernetes services

* Return signatures to sentry for same HRS (#55)

Add cache for signatures
Simplify high watermark check
  • Loading branch information
agouin authored Feb 7, 2022
1 parent 8f154e1 commit 0134855
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 51 deletions.
4 changes: 0 additions & 4 deletions signer/Cosigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,4 @@ type Cosigner interface {

// Sign the requested bytes
SetEphemeralSecretPartsAndSign(req CosignerSetEphemeralSecretPartsAndSignRequest) (*CosignerSignResponse, error)

// Request that the cosigner manage the threshold signing process for this block
// Will throw error if cosigner is not the leader
SignBlock(req CosignerSignBlockRequest) (CosignerSignBlockResponse, error)
}
4 changes: 0 additions & 4 deletions signer/local_cosigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,6 @@ func (cosigner *LocalCosigner) setEphemeralSecretPart(req CosignerSetEphemeralSe
return nil
}

func (cosigner *LocalCosigner) SignBlock(req CosignerSignBlockRequest) (CosignerSignBlockResponse, error) {
return CosignerSignBlockResponse{}, errors.New("not implemented")
}

func (cosigner *LocalCosigner) SetEphemeralSecretPartsAndSign(
req CosignerSetEphemeralSecretPartsAndSignRequest) (*CosignerSignResponse, error) {
for _, secretPart := range req.EncryptedSecrets {
Expand Down
29 changes: 21 additions & 8 deletions signer/raft_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package signer

import (
"encoding/json"
"errors"
"fmt"
"strings"
)

const (
Expand Down Expand Up @@ -31,28 +33,39 @@ func (f *fsm) handleLSSEvent(value string) {
_ = f.cosigner.SaveLastSignedState(*lss)
}

func (s *RaftStore) GetLeaderCosigner() (Cosigner, error) {
func (s *RaftStore) getLeaderRPCAddress() (string, error) {
leader := string(s.GetLeader())
if leader == "" {
return "", errors.New("no current raft leader")
}
// If the same RPC port is used for all peers, we can just use the leader address on that port
if s.commonRPCPort != "" {
leaderSplit := strings.Split(leader, ":")
if len(leaderSplit) == 2 {
return fmt.Sprintf("tcp://%s:%s", leaderSplit[0], s.commonRPCPort), nil
}
}
for _, peer := range s.Peers {
if peer.GetRaftAddress() == leader {
return peer, nil
return peer.GetAddress(), nil
}
tcpAddress, err := GetTCPAddressForRaftAddress(peer.GetRaftAddress())
if err != nil {
continue
}
if fmt.Sprint(tcpAddress) == leader {
return peer, nil
return peer.GetAddress(), nil
}
}
return nil, fmt.Errorf("unable to find leader cosigner from address %s", leader)

return "", fmt.Errorf("unable to find leader cosigner from address %s", leader)
}

func (s *RaftStore) LeaderSignBlock(req CosignerSignBlockRequest) (*CosignerSignBlockResponse, error) {
leaderCosigner, err := s.GetLeaderCosigner()
func (s *RaftStore) LeaderSignBlock(req CosignerSignBlockRequest) (res *CosignerSignBlockResponse, err error) {
leaderCosigner, err := s.getLeaderRPCAddress()
if err != nil {
return nil, err
}
res, err := leaderCosigner.SignBlock(req)
return &res, err

return res, CallRPC(leaderCosigner, "SignBlock", req, &res)
}
34 changes: 26 additions & 8 deletions signer/raft_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"net"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -49,21 +50,38 @@ type RaftStore struct {
logger log.Logger
cosigner *LocalCosigner
thresholdValidator *ThresholdValidator
commonRPCPort string
}

// OnStart starts the raft server
func getCommonRPCPort(peers []Cosigner) string {
var rpcPort string
for i, peer := range peers {
if i == 0 {
rpcPort = strings.Split(peer.GetAddress(), ":")[2]
continue
}
if strings.Split(peer.GetAddress(), ":")[2] != rpcPort {
return ""
}
}
return rpcPort
}

// New returns a new Store.
func NewRaftStore(
nodeID string, directory string, bindAddress string, timeout time.Duration,
logger log.Logger, cosigner *LocalCosigner, raftPeers []Cosigner) *RaftStore {
cosignerRaftStore := &RaftStore{
NodeID: nodeID,
RaftDir: directory,
RaftBind: bindAddress,
RaftTimeout: timeout,
m: make(map[string]string),
logger: logger,
cosigner: cosigner,
Peers: raftPeers,
NodeID: nodeID,
RaftDir: directory,
RaftBind: bindAddress,
RaftTimeout: timeout,
m: make(map[string]string),
logger: logger,
cosigner: cosigner,
Peers: raftPeers,
commonRPCPort: getCommonRPCPort(raftPeers),
}

cosignerRaftStore.BaseService = *service.NewBaseService(logger, "CosignerRaftStore", cosignerRaftStore)
Expand Down
5 changes: 0 additions & 5 deletions signer/remote_cosigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func (cosigner *RemoteCosigner) GetEphemeralSecretParts(
return res, CallRPC(cosigner.address, "GetEphemeralSecretParts", req, &res)
}

// Implements the cosigner interface
func (cosigner *RemoteCosigner) SignBlock(req CosignerSignBlockRequest) (res CosignerSignBlockResponse, err error) {
return res, CallRPC(cosigner.address, "SignBlock", req, &res)
}

// Implements the cosigner interface
func (cosigner *RemoteCosigner) SetEphemeralSecretPartsAndSign(
req CosignerSetEphemeralSecretPartsAndSignRequest) (res *CosignerSignResponse, err error) {
Expand Down
60 changes: 58 additions & 2 deletions signer/sign_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
stepPropose int8 = 1
stepPrevote int8 = 2
stepPrecommit int8 = 3
blocksToCache = 3
)

func CanonicalVoteToStep(vote *tmProto.CanonicalVote) int8 {
Expand Down Expand Up @@ -56,6 +57,7 @@ type SignState struct {
EphemeralPublic []byte `json:"ephemeral_public"`
Signature []byte `json:"signature,omitempty"`
SignBytes tmBytes.HexBytes `json:"signbytes,omitempty"`
cache map[HRSKey]SignStateConsensus

filePath string
}
Expand All @@ -76,6 +78,22 @@ func NewSignStateConsensus(height int64, round int64, step int8) SignStateConsen
}
}

func (signState *SignState) GetFromCache(hrs HRSKey, lock *sync.Mutex) (HRSKey, *SignStateConsensus) {
if lock != nil {
lock.Lock()
defer lock.Unlock()
}
latestBlock := HRSKey{
Height: signState.Height,
Round: signState.Round,
Step: signState.Step,
}
if ssc, ok := signState.cache[hrs]; ok {
return latestBlock, &ssc
}
return latestBlock, nil
}

func (signState *SignState) Save(ssc SignStateConsensus, lock *sync.Mutex) error {
// One lock/unlock for less/equal check and mutation.
// Setting nil for lock for getErrorIfLessOrEqual to avoid recursive lock
Expand All @@ -90,6 +108,13 @@ func (signState *SignState) Save(ssc SignStateConsensus, lock *sync.Mutex) error
}
// HRS is greater than existing state, allow

signState.cache[HRSKey{Height: ssc.Height, Round: ssc.Round, Step: ssc.Step}] = ssc
for hrs := range signState.cache {
if hrs.Height < ssc.Height-blocksToCache {
delete(signState.cache, hrs)
}
}

signState.Height = ssc.Height
signState.Round = ssc.Round
signState.Step = ssc.Step
Expand Down Expand Up @@ -156,6 +181,18 @@ func (signState *SignState) CheckHRS(hrs HRSKey) (bool, error) {
return false, nil
}

type SameHRSError struct {
msg string
}

func (e *SameHRSError) Error() string { return e.msg }

func newSameHRSError(hrs HRSKey) *SameHRSError {
return &SameHRSError{
msg: fmt.Sprintf("HRS is the same as current: %d:%d:%d", hrs.Height, hrs.Round, hrs.Step),
}
}

func (signState *SignState) GetErrorIfLessOrEqual(height int64, round int64, step int8, lock *sync.Mutex) error {
if lock != nil {
lock.Lock()
Expand Down Expand Up @@ -184,8 +221,8 @@ func (signState *SignState) GetErrorIfLessOrEqual(height int64, round int64, ste
return errors.New("step regression not allowed")
}
if step == signState.Step {
// same HRS as current!
return errors.New("not allowing double sign of current latest HRS")
// same HRS as current
return newSameHRSError(HRSKey{Height: height, Round: round, Step: step})
}
// Step is greater, so all good
return nil
Expand All @@ -203,6 +240,14 @@ func LoadSignState(filepath string) (SignState, error) {
if err != nil {
return state, err
}
state.cache = make(map[HRSKey]SignStateConsensus)
state.cache[HRSKey{Height: state.Height, Round: state.Round, Step: state.Step}] = SignStateConsensus{
Height: state.Height,
Round: state.Round,
Step: state.Step,
Signature: state.Signature,
SignBytes: state.SignBytes,
}
state.filePath = filepath
return state, nil
}
Expand All @@ -220,6 +265,7 @@ func LoadOrCreateSignState(filepath string) (SignState, error) {
// Make an empty sign state and save it
state := SignState{}
state.filePath = filepath
state.cache = make(map[HRSKey]SignStateConsensus)
state.save()
return state, nil
}
Expand All @@ -236,6 +282,16 @@ func (signState *SignState) OnlyDifferByTimestamp(signBytes []byte) (time.Time,
return time.Time{}, false
}

func (signState *SignStateConsensus) OnlyDifferByTimestamp(signBytes []byte) (time.Time, bool) {
if signState.Step == stepPropose {
return checkProposalOnlyDifferByTimestamp(signState.SignBytes, signBytes)
} else if signState.Step == stepPrevote || signState.Step == stepPrecommit {
return checkVoteOnlyDifferByTimestamp(signState.SignBytes, signBytes)
}

return time.Time{}, false
}

func checkVoteOnlyDifferByTimestamp(lastSignBytes, newSignBytes []byte) (time.Time, bool) {
var lastVote, newVote tmProto.CanonicalVote
if err := protoio.UnmarshalDelimited(lastSignBytes, &lastVote); err != nil {
Expand Down
44 changes: 24 additions & 20 deletions signer/threshold_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewThresholdValidator(opt *ThresholdValidatorOpt) *ThresholdValidator {
Round: opt.SignState.Round,
Step: opt.SignState.Step,
filePath: "none",
cache: make(map[HRSKey]SignStateConsensus),
}
validator.lastSignStateInitiatedMutex = sync.Mutex{}
validator.raftStore = opt.RaftStore
Expand Down Expand Up @@ -263,36 +264,39 @@ func (pv *ThresholdValidator) SignBlock(chainID string, block *block) ([]byte, t

pv.logger.Debug("I am the raft leader. Managing the sign process for this block")

// the block sign state for caching full block signatures
lss := pv.lastSignState

hrs := HRSKey{
Height: height,
Round: round,
Step: step,
}

// check watermark
sameHRS, err := lss.CheckHRS(hrs)
if err != nil {
return nil, stamp, err
}
signBytes := block.SignBytes

// Keep track of the last block that we began the signing process for. Only allow one attempt per block
if err := pv.SaveLastSignedStateInitiated(NewSignStateConsensus(height, round, step)); err != nil {
return nil, stamp, pv.newBeyondBlockError(hrs)
}

signBytes := block.SignBytes

if sameHRS {
if bytes.Equal(signBytes, lss.SignBytes) {
return lss.Signature, block.Timestamp, nil
} else if timestamp, ok := lss.OnlyDifferByTimestamp(signBytes); ok {
return lss.Signature, timestamp, nil
switch err.(type) {
case *SameHRSError:
// Wait for last sign state signature to be the same block
for i := 0; i < 100; i++ {
time.Sleep(10 * time.Millisecond)
latestBlock, existingSignature := pv.lastSignState.GetFromCache(hrs, &pv.lastSignStateMutex)
if existingSignature != nil {
if bytes.Equal(signBytes, existingSignature.SignBytes) {
return existingSignature.Signature, block.Timestamp, nil
} else if timestamp, ok := existingSignature.OnlyDifferByTimestamp(signBytes); ok {
return existingSignature.Signature, timestamp, nil
}
return nil, stamp, errors.New("conflicting data")
} else if latestBlock.Height > height ||
(latestBlock.Height == height && latestBlock.Round > round) ||
(latestBlock.Height == height && latestBlock.Round == round && latestBlock.Step > step) {
return nil, stamp, pv.newBeyondBlockError(hrs)
}
}
return nil, stamp, errors.New("timed out waiting for block signature from cluster")
default:
return nil, stamp, pv.newBeyondBlockError(hrs)
}

return nil, stamp, errors.New("conflicting data")
}

numPeers := len(pv.peers)
Expand Down

0 comments on commit 0134855

Please sign in to comment.