Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unused code back in to preserve compatibility with the sdk #522

Merged
merged 2 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package server

import (
"net"

"google.golang.org/grpc"

"github.com/celestiaorg/celestia-core/abci/types"
tmnet "github.com/celestiaorg/celestia-core/libs/net"
"github.com/celestiaorg/celestia-core/libs/service"
)

type GRPCServer struct {
service.BaseService

proto string
addr string
listener net.Listener
server *grpc.Server

app types.ABCIApplicationServer
}

// NewGRPCServer returns a new gRPC ABCI server
func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &GRPCServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
}
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
return s
}

// OnStart starts the gRPC service.
func (s *GRPCServer) OnStart() error {

ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}

s.listener = ln
s.server = grpc.NewServer()
types.RegisterABCIApplicationServer(s.server, s.app)

s.Logger.Info("Listening", "proto", s.proto, "addr", s.addr)
go func() {
if err := s.server.Serve(s.listener); err != nil {
s.Logger.Error("Error serving gRPC server", "err", err)
}
}()
return nil
}

// OnStop stops the gRPC server.
func (s *GRPCServer) OnStop() {
s.server.Stop()
}
28 changes: 28 additions & 0 deletions abci/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Package server is used to start a new ABCI server.
It contains two server implementation:
* gRPC server
* socket server
*/
package server

import (
"fmt"

"github.com/celestiaorg/celestia-core/abci/types"
"github.com/celestiaorg/celestia-core/libs/service"
)

func NewServer(protoAddr, transport string, app types.Application) (service.Service, error) {
var s service.Service
var err error
switch transport {
case "socket":
s = NewSocketServer(protoAddr, app)
case "grpc":
s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app))
default:
err = fmt.Errorf("unknown server type %s", transport)
}
return s, err
}
264 changes: 264 additions & 0 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package server

import (
"bufio"
"fmt"
"io"
"net"
"os"
"runtime"

"github.com/celestiaorg/celestia-core/abci/types"
tmlog "github.com/celestiaorg/celestia-core/libs/log"
tmnet "github.com/celestiaorg/celestia-core/libs/net"
"github.com/celestiaorg/celestia-core/libs/service"
tmsync "github.com/celestiaorg/celestia-core/libs/sync"
)

// var maxNumberConnections = 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the idea why this comment is still left here stranded 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me neither 🤷‍♂️


type SocketServer struct {
service.BaseService
isLoggerSet bool

proto string
addr string
listener net.Listener

connsMtx tmsync.Mutex
conns map[int]net.Conn
nextConnID int

appMtx tmsync.Mutex
app types.Application
}

func NewSocketServer(protoAddr string, app types.Application) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &SocketServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
conns: make(map[int]net.Conn),
}
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
return s
}

func (s *SocketServer) SetLogger(l tmlog.Logger) {
s.BaseService.SetLogger(l)
s.isLoggerSet = true
}

func (s *SocketServer) OnStart() error {
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}

s.listener = ln
go s.acceptConnectionsRoutine()

return nil
}

func (s *SocketServer) OnStop() {
if err := s.listener.Close(); err != nil {
s.Logger.Error("Error closing listener", "err", err)
}

s.connsMtx.Lock()
defer s.connsMtx.Unlock()
for id, conn := range s.conns {
delete(s.conns, id)
if err := conn.Close(); err != nil {
s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err)
}
}
}

func (s *SocketServer) addConn(conn net.Conn) int {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()

connID := s.nextConnID
s.nextConnID++
s.conns[connID] = conn

return connID
}

// deletes conn even if close errs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for comments, even on private funcs, I think it's nice to do:

// <func name> does X, Y, Z

func (s *SocketServer) rmConn(connID int) error {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()

conn, ok := s.conns[connID]
if !ok {
return fmt.Errorf("connection %d does not exist", connID)
}

delete(s.conns, connID)
return conn.Close()
}

func (s *SocketServer) acceptConnectionsRoutine() {
for {
// Accept a connection
s.Logger.Info("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
s.Logger.Error("Failed to accept connection", "err", err)
continue
}

s.Logger.Info("Accepted a new connection")

connID := s.addConn(conn)

closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses

// Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(closeConn, conn, responses)

// Wait until signal to close connection
go s.waitForClose(closeConn, connID)
}
}

