Skip to content

Commit

Permalink
Fix cardano-testnet cleanup logic: kill cardano-node child processes
Browse files Browse the repository at this point in the history
  • Loading branch information
errfrom committed Sep 2, 2024
1 parent fc3ae37 commit a1cea53
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 135 deletions.
12 changes: 10 additions & 2 deletions src/Internal/Spawn.purs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Ctl.Internal.Spawn
, spawn
, exec
, stop
, stopProcessWithChildren
, waitForStop
, cleanupTmpDir
, cleanupOnSigint
Expand Down Expand Up @@ -176,6 +177,13 @@ stop (ManagedProcess _ child closedAVar) = do
isAlive <- AVar.isEmpty <$> AVar.status closedAVar
when isAlive $ liftEffect $ kill SIGINT child

stopProcessWithChildren :: ManagedProcess -> Aff Unit
stopProcessWithChildren managedProc@(ManagedProcess _ proc _) = do
void $ liftEffect $ Node.ChildProcess.execSync
("pkill -TERM -P " <> show (unwrap $ Node.ChildProcess.pid proc))
Node.ChildProcess.defaultExecSyncOptions
stop managedProc

-- | Waits until the process has cleanly stopped.
waitForStop :: ManagedProcess -> Aff Unit
waitForStop (ManagedProcess cmd _ closedAVar) = do
Expand All @@ -195,12 +203,12 @@ onSignal :: Signal -> Effect Unit -> Effect OnSignalRef
onSignal sig = onSignalImpl (Signal.toString sig)

