-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add unused code back in to preserve compatibility with the sdk #522
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package server | ||
|
||
import ( | ||
"net" | ||
|
||
"google.golang.org/grpc" | ||
|
||
"github.com/celestiaorg/celestia-core/abci/types" | ||
tmnet "github.com/celestiaorg/celestia-core/libs/net" | ||
"github.com/celestiaorg/celestia-core/libs/service" | ||
) | ||
|
||
type GRPCServer struct { | ||
service.BaseService | ||
|
||
proto string | ||
addr string | ||
listener net.Listener | ||
server *grpc.Server | ||
|
||
app types.ABCIApplicationServer | ||
} | ||
|
||
// NewGRPCServer returns a new gRPC ABCI server | ||
func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Service { | ||
proto, addr := tmnet.ProtocolAndAddress(protoAddr) | ||
s := &GRPCServer{ | ||
proto: proto, | ||
addr: addr, | ||
listener: nil, | ||
app: app, | ||
} | ||
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) | ||
return s | ||
} | ||
|
||
// OnStart starts the gRPC service. | ||
func (s *GRPCServer) OnStart() error { | ||
|
||
ln, err := net.Listen(s.proto, s.addr) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
s.listener = ln | ||
s.server = grpc.NewServer() | ||
types.RegisterABCIApplicationServer(s.server, s.app) | ||
|
||
s.Logger.Info("Listening", "proto", s.proto, "addr", s.addr) | ||
go func() { | ||
if err := s.server.Serve(s.listener); err != nil { | ||
s.Logger.Error("Error serving gRPC server", "err", err) | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// OnStop stops the gRPC server. | ||
func (s *GRPCServer) OnStop() { | ||
s.server.Stop() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
Package server is used to start a new ABCI server. | ||
It contains two server implementation: | ||
* gRPC server | ||
* socket server | ||
*/ | ||
package server | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/celestiaorg/celestia-core/abci/types" | ||
"github.com/celestiaorg/celestia-core/libs/service" | ||
) | ||
|
||
func NewServer(protoAddr, transport string, app types.Application) (service.Service, error) { | ||
var s service.Service | ||
var err error | ||
switch transport { | ||
case "socket": | ||
s = NewSocketServer(protoAddr, app) | ||
case "grpc": | ||
s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app)) | ||
default: | ||
err = fmt.Errorf("unknown server type %s", transport) | ||
} | ||
return s, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
package server | ||
|
||
import ( | ||
"bufio" | ||
"fmt" | ||
"io" | ||
"net" | ||
"os" | ||
"runtime" | ||
|
||
"github.com/celestiaorg/celestia-core/abci/types" | ||
tmlog "github.com/celestiaorg/celestia-core/libs/log" | ||
tmnet "github.com/celestiaorg/celestia-core/libs/net" | ||
"github.com/celestiaorg/celestia-core/libs/service" | ||
tmsync "github.com/celestiaorg/celestia-core/libs/sync" | ||
) | ||
|
||
// var maxNumberConnections = 2 | ||
|
||
type SocketServer struct { | ||
service.BaseService | ||
isLoggerSet bool | ||
|
||
proto string | ||
addr string | ||
listener net.Listener | ||
|
||
connsMtx tmsync.Mutex | ||
conns map[int]net.Conn | ||
nextConnID int | ||
|
||
appMtx tmsync.Mutex | ||
app types.Application | ||
} | ||
|
||
func NewSocketServer(protoAddr string, app types.Application) service.Service { | ||
proto, addr := tmnet.ProtocolAndAddress(protoAddr) | ||
s := &SocketServer{ | ||
proto: proto, | ||
addr: addr, | ||
listener: nil, | ||
app: app, | ||
conns: make(map[int]net.Conn), | ||
} | ||
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) | ||
return s | ||
} | ||
|
||
func (s *SocketServer) SetLogger(l tmlog.Logger) { | ||
s.BaseService.SetLogger(l) | ||
s.isLoggerSet = true | ||
} | ||
|
||
func (s *SocketServer) OnStart() error { | ||
ln, err := net.Listen(s.proto, s.addr) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
s.listener = ln | ||
go s.acceptConnectionsRoutine() | ||
|
||
return nil | ||
} | ||
|
||
func (s *SocketServer) OnStop() { | ||
if err := s.listener.Close(); err != nil { | ||
s.Logger.Error("Error closing listener", "err", err) | ||
} | ||
|
||
s.connsMtx.Lock() | ||
defer s.connsMtx.Unlock() | ||
for id, conn := range s.conns { | ||
delete(s.conns, id) | ||
if err := conn.Close(); err != nil { | ||
s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err) | ||
} | ||
} | ||
} | ||
|
||
func (s *SocketServer) addConn(conn net.Conn) int { | ||
s.connsMtx.Lock() | ||
defer s.connsMtx.Unlock() | ||
|
||
connID := s.nextConnID | ||
s.nextConnID++ | ||
s.conns[connID] = conn | ||
|
||
return connID | ||
} | ||
|
||
// deletes conn even if close errs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for comments, even on private funcs, I think it's nice to do:
|
||
func (s *SocketServer) rmConn(connID int) error { | ||
s.connsMtx.Lock() | ||
defer s.connsMtx.Unlock() | ||
|
||
conn, ok := s.conns[connID] | ||
if !ok { | ||
return fmt.Errorf("connection %d does not exist", connID) | ||
} | ||
|
||
delete(s.conns, connID) | ||
return conn.Close() | ||
} | ||
|
||
func (s *SocketServer) acceptConnectionsRoutine() { | ||
for { | ||
// Accept a connection | ||
s.Logger.Info("Waiting for new connection...") | ||
conn, err := s.listener.Accept() | ||
if err != nil { | ||
if !s.IsRunning() { | ||
return // Ignore error from listener closing. | ||
} | ||
s.Logger.Error("Failed to accept connection", "err", err) | ||
continue | ||
} | ||
|
||
s.Logger.Info("Accepted a new connection") | ||
|
||
connID := s.addConn(conn) | ||
|
||
closeConn := make(chan error, 2) // Push to signal connection closed | ||
responses := make(chan *types.Response, 1000) // A channel to buffer responses | ||
|
||
// Read requests from conn and deal with them | ||
go s.handleRequests(closeConn, conn, responses) | ||
// Pull responses from 'responses' and write them to conn. | ||
go s.handleResponses(closeConn, conn, responses) | ||
|
||
// Wait until signal to close connection | ||
go s.waitForClose(closeConn, connID) | ||
} | ||
} | ||
|
||
func (s *SocketServer) waitForClose(closeConn chan error, connID int) { | ||
err := <-closeConn | ||
switch { | ||
case err == io.EOF: | ||
s.Logger.Error("Connection was closed by client") | ||
case err != nil: | ||
s.Logger.Error("Connection error", "err", err) | ||
default: | ||
// never happens | ||
s.Logger.Error("Connection was closed") | ||
} | ||
|
||
// Close the connection | ||
if err := s.rmConn(connID); err != nil { | ||
s.Logger.Error("Error closing connection", "err", err) | ||
} | ||
} | ||
|
||
// Read requests from conn and deal with them | ||
func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, responses chan<- *types.Response) { | ||
var count int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like a wisdom thought liner. Nevertheless, it would be nice if there is an explanation why this is only used for incrementing and nothing else 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait, what's a wisdom thought liner? Also, I tend to agree, comment are usually very much appreciated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A wisdom thought liner is a 1 line of code that does something too magical that I can't get the understanding of it aka too wise for me 😸 |
||
var bufReader = bufio.NewReader(conn) | ||
|
||
defer func() { | ||
// make sure to recover from any app-related panics to allow proper socket cleanup | ||
r := recover() | ||
if r != nil { | ||
const size = 64 << 10 | ||
buf := make([]byte, size) | ||
buf = buf[:runtime.Stack(buf, false)] | ||
err := fmt.Errorf("recovered from panic: %v\n%s", r, buf) | ||
if !s.isLoggerSet { | ||
fmt.Fprintln(os.Stderr, err) | ||
} | ||
closeConn <- err | ||
s.appMtx.Unlock() | ||
} | ||
}() | ||
|
||
for { | ||
|
||
var req = &types.Request{} | ||
err := types.ReadMessage(bufReader, req) | ||
if err != nil { | ||
if err == io.EOF { | ||
closeConn <- err | ||
} else { | ||
closeConn <- fmt.Errorf("error reading message: %w", err) | ||
} | ||
return | ||
} | ||
s.appMtx.Lock() | ||
count++ | ||
s.handleRequest(req, responses) | ||
s.appMtx.Unlock() | ||
} | ||
} | ||
|
||
func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) { | ||
switch r := req.Value.(type) { | ||
case *types.Request_Echo: | ||
responses <- types.ToResponseEcho(r.Echo.Message) | ||
case *types.Request_Flush: | ||
responses <- types.ToResponseFlush() | ||
case *types.Request_Info: | ||
res := s.app.Info(*r.Info) | ||
responses <- types.ToResponseInfo(res) | ||
case *types.Request_DeliverTx: | ||
res := s.app.DeliverTx(*r.DeliverTx) | ||
responses <- types.ToResponseDeliverTx(res) | ||
case *types.Request_CheckTx: | ||
res := s.app.CheckTx(*r.CheckTx) | ||
responses <- types.ToResponseCheckTx(res) | ||
case *types.Request_Commit: | ||
res := s.app.Commit() | ||
responses <- types.ToResponseCommit(res) | ||
case *types.Request_Query: | ||
res := s.app.Query(*r.Query) | ||
responses <- types.ToResponseQuery(res) | ||
case *types.Request_InitChain: | ||
res := s.app.InitChain(*r.InitChain) | ||
responses <- types.ToResponseInitChain(res) | ||
case *types.Request_BeginBlock: | ||
res := s.app.BeginBlock(*r.BeginBlock) | ||
responses <- types.ToResponseBeginBlock(res) | ||
case *types.Request_EndBlock: | ||
res := s.app.EndBlock(*r.EndBlock) | ||
responses <- types.ToResponseEndBlock(res) | ||
case *types.Request_ListSnapshots: | ||
res := s.app.ListSnapshots(*r.ListSnapshots) | ||
responses <- types.ToResponseListSnapshots(res) | ||
case *types.Request_OfferSnapshot: | ||
res := s.app.OfferSnapshot(*r.OfferSnapshot) | ||
responses <- types.ToResponseOfferSnapshot(res) | ||
case *types.Request_LoadSnapshotChunk: | ||
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk) | ||
responses <- types.ToResponseLoadSnapshotChunk(res) | ||
case *types.Request_ApplySnapshotChunk: | ||
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk) | ||
responses <- types.ToResponseApplySnapshotChunk(res) | ||
case *types.Request_PreprocessTxs: | ||
res := s.app.PreprocessTxs(*r.PreprocessTxs) | ||
responses <- types.ToResponsePreprocessTx(res) | ||
default: | ||
responses <- types.ToResponseException("Unknown request") | ||
} | ||
} | ||
|
||
// Pull responses from 'responses' and write them to conn. | ||
func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) { | ||
var count int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same wisdom is applied here 🧐 |
||
var bufWriter = bufio.NewWriter(conn) | ||
for { | ||
var res = <-responses | ||
err := types.WriteMessage(res, bufWriter) | ||
if err != nil { | ||
closeConn <- fmt.Errorf("error writing message: %w", err) | ||
return | ||
} | ||
if _, ok := res.Value.(*types.Response_Flush); ok { | ||
err = bufWriter.Flush() | ||
if err != nil { | ||
closeConn <- fmt.Errorf("error flushing write buffer: %w", err) | ||
return | ||
} | ||
} | ||
count++ | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get the idea why this comment is still left here stranded 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
me neither 🤷♂️