func (s *SocketServer) waitForClose(closeConn chan error, connID int) {
err := <-closeConn
switch {
case err == io.EOF:
s.Logger.Error("Connection was closed by client")
case err != nil:
s.Logger.Error("Connection error", "err", err)
default:
// never happens
s.Logger.Error("Connection was closed")
}

// Close the connection
if err := s.rmConn(connID); err != nil {
s.Logger.Error("Error closing connection", "err", err)
}
}

// Read requests from conn and deal with them
func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, responses chan<- *types.Response) {
var count int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a wisdom thought liner. Nevertheless, it would be nice if there is an explanation why this is only used for incrementing and nothing else 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, what's a wisdom thought liner? Also, I tend to agree, comment are usually very much appreciated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A wisdom thought liner is a 1 line of code that does something too magical that I can't get the understanding of it aka too wise for me 😸

var bufReader = bufio.NewReader(conn)

defer func() {
// make sure to recover from any app-related panics to allow proper socket cleanup
r := recover()
if r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err := fmt.Errorf("recovered from panic: %v\n%s", r, buf)
if !s.isLoggerSet {
fmt.Fprintln(os.Stderr, err)
}
closeConn <- err
s.appMtx.Unlock()
}
}()

for {

var req = &types.Request{}
err := types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- err
} else {
closeConn <- fmt.Errorf("error reading message: %w", err)
}
return
}
s.appMtx.Lock()
count++
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}

func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
switch r := req.Value.(type) {
case *types.Request_Echo:
responses <- types.ToResponseEcho(r.Echo.Message)
case *types.Request_Flush:
responses <- types.ToResponseFlush()
case *types.Request_Info:
res := s.app.Info(*r.Info)
responses <- types.ToResponseInfo(res)
case *types.Request_DeliverTx:
res := s.app.DeliverTx(*r.DeliverTx)
responses <- types.ToResponseDeliverTx(res)
case *types.Request_CheckTx:
res := s.app.CheckTx(*r.CheckTx)
responses <- types.ToResponseCheckTx(res)
case *types.Request_Commit:
res := s.app.Commit()
responses <- types.ToResponseCommit(res)
case *types.Request_Query:
res := s.app.Query(*r.Query)
responses <- types.ToResponseQuery(res)
case *types.Request_InitChain:
res := s.app.InitChain(*r.InitChain)
responses <- types.ToResponseInitChain(res)
case *types.Request_BeginBlock:
res := s.app.BeginBlock(*r.BeginBlock)
responses <- types.ToResponseBeginBlock(res)
case *types.Request_EndBlock:
res := s.app.EndBlock(*r.EndBlock)
responses <- types.ToResponseEndBlock(res)
case *types.Request_ListSnapshots:
res := s.app.ListSnapshots(*r.ListSnapshots)
responses <- types.ToResponseListSnapshots(res)
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
case *types.Request_PreprocessTxs:
res := s.app.PreprocessTxs(*r.PreprocessTxs)
responses <- types.ToResponsePreprocessTx(res)
default:
responses <- types.ToResponseException("Unknown request")
}
}

// Pull responses from 'responses' and write them to conn.
func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) {
var count int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same wisdom is applied here 🧐

var bufWriter = bufio.NewWriter(conn)
for {
var res = <-responses
err := types.WriteMessage(res, bufWriter)
if err != nil {
closeConn <- fmt.Errorf("error writing message: %w", err)
return
}
if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("error flushing write buffer: %w", err)
return
}
}
count++
}
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type BaseConfig struct { //nolint: maligned
// If true, query the ABCI app on connecting to a new peer
// so the app can decide if we should keep the connection or not
FilterPeers bool `mapstructure:"filter-peers"` // false

// DBBackend is solely to preserve compatibility with the sdk, it is deprecated
DBBackend string
}

// DefaultBaseConfig returns a default base configuration for a Tendermint node
Expand Down