Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cardano-testnet: Waiting for blocks using EpochStateView. #5836

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cardano-testnet/cardano-testnet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ library
, safe-exceptions
, scientific
, si-timers
, stm
, stm > 2.5.1
, tasty ^>= 1.5
, tasty-expected-failure
, tasty-hedgehog
Expand Down
145 changes: 126 additions & 19 deletions cardano-testnet/src/Testnet/Components/Query.hs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Testnet.Components.Query
( EpochStateView
, checkDRepsNumber
, checkDRepState
, getEpochStateView
, getEpochState
, getSlotNumber
, getBlockNumber
, watchEpochStateUpdate
, getMinDRepDeposit
, getMinGovActionDeposit
, getGovState
, getCurrentEpochNo
, waitUntilEpoch
, waitForEpochs
, getEpochStateView
, waitForBlocks
, findAllUtxos
, findUtxosWithAddress
, findLargestUtxoWithAddress
, findLargestUtxoForPaymentKey
, checkDRepsNumber
, checkDRepState
) where

import Cardano.Api as Api
Expand All @@ -34,11 +39,15 @@ import qualified Cardano.Ledger.Conway.PParams as L
import qualified Cardano.Ledger.Shelley.LedgerState as L
import qualified Cardano.Ledger.UTxO as L

import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TMVar
import Control.Exception.Safe (MonadCatch)
import Control.Monad
import Control.Monad.STM
import Control.Monad.Trans.Maybe (MaybeT (..))
import Control.Monad.Trans.Resource
import Control.Monad.Trans.State.Strict (put)
import Data.Bifunctor (bimap)
import Data.IORef
import Data.List (sortOn)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
Expand All @@ -48,6 +57,7 @@ import Data.Ord (Down (..))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Type.Equality
import Data.Word (Word64)
import GHC.Exts (IsList (..))
import GHC.Stack
import Lens.Micro (to, (^.))
Expand Down Expand Up @@ -98,25 +108,117 @@ waitForEpochs epochStateView@EpochStateView{nodeConfigPath, socketPath} interval
currentEpoch <- getCurrentEpochNo epochStateView
waitUntilEpoch nodeConfigPath socketPath $ addEpochInterval currentEpoch interval


-- | Wait for the requested number of blocks
waitForBlocks
:: HasCallStack
=> MonadIO m
=> MonadTest m
=> MonadAssertion m
=> EpochStateView
-> Word64 -- ^ Number of blocks to wait
-> m BlockNo -- ^ The block number reached
waitForBlocks epochStateView numberOfBlocks = withFrozenCallStack $ do
BlockNo startingBlockNumber <- getBlockNumber epochStateView
fmap BlockNo $
watchEpochStateUpdate epochStateView $ \(_, _, BlockNo blockNumber) ->
pure $
if blockNumber >= startingBlockNumber + numberOfBlocks
then Just blockNumber
else Nothing

