Skip to content

Commit

Permalink
Merge pull request #1584 from statechannels/refactor-init-functions
Browse files Browse the repository at this point in the history
Refactor init functions to separate node from rpc server
  • Loading branch information
bitwiseguy authored Aug 29, 2023
2 parents 7da3527 + e857f5e commit 91837d4
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 91 deletions.
49 changes: 49 additions & 0 deletions internal/node/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package node

import (
"fmt"
"log/slog"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/statechannels/go-nitro/internal/chain"
"github.com/statechannels/go-nitro/node"
"github.com/statechannels/go-nitro/node/engine"
"github.com/statechannels/go-nitro/node/engine/chainservice"
"github.com/statechannels/go-nitro/node/engine/store"
"github.com/tidwall/buntdb"

p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
)

func InitializeNode(pkString string, chainOpts chain.ChainOpts,
useDurableStore bool, durableStoreFolder string, msgPort int, logDestination *os.File, bootPeers []string,
) (*node.Node, *store.Store, *p2pms.P2PMessageService, *chainservice.EthChainService, error) {
if pkString == "" {
panic("pk must be set")
}

pk := common.Hex2Bytes(pkString)
ourStore, err := store.NewStore(pk, useDurableStore, durableStoreFolder, buntdb.Config{})
if err != nil {
return nil, nil, nil, nil, err
}

slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...")
messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers)

slog.Info("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...")
ourChain, err := chain.InitializeEthChainService(chainOpts)
if err != nil {
return nil, nil, nil, nil, err
}

node := node.New(
messageService,
ourChain,
ourStore,
&engine.PermissivePolicy{},
)

return &node, &ourStore, messageService, ourChain, nil
}
97 changes: 11 additions & 86 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,108 +3,33 @@ package rpc
import (
"fmt"
"log/slog"
"path/filepath"

"github.com/ethereum/go-ethereum/common"

"github.com/statechannels/go-nitro/crypto"
"github.com/statechannels/go-nitro/internal/chain"
"github.com/statechannels/go-nitro/internal/logging"
"github.com/statechannels/go-nitro/node"
"github.com/statechannels/go-nitro/node/engine"
"github.com/statechannels/go-nitro/node/engine/chainservice"
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
"github.com/statechannels/go-nitro/node/engine/store"
"github.com/statechannels/go-nitro/rpc"
"github.com/statechannels/go-nitro/rpc/transport"
"github.com/statechannels/go-nitro/rpc/transport/nats"
"github.com/statechannels/go-nitro/rpc/transport/ws"
"github.com/tidwall/buntdb"
)

func InitChainServiceAndRunRpcServer(pkString string, chainOpts chain.ChainOpts,
useDurableStore bool, durableStoreFolder string, useNats bool, msgPort int, rpcPort int,
bootPeers []string,
) (*rpc.RpcServer, *node.Node, *p2pms.P2PMessageService, error) {
if pkString == "" {
panic("pk must be set")
}
pk := common.Hex2Bytes(pkString)

chainService, err := chain.InitializeEthChainService(chainOpts)
if err != nil {
return nil, nil, nil, err
}

transportType := transport.Ws
if useNats {
transportType = transport.Nats
}

rpcServer, node, messageService, err := RunRpcServer(pk, chainService, useDurableStore, durableStoreFolder, msgPort, rpcPort, transportType, bootPeers)
if err != nil {
return nil, nil, nil, err
}

slog.Info("Nitro as a Service listening", "rpc-port", rpcPort, "msg-port", msgPort, "transport", transportType, "address", node.Address.String())

return rpcServer, node, messageService, nil
}

func RunRpcServer(pk []byte, chainService chainservice.ChainService,
useDurableStore bool, durableStoreFolder string, msgPort int, rpcPort int, transportType transport.TransportType,
bootPeers []string,
) (*rpc.RpcServer, *node.Node, *p2pms.P2PMessageService, error) {
me := crypto.GetAddressFromSecretKeyBytes(pk)
logger := logging.LoggerWithAddress(slog.Default(), me)
var ourStore store.Store
var err error

if useDurableStore {
dataFolder := filepath.Join(durableStoreFolder, me.String())
logger.Info("Initialising durable store", "dataFolder", dataFolder)

ourStore, err = store.NewDurableStore(pk, dataFolder, buntdb.Config{})
if err != nil {
return nil, nil, nil, err
}

} else {
logger.Info("Initialising mem store...")
ourStore = store.NewMemStore(pk)
}

logger.Info("Initializing message service ", "port", msgPort)

messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers)
node := node.New(
messageService,
chainService,
ourStore,
&engine.PermissivePolicy{})

