Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Improve thread-safety of stellarCoreRunner.close() #5307

Merged
merged 4 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,6 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang
if err := c.stellarCoreRunner.close(); err != nil {
return false, errors.Wrap(err, "error closing existing session")
}

// Make sure Stellar-Core is terminated before starting a new instance.
processExited, _ := c.stellarCoreRunner.getProcessExitError()
if !processExited {
return false, errors.New("the previous Stellar-Core instance is still running")
}
}

var err error
Expand Down
2 changes: 0 additions & 2 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)
mockRunner.On("getProcessExitError").Return(true, nil)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down
2 changes: 1 addition & 1 deletion ingest/ledgerbackend/mock_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func simpleCommandMock() *mockCmd {
cmdMock.On("getStdout").Return(writer)
cmdMock.On("setStderr", mock.Anything)
cmdMock.On("getStderr").Return(writer)
cmdMock.On("getProcess").Return(&os.Process{})
cmdMock.On("getProcess").Return(&os.Process{}).Maybe()
cmdMock.On("setExtraFiles", mock.Anything)
cmdMock.On("Start").Return(nil)
return cmdMock
Expand Down
88 changes: 41 additions & 47 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/pkg/errors"

"github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -65,6 +66,7 @@ type stellarCoreRunner struct {
systemCaller systemCaller

lock sync.Mutex
closeOnce sync.Once
processExited bool
processExitError error

Expand Down Expand Up @@ -285,9 +287,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
r.lock.Lock()
defer r.lock.Unlock()

r.mode = stellarCoreRunnerModeOffline
r.storagePath = r.getFullStoragePath()

// check if we have already been closed
if r.ctx.Err() != nil {
return r.ctx.Err()
Expand All @@ -297,6 +296,9 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
return errors.New("runner already started")
}

r.mode = stellarCoreRunnerModeOffline
r.storagePath = r.getFullStoragePath()

rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()}

Expand Down Expand Up @@ -350,9 +352,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
r.lock.Lock()
defer r.lock.Unlock()

r.mode = stellarCoreRunnerModeOnline
r.storagePath = r.getFullStoragePath()

// check if we have already been closed
if r.ctx.Err() != nil {
return r.ctx.Err()
Expand All @@ -362,6 +361,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

r.mode = stellarCoreRunnerModeOnline
r.storagePath = r.getFullStoragePath()

var err error

if r.useDB {
Expand Down Expand Up @@ -546,53 +548,45 @@ func (r *stellarCoreRunner) getProcessExitError() (bool, error) {
// the necessary cleanup on the resources associated with the captive core process
// close is both thread safe and idempotent
func (r *stellarCoreRunner) close() error {
r.lock.Lock()
started := r.started
storagePath := r.storagePath

r.storagePath = ""

// check if we have already closed
if storagePath == "" {
var closeError error
r.closeOnce.Do(func() {
r.lock.Lock()
// we cancel the context while holding the lock in order to guarantee that
// this captive core instance cannot start once the lock is released.
// catchup() and runFrom() can only execute while holding the lock and if
// the context is canceled both catchup() and runFrom() will abort early
// without performing any side effects (e.g. state mutations).
r.cancel()
r.lock.Unlock()
return nil
}

if !started {
// Update processExited if handleExit that updates it not even started
// (error before command run).
r.processExited = true
}

r.cancel()
r.lock.Unlock()
// only reap captive core sub process and related go routines if we've started
// otherwise, just cleanup the temp dir
if r.started {
// wait for the stellar core process to terminate
r.wg.Wait()

// only reap captive core sub process and related go routines if we've started
// otherwise, just cleanup the temp dir
if started {
// wait for the stellar core process to terminate
r.wg.Wait()
// drain meta pipe channel to make sure the ledger buffer goroutine exits
for range r.getMetaPipe() {

// drain meta pipe channel to make sure the ledger buffer goroutine exits
for range r.getMetaPipe() {
}

// now it's safe to close the pipe reader
// because the ledger buffer is no longer reading from it
r.pipe.Reader.Close()
}

// now it's safe to close the pipe reader
// because the ledger buffer is no longer reading from it
r.pipe.Reader.Close()
}

if r.mode != 0 && (runtime.GOOS == "windows" ||
(r.processExitError != nil && r.processExitError != context.Canceled) ||
r.mode == stellarCoreRunnerModeOffline) {
// It's impossible to send SIGINT on Windows so buckets can become
// corrupted. If we can't reuse it, then remove it.
// We also remove the storage path if there was an error terminating the
// process (files can be corrupted).
// We remove all files when reingesting to save disk space.
return r.systemCaller.removeAll(storagePath)
}
if r.mode != 0 && (runtime.GOOS == "windows" ||
(r.processExitError != nil && r.processExitError != context.Canceled) ||
r.mode == stellarCoreRunnerModeOffline) {
// It's impossible to send SIGINT on Windows so buckets can become
// corrupted. If we can't reuse it, then remove it.
// We also remove the storage path if there was an error terminating the
// process (files can be corrupted).
// We remove all files when reingesting to save disk space.
closeError = r.systemCaller.removeAll(r.storagePath)
return
}
})

return nil
return closeError
}
62 changes: 59 additions & 3 deletions ingest/ledgerbackend/stellar_core_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ledgerbackend
import (
"context"
"encoding/json"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -46,7 +47,7 @@ func TestCloseOffline(t *testing.T) {
"fd:3",
"--in-memory",
).Return(cmdMock)
scMock.On("removeAll", mock.Anything).Return(nil)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.catchup(100, 200))
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestCloseOnlineWithError(t *testing.T) {
"--metadata-output-stream",
"fd:3",
).Return(cmdMock)
scMock.On("removeAll", mock.Anything).Return(nil)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.runFrom(100, "hash"))
Expand All @@ -149,6 +150,61 @@ func TestCloseOnlineWithError(t *testing.T) {
assert.NoError(t, runner.close())
}

func TestCloseConcurrency(t *testing.T) {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

captiveCoreToml.AddExamplePubnetValidators()

runner := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/usr/bin/stellar-core",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
})

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(errors.New("wait error")).WaitUntil(time.After(time.Millisecond * 300))
defer cmdMock.AssertExpectations(t)

// Replace system calls with a mock
scMock := &mockSystemCaller{}
defer scMock.AssertExpectations(t)
scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil)
scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil)
scMock.On("command",
"/usr/bin/stellar-core",
"--conf",
mock.Anything,
"--console",
"catchup",
"200/101",
"--metadata-output-stream",
"fd:3",
"--in-memory",
).Return(cmdMock)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.catchup(100, 200))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, runner.close())
exited, err := runner.getProcessExitError()
assert.True(t, exited)
assert.Error(t, err)
}()
}

wg.Wait()
}

func TestRunFromUseDBLedgersMatch(t *testing.T) {
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)
Expand Down Expand Up @@ -300,7 +356,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) {
scMock := &mockSystemCaller{}
defer scMock.AssertExpectations(t)
// Storage dir is removed because ledgers do not match
scMock.On("removeAll", mock.Anything).Return(nil)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil)
scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil)
scMock.On("command",
Expand Down
Loading