Skip to content

Commit

Permalink
Stop preallocating tcp ports to avoid clashes
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jun 16, 2024
1 parent b3ba569 commit 195881e
Show file tree
Hide file tree
Showing 14 changed files with 386 additions and 268 deletions.
36 changes: 30 additions & 6 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package daemon
import (
"context"
"errors"
"net"
"net/http"
"net/http/pprof" //nolint:gosec
"os"
Expand Down Expand Up @@ -50,7 +51,9 @@ type Daemon struct {
jsonRPCHandler *internal.Handler
logger *supportlog.Entry
preflightWorkerPool *preflight.PreflightWorkerPool
listener net.Listener
server *http.Server
adminListener net.Listener
adminServer *http.Server
closeOnce sync.Once
closeError error
Expand All @@ -62,6 +65,15 @@ func (d *Daemon) GetDB() *db.DB {
return d.db
}

func (d *Daemon) GetEndpointAddrs() (net.TCPAddr, *net.TCPAddr) {
var addr = d.listener.Addr().(*net.TCPAddr)
var adminAddr *net.TCPAddr
if d.adminListener != nil {
adminAddr = d.adminListener.Addr().(*net.TCPAddr)
}
return *addr, adminAddr
}

func (d *Daemon) close() {
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), defaultShutdownGracePeriod)
defer shutdownRelease()
Expand Down Expand Up @@ -251,8 +263,14 @@ func MustNew(cfg *config.Config) *Daemon {
daemon.ingestService = ingestService
daemon.jsonRPCHandler = &jsonRPCHandler

// Use a separate listener in order to obtain the actual TCP port
// when using dynamic ports during testing (e.g. endpoint="localhost:0")
daemon.listener, err = net.Listen("tcp", cfg.Endpoint)
if err != nil {
daemon.logger.WithError(err).WithField("endpoint", cfg.Endpoint).Fatal("cannot listen on endpoint")
}
daemon.server = &http.Server{
Addr: cfg.Endpoint,
Addr: ":http",
Handler: httpHandler,
ReadTimeout: defaultReadTimeout,
}
Expand All @@ -269,7 +287,11 @@ func MustNew(cfg *config.Config) *Daemon {
adminMux.Handle("/debug/pprof/"+profile.Name(), pprof.Handler(profile.Name()))
}
adminMux.Handle("/metrics", promhttp.HandlerFor(metricsRegistry, promhttp.HandlerOpts{}))
daemon.adminServer = &http.Server{Addr: cfg.AdminEndpoint, Handler: adminMux}
daemon.adminListener, err = net.Listen("tcp", cfg.AdminEndpoint)
if err != nil {
daemon.logger.WithError(err).WithField("endpoint", cfg.Endpoint).Fatal("cannot listen on admin endpoint")
}
daemon.adminServer = &http.Server{Handler: adminMux}
}
daemon.registerMetrics()
return daemon
Expand Down Expand Up @@ -340,20 +362,22 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"addr": d.server.Addr,
"addr": d.listener.Addr().String(),
}).Info("starting HTTP server")

panicGroup := util.UnrecoverablePanicGroup.Log(d.logger)
panicGroup.Go(func() {
if err := d.server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
// Error starting or closing listener:
if err := d.server.Serve(d.listener); !errors.Is(err, http.ErrServerClosed) {
d.logger.WithError(err).Fatal("soroban JSON RPC server encountered fatal error")
}
})

if d.adminServer != nil {
d.logger.WithFields(supportlog.F{
"addr": d.adminListener.Addr().String(),
}).Info("starting Admin HTTP server")
panicGroup.Go(func() {
if err := d.adminServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
if err := d.adminServer.Serve(d.adminListener); !errors.Is(err, http.ErrServerClosed) {
d.logger.WithError(err).Error("soroban admin server encountered fatal error")
}
})
Expand Down
53 changes: 35 additions & 18 deletions cmd/soroban-rpc/internal/integrationtest/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,55 @@ import (
"net"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/integrationtest/infrastructure"
)

func TestArchiveUserAgent(t *testing.T) {
ports := infrastructure.NewTestPorts(t)
archiveHost := net.JoinHostPort("localhost", strconv.Itoa(int(ports.CoreArchivePort)))
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: archiveHost})
userAgents := sync.Map{}
historyArchiveProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userAgents.Store(r.Header["User-Agent"][0], "")
proxy.ServeHTTP(w, r)
historyArchive := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
agent := r.Header["User-Agent"][0]
t.Log("agent", agent)
userAgents.Store(agent, "")
if r.URL.Path == "/.well-known/stellar-history.json" || r.URL.Path == "/history/00/00/00/history-0000001f.json" {
w.Write([]byte(`{
"version": 1,
"server": "stellar-core 21.0.1 (dfd3dbff1d9cad4dc31e022de6ac2db731b4b326)",
"currentLedger": 31,
"networkPassphrase": "Standalone Network ; February 2017",
"currentBuckets": []
}`))
return
}
// emulate a problem with the archive
w.WriteHeader(http.StatusInternalServerError)
}))
defer historyArchiveProxy.Close()
defer historyArchive.Close()
historyPort := historyArchive.Listener.Addr().(*net.TCPAddr).Port