-- | Just as onSignal, but Aff.
waitForSignal :: Signal -> Aff Unit
waitForSignal :: Signal -> Aff Signal
waitForSignal signal = makeAff \cont -> do
isCanceledRef <- Ref.new false
onSignalRef <- onSignal signal
$ Ref.read isCanceledRef
>>= flip unless (cont $ Right unit)
>>= flip unless (cont $ Right signal)
pure $ Canceler \err -> liftEffect do
Ref.write true isCanceledRef
removeOnSignal onSignalRef
Expand Down
187 changes: 92 additions & 95 deletions src/Internal/Testnet/Server.purs
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,45 @@ import Ctl.Internal.Contract.Monad
, stopContractEnv
)
import Ctl.Internal.Contract.QueryBackend (mkCtlBackendParams)
import Ctl.Internal.Helpers ((<</>>))
import Ctl.Internal.Helpers (concatPaths, (<</>>))
import Ctl.Internal.Logging (Logger, mkLogger, setupLogs)
import Ctl.Internal.QueryM.UniqueId (uniqueId)
import Ctl.Internal.ServerConfig (ServerConfig)
import Ctl.Internal.Spawn
( ManagedProcess(ManagedProcess)
, NewOutputAction(NoOp, Success)
, _rmdirSync
, isPortAvailable
, killProcessWithPort
, spawn
, stop
, stopProcessWithChildren
)
import Ctl.Internal.Testnet.Types
( Node
, TestnetClusterConfig
( TestnetClusterConfig
, TestnetConfig
, TestnetPaths
)
import Ctl.Internal.Testnet.Utils
( EventSource
, TestnetCleanupRef
, addCleanup
, after
, annotateError
, findNodeDirs
, findTestnetPaths
, getRuntime
, onLine
, readNodes
, runCleanup
, scheduleCleanup
, suppressAndLogErrors
, tmpdir
, tmpdirUnique
, tryAndLogErrors
, waitFor
, waitForClose
, waitForError
, waitForEvent
, waitUntil
)
import Ctl.Internal.Types.UsedTxOuts (newUsedTxOuts)
import Data.Array (head) as Array
import Data.Log.Message (Message)
import Data.Maybe (Maybe(Nothing, Just))
import Data.Set as Set
Expand All @@ -73,22 +72,24 @@ import Data.String.Pattern (Pattern(Pattern))
import Data.Time.Duration (Milliseconds(Milliseconds))
import Data.UInt (UInt)
import Data.UInt (toString) as UInt
import Effect.Aff (Aff, launchAff_)
import Effect.Aff (Aff)
import Effect.Aff as Aff
import Effect.Aff.Class (class MonadAff)
import Effect.Aff.Retry
( RetryPolicy
, constantDelay
, limitRetriesByCumulativeDelay
, recovering
)
import Effect.Class (class MonadEffect)
import Effect.Exception (Error, error, throw)
import Effect.Ref (Ref)
import Effect.Ref (modify_, new, read, write) as Ref
import Effect.Ref (modify_, new) as Ref
import Foreign.Object as Object
import Node.ChildProcess (defaultSpawnOptions)
import Node.ChildProcess as Node.ChildProcess
import Node.Encoding (Encoding(UTF8))
import Node.FS.Sync (exists, mkdir) as FSSync
import Node.FS.Sync (readdir) as FSSync
import Node.FS.Sync as Node.FS
import Node.Path (FilePath)
import Node.Process as Node.Process
Expand Down Expand Up @@ -156,13 +157,7 @@ startKupo
-> Ref (Array (Aff Unit))
-> Aff (ManagedProcess /\ String)
startKupo cfg params cleanupRef = do
tmpDir <- liftEffect tmpdir
randomStr <- liftEffect $ uniqueId ""
let
workdir = tmpDir <</>> randomStr <> "-kupo-db"
liftEffect do
workdirExists <- FSSync.exists workdir
unless workdirExists (FSSync.mkdir workdir)
workdir <- tmpdirUnique "kupo-db"
childProcess <-
after
(spawnKupoProcess workdir)
Expand Down Expand Up @@ -283,33 +278,21 @@ startTestnetCluster cfg cleanupRef = do
pure { process: ogmios, channels: ogmiosChannels }

-- | Spawns cardano-testnet process with provided parameters.
spawnCardanoTestnet
:: { cwd :: FilePath }
-> TestnetClusterConfig
-> Aff { testnet :: ManagedProcess, workspace :: FilePath }
spawnCardanoTestnet { cwd } params = do
spawnCardanoTestnet :: FilePath -> TestnetClusterConfig -> Aff ManagedProcess
spawnCardanoTestnet workdir params = do
env <- liftEffect Node.Process.getEnv
let
env' = Object.fromFoldable
[ "CARDANO_CLI" /\ "cardano-cli"
, "CARDANO_NODE" /\ "cardano-node"
, "TMPDIR" /\ workdir
]
opts = defaultSpawnOptions
{ cwd = Just cwd
{ cwd = Just workdir
, env = Just $ Object.union env' env
, detached = true
}
workspaceRef <- liftEffect $ Ref.new mempty
ps <- spawn "cardano-testnet" options opts $
Just
( \{ line } ->
case String.stripPrefix (Pattern "Workspace: ") (String.trim line) of
Nothing -> pure NoOp
Just workspace -> do
void $ Ref.write workspace workspaceRef
pure Success
)
workspace <- liftEffect $ Ref.read workspaceRef
pure { testnet: ps, workspace }
spawn "cardano-testnet" options opts Nothing
where
flag :: String -> String
flag name = "--" <> name
Expand All @@ -330,76 +313,92 @@ spawnCardanoTestnet { cwd } params = do

startCardanoTestnet
:: TestnetClusterConfig
-> Ref (Array (Aff Unit))
-> TestnetCleanupRef
-> Aff
{ testnet :: ManagedProcess
, channels ::
{ stderr :: EventSource String
, stdout :: EventSource String
}
, workdirAbsolute :: FilePath
, nodes :: Array { | Node () }
}
startCardanoTestnet params cleanupRef = annotateError "startCardanoTestnet" do
tmpDir <- liftEffect tmpdir
{ testnet, workspace } <- spawnCardanoTestnet { cwd: tmpDir } params
workdir <- tmpdirUnique "cardano-testnet"
testnet <- scheduleCleanup
cleanupRef
(spawnCardanoTestnet workdir params)
stopProcessWithChildren
channels <- liftEffect $ getChannels testnet
workspace <- waitUntil (Milliseconds 100.0) $ findWorkspaceDir workdir
scheduleWorkspaceCleanup workspace
redirectStreams channels workspace
workspaceFromLogs <- waitForCardanoTestnetWorkspace channels.stdout
when (workspace /= workspaceFromLogs) do
runCleanup cleanupRef
throwError $ error "cardano-testnet workspace mismatch"
attachStdoutMonitors testnet
log "startCardanoTestnet:done"
pure { testnet, workdirAbsolute: workspace, channels }
where
findWorkspaceDir :: forall m. MonadEffect m => FilePath -> m (Maybe FilePath)
findWorkspaceDir workdir =
liftEffect $ map (concatPaths workdir) <<< Array.head <$>
FSSync.readdir workdir

redirectStreams :: StdStreams -> FilePath -> Aff Unit
redirectStreams channels workspace =
void $ redirectChannels channels
{ stdoutTo:
{ log: Just $ workspace <</>> "cardano-testnet.stdout.log"
, console: Nothing
}
, stderrTo:
{ log: Just $ workspace <</>> "cardano-testnet.stderr.log"
, console: Nothing
}
}

void $ Aff.forkAff $ annotateError "startCardanoTestnet:waitForErrorOrClose"
do
let
waitError = Just <$> waitForError testnet
waitClose = Nothing <$ waitForClose testnet
cause <- waitError <|> waitClose
runCleanup cleanupRef
throwError $ fromMaybe (error "cardano-testnet process has exited") cause

nodes <-
waitUntil (Milliseconds 3000.0) $ liftEffect do
hush <$> tryAndLogErrors "startCardanoTestnet:waitForNodes" do
nodeDirs <- findNodeDirs { workdir: workspace }
readNodes { testnetDirectory: workspace, nodeDirs }

liftEffect $
for_ nodes \{ port } ->
addCleanup cleanupRef (killProcessWithPort port)

-- clean up on SIGINT
do
shouldCleanup <- liftEffect
$ Node.Process.lookupEnv "TESTNET_CLEANUP_WORKDIR"
<#> case _ of
Just "0" -> false
_ -> true
when shouldCleanup
$ liftEffect
$ addCleanup cleanupRef
$ liftEffect do
log $ "Cleaning up workdir: " <> workspace
waitForCardanoTestnetWorkspace
:: forall m
. MonadAff m
=> EventSource String
-> m FilePath
waitForCardanoTestnetWorkspace =
liftAff
<<< flip waitFor
(String.stripPrefix (Pattern "Workspace: ") <<< String.trim)

attachStdoutMonitors :: ManagedProcess -> Aff Unit
attachStdoutMonitors testnet =
void $ Aff.forkAff $
annotateError "startCardanoTestnet:attachStdoutMonitors" do
let
waitError = Just <$> waitForError testnet
waitClose = Nothing <$ waitForClose testnet
cause <- waitError <|> waitClose
runCleanup cleanupRef
throwError $ fromMaybe (error "cardano-testnet process has exited")
cause

scheduleWorkspaceCleanup :: forall m. MonadEffect m => FilePath -> m Unit
scheduleWorkspaceCleanup workspace =
liftEffect do
shouldCleanup <-
Node.Process.lookupEnv "TESTNET_CLEANUP_WORKDIR" <#>
case _ of
Just "0" -> false
_ -> true
when shouldCleanup do
addCleanup cleanupRef $ liftEffect do
log $ "Cleaning up cardano-testnet workspace: " <> workspace
_rmdirSync workspace
launchAff_ $ stop testnet

_ <- redirectChannels
{ stderr: channels.stderr, stdout: channels.stdout }
{ stdoutTo:
{ log: Just $ workspace <</>> "cardano-testnet.stdout.log"
, console: Nothing
}
, stderrTo:
{ log: Just $ workspace <</>> "cardano-testnet.stderr.log"
, console: Nothing
}
}

log "startCardanoTestnet:done"
pure { testnet, workdirAbsolute: workspace, channels, nodes }
type StdStreams =
{ stderr :: EventSource String
, stdout :: EventSource String
}

getChannels
:: ManagedProcess
-> Effect
{ stderr :: EventSource String
, stdout :: EventSource String
}
getChannels :: ManagedProcess -> Effect StdStreams
getChannels (ManagedProcess _ process _) = ado
stdout <- onLine (Node.ChildProcess.stdout process) Just
stderr <- onLine (Node.ChildProcess.stderr process) Just
Expand All @@ -408,9 +407,7 @@ getChannels (ManagedProcess _ process _) = ado
-- Note: it will not throw, so to check the computation result
-- Fiber must be inspected.
redirectChannels
:: { stderr :: EventSource String
, stdout :: EventSource String
}
:: StdStreams
-> { stderrTo :: { log :: Maybe FilePath, console :: Maybe String }
, stdoutTo :: { log :: Maybe FilePath, console :: Maybe String }
}
Expand Down
Loading

0 comments on commit a1cea53

Please sign in to comment.