func InitializeRpcServer(node *node.Node, rpcPort int, useNats bool) (*rpc.RpcServer, error) {
var transport transport.Responder
var err error

switch transportType {
case "nats":

logger.Info("Initializing NATS RPC transport...")
if useNats {
slog.Info("Initializing NATS RPC transport...")
transport, err = nats.NewNatsTransportAsServer(rpcPort)
case "ws":
logger.Info("Initializing websocket RPC transport...")

} else {
slog.Info("Initializing websocket RPC transport...")
transport, err = ws.NewWebSocketTransportAsServer(fmt.Sprint(rpcPort))
default:
err = fmt.Errorf("unknown transport type %s", transportType)
}

if err != nil {
return nil, nil, nil, err
return nil, err
}

rpcServer, err := rpc.NewRpcServer(&node, transport)
rpcServer, err := rpc.NewRpcServer(node, transport)
if err != nil {
return nil, nil, nil, err
return nil, err
}
return rpcServer, &node, messageService, nil

return rpcServer, nil
}
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/statechannels/go-nitro/internal/chain"
"github.com/statechannels/go-nitro/internal/logging"
"github.com/statechannels/go-nitro/internal/node"
"github.com/statechannels/go-nitro/internal/rpc"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
Expand Down Expand Up @@ -177,7 +178,12 @@ func main() {

logging.SetupDefaultLogger(os.Stdout, slog.LevelDebug)

rpcServer, _, _, err := rpc.InitChainServiceAndRunRpcServer(pkString, chainOpts, useDurableStore, durableStoreFolder, useNats, msgPort, rpcPort, peerSlice)
node, _, _, _, err := node.InitializeNode(pkString, chainOpts, useDurableStore, durableStoreFolder, msgPort, os.Stdout, peerSlice)
if err != nil {
return err
}

rpcServer, err := rpc.InitializeRpcServer(node, rpcPort, useNats)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion node/engine/store/durablestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -38,7 +39,11 @@ type DurableStore struct {
// It will create the folder if it does not exist
func NewDurableStore(key []byte, folder string, config buntdb.Config) (Store, error) {
ps := DurableStore{}
err := os.MkdirAll(folder, os.ModePerm)

me := crypto.GetAddressFromSecretKeyBytes(key)
dataFolder := filepath.Join(folder, me.String())

err := os.MkdirAll(dataFolder, os.ModePerm)
if err != nil {
return nil, err
}
Expand Down
25 changes: 25 additions & 0 deletions node/engine/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package store // import "github.com/statechannels/go-nitro/node/engine/store"

import (
"io"
"log/slog"
"path/filepath"

"github.com/statechannels/go-nitro/channel"
"github.com/statechannels/go-nitro/channel/consensus_channel"
"github.com/statechannels/go-nitro/crypto"
"github.com/statechannels/go-nitro/payments"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/types"
"github.com/tidwall/buntdb"
)

const (
Expand Down Expand Up @@ -47,3 +51,24 @@ type ConsensusChannelStore interface {
SetConsensusChannel(*consensus_channel.ConsensusChannel) error
DestroyConsensusChannel(id types.Destination) error
}

func NewStore(pk []byte, useDurableStore bool, durableStoreFolder string, buntDbConfig buntdb.Config) (Store, error) {
var ourStore Store
var err error

if useDurableStore {
me := crypto.GetAddressFromSecretKeyBytes(pk)
dataFolder := filepath.Join(durableStoreFolder, me.String())

slog.Info("Initialising durable store...", "dataFolder", dataFolder)
ourStore, err = NewDurableStore(pk, dataFolder, buntdb.Config{})
if err != nil {
return nil, err
}
} else {
slog.Info("Initialising mem store...")
ourStore = NewMemStore(pk)
}

return ourStore, nil
}
32 changes: 29 additions & 3 deletions node_test/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
ta "github.com/statechannels/go-nitro/internal/testactors"
"github.com/statechannels/go-nitro/internal/testdata"
"github.com/statechannels/go-nitro/internal/testhelpers"
"github.com/statechannels/go-nitro/node"
"github.com/statechannels/go-nitro/node/engine"
"github.com/statechannels/go-nitro/node/engine/chainservice"
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
"github.com/statechannels/go-nitro/node/engine/store"
"github.com/statechannels/go-nitro/node/query"
"github.com/statechannels/go-nitro/protocols/directfund"
"github.com/statechannels/go-nitro/protocols/virtualfund"
Expand All @@ -28,6 +31,7 @@ import (
natstrans "github.com/statechannels/go-nitro/rpc/transport/nats"
"github.com/statechannels/go-nitro/rpc/transport/ws"
"github.com/statechannels/go-nitro/types"
"github.com/tidwall/buntdb"

"github.com/statechannels/go-nitro/crypto"
)
Expand Down Expand Up @@ -393,10 +397,32 @@ func setupNitroNodeWithRPCClient(
connectionType transport.TransportType,
bootPeers []string,
) (rpc.RpcClientApi, *p2pms.P2PMessageService, func()) {
var err error

dataFolder, cleanupData := testhelpers.GenerateTempStoreFolder()
rpcServer, _, messageService, err := interRpc.RunRpcServer(pk, chain, true, dataFolder, msgPort, rpcPort, connectionType, bootPeers)
ourStore, err := store.NewStore(pk, true, dataFolder, buntdb.Config{})
if err != nil {
t.Fatal(err)
}

slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...")
messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers)

node := node.New(
messageService,
chain,
ourStore,
&engine.PermissivePolicy{})

var useNats bool
switch connectionType {
case "nats":
useNats = true
case "ws":
useNats = false
default:
err = fmt.Errorf("unknown connection type %v", connectionType)
panic(err)
}
rpcServer, err := interRpc.InitializeRpcServer(&node, rpcPort, useNats)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 91837d4

Please sign in to comment.