From ec726d441ceeab34b5ec4be4d5a29268a900a181 Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Wed, 23 Aug 2023 15:47:03 -0400 Subject: [PATCH 1/2] Refactor init functions to separate node from rpc server --- internal/node/node.go | 55 +++++++++++++++++ internal/rpc/rpc.go | 98 +++---------------------------- main.go | 9 ++- node/engine/store/durablestore.go | 7 ++- node/engine/store/store.go | 25 ++++++++ node_test/rpc_test.go | 37 +++++++++++- 6 files changed, 138 insertions(+), 93 deletions(-) create mode 100644 internal/node/node.go diff --git a/internal/node/node.go b/internal/node/node.go new file mode 100644 index 000000000..84103ba72 --- /dev/null +++ b/internal/node/node.go @@ -0,0 +1,55 @@ +package node + +import ( + "fmt" + "os" + + "github.com/ethereum/go-ethereum/common" + "github.com/rs/zerolog" + "github.com/statechannels/go-nitro/internal/chain" + "github.com/statechannels/go-nitro/node" + "github.com/statechannels/go-nitro/node/engine" + "github.com/statechannels/go-nitro/node/engine/chainservice" + "github.com/statechannels/go-nitro/node/engine/store" + "github.com/tidwall/buntdb" + + p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" +) + +func InitializeNode(pkString string, chainOpts chain.ChainOpts, + useDurableStore bool, durableStoreFolder string, msgPort int, logDestination *os.File, bootPeers []string, +) (*node.Node, *store.Store, *p2pms.P2PMessageService, *chainservice.EthChainService, error) { + if pkString == "" { + panic("pk must be set") + } + + logger := zerolog.New(logDestination). + With(). + Timestamp(). + Logger() + + pk := common.Hex2Bytes(pkString) + ourStore, err := store.NewStore(pk, logger, useDurableStore, durableStoreFolder, buntdb.Config{}) + if err != nil { + return nil, nil, nil, nil, err + } + + logger.Info().Msg("Initializing message service on port " + fmt.Sprint(msgPort) + "...") + messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, logDestination, bootPeers) + + logger.Info().Msg("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...") + ourChain, err := chain.InitializeEthChainService(chainOpts) + if err != nil { + return nil, nil, nil, nil, err + } + + node := node.New( + messageService, + ourChain, + ourStore, + logDestination, + &engine.PermissivePolicy{}, + ) + + return &node, &ourStore, messageService, ourChain, nil +} diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index 266796a0a..753ded1ac 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -3,118 +3,38 @@ package rpc import ( "fmt" "os" - "path/filepath" - "github.com/ethereum/go-ethereum/common" "github.com/rs/zerolog" - "github.com/statechannels/go-nitro/crypto" - "github.com/statechannels/go-nitro/internal/chain" - "github.com/statechannels/go-nitro/internal/logging" "github.com/statechannels/go-nitro/node" - "github.com/statechannels/go-nitro/node/engine" - "github.com/statechannels/go-nitro/node/engine/chainservice" - p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" - "github.com/statechannels/go-nitro/node/engine/store" "github.com/statechannels/go-nitro/rpc" "github.com/statechannels/go-nitro/rpc/transport" "github.com/statechannels/go-nitro/rpc/transport/nats" "github.com/statechannels/go-nitro/rpc/transport/ws" - "github.com/tidwall/buntdb" ) -func InitChainServiceAndRunRpcServer(pkString string, chainOpts chain.ChainOpts, - useDurableStore bool, durableStoreFolder string, useNats bool, msgPort int, rpcPort int, - bootPeers []string, -) (*rpc.RpcServer, *node.Node, *p2pms.P2PMessageService, error) { - if pkString == "" { - panic("pk must be set") - } - pk := common.Hex2Bytes(pkString) - - chainService, err := chain.InitializeEthChainService(chainOpts) - if err != nil { - return nil, nil, nil, err - } - - transportType := transport.Ws - if useNats { - transportType = transport.Nats - } - rpcServer, node, messageService, err := RunRpcServer(pk, chainService, useDurableStore, durableStoreFolder, msgPort, rpcPort, transportType, os.Stdout, bootPeers) - if err != nil { - return nil, nil, nil, err - } - - fmt.Println("Nitro as a Service listening on port", rpcPort) - return rpcServer, node, messageService, nil -} - -func RunRpcServer(pk []byte, chainService chainservice.ChainService, - useDurableStore bool, durableStoreFolder string, msgPort int, rpcPort int, transportType transport.TransportType, logDestination *os.File, - bootPeers []string, -) (*rpc.RpcServer, *node.Node, *p2pms.P2PMessageService, error) { - me := crypto.GetAddressFromSecretKeyBytes(pk) - - var ourStore store.Store - var err error - +func InitializeRpcServer(node *node.Node, rpcPort int, useNats bool, logDestination *os.File) (*rpc.RpcServer, error) { logger := zerolog.New(logDestination). With(). Timestamp(). Logger() - if useDurableStore { - dataFolder := filepath.Join(durableStoreFolder, me.String()) - logger.Info().Msgf("Initialising durable store in %s...", dataFolder) - - ourStore, err = store.NewDurableStore(pk, dataFolder, buntdb.Config{}) - if err != nil { - return nil, nil, nil, err - } - - } else { - logger.Info().Msg("Initialising mem store...") - ourStore = store.NewMemStore(pk) - } - - logger.Info().Msg("Initializing message service on port " + fmt.Sprint(msgPort) + "...") - messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, logDestination, bootPeers) - node := node.New( - messageService, - chainService, - ourStore, - logDestination, - &engine.PermissivePolicy{}) - - serverLogger := logging.WithAddress(zerolog.New(logDestination). - Level(zerolog.TraceLevel). - With(). - Timestamp(). - Str("rpc", "server"), ourStore.GetAddress()). - Logger() - var transport transport.Responder + var err error - switch transportType { - case "nats": - + if useNats { logger.Info().Msg("Initializing NATS RPC transport...") transport, err = nats.NewNatsTransportAsServer(rpcPort) - case "ws": + } else { logger.Info().Msg("Initializing websocket RPC transport...") - - transport, err = ws.NewWebSocketTransportAsServer(fmt.Sprint(rpcPort), serverLogger) - default: - err = fmt.Errorf("unknown transport type %s", transportType) + transport, err = ws.NewWebSocketTransportAsServer(fmt.Sprint(rpcPort), logger) } - if err != nil { - return nil, nil, nil, err + return nil, err } - rpcServer, err := rpc.NewRpcServer(&node, &serverLogger, transport) + rpcServer, err := rpc.NewRpcServer(node, &logger, transport) if err != nil { - return nil, nil, nil, err + return nil, err } - return rpcServer, &node, messageService, nil + return rpcServer, nil } diff --git a/main.go b/main.go index f6b10bb94..6d69b903c 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/statechannels/go-nitro/internal/chain" + "github.com/statechannels/go-nitro/internal/node" "github.com/statechannels/go-nitro/internal/rpc" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" @@ -172,7 +173,13 @@ func main() { if bootPeers != "" { peerSlice = strings.Split(bootPeers, ",") } - rpcServer, _, _, err := rpc.InitChainServiceAndRunRpcServer(pkString, chainOpts, useDurableStore, durableStoreFolder, useNats, msgPort, rpcPort, peerSlice) + + node, _, _, _, err := node.InitializeNode(pkString, chainOpts, useDurableStore, durableStoreFolder, msgPort, os.Stdout, peerSlice) + if err != nil { + return err + } + + rpcServer, err := rpc.InitializeRpcServer(node, rpcPort, useNats, os.Stdout) if err != nil { return err } diff --git a/node/engine/store/durablestore.go b/node/engine/store/durablestore.go index f3ab7021b..a3d238be7 100644 --- a/node/engine/store/durablestore.go +++ b/node/engine/store/durablestore.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "path/filepath" "github.com/ethereum/go-ethereum/common" "github.com/statechannels/go-nitro/channel" @@ -36,7 +37,11 @@ type DurableStore struct { // It will create the folder if it does not exist func NewDurableStore(key []byte, folder string, config buntdb.Config) (Store, error) { ps := DurableStore{} - err := os.MkdirAll(folder, os.ModePerm) + + me := crypto.GetAddressFromSecretKeyBytes(key) + dataFolder := filepath.Join(folder, me.String()) + + err := os.MkdirAll(dataFolder, os.ModePerm) if err != nil { return nil, err } diff --git a/node/engine/store/store.go b/node/engine/store/store.go index a644a2408..e8b285d7d 100644 --- a/node/engine/store/store.go +++ b/node/engine/store/store.go @@ -3,12 +3,16 @@ package store // import "github.com/statechannels/go-nitro/node/engine/store" import ( "io" + "path/filepath" + "github.com/rs/zerolog" "github.com/statechannels/go-nitro/channel" "github.com/statechannels/go-nitro/channel/consensus_channel" + "github.com/statechannels/go-nitro/crypto" "github.com/statechannels/go-nitro/payments" "github.com/statechannels/go-nitro/protocols" "github.com/statechannels/go-nitro/types" + "github.com/tidwall/buntdb" ) const ( @@ -44,3 +48,24 @@ type ConsensusChannelStore interface { SetConsensusChannel(*consensus_channel.ConsensusChannel) error DestroyConsensusChannel(id types.Destination) error } + +func NewStore(pk []byte, logger zerolog.Logger, useDurableStore bool, durableStoreFolder string, buntDbConfig buntdb.Config) (Store, error) { + var ourStore Store + var err error + + if useDurableStore { + me := crypto.GetAddressFromSecretKeyBytes(pk) + dataFolder := filepath.Join(durableStoreFolder, me.String()) + + logger.Info().Msgf("Initialising durable store in %s...", dataFolder) + ourStore, err = NewDurableStore(pk, dataFolder, buntdb.Config{}) + if err != nil { + return nil, err + } + } else { + logger.Info().Msg("Initialising mem store...") + ourStore = NewMemStore(pk) + } + + return ourStore, nil +} diff --git a/node_test/rpc_test.go b/node_test/rpc_test.go index 27f5b799c..9276b68ab 100644 --- a/node_test/rpc_test.go +++ b/node_test/rpc_test.go @@ -18,8 +18,11 @@ import ( ta "github.com/statechannels/go-nitro/internal/testactors" "github.com/statechannels/go-nitro/internal/testdata" "github.com/statechannels/go-nitro/internal/testhelpers" + "github.com/statechannels/go-nitro/node" + "github.com/statechannels/go-nitro/node/engine" "github.com/statechannels/go-nitro/node/engine/chainservice" p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" + "github.com/statechannels/go-nitro/node/engine/store" "github.com/statechannels/go-nitro/node/query" "github.com/statechannels/go-nitro/protocols/directfund" "github.com/statechannels/go-nitro/protocols/virtualfund" @@ -28,6 +31,7 @@ import ( natstrans "github.com/statechannels/go-nitro/rpc/transport/nats" "github.com/statechannels/go-nitro/rpc/transport/ws" "github.com/statechannels/go-nitro/types" + "github.com/tidwall/buntdb" ) func simpleOutcome(a, b types.Address, aBalance, bBalance uint) outcome.Exit { @@ -410,9 +414,38 @@ func setupNitroNodeWithRPCClient( connectionType transport.TransportType, bootPeers []string, ) (rpc.RpcClientApi, *p2pms.P2PMessageService, func()) { - var err error + logger := zerolog.New(logDestination). + With(). + Timestamp(). + Logger() + dataFolder, cleanupData := testhelpers.GenerateTempStoreFolder() - rpcServer, _, messageService, err := interRpc.RunRpcServer(pk, chain, true, dataFolder, msgPort, rpcPort, connectionType, logDestination, bootPeers) + ourStore, err := store.NewStore(pk, logger, true, dataFolder, buntdb.Config{}) + if err != nil { + t.Fatal(err) + } + + logger.Info().Msg("Initializing message service on port " + fmt.Sprint(msgPort) + "...") + messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, logDestination, bootPeers) + + node := node.New( + messageService, + chain, + ourStore, + logDestination, + &engine.PermissivePolicy{}) + + var useNats bool + switch connectionType { + case "nats": + useNats = true + case "ws": + useNats = false + default: + err = fmt.Errorf("unknown connection type %v", connectionType) + panic(err) + } + rpcServer, err := interRpc.InitializeRpcServer(&node, rpcPort, useNats, logDestination) if err != nil { t.Fatal(err) } From 3ae6d2cd3f69404175bbb0b5afb8c77cd960eddb Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Mon, 28 Aug 2023 14:02:40 -0400 Subject: [PATCH 2/2] Use slog throughout new init functions --- internal/node/node.go | 16 +++++----------- internal/rpc/rpc.go | 15 +++++++-------- main.go | 8 ++++---- node/engine/store/store.go | 8 ++++---- node_test/rpc_test.go | 14 ++++---------- 5 files changed, 24 insertions(+), 37 deletions(-) diff --git a/internal/node/node.go b/internal/node/node.go index 84103ba72..6efbff16b 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -2,10 +2,10 @@ package node import ( "fmt" + "log/slog" "os" "github.com/ethereum/go-ethereum/common" - "github.com/rs/zerolog" "github.com/statechannels/go-nitro/internal/chain" "github.com/statechannels/go-nitro/node" "github.com/statechannels/go-nitro/node/engine" @@ -23,21 +23,16 @@ func InitializeNode(pkString string, chainOpts chain.ChainOpts, panic("pk must be set") } - logger := zerolog.New(logDestination). - With(). - Timestamp(). - Logger() - pk := common.Hex2Bytes(pkString) - ourStore, err := store.NewStore(pk, logger, useDurableStore, durableStoreFolder, buntdb.Config{}) + ourStore, err := store.NewStore(pk, useDurableStore, durableStoreFolder, buntdb.Config{}) if err != nil { return nil, nil, nil, nil, err } - logger.Info().Msg("Initializing message service on port " + fmt.Sprint(msgPort) + "...") - messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, logDestination, bootPeers) + slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...") + messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers) - logger.Info().Msg("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...") + slog.Info("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...") ourChain, err := chain.InitializeEthChainService(chainOpts) if err != nil { return nil, nil, nil, nil, err @@ -47,7 +42,6 @@ func InitializeNode(pkString string, chainOpts chain.ChainOpts, messageService, ourChain, ourStore, - logDestination, &engine.PermissivePolicy{}, ) diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index bc1409fab..6bf476232 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -2,9 +2,8 @@ package rpc import ( "fmt" - "os" + "log/slog" - "github.com/rs/zerolog" "github.com/statechannels/go-nitro/node" "github.com/statechannels/go-nitro/rpc" "github.com/statechannels/go-nitro/rpc/transport" @@ -12,25 +11,25 @@ import ( "github.com/statechannels/go-nitro/rpc/transport/ws" ) -func InitializeRpcServer(node *node.Node, rpcPort int, useNats bool, logDestination *os.File) (*rpc.RpcServer, error) { +func InitializeRpcServer(node *node.Node, rpcPort int, useNats bool) (*rpc.RpcServer, error) { var transport transport.Responder var err error if useNats { - logger.Info().Msg("Initializing NATS RPC transport...") + slog.Info("Initializing NATS RPC transport...") transport, err = nats.NewNatsTransportAsServer(rpcPort) } else { - logger.Info().Msg("Initializing websocket RPC transport...") - transport, err = ws.NewWebSocketTransportAsServer(fmt.Sprint(rpcPort), logger) + slog.Info("Initializing websocket RPC transport...") + transport, err = ws.NewWebSocketTransportAsServer(fmt.Sprint(rpcPort)) } if err != nil { return nil, err } - rpcServer, err := rpc.NewRpcServer(node, &logger, transport) + rpcServer, err := rpc.NewRpcServer(node, transport) if err != nil { return nil, err } - + return rpcServer, nil } diff --git a/main.go b/main.go index af36e6c14..0546bce92 100644 --- a/main.go +++ b/main.go @@ -10,8 +10,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/statechannels/go-nitro/internal/chain" - "github.com/statechannels/go-nitro/internal/node" "github.com/statechannels/go-nitro/internal/logging" + "github.com/statechannels/go-nitro/internal/node" "github.com/statechannels/go-nitro/internal/rpc" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" @@ -177,13 +177,13 @@ func main() { } logging.SetupDefaultLogger(os.Stdout, slog.LevelDebug) - - node, _, _, _, err := node.InitializeNode(pkString, chainOpts, useDurableStore, durableStoreFolder, msgPort, os.Stdout, peerSlice) + + node, _, _, _, err := node.InitializeNode(pkString, chainOpts, useDurableStore, durableStoreFolder, msgPort, os.Stdout, peerSlice) if err != nil { return err } - rpcServer, err := rpc.InitializeRpcServer(node, rpcPort, useNats, os.Stdout) + rpcServer, err := rpc.InitializeRpcServer(node, rpcPort, useNats) if err != nil { return err } diff --git a/node/engine/store/store.go b/node/engine/store/store.go index e8b285d7d..5b3a868c5 100644 --- a/node/engine/store/store.go +++ b/node/engine/store/store.go @@ -3,9 +3,9 @@ package store // import "github.com/statechannels/go-nitro/node/engine/store" import ( "io" + "log/slog" "path/filepath" - "github.com/rs/zerolog" "github.com/statechannels/go-nitro/channel" "github.com/statechannels/go-nitro/channel/consensus_channel" "github.com/statechannels/go-nitro/crypto" @@ -49,7 +49,7 @@ type ConsensusChannelStore interface { DestroyConsensusChannel(id types.Destination) error } -func NewStore(pk []byte, logger zerolog.Logger, useDurableStore bool, durableStoreFolder string, buntDbConfig buntdb.Config) (Store, error) { +func NewStore(pk []byte, useDurableStore bool, durableStoreFolder string, buntDbConfig buntdb.Config) (Store, error) { var ourStore Store var err error @@ -57,13 +57,13 @@ func NewStore(pk []byte, logger zerolog.Logger, useDurableStore bool, durableSto me := crypto.GetAddressFromSecretKeyBytes(pk) dataFolder := filepath.Join(durableStoreFolder, me.String()) - logger.Info().Msgf("Initialising durable store in %s...", dataFolder) + slog.Info("Initialising durable store...", "dataFolder", dataFolder) ourStore, err = NewDurableStore(pk, dataFolder, buntdb.Config{}) if err != nil { return nil, err } } else { - logger.Info().Msg("Initialising mem store...") + slog.Info("Initialising mem store...") ourStore = NewMemStore(pk) } diff --git a/node_test/rpc_test.go b/node_test/rpc_test.go index 3eff952de..abe198701 100644 --- a/node_test/rpc_test.go +++ b/node_test/rpc_test.go @@ -397,25 +397,19 @@ func setupNitroNodeWithRPCClient( connectionType transport.TransportType, bootPeers []string, ) (rpc.RpcClientApi, *p2pms.P2PMessageService, func()) { - logger := zerolog.New(logDestination). - With(). - Timestamp(). - Logger() - dataFolder, cleanupData := testhelpers.GenerateTempStoreFolder() - ourStore, err := store.NewStore(pk, logger, true, dataFolder, buntdb.Config{}) + ourStore, err := store.NewStore(pk, true, dataFolder, buntdb.Config{}) if err != nil { t.Fatal(err) } - logger.Info().Msg("Initializing message service on port " + fmt.Sprint(msgPort) + "...") - messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, logDestination, bootPeers) + slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...") + messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers) node := node.New( messageService, chain, ourStore, - logDestination, &engine.PermissivePolicy{}) var useNats bool @@ -428,7 +422,7 @@ func setupNitroNodeWithRPCClient( err = fmt.Errorf("unknown connection type %v", connectionType) panic(err) } - rpcServer, err := interRpc.InitializeRpcServer(&node, rpcPort, useNats, logDestination) + rpcServer, err := interRpc.InitializeRpcServer(&node, rpcPort, useNats) if err != nil { t.Fatal(err) }