cfg := &infrastructure.TestConfig{
TestPorts: &ports,
HistoryArchiveURL: historyArchiveProxy.URL,
OnlyRPC: &infrastructure.TestOnlyRPCConfig{
CorePorts: infrastructure.TestCorePorts{
CoreArchivePort: uint16(historyPort),
},
DontWait: true,
},
}

infrastructure.NewTest(t, cfg)

_, ok := userAgents.Load("soroban-rpc/0.0.0")
assert.True(t, ok, "rpc service should set user agent for history archives")

_, ok = userAgents.Load("soroban-rpc/0.0.0/captivecore")
assert.True(t, ok, "rpc captive core should set user agent for history archives")
require.Eventually(t,
func() bool {
_, ok1 := userAgents.Load("soroban-rpc/0.0.0")
_, ok2 := userAgents.Load("soroban-rpc/0.0.0/captivecore")
return ok1 && ok2
},
5*time.Second,
time.Second,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"testing"

"github.com/creachadair/jrpc2"
"github.com/stellar/go/keypair"
"github.com/stellar/go/txnbuild"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -33,7 +32,7 @@ func buildSetOptionsTxParams(account txnbuild.SimpleAccount) txnbuild.Transactio
// client - the JSON-RPC client used to send the transactions.
//
// Returns a slice of ledger numbers corresponding to where each transaction was recorded.
func sendTransactions(t *testing.T, client *jrpc2.Client) []uint32 {
func sendTransactions(t *testing.T, client *infrastructure.Client) []uint32 {
kp := keypair.Root(infrastructure.StandaloneNetworkPassphrase)
address := kp.Address()

Expand Down
43 changes: 39 additions & 4 deletions cmd/soroban-rpc/internal/integrationtest/infrastructure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/jhttp"
"github.com/stellar/go/keypair"
"github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/txnbuild"
Expand All @@ -17,7 +18,41 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/methods"
)

func getTransaction(t *testing.T, client *jrpc2.Client, hash string) methods.GetTransactionResponse {
// Client is a jrpc2 client which tolerates errors
type Client struct {
url string
cli *jrpc2.Client
opts *jrpc2.ClientOptions
}

func NewClient(url string, opts *jrpc2.ClientOptions) *Client {
c := &Client{url: url, opts: opts}
c.refreshClient()
return c
}

func (c *Client) refreshClient() {
if c.cli != nil {
c.cli.Close()
}
ch := jhttp.NewChannel(c.url, nil)
c.cli = jrpc2.NewClient(ch, c.opts)
}

func (c *Client) CallResult(ctx context.Context, method string, params, result any) error {
err := c.cli.CallResult(ctx, method, params, result)
if err != nil {
// This is needed because of https://github.com/creachadair/jrpc2/issues/118
c.refreshClient()
}
return err
}

func (c *Client) Close() error {
return c.cli.Close()
}

func getTransaction(t *testing.T, client *Client, hash string) methods.GetTransactionResponse {
var result methods.GetTransactionResponse
for i := 0; i < 60; i++ {
request := methods.GetTransactionRequest{Hash: hash}
Expand All @@ -35,7 +70,7 @@ func getTransaction(t *testing.T, client *jrpc2.Client, hash string) methods.Get
return result
}

func SendSuccessfulTransaction(t *testing.T, client *jrpc2.Client, kp *keypair.Full, transaction *txnbuild.Transaction) methods.GetTransactionResponse {
func SendSuccessfulTransaction(t *testing.T, client *Client, kp *keypair.Full, transaction *txnbuild.Transaction) methods.GetTransactionResponse {
tx, err := transaction.Sign(StandaloneNetworkPassphrase, kp)
assert.NoError(t, err)
b64, err := tx.Base64()
Expand Down Expand Up @@ -92,7 +127,7 @@ func SendSuccessfulTransaction(t *testing.T, client *jrpc2.Client, kp *keypair.F
return response
}

func SimulateTransactionFromTxParams(t *testing.T, client *jrpc2.Client, params txnbuild.TransactionParams) methods.SimulateTransactionResponse {
func SimulateTransactionFromTxParams(t *testing.T, client *Client, params txnbuild.TransactionParams) methods.SimulateTransactionResponse {
savedAutoIncrement := params.IncrementSequenceNum
params.IncrementSequenceNum = false
tx, err := txnbuild.NewTransaction(params)
Expand Down Expand Up @@ -153,7 +188,7 @@ func PreflightTransactionParamsLocally(t *testing.T, params txnbuild.Transaction
return params
}

func PreflightTransactionParams(t *testing.T, client *jrpc2.Client, params txnbuild.TransactionParams) txnbuild.TransactionParams {
func PreflightTransactionParams(t *testing.T, client *Client, params txnbuild.TransactionParams) txnbuild.TransactionParams {
response := SimulateTransactionFromTxParams(t, client, params)
// The preamble should be zero except for the special restore case
assert.Nil(t, response.RestorePreamble)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (
var testSalt = sha256.Sum256([]byte("a1"))

func GetHelloWorldContract() []byte {
testDirName := GetCurrentDirectory()
contractFile := path.Join(testDirName, helloWorldContractPath)
contractFile := path.Join(GetCurrentDirectory(), "../../../../../wasms/test_hello_world.wasm")
ret, err := os.ReadFile(contractFile)
if err != nil {
str := fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
PEER_PORT=${CORE_CAPTIVE_PORT}
# To fill in and use by RPC

# This simply needs to be an unconflicting, unused port
# since captive core doesn't expect external connections
PEER_PORT=${CAPTIVE_CORE_PORT}
ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true

UNSAFE_QUORUM=true
Expand All @@ -14,5 +18,7 @@ NAME="local_core"
HOME_DOMAIN="core.local"
# From "SACJC372QBSSKJYTV5A7LWT4NXWHTQO6GHG4QDAVC2XDPX6CNNXFZ4JK"
PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS"
ADDRESS="localhost:${CORE_PORT}"

# should be "core" when running RPC in a container or "localhost:port" when running RPC in the host
ADDRESS="${CORE_HOST_PORT}"
QUALITY="MEDIUM"
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@ fi
echo "using config:"
cat stellar-core.cfg

# initialize new db
stellar-core new-db
# initialize new db (retry a few times to wait for the database to be available)
until stellar-core new-db; do
sleep 0.2
echo "couldn't create new db, retrying"
done

if [ "$1" = "standalone" ]; then
# initialize for new history archive path, remove any pre-existing on same path from base image
rm -rf ./history
stellar-core new-hist vs

# serve history archives to horizon on port CORE_ARCHIVE_PORT
# serve history archives to horizon on port 1570
pushd ./history/vs/
python3 -m http.server ${CORE_ARCHIVE_PORT} &
python3 -m http.server 1570 &
popd
fi

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
include:
- docker-compose.yml
services:
rpc:
platform: linux/amd64
image: stellar/soroban-rpc:${RPC_IMAGE_TAG}
depends_on:
- core
ports:
- ${RPC_PORT}:${RPC_PORT}
- ${RPC_ADMIN_PORT}:${RPC_ADMIN_PORT}
ports: # we omit the host-side ports to allocate them dynamically
# HTTP
- "127.0.0.1::8000"
# Admin HTTP
- "127.0.0.1::8080"
command: --config-path /soroban-rpc.config
volumes:
- ${RPC_CONFIG_MOUNT_DIR}/stellar-core-integration-tests.cfg:/stellar-core.cfg
- ${RPC_CONFIG_MOUNT_DIR}/captive-core-integration-tests.cfg:/stellar-core.cfg
- ${RPC_CONFIG_MOUNT_DIR}/soroban-rpc.config:/soroban-rpc.config
- ${RPC_SQLITE_MOUNT_DIR}:/db/
# Needed so that the sql database files created in the container
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
services:
core-postgres:
image: postgres:9.6.17-alpine
restart: on-failure
environment:
- POSTGRES_PASSWORD=mysecretpassword
- POSTGRES_DB=stellar
Expand All @@ -17,18 +16,19 @@ services:
image: ${CORE_IMAGE:-stellar/unsafe-stellar-core:21.0.1-1897.dfd3dbff1.focal}
depends_on:
- core-postgres
restart: on-failure
environment:
- TRACY_NO_INVARIANT_CHECK=1
ports:
- ${CORE_PORT}:${CORE_PORT}
- ${CORE_HTTP_PORT}:${CORE_HTTP_PORT}
# add extra port for history archive server
- ${CORE_ARCHIVE_PORT}:${CORE_ARCHIVE_PORT}
ports: # we omit the host-side ports to allocate them dynamically
# peer
- "127.0.0.1:0:11625"
# http
- "127.0.0.1:0:11626"
# history archive
- "127.0.0.1:0:1570"
entrypoint: /usr/bin/env
command: /start standalone
volumes:
- ${CORE_MOUNT_DIR}/stellar-core-integration-tests.cfg:/stellar-core.cfg
- ${CORE_MOUNT_DIR}/core-start.sh:/start
- ./stellar-core-integration-tests.cfg:/stellar-core.cfg
- ./core-start.sh:/start
extra_hosts:
- "host.docker.internal:host-gateway"
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ DEPRECATED_SQL_LEDGER_STATE=false

NETWORK_PASSPHRASE="Standalone Network ; February 2017"

PEER_PORT=${CORE_PORT}
HTTP_PORT=${CORE_HTTP_PORT}
PEER_PORT=11625
HTTP_PORT=11626
PUBLIC_HTTP_PORT=true

NODE_SEED="SACJC372QBSSKJYTV5A7LWT4NXWHTQO6GHG4QDAVC2XDPX6CNNXFZ4JK"
Expand Down
Loading

0 comments on commit 195881e

Please sign in to comment.