diff --git a/internal/node/node.go b/internal/node/node.go new file mode 100644 index 000000000..6efbff16b --- /dev/null +++ b/internal/node/node.go @@ -0,0 +1,49 @@ +package node + +import ( + "fmt" + "log/slog" + "os" + + "github.com/ethereum/go-ethereum/common" + "github.com/statechannels/go-nitro/internal/chain" + "github.com/statechannels/go-nitro/node" + "github.com/statechannels/go-nitro/node/engine" + "github.com/statechannels/go-nitro/node/engine/chainservice" + "github.com/statechannels/go-nitro/node/engine/store" + "github.com/tidwall/buntdb" + + p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" +) + +func InitializeNode(pkString string, chainOpts chain.ChainOpts, + useDurableStore bool, durableStoreFolder string, msgPort int, logDestination *os.File, bootPeers []string, +) (*node.Node, *store.Store, *p2pms.P2PMessageService, *chainservice.EthChainService, error) { + if pkString == "" { + panic("pk must be set") + } + + pk := common.Hex2Bytes(pkString) + ourStore, err := store.NewStore(pk, useDurableStore, durableStoreFolder, buntdb.Config{}) + if err != nil { + return nil, nil, nil, nil, err + } + + slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...") + messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers) + + slog.Info("Initializing chain service and connecting to " + chainOpts.ChainUrl + "...") + ourChain, err := chain.InitializeEthChainService(chainOpts) + if err != nil { + return nil, nil, nil, nil, err + } + + node := node.New( + messageService, + ourChain, + ourStore, + &engine.PermissivePolicy{}, + ) + + return &node, &ourStore, messageService, ourChain, nil +} diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index 161940d6d..6bf476232 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -3,108 +3,33 @@ package rpc import ( "fmt" "log/slog" - "path/filepath" - "github.com/ethereum/go-ethereum/common" - - "github.com/statechannels/go-nitro/crypto" - "github.com/statechannels/go-nitro/internal/chain" - "github.com/statechannels/go-nitro/internal/logging" "github.com/statechannels/go-nitro/node" - "github.com/statechannels/go-nitro/node/engine" - "github.com/statechannels/go-nitro/node/engine/chainservice" - p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" - "github.com/statechannels/go-nitro/node/engine/store" "github.com/statechannels/go-nitro/rpc" "github.com/statechannels/go-nitro/rpc/transport" "github.com/statechannels/go-nitro/rpc/transport/nats" "github.com/statechannels/go-nitro/rpc/transport/ws" - "github.com/tidwall/buntdb" ) -func InitChainServiceAndRunRpcServer(pkString string, chainOpts chain.ChainOpts, - useDurableStore bool, durableStoreFolder string, useNats bool, msgPort int, rpcPort int, - bootPeers []string, -) (*rpc.RpcServer, *node.Node, *p2pms.P2PMessageService, error) { - if pkString == "" { - panic("pk must be set") - } - pk := common.Hex2Bytes(pkString) - - chainService, err := chain.InitializeEthChainService(chainOpts) - if err != nil { - return nil, nil, nil, err - } - - transportType := transport.Ws - if useNats { - transportType = transport.Nats - } - - rpcServer, node, messageService, err := RunRpcServer(pk, chainService, useDurableStore, durableStoreFolder, msgPort, rpcPort, transportType, bootPeers) - if err != nil { - return nil, nil, nil, err - } - - slog.Info("Nitro as a Service listening", "rpc-port", rpcPort, "msg-port", msgPort, "transport", transportType, "address", node.Address.String()) - - return rpcServer, node, messageService, nil -} - -func RunRpcServer(pk []byte, chainService chainservice.ChainService, - useDurableStore bool, durableStoreFolder string, msgPort int, rpcPort int, transportType transport.TransportType, - bootPeers []string, -) (*rpc.RpcServer, *node.Node, *p2pms.P2PMessageService, error) { - me := crypto.GetAddressFromSecretKeyBytes(pk) - logger := logging.LoggerWithAddress(slog.Default(), me) - var ourStore store.Store - var err error - - if useDurableStore { - dataFolder := filepath.Join(durableStoreFolder, me.String()) - logger.Info("Initialising durable store", "dataFolder", dataFolder) - - ourStore, err = store.NewDurableStore(pk, dataFolder, buntdb.Config{}) - if err != nil { - return nil, nil, nil, err - } - - } else { - logger.Info("Initialising mem store...") - ourStore = store.NewMemStore(pk) - } - - logger.Info("Initializing message service ", "port", msgPort) - - messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers) - node := node.New( - messageService, - chainService, - ourStore, - &engine.PermissivePolicy{}) - +func InitializeRpcServer(node *node.Node, rpcPort int, useNats bool) (*rpc.RpcServer, error) { var transport transport.Responder + var err error - switch transportType { - case "nats": - - logger.Info("Initializing NATS RPC transport...") + if useNats { + slog.Info("Initializing NATS RPC transport...") transport, err = nats.NewNatsTransportAsServer(rpcPort) - case "ws": - logger.Info("Initializing websocket RPC transport...") - + } else { + slog.Info("Initializing websocket RPC transport...") transport, err = ws.NewWebSocketTransportAsServer(fmt.Sprint(rpcPort)) - default: - err = fmt.Errorf("unknown transport type %s", transportType) } - if err != nil { - return nil, nil, nil, err + return nil, err } - rpcServer, err := rpc.NewRpcServer(&node, transport) + rpcServer, err := rpc.NewRpcServer(node, transport) if err != nil { - return nil, nil, nil, err + return nil, err } - return rpcServer, &node, messageService, nil + + return rpcServer, nil } diff --git a/main.go b/main.go index 251cc7f5a..0546bce92 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/statechannels/go-nitro/internal/chain" "github.com/statechannels/go-nitro/internal/logging" + "github.com/statechannels/go-nitro/internal/node" "github.com/statechannels/go-nitro/internal/rpc" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" @@ -177,7 +178,12 @@ func main() { logging.SetupDefaultLogger(os.Stdout, slog.LevelDebug) - rpcServer, _, _, err := rpc.InitChainServiceAndRunRpcServer(pkString, chainOpts, useDurableStore, durableStoreFolder, useNats, msgPort, rpcPort, peerSlice) + node, _, _, _, err := node.InitializeNode(pkString, chainOpts, useDurableStore, durableStoreFolder, msgPort, os.Stdout, peerSlice) + if err != nil { + return err + } + + rpcServer, err := rpc.InitializeRpcServer(node, rpcPort, useNats) if err != nil { return err } diff --git a/node/engine/store/durablestore.go b/node/engine/store/durablestore.go index 9ccc8d073..8a268b0fe 100644 --- a/node/engine/store/durablestore.go +++ b/node/engine/store/durablestore.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "path/filepath" "strconv" "github.com/ethereum/go-ethereum/common" @@ -38,7 +39,11 @@ type DurableStore struct { // It will create the folder if it does not exist func NewDurableStore(key []byte, folder string, config buntdb.Config) (Store, error) { ps := DurableStore{} - err := os.MkdirAll(folder, os.ModePerm) + + me := crypto.GetAddressFromSecretKeyBytes(key) + dataFolder := filepath.Join(folder, me.String()) + + err := os.MkdirAll(dataFolder, os.ModePerm) if err != nil { return nil, err } diff --git a/node/engine/store/store.go b/node/engine/store/store.go index 566f2f03f..1a7c28224 100644 --- a/node/engine/store/store.go +++ b/node/engine/store/store.go @@ -3,12 +3,16 @@ package store // import "github.com/statechannels/go-nitro/node/engine/store" import ( "io" + "log/slog" + "path/filepath" "github.com/statechannels/go-nitro/channel" "github.com/statechannels/go-nitro/channel/consensus_channel" + "github.com/statechannels/go-nitro/crypto" "github.com/statechannels/go-nitro/payments" "github.com/statechannels/go-nitro/protocols" "github.com/statechannels/go-nitro/types" + "github.com/tidwall/buntdb" ) const ( @@ -47,3 +51,24 @@ type ConsensusChannelStore interface { SetConsensusChannel(*consensus_channel.ConsensusChannel) error DestroyConsensusChannel(id types.Destination) error } + +func NewStore(pk []byte, useDurableStore bool, durableStoreFolder string, buntDbConfig buntdb.Config) (Store, error) { + var ourStore Store + var err error + + if useDurableStore { + me := crypto.GetAddressFromSecretKeyBytes(pk) + dataFolder := filepath.Join(durableStoreFolder, me.String()) + + slog.Info("Initialising durable store...", "dataFolder", dataFolder) + ourStore, err = NewDurableStore(pk, dataFolder, buntdb.Config{}) + if err != nil { + return nil, err + } + } else { + slog.Info("Initialising mem store...") + ourStore = NewMemStore(pk) + } + + return ourStore, nil +} diff --git a/node_test/rpc_test.go b/node_test/rpc_test.go index d78852162..abe198701 100644 --- a/node_test/rpc_test.go +++ b/node_test/rpc_test.go @@ -18,8 +18,11 @@ import ( ta "github.com/statechannels/go-nitro/internal/testactors" "github.com/statechannels/go-nitro/internal/testdata" "github.com/statechannels/go-nitro/internal/testhelpers" + "github.com/statechannels/go-nitro/node" + "github.com/statechannels/go-nitro/node/engine" "github.com/statechannels/go-nitro/node/engine/chainservice" p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" + "github.com/statechannels/go-nitro/node/engine/store" "github.com/statechannels/go-nitro/node/query" "github.com/statechannels/go-nitro/protocols/directfund" "github.com/statechannels/go-nitro/protocols/virtualfund" @@ -28,6 +31,7 @@ import ( natstrans "github.com/statechannels/go-nitro/rpc/transport/nats" "github.com/statechannels/go-nitro/rpc/transport/ws" "github.com/statechannels/go-nitro/types" + "github.com/tidwall/buntdb" "github.com/statechannels/go-nitro/crypto" ) @@ -393,10 +397,32 @@ func setupNitroNodeWithRPCClient( connectionType transport.TransportType, bootPeers []string, ) (rpc.RpcClientApi, *p2pms.P2PMessageService, func()) { - var err error - dataFolder, cleanupData := testhelpers.GenerateTempStoreFolder() - rpcServer, _, messageService, err := interRpc.RunRpcServer(pk, chain, true, dataFolder, msgPort, rpcPort, connectionType, bootPeers) + ourStore, err := store.NewStore(pk, true, dataFolder, buntdb.Config{}) + if err != nil { + t.Fatal(err) + } + + slog.Info("Initializing message service on port " + fmt.Sprint(msgPort) + "...") + messageService := p2pms.NewMessageService("127.0.0.1", msgPort, *ourStore.GetAddress(), pk, bootPeers) + + node := node.New( + messageService, + chain, + ourStore, + &engine.PermissivePolicy{}) + + var useNats bool + switch connectionType { + case "nats": + useNats = true + case "ws": + useNats = false + default: + err = fmt.Errorf("unknown connection type %v", connectionType) + panic(err) + } + rpcServer, err := interRpc.InitializeRpcServer(&node, rpcPort, useNats) if err != nil { t.Fatal(err) }