-- | A read-only mutable pointer to an epoch state, updated automatically
data EpochStateView = EpochStateView
{ nodeConfigPath :: !(NodeConfigFile In)
-- ^ node configuration file path
, socketPath :: !SocketPath
, epochStateView :: !(IORef (Maybe AnyNewEpochState))
-- ^ node socket path, to which foldEpochState is connected to
, wakeLock :: !(TChan ())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was wrong with the IORef? If we are going to use TChans and TMVars there needs to be good justification for doing so. I don't immediately see the benefit of this increased complexity in an already tedious testing environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is to listen to changes of EpochStateView. You can't do this just using IORef. You could do this using MVar, and having watching threads block on empty MVar and continue when MVar gets filled. But then you still need additional IORef for functions checking for current EpochStateView without blocking. The problem with using MVar and IORef at the same time, is that you don't have any guarantees about the order of operations in the listening thread. In other words, If you update IORef and MVar in order in one thread, the other threads may see it in a different order. This means that listeners waiting on MVar () may see stale IORef value.

I've chosen STM which gives you a single view on both of variables, but you don't have MVar analogue for waiting in multiple threads for an update of a single MVar. (TMVar is only single-wakeup).

If we want to avoid STM here, we can define view and lock:

, wakeLock :: !(MVar (AnyNewEpochState, SlotNo, BlockNo))
, epochStateView :: !(IORef (AnyNewEpochState, SlotNo, BlockNo))

assuming we won't use one value to check the other, because they can be out-of-sync.

Copy link
Contributor

@Jimbo4350 Jimbo4350 May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to notify the listeners about changes? I could understand wanting to notify listeners of changes if the changes are short lived but they are not (or at least I can't think of any that are). What problem is this solving? Why specifically do we need this level of granularity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an alternative to watchEpochStateView which is polling every 100ms EpochStateView. Sometimes new epoch state is updated much slower, like once every few seconds. So it saves some CPU on needlessly computing the check again and again on the same input.

Copy link
Contributor

@Jimbo4350 Jimbo4350 May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the additional complexity is not worth it. We should be using the atomic* versions of the IORef functions to avoid race conditions/partial writes etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you can't implement blocking and waiting or multi-wakeup using just IORef.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you feel that MVars are too much, I can rewrite this PR using polling in watchEpochStateView.

Copy link
Contributor

@Jimbo4350 Jimbo4350 May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you can't implement blocking and waiting or multi-wakeup using just IORef.

Why do we need this? If the goal is to "listen to changes of EpochStateView" and your reason is:

It's an alternative to watchEpochStateView which is polling every 100ms EpochStateView. Sometimes new epoch state is updated much slower, like once every few seconds. So it saves some CPU on needlessly computing the check again and again on the same input.

I say the complexity is not worth it. The changes we are interested in don't require this level of granularity. Saving a little bit of CPU does not justify the additional complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting on MVar is not that much more complex than polling. I'll rewrite the PR using polling then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one I wrote already uses polling, you can use that one, or tune it

-- ^ multi-wakeup lock for notifying about epoch state updates. All listeners need to 'dupTChan' and then
-- 'readTChan' to be notified.
, epochStateView :: !(TMVar (AnyNewEpochState, SlotNo, BlockNo))
-- ^ Updated automatically current NewEpochState. Use 'getEpochState' to access the value.
}

-- | Notify epoch state view listeners about the update of the epoch state
notifyEpochStateViewListeners
:: MonadIO m
=> EpochStateView
-> m ()
notifyEpochStateViewListeners EpochStateView{wakeLock} =
void . liftIO . atomically $ do
-- Drain the channel first, to not keep anything not used in memory. We only need to store one element in
-- the channel. This is a safeguard against elements piling up in the channel If there are no listeners.
_ <- runMaybeT . forever . MaybeT $ tryReadTChan wakeLock
-- notify all listeners on duplicated channels
writeTChan wakeLock ()

-- | Watch epoch state view for an update. On every update, the callback function gets executed.
watchEpochStateUpdate
:: HasCallStack
=> MonadTest m
=> MonadIO m
=> EpochStateView
-> ((AnyNewEpochState, SlotNo, BlockNo) -> m (Maybe a))
-- ^ callback function invoked repeatedly. Stops the processing when 'Just a' gets returned
-> m a -- ^ the result from the callback function
watchEpochStateUpdate EpochStateView{wakeLock, epochStateView} f = withFrozenCallStack $
-- dupTChan for multi-wakeup
go =<< (liftIO . atomically $ dupTChan wakeLock)
where
go wakeupChannel = do
newEpochState <- liftIO . atomically $ do
_ <- readTChan wakeupChannel -- block and wait for update
readTMVar epochStateView
f newEpochState >>= \case
Nothing -> go wakeupChannel
Just a -> pure a

-- | Get epoch state from the view. If the state isn't available, retry waiting up to 15 seconds. Fails when
-- the state is not available after 15 seconds.
getEpochState :: MonadTest m
=> MonadAssertion m
=> MonadIO m
=> EpochStateView
-> m AnyNewEpochState
getEpochState EpochStateView{epochStateView} =
getEpochState
:: HasCallStack
=> MonadTest m
=> MonadAssertion m
=> MonadIO m
=> EpochStateView
-> m AnyNewEpochState
getEpochState epochStateView =
withFrozenCallStack $ getEpochStateDetails epochStateView $ \(nes, _, _) -> nes

getBlockNumber
:: HasCallStack
=> MonadIO m
=> MonadTest m
=> MonadAssertion m
=> EpochStateView
-> m BlockNo -- ^ The number of last produced block
getBlockNumber epochStateView =
withFrozenCallStack $ getEpochStateDetails epochStateView $ \(_, _, blockNumber) -> blockNumber

getSlotNumber
:: HasCallStack
=> MonadIO m
=> MonadTest m
=> MonadAssertion m
=> EpochStateView
-> m SlotNo -- ^ The current slot number
getSlotNumber epochStateView =
withFrozenCallStack $ getEpochStateDetails epochStateView $ \(_, slotNumber, _) -> slotNumber

-- | Utility function for accessing epoch state in `TVar`
getEpochStateDetails
:: HasCallStack
=> MonadAssertion m
=> MonadTest m
=> MonadIO m
=> EpochStateView
-> ((AnyNewEpochState, SlotNo, BlockNo) -> a)
-> m a
getEpochStateDetails EpochStateView{epochStateView} f =
withFrozenCallStack $
H.byDurationM 0.5 15 "EpochStateView has not been initialized within 15 seconds" $
H.evalIO (readIORef epochStateView) >>= maybe H.failure pure

H.evalIO (atomically $ tryReadTMVar epochStateView) >>= maybe H.failure (pure . f)

-- | Create a background thread listening for new epoch states. New epoch states are available to access
-- through 'EpochStateView', using query functions.
Expand All @@ -129,12 +231,17 @@ getEpochStateView
-> SocketPath -- ^ node socket path
-> m EpochStateView
getEpochStateView nodeConfigFile socketPath = withFrozenCallStack $ do
epochStateView <- H.evalIO $ newIORef Nothing
epochStateView <- H.evalIO newEmptyTMVarIO
-- we're not using 'newBroadcastTChan' here, because we don't know if we will have any clients here, so we
-- have to manually read and write a value to the channel, triggering multi-wakeup on listeners on dup-chans
wakeLock <- H.evalIO newTChanIO
let esv = EpochStateView nodeConfigFile socketPath wakeLock epochStateView
runInBackground . runExceptT . foldEpochState nodeConfigFile socketPath QuickValidation (EpochNo maxBound) Nothing
$ \epochState _slotNb _blockNb -> do
liftIO $ writeIORef epochStateView (Just epochState)
$ \epochState slotNumber blockNumber -> do
liftIO . atomically $ writeTMVar epochStateView (epochState, slotNumber, blockNumber)
notifyEpochStateViewListeners esv
pure ConditionNotMet
pure $ EpochStateView nodeConfigFile socketPath epochStateView
pure esv

-- | Retrieve all UTxOs map from the epoch state view.
findAllUtxos
Expand Down Expand Up @@ -204,7 +311,7 @@ findLargestUtxoWithAddress epochStateView sbe address = withFrozenCallStack $ do
$ sortOn (\(_, TxOut _ txOutValue _ _) -> Down $ txOutValueToLovelace txOutValue) utxos

-- | Retrieve a largest UTxO for a payment key info - a convenience wrapper for
-- 'findLargestUtxoForPaymentKey'.
-- 'findLargestUtxoWithAddress'.
findLargestUtxoForPaymentKey
:: MonadTest m
=> MonadAssertion m
Expand Down Expand Up @@ -262,7 +369,7 @@ checkDRepState epochStateView@EpochStateView{nodeConfigPath, socketPath} sbe f =
currentEpoch <- getCurrentEpochNo epochStateView
let terminationEpoch = succ . succ $ currentEpoch
result <- H.evalIO . runExceptT $ foldEpochState nodeConfigPath socketPath QuickValidation terminationEpoch Nothing
$ \(AnyNewEpochState actualEra newEpochState) _slotNb _blockNb -> do
$ \(AnyNewEpochState actualEra newEpochState) _slotNumber _blockNumber -> do
Refl <- either error pure $ assertErasEqual sbe actualEra
let dreps = shelleyBasedEraConstraints sbe newEpochState
^. L.nesEsL
Expand Down
36 changes: 1 addition & 35 deletions cardano-testnet/src/Testnet/EpochStateProcessing.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

module Testnet.EpochStateProcessing
( maybeExtractGovernanceActionIndex
, findCondition
, watchEpochStateView
) where

import Cardano.Api
import Cardano.Api.Ledger (EpochInterval (..), GovActionId (..))
import Cardano.Api.Ledger (GovActionId (..))
import qualified Cardano.Api.Ledger as L

import qualified Cardano.Ledger.Conway.Governance as L
Expand All @@ -25,11 +23,7 @@ import Data.Word (Word32)
import GHC.Stack
import Lens.Micro ((^.))

import Testnet.Components.Query (EpochStateView, getEpochState)

import Hedgehog
import Hedgehog.Extras (MonadAssertion)
import qualified Hedgehog.Extras as H

findCondition
:: HasCallStack
Expand Down Expand Up @@ -78,31 +72,3 @@ maybeExtractGovernanceActionIndex txid (AnyNewEpochState sbe newEpochState) =
| ti1 == L.extractHash ti2 = Just gai
compareWithTxId _ x _ _ = x

-- | Watch the epoch state view until the guard function returns 'Just' or the timeout epoch is reached.
-- Wait for at most @maxWait@ epochs.
-- The function will return the result of the guard function if it is met, otherwise it will return @Nothing@.
watchEpochStateView
:: forall m a. (HasCallStack, MonadIO m, MonadTest m, MonadAssertion m)
=> EpochStateView -- ^ The info to access the epoch state
-> (AnyNewEpochState -> m (Maybe a)) -- ^ The guard function (@Just@ if the condition is met, @Nothing@ otherwise)
-> EpochInterval -- ^ The maximum number of epochs to wait
-> m (Maybe a)
watchEpochStateView epochStateView f (EpochInterval maxWait) = withFrozenCallStack $ do
AnyNewEpochState _ newEpochState <- getEpochState epochStateView
let EpochNo currentEpoch = L.nesEL newEpochState
go (EpochNo $ currentEpoch + fromIntegral maxWait)
where
go :: EpochNo -> m (Maybe a)
go (EpochNo timeout) = do
epochState@(AnyNewEpochState _ newEpochState') <- getEpochState epochStateView
let EpochNo currentEpoch = L.nesEL newEpochState'
condition <- f epochState
case condition of
Just result -> pure (Just result)
Nothing -> do
if currentEpoch > timeout
then pure Nothing
else do
H.threadDelay 100_000
go (EpochNo timeout)

16 changes: 8 additions & 8 deletions cardano-testnet/src/Testnet/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ data TestnetRuntime = TestnetRuntime
poolSprockets :: TestnetRuntime -> [Sprocket]
poolSprockets = fmap (nodeSprocket . poolRuntime) . poolNodes

data PoolNode = PoolNode
{ poolRuntime :: NodeRuntime
, poolKeys :: PoolNodeKeys
}

poolNodeStdout :: PoolNode -> FilePath
poolNodeStdout = nodeStdout . poolRuntime

data NodeRuntime = NodeRuntime
{ nodeName :: !String
, nodeIpv4 :: !Text
Expand All @@ -119,14 +127,6 @@ data NodeRuntime = NodeRuntime
nodeSocketPath :: NodeRuntime -> SocketPath
nodeSocketPath = File . H.sprocketSystemName . nodeSprocket

data PoolNode = PoolNode
{ poolRuntime :: NodeRuntime
, poolKeys :: PoolNodeKeys
}

poolNodeStdout :: PoolNode -> FilePath
poolNodeStdout = nodeStdout . poolRuntime

data ColdPoolKey
data StakingKey
data SpoColdKey
Expand Down
Loading
Loading