From 384fd9e4c04fad7e508941a0a4d5e50cf2f4a48f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 9 May 2024 11:11:49 +0100 Subject: [PATCH 1/3] improve thread-safety of stellarCoreRunner.close() --- ingest/ledgerbackend/captive_core_backend.go | 6 -- .../captive_core_backend_test.go | 2 - ingest/ledgerbackend/stellar_core_runner.go | 88 +++++++++---------- .../ledgerbackend/stellar_core_runner_test.go | 13 ++- 4 files changed, 52 insertions(+), 57 deletions(-) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 50e933bb6a..25b22d01d5 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -452,12 +452,6 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang if err := c.stellarCoreRunner.close(); err != nil { return false, errors.Wrap(err, "error closing existing session") } - - // Make sure Stellar-Core is terminated before starting a new instance. - processExited, _ := c.stellarCoreRunner.getProcessExitError() - if !processExited { - return false, errors.New("the previous Stellar-Core instance is still running") - } } var err error diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 5178fd97a1..a367f560f1 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -302,8 +302,6 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) - mockRunner.On("getProcessExitError").Return(true, nil) - mockRunner.On("getProcessExitError").Return(false, nil) mockArchive := &historyarchive.MockArchive{} mockArchive. diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 1c2c09c4a6..7f883b69c5 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/log" ) @@ -65,6 +66,7 @@ type stellarCoreRunner struct { systemCaller systemCaller lock sync.Mutex + closeOnce sync.Once processExited bool processExitError error @@ -285,9 +287,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { r.lock.Lock() defer r.lock.Unlock() - r.mode = stellarCoreRunnerModeOffline - r.storagePath = r.getFullStoragePath() - // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() @@ -297,6 +296,9 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { return errors.New("runner already started") } + r.mode = stellarCoreRunnerModeOffline + r.storagePath = r.getFullStoragePath() + rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()} @@ -350,9 +352,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { r.lock.Lock() defer r.lock.Unlock() - r.mode = stellarCoreRunnerModeOnline - r.storagePath = r.getFullStoragePath() - // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() @@ -362,6 +361,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } + r.mode = stellarCoreRunnerModeOnline + r.storagePath = r.getFullStoragePath() + var err error if r.useDB { @@ -546,53 +548,45 @@ func (r *stellarCoreRunner) getProcessExitError() (bool, error) { // the necessary cleanup on the resources associated with the captive core process // close is both thread safe and idempotent func (r *stellarCoreRunner) close() error { - r.lock.Lock() - started := r.started - storagePath := r.storagePath - - r.storagePath = "" - - // check if we have already closed - if storagePath == "" { + var closeError error + r.closeOnce.Do(func() { + r.lock.Lock() + // we cancel the context while holding the lock in order to guarantee that + // this captive core instance cannot start once the lock is released. + // catchup() and runFrom() can only execute while holding the lock and if + // the context is canceled both catchup() and runFrom() will abort early + // without performing any side effects (e.g. state mutations). + r.cancel() r.lock.Unlock() - return nil - } - if !started { - // Update processExited if handleExit that updates it not even started - // (error before command run). - r.processExited = true - } - - r.cancel() - r.lock.Unlock() + // only reap captive core sub process and related go routines if we've started + // otherwise, just cleanup the temp dir + if r.started { + // wait for the stellar core process to terminate + r.wg.Wait() - // only reap captive core sub process and related go routines if we've started - // otherwise, just cleanup the temp dir - if started { - // wait for the stellar core process to terminate - r.wg.Wait() + // drain meta pipe channel to make sure the ledger buffer goroutine exits + for range r.getMetaPipe() { - // drain meta pipe channel to make sure the ledger buffer goroutine exits - for range r.getMetaPipe() { + } + // now it's safe to close the pipe reader + // because the ledger buffer is no longer reading from it + r.pipe.Reader.Close() } - // now it's safe to close the pipe reader - // because the ledger buffer is no longer reading from it - r.pipe.Reader.Close() - } - - if r.mode != 0 && (runtime.GOOS == "windows" || - (r.processExitError != nil && r.processExitError != context.Canceled) || - r.mode == stellarCoreRunnerModeOffline) { - // It's impossible to send SIGINT on Windows so buckets can become - // corrupted. If we can't reuse it, then remove it. - // We also remove the storage path if there was an error terminating the - // process (files can be corrupted). - // We remove all files when reingesting to save disk space. - return r.systemCaller.removeAll(storagePath) - } + if r.mode != 0 && (runtime.GOOS == "windows" || + (r.processExitError != nil && r.processExitError != context.Canceled) || + r.mode == stellarCoreRunnerModeOffline) { + // It's impossible to send SIGINT on Windows so buckets can become + // corrupted. If we can't reuse it, then remove it. + // We also remove the storage path if there was an error terminating the + // process (files can be corrupted). + // We remove all files when reingesting to save disk space. + closeError = r.systemCaller.removeAll(r.storagePath) + return + } + }) - return nil + return closeError } diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 60871922b7..5bbd954844 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -50,6 +50,9 @@ func TestCloseOffline(t *testing.T) { runner.systemCaller = scMock assert.NoError(t, runner.catchup(100, 200)) + // close can be called multiple times safely + assert.NoError(t, runner.close()) + assert.NoError(t, runner.close()) assert.NoError(t, runner.close()) } @@ -133,7 +136,7 @@ func TestCloseOnlineWithError(t *testing.T) { "--metadata-output-stream", "fd:3", ).Return(cmdMock) - scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock assert.NoError(t, runner.runFrom(100, "hash")) @@ -146,6 +149,9 @@ func TestCloseOnlineWithError(t *testing.T) { } time.Sleep(10 * time.Millisecond) } + // close can be called multiple times safely + assert.NoError(t, runner.close()) + assert.NoError(t, runner.close()) assert.NoError(t, runner.close()) } @@ -300,7 +306,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { scMock := &mockSystemCaller{} defer scMock.AssertExpectations(t) // Storage dir is removed because ledgers do not match - scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("removeAll", mock.Anything).Return(nil).Once() scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", @@ -336,5 +342,8 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { runner.systemCaller = scMock assert.NoError(t, runner.runFrom(100, "hash")) + // close can be called multiple times safely + assert.NoError(t, runner.close()) + assert.NoError(t, runner.close()) assert.NoError(t, runner.close()) } From cf2ecf647ced577f28895d3cb8c56e60f9097e1f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 9 May 2024 14:42:21 +0100 Subject: [PATCH 2/3] Add test --- ingest/ledgerbackend/mock_cmd_test.go | 2 +- .../ledgerbackend/stellar_core_runner_test.go | 76 ++++++++++++++++--- 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/ingest/ledgerbackend/mock_cmd_test.go b/ingest/ledgerbackend/mock_cmd_test.go index a1d280421a..a28b6c8d01 100644 --- a/ingest/ledgerbackend/mock_cmd_test.go +++ b/ingest/ledgerbackend/mock_cmd_test.go @@ -70,7 +70,7 @@ func simpleCommandMock() *mockCmd { cmdMock.On("getStdout").Return(writer) cmdMock.On("setStderr", mock.Anything) cmdMock.On("getStderr").Return(writer) - cmdMock.On("getProcess").Return(&os.Process{}) + cmdMock.On("getProcess").Return(&os.Process{}).Maybe() cmdMock.On("setExtraFiles", mock.Anything) cmdMock.On("Start").Return(nil) return cmdMock diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 5bbd954844..bab8b0912e 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -3,6 +3,7 @@ package ledgerbackend import ( "context" "encoding/json" + "sync" "testing" "time" @@ -50,9 +51,6 @@ func TestCloseOffline(t *testing.T) { runner.systemCaller = scMock assert.NoError(t, runner.catchup(100, 200)) - // close can be called multiple times safely - assert.NoError(t, runner.close()) - assert.NoError(t, runner.close()) assert.NoError(t, runner.close()) } @@ -149,12 +147,75 @@ func TestCloseOnlineWithError(t *testing.T) { } time.Sleep(10 * time.Millisecond) } - // close can be called multiple times safely - assert.NoError(t, runner.close()) - assert.NoError(t, runner.close()) assert.NoError(t, runner.close()) } +func TestCloseConcurrency(t *testing.T) { + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + captiveCoreToml.AddExamplePubnetValidators() + + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + StoragePath: "/tmp/captive-core", + }) + + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(errors.New("wait error")).WaitUntil(time.After(time.Millisecond * 300)) + defer cmdMock.AssertExpectations(t) + + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "--console", + "run", + "--in-memory", + "--start-at-ledger", + "100", + "--start-at-hash", + "hash", + "--metadata-output-stream", + "fd:3", + ).Return(cmdMock) + scMock.On("removeAll", mock.Anything).Return(nil).Once() + runner.systemCaller = scMock + + assert.NoError(t, runner.runFrom(100, "hash")) + + // Wait with calling close until r.processExitError is set to Wait() error + for { + _, err := runner.getProcessExitError() + if err != nil { + break + } + time.Sleep(10 * time.Millisecond) + } + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, runner.close()) + exited, err := runner.getProcessExitError() + assert.True(t, exited) + assert.EqualError(t, err, "wait error") + }() + } + + wg.Wait() +} + func TestRunFromUseDBLedgersMatch(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) @@ -342,8 +403,5 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { runner.systemCaller = scMock assert.NoError(t, runner.runFrom(100, "hash")) - // close can be called multiple times safely - assert.NoError(t, runner.close()) - assert.NoError(t, runner.close()) assert.NoError(t, runner.close()) } From 166486532e6d342476e76fc0929edaa9f39408c1 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 9 May 2024 14:56:26 +0100 Subject: [PATCH 3/3] fix test --- .../ledgerbackend/stellar_core_runner_test.go | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index bab8b0912e..00cb29137b 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -47,7 +47,7 @@ func TestCloseOffline(t *testing.T) { "fd:3", "--in-memory", ).Return(cmdMock) - scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock assert.NoError(t, runner.catchup(100, 200)) @@ -179,28 +179,17 @@ func TestCloseConcurrency(t *testing.T) { "--conf", mock.Anything, "--console", - "run", - "--in-memory", - "--start-at-ledger", - "100", - "--start-at-hash", - "hash", + "catchup", + "200/101", "--metadata-output-stream", "fd:3", + "--in-memory", ).Return(cmdMock) scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock - assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.catchup(100, 200)) - // Wait with calling close until r.processExitError is set to Wait() error - for { - _, err := runner.getProcessExitError() - if err != nil { - break - } - time.Sleep(10 * time.Millisecond) - } var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) @@ -209,7 +198,7 @@ func TestCloseConcurrency(t *testing.T) { assert.NoError(t, runner.close()) exited, err := runner.getProcessExitError() assert.True(t, exited) - assert.EqualError(t, err, "wait error") + assert.Error(t, err) }() }