Skip to content

Commit

Permalink
fix: use globally inited connections and pools
Browse files Browse the repository at this point in the history
  • Loading branch information
anomit committed Oct 29, 2024
1 parent e8b6fba commit 60b4df6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 50 deletions.
29 changes: 24 additions & 5 deletions pkgs/service/libp2p_stream_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ func (p *StreamPool) GetStream() (network.Stream, error) {
if len(p.streams) > 0 {
stream := p.streams[len(p.streams)-1]
p.streams = p.streams[:len(p.streams)-1]

if err := p.pingStream(stream); err == nil {
log.Debug("Retrieved valid stream from pool")
return stream, nil
}
// If ping fails, close the stream and create a new one
log.Warn("Ping failed for pooled stream, closing and creating new one")
stream.Close()
}

log.Debug("Creating new stream")
return p.createNewStreamWithRetry()
}

Expand All @@ -75,25 +79,40 @@ func (p *StreamPool) ReturnStream(stream network.Stream) {
defer p.mu.Unlock()

if err := p.pingStream(stream); err != nil {
log.Warnf("Stream failed ping check on return: %v", err)
stream.Close()
return
}

if len(p.streams) < p.maxSize {
log.Debug("Returning valid stream to pool")
p.streams = append(p.streams, stream)
} else {
log.Debug("Pool at capacity, closing stream")
stream.Close()
}
}

func (p *StreamPool) pingStream(stream network.Stream) error {
if err := stream.SetDeadline(time.Now().Add(500 * time.Millisecond)); err != nil {
return err
if err := stream.SetDeadline(time.Now().Add(1 * time.Second)); err != nil {
return fmt.Errorf("failed to set deadline: %w", err)
}
defer stream.SetDeadline(time.Time{})

_, err := stream.Write([]byte("ping"))
return err
pingMsg := []byte("ping")
n, err := stream.Write(pingMsg)
if err != nil {
return fmt.Errorf("failed to write ping: %w", err)
}
if n != len(pingMsg) {
return fmt.Errorf("incomplete ping write: wrote %d of %d bytes", n, len(pingMsg))
}

if err := stream.Flush(); err != nil {
return fmt.Errorf("failed to flush stream after ping: %w", err)
}

return nil
}

func (p *StreamPool) createNewStreamWithRetry() (network.Stream, error) {
Expand Down
67 changes: 22 additions & 45 deletions pkgs/service/msg_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/rcrowley/go-metrics"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
Expand All @@ -23,8 +21,7 @@ import (
// server is used to implement submission.SubmissionService.
type server struct {
pkgs.UnimplementedSubmissionServer
streamPool *StreamPool
limiter *rate.Limiter
limiter *rate.Limiter
}

var _ pkgs.SubmissionServer = &server{}
Expand All @@ -39,47 +36,19 @@ type serverMetrics struct {
// NewMsgServerImpl returns an implementation of the SubmissionService interface
// for the provided Keeper.
func NewMsgServerImplV2() pkgs.SubmissionServer {
var sequencerAddr ma.Multiaddr
var err error

sequencer, err := fetchSequencer("https://raw.githubusercontent.com/PowerLoom/snapshotter-lite-local-collector/feat/trusted-relayers/sequencers.json", config.SettingsObj.DataMarketAddress)
if err != nil {
log.Debugln(err.Error())
return nil
}
sequencerAddr, err = ma.NewMultiaddr(sequencer.Maddr)
if err != nil {
log.Debugln(err.Error())
return nil
}

sequencerInfo, err := peer.AddrInfoFromP2pAddr(sequencerAddr)

if err != nil {
log.Errorln("Error converting MultiAddr to AddrInfo: ", err.Error())
return nil
}

sequencerID := sequencerInfo.ID

if err := SequencerHostConn.Connect(context.Background(), *sequencerInfo); err != nil {
log.Debugln("Failed to connect to the Sequencer:", err)
} else {
log.Debugln("Successfully connected to the Sequencer: ", sequencerAddr.String())
}

createStream := func() (network.Stream, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return SequencerHostConn.NewStream(ctx, sequencerID, "/collect")
return SequencerHostConn.NewStream(ctx, SequencerId, "/collect")
}

// Initialize the global stream pool
InitLibp2pStreamPool(config.SettingsObj.MaxStreamPoolSize, createStream, sequencerID)
// Initialize the global stream pool if not already initialized
if GetLibp2pStreamPool() == nil {
InitLibp2pStreamPool(config.SettingsObj.MaxStreamPoolSize, createStream, SequencerId)
}

s := &server{
streamPool: GetLibp2pStreamPool(), // Use the global pool instead of creating a new one
limiter: rate.NewLimiter(rate.Limit(300), 50), // Adjusted rate limit
limiter: rate.NewLimiter(rate.Limit(300), 50),
}
return s
}
Expand Down Expand Up @@ -146,24 +115,32 @@ func StartSubmissionServer(server pkgs.SubmissionServer) {
func (s *server) writeToStream(data []byte) error {
maxRetries := 5
for i := 0; i < maxRetries; i++ {
stream, err := s.streamPool.GetStream()
stream, err := GetLibp2pStreamPool().GetStream()
if err != nil {
log.Warnf("Failed to get stream (attempt %d/%d): %v", i+1, maxRetries, err)
time.Sleep(time.Second * time.Duration(i+1)) // Exponential backoff
time.Sleep(time.Second * time.Duration(i+1))
continue
}

_, err = stream.Write(data)
n, err := stream.Write(data)
if err != nil {
log.Warnf("Failed to write to stream (attempt %d/%d): %v", i+1, maxRetries, err)
stream.Reset()
s.streamPool.RemoveStream(stream)
time.Sleep(time.Second * time.Duration(i+1)) // Exponential backoff
GetLibp2pStreamPool().RemoveStream(stream)
time.Sleep(time.Second * time.Duration(i+1))
continue
}

s.streamPool.ReturnStream(stream)
return nil // Success
// Add length verification
if n != len(data) {
log.Warnf("Incomplete write: wrote %d of %d bytes", n, len(data))
stream.Reset()
GetLibp2pStreamPool().RemoveStream(stream)
continue
}

GetLibp2pStreamPool().ReturnStream(stream)
return nil
}

return fmt.Errorf("failed to write to stream after %d attempts", maxRetries)
Expand Down

0 comments on commit 60b4df6

Please sign in to comment.