Skip to content

Commit

Permalink
Add unused code back in to preserve compatibility with the sdk (#522)
Browse files Browse the repository at this point in the history
* restore compatability wiht the sdk by adding unused code back in

* add defaults to db-backend
  • Loading branch information
evan-forbes authored Aug 31, 2021
1 parent 321cdc4 commit ceaf5e5
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 0 deletions.
61 changes: 61 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package server

import (
"net"

"google.golang.org/grpc"

"github.com/celestiaorg/celestia-core/abci/types"
tmnet "github.com/celestiaorg/celestia-core/libs/net"
"github.com/celestiaorg/celestia-core/libs/service"
)

type GRPCServer struct {
service.BaseService

proto string
addr string
listener net.Listener
server *grpc.Server

app types.ABCIApplicationServer
}

// NewGRPCServer returns a new gRPC ABCI server
func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &GRPCServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
}
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
return s
}

// OnStart starts the gRPC service.
func (s *GRPCServer) OnStart() error {

ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}

s.listener = ln
s.server = grpc.NewServer()
types.RegisterABCIApplicationServer(s.server, s.app)

s.Logger.Info("Listening", "proto", s.proto, "addr", s.addr)
go func() {
if err := s.server.Serve(s.listener); err != nil {
s.Logger.Error("Error serving gRPC server", "err", err)
}
}()
return nil
}

// OnStop stops the gRPC server.
func (s *GRPCServer) OnStop() {
s.server.Stop()
}
28 changes: 28 additions & 0 deletions abci/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Package server is used to start a new ABCI server.
It contains two server implementation:
* gRPC server
* socket server
*/
package server

import (
"fmt"

"github.com/celestiaorg/celestia-core/abci/types"
"github.com/celestiaorg/celestia-core/libs/service"
)

func NewServer(protoAddr, transport string, app types.Application) (service.Service, error) {
var s service.Service
var err error
switch transport {
case "socket":
s = NewSocketServer(protoAddr, app)
case "grpc":
s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app))
default:
err = fmt.Errorf("unknown server type %s", transport)
}
return s, err
}
264 changes: 264 additions & 0 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package server

import (
"bufio"
"fmt"
"io"
"net"
"os"
"runtime"

"github.com/celestiaorg/celestia-core/abci/types"
tmlog "github.com/celestiaorg/celestia-core/libs/log"
tmnet "github.com/celestiaorg/celestia-core/libs/net"
"github.com/celestiaorg/celestia-core/libs/service"
tmsync "github.com/celestiaorg/celestia-core/libs/sync"
)

// var maxNumberConnections = 2

type SocketServer struct {
service.BaseService
isLoggerSet bool

proto string
addr string
listener net.Listener

connsMtx tmsync.Mutex
conns map[int]net.Conn
nextConnID int

appMtx tmsync.Mutex
app types.Application
}

func NewSocketServer(protoAddr string, app types.Application) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &SocketServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
conns: make(map[int]net.Conn),
}
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
return s
}

func (s *SocketServer) SetLogger(l tmlog.Logger) {
s.BaseService.SetLogger(l)
s.isLoggerSet = true
}

func (s *SocketServer) OnStart() error {
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}

s.listener = ln
go s.acceptConnectionsRoutine()

return nil
}

func (s *SocketServer) OnStop() {
if err := s.listener.Close(); err != nil {
s.Logger.Error("Error closing listener", "err", err)
}

s.connsMtx.Lock()
defer s.connsMtx.Unlock()
for id, conn := range s.conns {
delete(s.conns, id)
if err := conn.Close(); err != nil {
s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err)
}
}
}

func (s *SocketServer) addConn(conn net.Conn) int {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()

connID := s.nextConnID
s.nextConnID++
s.conns[connID] = conn

return connID
}

// deletes conn even if close errs
func (s *SocketServer) rmConn(connID int) error {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()

conn, ok := s.conns[connID]
if !ok {
return fmt.Errorf("connection %d does not exist", connID)
}

delete(s.conns, connID)
return conn.Close()
}

func (s *SocketServer) acceptConnectionsRoutine() {
for {
// Accept a connection
s.Logger.Info("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
s.Logger.Error("Failed to accept connection", "err", err)
continue
}

s.Logger.Info("Accepted a new connection")

connID := s.addConn(conn)

closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses

// Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(closeConn, conn, responses)

// Wait until signal to close connection
go s.waitForClose(closeConn, connID)
}
}

