Skip to content

Commit

Permalink
Merge pull request #17 from PowerLoom/feat/close-sub-stream
Browse files Browse the repository at this point in the history
Better handling of grpc and libp2p connections and streams
  • Loading branch information
anomit authored Oct 16, 2024
2 parents 41c4a2c + 3ce553e commit 99e23cc
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 318 deletions.
26 changes: 14 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module proto-snapshot-server

go 1.20
go 1.21

toolchain go1.23.1

require (
github.com/cenkalti/backoff/v4 v4.2.0
Expand All @@ -12,14 +14,14 @@ require (
github.com/sethvargo/go-retry v0.2.4
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.32.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.34.2
)

require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down Expand Up @@ -109,16 +111,16 @@ require (
go.uber.org/mock v0.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.18.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gonum.org/v1/gonum v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
58 changes: 36 additions & 22 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkgs/proto/submission.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ message SnapshotSubmission {
}

service Submission {
rpc SubmitSnapshotSimulation (stream SnapshotSubmission) returns (stream SubmissionResponse);
rpc SubmitSnapshot (stream SnapshotSubmission) returns (SubmissionResponse);
rpc SubmitSnapshotSimulation (stream SnapshotSubmission) returns (SubmissionResponse);
rpc SubmitSnapshot (stream SnapshotSubmission) returns (stream SubmissionResponse);
}

message SubmissionResponse {
Expand Down
124 changes: 124 additions & 0 deletions pkgs/service/libp2p_stream_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package service

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)

type streamPool struct {
streams chan network.Stream
maxSize int
createStream func() (network.Stream, error)
mu sync.Mutex
sequencerID peer.ID
stopChan chan struct{}
}

func newStreamPool(maxSize int, sequencerID peer.ID, createStream func() (network.Stream, error)) *streamPool {
pool := &streamPool{
streams: make(chan network.Stream, maxSize),
maxSize: maxSize,
createStream: createStream,
sequencerID: sequencerID,
}
go pool.maintainPool(context.Background())
return pool
}

func (p *streamPool) GetStream() (network.Stream, error) {
select {
case stream := <-p.streams:
if stream.Conn().IsClosed() {
stream.Close()
return p.createNewStream()
}
return stream, nil
default:
return p.createNewStream()
}
}

func (p *streamPool) ReturnStream(stream network.Stream) {
if stream.Conn().IsClosed() {
stream.Close()
return
}

select {
case p.streams <- stream:
// Stream returned to pool
default:
// Pool is full, close the stream
stream.Close()
}
}

func (p *streamPool) Stop() {
close(p.stopChan)
}

func (p *streamPool) maintainPool(ctx context.Context) {
ticker := time.NewTicker(900 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-p.stopChan:
return
case <-ticker.C:
p.fillPool()
p.cleanPool()
}
}
}

func (p *streamPool) fillPool() {
p.mu.Lock()
defer p.mu.Unlock()

currentSize := len(p.streams)
for i := currentSize; i < p.maxSize; i++ {
stream, err := p.createStream()
if err != nil {
log.Errorf("Failed to create stream: %v", err)
break
}
p.ReturnStream(stream)
}
}

func (p *streamPool) cleanPool() {
p.mu.Lock()
defer p.mu.Unlock()

validStreams := make([]network.Stream, 0, len(p.streams))
for len(p.streams) > 0 {
select {
case stream := <-p.streams:
if !stream.Conn().IsClosed() {
validStreams = append(validStreams, stream)
} else {
stream.Close()
}
default:
break
}
}

for _, stream := range validStreams {
p.streams <- stream
}
}

func (p *streamPool) createNewStream() (network.Stream, error) {
p.mu.Lock()
defer p.mu.Unlock()
return p.createStream()
}
Loading

0 comments on commit 99e23cc

Please sign in to comment.