func (s *SocketServer) waitForClose(closeConn chan error, connID int) {
err := <-closeConn
switch {
case err == io.EOF:
s.Logger.Error("Connection was closed by client")
case err != nil:
s.Logger.Error("Connection error", "err", err)
default:
// never happens
s.Logger.Error("Connection was closed")
}

// Close the connection
if err := s.rmConn(connID); err != nil {
s.Logger.Error("Error closing connection", "err", err)
}
}

// Read requests from conn and deal with them
func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, responses chan<- *types.Response) {
var count int
var bufReader = bufio.NewReader(conn)

defer func() {
// make sure to recover from any app-related panics to allow proper socket cleanup
r := recover()
if r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err := fmt.Errorf("recovered from panic: %v\n%s", r, buf)
if !s.isLoggerSet {
fmt.Fprintln(os.Stderr, err)
}
closeConn <- err
s.appMtx.Unlock()
}
}()

for {

var req = &types.Request{}
err := types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- err
} else {
closeConn <- fmt.Errorf("error reading message: %w", err)
}
return
}
s.appMtx.Lock()
count++
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}

func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
switch r := req.Value.(type) {
case *types.Request_Echo:
responses <- types.ToResponseEcho(r.Echo.Message)
case *types.Request_Flush:
responses <- types.ToResponseFlush()
case *types.Request_Info:
res := s.app.Info(*r.Info)
responses <- types.ToResponseInfo(res)
case *types.Request_DeliverTx:
res := s.app.DeliverTx(*r.DeliverTx)
responses <- types.ToResponseDeliverTx(res)
case *types.Request_CheckTx:
res := s.app.CheckTx(*r.CheckTx)
responses <- types.ToResponseCheckTx(res)
case *types.Request_Commit:
res := s.app.Commit()
responses <- types.ToResponseCommit(res)
case *types.Request_Query:
res := s.app.Query(*r.Query)
responses <- types.ToResponseQuery(res)
case *types.Request_InitChain:
res := s.app.InitChain(*r.InitChain)
responses <- types.ToResponseInitChain(res)
case *types.Request_BeginBlock:
res := s.app.BeginBlock(*r.BeginBlock)
responses <- types.ToResponseBeginBlock(res)
case *types.Request_EndBlock:
res := s.app.EndBlock(*r.EndBlock)
responses <- types.ToResponseEndBlock(res)
case *types.Request_ListSnapshots:
res := s.app.ListSnapshots(*r.ListSnapshots)
responses <- types.ToResponseListSnapshots(res)
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
case *types.Request_PreprocessTxs:
res := s.app.PreprocessTxs(*r.PreprocessTxs)
responses <- types.ToResponsePreprocessTx(res)
default:
responses <- types.ToResponseException("Unknown request")
}
}

// Pull responses from 'responses' and write them to conn.
func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) {
var count int
var bufWriter = bufio.NewWriter(conn)
for {
var res = <-responses
err := types.WriteMessage(res, bufWriter)
if err != nil {
closeConn <- fmt.Errorf("error writing message: %w", err)
return
}
if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("error flushing write buffer: %w", err)
return
}
}
count++
}
}
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type BaseConfig struct { //nolint: maligned
// If true, query the ABCI app on connecting to a new peer
// so the app can decide if we should keep the connection or not
FilterPeers bool `mapstructure:"filter-peers"` // false

// DBBackend is solely to preserve compatibility with the sdk, it is deprecated
DBBackend string `mapstructure:"db-backend"` // false
}

// DefaultBaseConfig returns a default base configuration for a Tendermint node
Expand All @@ -208,6 +211,7 @@ func DefaultBaseConfig() BaseConfig {
FastSyncMode: true,
FilterPeers: false,
DBPath: "data",
DBBackend: "badger-db",
}
}

Expand Down
4 changes: 4 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ fast-sync = {{ .BaseConfig.FastSyncMode }}
# Database directory
db-dir = "{{ js .BaseConfig.DBPath }}"
# Database backend: badgerdb
# we only use badgerdb, but maintain the option in the config because the cosmos-sdk expects it
db-backend = "{{ .BaseConfig.DBBackend }}"
# Output level for logging, including package level options
log-level = "{{ .BaseConfig.LogLevel }}"
Expand Down

0 comments on commit ceaf5e5

Please sign in to comment.