From 3724987ad9c63759a185129aeaba84eb30c1ca44 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Mon, 11 Dec 2023 19:18:57 +0100 Subject: [PATCH 01/13] Experiment with IOSimPOR This small experiment demonstrates how to detect "race conditions" when some atomic result is expected, the idea is to use a TVar to simulate files operations and show the consequences of non-atomic writes and reads on messages resending. --- .../test/Hydra/Network/ReliabilitySpec.hs | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index 17804343193..db5f1ce34ab 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -11,7 +11,10 @@ import Control.Concurrent.Class.MonadSTM ( newTVarIO, writeTVar, ) -import Control.Monad.IOSim (runSimOrThrow) +import Control.Monad.Class.MonadFork (forkIO) +import Control.Monad.Class.MonadSay (say) +import Control.Monad.Class.MonadTest (exploreRaces) +import Control.Monad.IOSim (ExplorationOptions (..), IOSim, exploreSimTrace, runSimOrThrow, selectTraceEventsSay, traceEvents) import Control.Tracer (Tracer (..), nullTracer) import Data.Sequence.Strict ((|>)) import Data.Vector (Vector, empty, fromList, head, replicate, snoc) @@ -33,6 +36,7 @@ import System.Random (mkStdGen, uniformR) import Test.Hydra.Fixture (alice, bob, carol) import Test.QuickCheck ( Positive (Positive), + Property, collect, counterexample, generate, @@ -167,6 +171,8 @@ spec = parallel $ do receivedMsgs `shouldBe` [ReliableMsg (fromList [1, 1]) (Data "node-1" msg)] + prop "runs IOSimPOR" $ testIOSimPOR + it "appends messages to disk and can load them back" $ do withTempDir "network-messages-persistence" $ \tmpDir -> do Persistence{load, save} <- createPersistence $ tmpDir <> "/acks" @@ -245,6 +251,25 @@ spec = parallel $ do writeTVar seed' newGenSeed pure res +testIOSimPOR :: Property +testIOSimPOR = + exploreSimTrace + id + sim + ( \_ tr -> + let says = selectTraceEventsSay tr + in says == ["foobar"] || says == [""] + & counterexample (unlines $ map show $ traceEvents tr) + ) + where + sim :: IOSim s () + sim = do + exploreRaces + var <- newTVarIO "" + forkIO (atomically (writeTVar var "foo") >> atomically (modifyTVar' var (<> "bar"))) + forkIO (readTVarIO var >>= say) + threadDelay 1 + noop :: Monad m => b -> m () noop = const $ pure () From 22571b2bf03637faa9d29dd4f0ede815de77a856 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Tue, 12 Dec 2023 11:14:57 +0100 Subject: [PATCH 02/13] Introducing stress test for network persistence This test is not very satisfying but for now I cannot come up with a better idea. It shows that with large messages (~10kB) there's exception thrown but the peers can get stuck, which is a different problem than the one observed in production. --- .../test/Hydra/Network/ReliabilitySpec.hs | 89 +++++++++++++++++-- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index db5f1ce34ab..d0f80c72003 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -16,9 +16,12 @@ import Control.Monad.Class.MonadSay (say) import Control.Monad.Class.MonadTest (exploreRaces) import Control.Monad.IOSim (ExplorationOptions (..), IOSim, exploreSimTrace, runSimOrThrow, selectTraceEventsSay, traceEvents) import Control.Tracer (Tracer (..), nullTracer) +import Data.ByteString qualified as BS import Data.Sequence.Strict ((|>)) +import Data.Text qualified as Text import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector +import Hydra.Logging (showLogsOnFailure, withTracerOutputTo) import Hydra.Network (Network (..)) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) @@ -37,14 +40,18 @@ import Test.Hydra.Fixture (alice, bob, carol) import Test.QuickCheck ( Positive (Positive), Property, + arbitraryPrintableChar, collect, counterexample, generate, + resize, tabulate, + vectorOf, within, (===), ) -import Prelude (unlines) +import Test.QuickCheck.Monadic (assert, monadicIO, monitor, pick, run) +import Prelude (unlines, userError) spec :: Spec spec = parallel $ do @@ -129,8 +136,8 @@ spec = parallel $ do aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob) bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice) - bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork emittedTraces "bob" bob [alice] - aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork emittedTraces "alice" alice [bob] + bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice] + aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob] runAlice = runPeer aliceReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages runBob = runPeer bobReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages @@ -149,6 +156,54 @@ spec = parallel $ do & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] & tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] + modifyMaxSuccess (const 10) $ + prop "stress test networking persistence layer" $ + monadicIO $ do + seed <- pick arbitrary + aliceToBobMessages :: [TestMsg] <- pickBlind (resize 100 $ arbitrary) + bobToAliceMessages :: [TestMsg] <- pickBlind (resize 100 $ arbitrary) + result <- run $ + withTempDir "network-messages-persistence" $ \tmpDir -> do + let logFile = tmpDir "nodes.log" + withFile logFile ReadWriteMode $ \hdl -> + race + (threadDelay 60 >> throwIO (userError $ "timeout exhausted, log file written to :" <> logFile)) + ( withTracerOutputTo hdl "network persistence" $ \tracer -> do + messagesReceivedByBob <- newTVarIO empty + messagesReceivedByAlice <- newTVarIO empty + randomSeed <- newTVarIO $ mkStdGen seed + aliceToBob <- newTQueueIO + bobToAlice <- newTQueueIO + alicePersistence <- realPersistenceFor "alice" tmpDir + bobPersistence <- realPersistenceFor "bob" tmpDir + + let + -- this is a NetworkComponent that broadcasts authenticated messages + -- mediated through a read and a write TQueue but drops 0.2 % of them + aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob) + bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice) + + bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork tracer "bob" bob [alice] + aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork tracer "alice" alice [bob] + + runAlice = runPeer aliceReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages + runBob = runPeer bobReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages + + concurrently_ runAlice runBob + + aliceReceived <- Vector.toList <$> readTVarIO messagesReceivedByAlice + bobReceived <- Vector.toList <$> readTVarIO messagesReceivedByBob + pure (aliceReceived, bobReceived) + ) + + case result of + Right (msgReceivedByAlice, msgReceivedByBob) -> do + assert $ msgReceivedByBob == aliceToBobMessages + assert $ msgReceivedByAlice == bobToAliceMessages + monitor $ tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] + monitor $ tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] + Left () -> assert False + it "broadcast updates counter from peers" $ do let receivedMsgs = runSimOrThrow $ do sentMessages <- newTVarIO empty @@ -226,10 +281,10 @@ spec = parallel $ do (waitForAllMessages expectedMessages receivedMessageContainer) (waitForAllMessages messagesToSend sentMessageContainer) - reliabilityStack persistence underlyingNetwork tracesContainer nodeId party peers = + reliabilityStack persistence underlyingNetwork tracer nodeId party peers = withHeartbeat nodeId noop $ withFlipHeartbeats $ - withReliability (captureTraces tracesContainer) persistence party peers underlyingNetwork + withReliability tracer persistence party peers underlyingNetwork failingNetwork seed peer (readQueue, writeQueue) callback action = withAsync @@ -326,3 +381,27 @@ mockMessagePersistence numberOfParties = do , loadMessages = toList <$> readTVarIO messages , appendMessage = \msg -> atomically $ modifyTVar' messages (|> msg) } + +realPersistenceFor :: (FromJSON msg, ToJSON msg) => String -> FilePath -> IO (MessagePersistence IO msg) +realPersistenceFor actor tmpDir = do + Persistence{load, save} <- createPersistence $ tmpDir actor "acks" + PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir actor "network-messages" + + pure $ + MessagePersistence + { loadAcks = do + mloaded <- load + case mloaded of + Nothing -> pure $ replicate (length [alice, bob]) 0 + Just acks -> pure acks + , saveAcks = save + , loadMessages = loadAll + , appendMessage = append + } + +newtype TestMsg = T Text + deriving newtype (Eq, Show, ToJSON, FromJSON) + +instance Arbitrary TestMsg where + arbitrary = + T . Text.pack <$> vectorOf 10000 arbitraryPrintableChar From 48d5595c5db2a709fe01822b553150e408620c00 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Tue, 12 Dec 2023 23:00:50 +0100 Subject: [PATCH 03/13] Unify terminology in log messages and use punning We try to improve naming the log messages' fields, make it more consistent between different messages, and replace field accessor function and positional construction with proper field name punning. --- hydra-node/src/Hydra/Network/Reliability.hs | 62 +++++++++---------- .../test/Hydra/Network/ReliabilitySpec.hs | 37 ++--------- 2 files changed, 37 insertions(+), 62 deletions(-) diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index dfc664793be..53e7653c117 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -116,13 +116,13 @@ data ReliableMsg msg = ReliableMsg -- ^ Vector of highest known counter for each known party. Serves as announcement of -- which messages the sender of `ReliableMsg` has seen. The individual counters have -- nothing to do with the `message` also included in this. - , message :: msg + , payload :: msg } deriving stock (Eq, Show, Generic) deriving anyclass (ToJSON, FromJSON) instance ToCBOR msg => ToCBOR (ReliableMsg msg) where - toCBOR ReliableMsg{knownMessageIds, message} = toCBOR knownMessageIds <> toCBOR message + toCBOR ReliableMsg{knownMessageIds, payload} = toCBOR knownMessageIds <> toCBOR payload instance FromCBOR msg => FromCBOR (ReliableMsg msg) where fromCBOR = ReliableMsg <$> fromCBOR <*> fromCBOR @@ -135,11 +135,11 @@ instance ToCBOR msg => SignableRepresentation (ReliableMsg msg) where -- __NOTE__: Log items are documented in a YAML schema file which is not -- currently public, but should be. data ReliabilityLog - = Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, partyIndex :: Int} - | BroadcastCounter {partyIndex :: Int, localCounter :: Vector Int} - | BroadcastPing {partyIndex :: Int, localCounter :: Vector Int} - | Received {acknowledged :: Vector Int, localCounter :: Vector Int, partyIndex :: Int} - | Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, partyIndex :: Int} + = Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int} + | BroadcastCounter {ourIndex :: Int, localCounter :: Vector Int} + | BroadcastPing {ourIndex :: Int, localCounter :: Vector Int} + | Received {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int} + | Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int} | ReliabilityFailedToFindMsg { missingMsgIndex :: Int , sentMessagesLength :: Int @@ -238,16 +238,16 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa { broadcast = \msg -> case msg of Data{} -> do - newAckCounter <- incrementAckCounter + localCounter <- incrementAckCounter appendMessage msg - saveAcks newAckCounter - traceWith tracer (BroadcastCounter ourIndex newAckCounter) - broadcast $ ReliableMsg newAckCounter msg + saveAcks localCounter + traceWith tracer BroadcastCounter{ourIndex, localCounter} + broadcast $ ReliableMsg localCounter msg Ping{} -> do - acks <- readTVarIO acksCache - saveAcks acks - traceWith tracer (BroadcastPing ourIndex acks) - broadcast $ ReliableMsg acks msg + localCounter <- readTVarIO acksCache + saveAcks localCounter + traceWith tracer BroadcastPing{ourIndex, localCounter} + broadcast $ ReliableMsg localCounter msg } where incrementAckCounter = atomically $ do @@ -256,24 +256,24 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa writeTVar acksCache newAcks pure newAcks - reliableCallback acksCache resend ourIndex (Authenticated (ReliableMsg acks msg) party) = do - if length acks /= length allParties + reliableCallback acksCache resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do + if length acknowledged /= length allParties then traceWith tracer ReceivedMalformedAcks { fromParty = party - , partyAcks = acks + , partyAcks = acknowledged , numberOfParties = length allParties } else do eShouldCallbackWithKnownAcks <- atomically $ runMaybeT $ do loadedAcks <- lift $ readTVar acksCache partyIndex <- hoistMaybe $ findPartyIndex party - messageAckForParty <- hoistMaybe (acks !? partyIndex) + messageAckForParty <- hoistMaybe (acknowledged !? partyIndex) knownAckForParty <- hoistMaybe $ loadedAcks !? partyIndex if - | isPing msg -> + | isPing payload -> -- we do not update indices on Pings but we do propagate them return (True, partyIndex, loadedAcks) | messageAckForParty == knownAckForParty + 1 -> do @@ -286,15 +286,15 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa return (False, partyIndex, loadedAcks) case eShouldCallbackWithKnownAcks of - Just (shouldCallback, partyIndex, knownAcks) -> do + Just (shouldCallback, theirIndex, localCounter) -> do if shouldCallback then do - callback (Authenticated msg party) - traceWith tracer (Received acks knownAcks partyIndex) - else traceWith tracer (Ignored acks knownAcks partyIndex) + callback Authenticated{payload, party} + traceWith tracer Received{acknowledged, localCounter, theirIndex, ourIndex} + else traceWith tracer Ignored{acknowledged, localCounter, theirIndex, ourIndex} - when (isPing msg) $ - resendMessagesIfLagging resend partyIndex knownAcks acks ourIndex + when (isPing payload) $ + resendMessagesIfLagging resend theirIndex localCounter acknowledged ourIndex Nothing -> pure () constructAcks acks wantedIndex = @@ -302,8 +302,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa partyIndexes = generate (length allParties) id - resendMessagesIfLagging resend partyIndex knownAcks messageAcks myIndex = do - let mmessageAckForUs = messageAcks !? myIndex + resendMessagesIfLagging resend theirIndex knownAcks acknowledged myIndex = do + let mmessageAckForUs = acknowledged !? myIndex let mknownAckForUs = knownAcks !? myIndex case (mmessageAckForUs, mknownAckForUs) of (Just messageAckForUs, Just knownAckForUs) -> @@ -324,9 +324,9 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa , messageAckForUs = messageAckForUs } Just missingMsg -> do - let newAcks' = zipWith (\ack i -> if i == myIndex then idx else ack) knownAcks partyIndexes - traceWith tracer (Resending missing messageAcks newAcks' partyIndex) - atomically $ resend $ ReliableMsg newAcks' missingMsg + let localCounter = zipWith (\ack i -> if i == myIndex then idx else ack) knownAcks partyIndexes + traceWith tracer Resending{missing, acknowledged, localCounter, theirIndex} + atomically $ resend $ ReliableMsg localCounter missingMsg _ -> pure () -- Find the index of a party in the list of all parties. diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index d0f80c72003..bdcd0904e44 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -11,17 +11,13 @@ import Control.Concurrent.Class.MonadSTM ( newTVarIO, writeTVar, ) -import Control.Monad.Class.MonadFork (forkIO) -import Control.Monad.Class.MonadSay (say) -import Control.Monad.Class.MonadTest (exploreRaces) -import Control.Monad.IOSim (ExplorationOptions (..), IOSim, exploreSimTrace, runSimOrThrow, selectTraceEventsSay, traceEvents) +import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (Tracer (..), nullTracer) -import Data.ByteString qualified as BS import Data.Sequence.Strict ((|>)) import Data.Text qualified as Text import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector -import Hydra.Logging (showLogsOnFailure, withTracerOutputTo) +import Hydra.Logging (withTracerOutputTo) import Hydra.Network (Network (..)) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) @@ -39,7 +35,6 @@ import System.Random (mkStdGen, uniformR) import Test.Hydra.Fixture (alice, bob, carol) import Test.QuickCheck ( Positive (Positive), - Property, arbitraryPrintableChar, collect, counterexample, @@ -98,7 +93,8 @@ spec = parallel $ do receivedMessagesInOrder messageReceived = let refMessages = Data "node-2" <$> [1 ..] - in all (`elem` refMessages) (payload <$> messageReceived) + isInMessage Authenticated{payload} = payload `elem` refMessages + in all isInMessage messageReceived in receivedMessagesInOrder propagatedMessages & counterexample (show propagatedMessages) @@ -156,7 +152,7 @@ spec = parallel $ do & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] & tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] - modifyMaxSuccess (const 10) $ + modifyMaxSuccess (const 1) $ prop "stress test networking persistence layer" $ monadicIO $ do seed <- pick arbitrary @@ -226,8 +222,6 @@ spec = parallel $ do receivedMsgs `shouldBe` [ReliableMsg (fromList [1, 1]) (Data "node-1" msg)] - prop "runs IOSimPOR" $ testIOSimPOR - it "appends messages to disk and can load them back" $ do withTempDir "network-messages-persistence" $ \tmpDir -> do Persistence{load, save} <- createPersistence $ tmpDir <> "/acks" @@ -306,25 +300,6 @@ spec = parallel $ do writeTVar seed' newGenSeed pure res -testIOSimPOR :: Property -testIOSimPOR = - exploreSimTrace - id - sim - ( \_ tr -> - let says = selectTraceEventsSay tr - in says == ["foobar"] || says == [""] - & counterexample (unlines $ map show $ traceEvents tr) - ) - where - sim :: IOSim s () - sim = do - exploreRaces - var <- newTVarIO "" - forkIO (atomically (writeTVar var "foo") >> atomically (modifyTVar' var (<> "bar"))) - forkIO (readTVarIO var >>= say) - threadDelay 1 - noop :: Monad m => b -> m () noop = const $ pure () @@ -353,7 +328,7 @@ captureIncoming receivedMessages msg = atomically $ modifyTVar' receivedMessages (`snoc` msg) capturePayload :: MonadSTM m => TVar m (Vector msg) -> Authenticated (Heartbeat msg) -> m () -capturePayload receivedMessages message = case payload message of +capturePayload receivedMessages Authenticated{payload} = case payload of Data _ msg -> atomically $ modifyTVar' receivedMessages (`snoc` msg) _ -> pure () From 9e1d747f030a7dec84dd61d54c00512c8e368125 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 10:36:56 +0100 Subject: [PATCH 04/13] Prevent concurrent loading and appending in PersistenceIncremental --- hydra-node/src/Hydra/Node/Network.hs | 2 +- hydra-node/src/Hydra/Persistence.hs | 19 ++++++++++++++--- hydra-node/test/Hydra/PersistenceSpec.hs | 27 +++++++++++++++++++++--- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index d6c0d4776bc..6197196cb6e 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -143,7 +143,7 @@ withNetwork tracer connectionMessages configuration callback action = do -- * Some state already exists and is loaded, -- * The number of parties is not the same as the number of acknowledgments saved. configureMessagePersistence :: - (MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg) => + (MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m, MonadThread m, MonadThrow (STM m)) => Tracer m (HydraNodeLog tx) -> FilePath -> Int -> diff --git a/hydra-node/src/Hydra/Persistence.hs b/hydra-node/src/Hydra/Persistence.hs index 7e27fa2a067..8a44e170ddf 100644 --- a/hydra-node/src/Hydra/Persistence.hs +++ b/hydra-node/src/Hydra/Persistence.hs @@ -4,6 +4,8 @@ module Hydra.Persistence where import Hydra.Prelude +import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, throwSTM) +import Control.Monad.Class.MonadFork (myThreadId) import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as C8 @@ -11,8 +13,9 @@ import System.Directory (createDirectoryIfMissing, doesFileExist) import System.FilePath (takeDirectory) import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic) -newtype PersistenceException +data PersistenceException = PersistenceException String + | IncorrectAccessException String deriving stock (Eq, Show) instance Exception PersistenceException @@ -54,17 +57,27 @@ data PersistenceIncremental a m = PersistenceIncremental -- | Initialize persistence handle for given type 'a' at given file path. createPersistenceIncremental :: - (MonadIO m, MonadThrow m) => + forall a m. + (MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) => FilePath -> m (PersistenceIncremental a m) createPersistenceIncremental fp = do liftIO . createDirectoryIfMissing True $ takeDirectory fp + authorizedThread <- newTVarIO Nothing pure $ PersistenceIncremental { append = \a -> do + tid <- myThreadId + atomically $ modifyTVar' authorizedThread (const $ Just tid) let bytes = toStrict $ Aeson.encode a <> "\n" liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) - , loadAll = + , loadAll = do + tid <- myThreadId + atomically $ do + authTid <- readTVar authorizedThread + when (isJust authTid && authTid /= Just tid) $ + throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread") + liftIO (doesFileExist fp) >>= \case False -> pure [] True -> do diff --git a/hydra-node/test/Hydra/PersistenceSpec.hs b/hydra-node/test/Hydra/PersistenceSpec.hs index 5c45611b120..2f7bc8c264b 100644 --- a/hydra-node/test/Hydra/PersistenceSpec.hs +++ b/hydra-node/test/Hydra/PersistenceSpec.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE LambdaCase #-} + module Hydra.PersistenceSpec where import Hydra.Prelude hiding (label) @@ -6,10 +8,10 @@ import Test.Hydra.Prelude import Data.Aeson (Value (..)) import Data.Aeson qualified as Aeson import Data.Text qualified as Text -import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental) -import Test.QuickCheck (checkCoverage, cover, elements, oneof, (===)) +import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental) +import Test.QuickCheck (checkCoverage, counterexample, cover, elements, generate, oneof, suchThat, (===)) import Test.QuickCheck.Gen (listOf) -import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run) +import Test.QuickCheck.Monadic (assert, monadicIO, monitor, pick, run) spec :: Spec spec = do @@ -54,6 +56,25 @@ spec = do loadAll pure $ actualResult === items + it "it cannot load from a different thread once having started appending" $ + monadicIO $ do + items <- pick $ listOf genPersistenceItem + moreItems <- pick $ listOf genPersistenceItem `suchThat` ((> 2) . length) + result :: Either PersistenceException () <- run $ withTempDir "hydra-persistence" $ \tmpDir -> do + persistence@PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir <> "/data" + forM_ items append + loadAll `shouldReturn` items + try $ loadAndAppendConcurrently persistence moreItems + + monitor $ counterexample $ show result + assert $ isLeft result + +loadAndAppendConcurrently :: PersistenceIncremental Value IO -> [Value] -> IO () +loadAndAppendConcurrently PersistenceIncremental{loadAll, append} moreItems = + race_ + (forever $ threadDelay 0.01 >> loadAll) + (forM_ moreItems $ \item -> append item >> threadDelay 0.01) + genPersistenceItem :: Gen Aeson.Value genPersistenceItem = oneof From 1f842bb3790b785cfe0a7520c84177b28f2e542e Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 10:43:20 +0100 Subject: [PATCH 05/13] Removing useless persistence stress test --- .../test/Hydra/Network/ReliabilitySpec.hs | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index bdcd0904e44..f4196f95232 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -152,54 +152,6 @@ spec = parallel $ do & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] & tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] - modifyMaxSuccess (const 1) $ - prop "stress test networking persistence layer" $ - monadicIO $ do - seed <- pick arbitrary - aliceToBobMessages :: [TestMsg] <- pickBlind (resize 100 $ arbitrary) - bobToAliceMessages :: [TestMsg] <- pickBlind (resize 100 $ arbitrary) - result <- run $ - withTempDir "network-messages-persistence" $ \tmpDir -> do - let logFile = tmpDir "nodes.log" - withFile logFile ReadWriteMode $ \hdl -> - race - (threadDelay 60 >> throwIO (userError $ "timeout exhausted, log file written to :" <> logFile)) - ( withTracerOutputTo hdl "network persistence" $ \tracer -> do - messagesReceivedByBob <- newTVarIO empty - messagesReceivedByAlice <- newTVarIO empty - randomSeed <- newTVarIO $ mkStdGen seed - aliceToBob <- newTQueueIO - bobToAlice <- newTQueueIO - alicePersistence <- realPersistenceFor "alice" tmpDir - bobPersistence <- realPersistenceFor "bob" tmpDir - - let - -- this is a NetworkComponent that broadcasts authenticated messages - -- mediated through a read and a write TQueue but drops 0.2 % of them - aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob) - bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice) - - bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork tracer "bob" bob [alice] - aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork tracer "alice" alice [bob] - - runAlice = runPeer aliceReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages - runBob = runPeer bobReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - - concurrently_ runAlice runBob - - aliceReceived <- Vector.toList <$> readTVarIO messagesReceivedByAlice - bobReceived <- Vector.toList <$> readTVarIO messagesReceivedByBob - pure (aliceReceived, bobReceived) - ) - - case result of - Right (msgReceivedByAlice, msgReceivedByBob) -> do - assert $ msgReceivedByBob == aliceToBobMessages - assert $ msgReceivedByAlice == bobToAliceMessages - monitor $ tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] - monitor $ tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] - Left () -> assert False - it "broadcast updates counter from peers" $ do let receivedMsgs = runSimOrThrow $ do sentMessages <- newTVarIO empty From 1d378aa8d9643b682d5f1299028ae51aecae3f82 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 11:02:49 +0100 Subject: [PATCH 06/13] Store sent messages in a cache for resending purpose We used to do that but then moved to always reloading from file, which lead to race conditions when reading/writing concurrently and incorrect deserialisation of messages. It seems both safer and faster to use a local cache, and only reload messages at start of the network layer. --- hydra-node/src/Hydra/Network/Reliability.hs | 29 ++++++++++++------- .../test/Hydra/Network/ReliabilitySpec.hs | 18 +++++++----- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 53e7653c117..9b13c74c253 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -86,6 +86,7 @@ import Cardano.Binary (serialize') import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation)) import Control.Concurrent.Class.MonadSTM ( MonadSTM (readTQueue, writeTQueue), + modifyTVar', newTQueueIO, newTVarIO, readTVarIO, @@ -93,6 +94,8 @@ import Control.Concurrent.Class.MonadSTM ( ) import Control.Tracer (Tracer) import Data.IntMap qualified as IMap +import Data.Sequence.Strict ((|>)) +import Data.Sequence.Strict qualified as Seq import Data.Vector ( Vector, elemIndex, @@ -224,23 +227,24 @@ withReliability :: NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do acksCache <- loadAcks >>= newTVarIO + sentMessages <- loadMessages >>= newTVarIO . Seq.fromList resendQ <- newTQueueIO let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me) let resend = writeTQueue resendQ - withRawNetwork (reliableCallback acksCache resend ourIndex) $ \network@Network{broadcast} -> do + withRawNetwork (reliableCallback acksCache sentMessages resend ourIndex) $ \network@Network{broadcast} -> do withAsync (forever $ atomically (readTQueue resendQ) >>= broadcast) $ \_ -> - reliableBroadcast ourIndex acksCache network + reliableBroadcast sentMessages ourIndex acksCache network where allParties = fromList $ sort $ me : otherParties - reliableBroadcast ourIndex acksCache Network{broadcast} = + reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} = action $ Network { broadcast = \msg -> case msg of Data{} -> do - localCounter <- incrementAckCounter - appendMessage msg + localCounter <- atomically $ cacheMessage msg >> incrementAckCounter saveAcks localCounter + appendMessage msg traceWith tracer BroadcastCounter{ourIndex, localCounter} broadcast $ ReliableMsg localCounter msg Ping{} -> do @@ -250,13 +254,16 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa broadcast $ ReliableMsg localCounter msg } where - incrementAckCounter = atomically $ do + incrementAckCounter = do acks <- readTVar acksCache let newAcks = constructAcks acks ourIndex writeTVar acksCache newAcks pure newAcks - reliableCallback acksCache resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do + cacheMessage msg = + modifyTVar' sentMessages (|> msg) + + reliableCallback acksCache sentMessages resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do if length acknowledged /= length allParties then traceWith @@ -294,7 +301,7 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa else traceWith tracer Ignored{acknowledged, localCounter, theirIndex, ourIndex} when (isPing payload) $ - resendMessagesIfLagging resend theirIndex localCounter acknowledged ourIndex + resendMessagesIfLagging sentMessages resend theirIndex localCounter acknowledged ourIndex Nothing -> pure () constructAcks acks wantedIndex = @@ -302,7 +309,7 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa partyIndexes = generate (length allParties) id - resendMessagesIfLagging resend theirIndex knownAcks acknowledged myIndex = do + resendMessagesIfLagging sentMessages resend theirIndex knownAcks acknowledged myIndex = do let mmessageAckForUs = acknowledged !? myIndex let mknownAckForUs = knownAcks !? myIndex case (mmessageAckForUs, mknownAckForUs) of @@ -311,8 +318,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa -- latest message sent when (messageAckForUs < knownAckForUs) $ do let missing = fromList [messageAckForUs + 1 .. knownAckForUs] - storedMessages <- loadMessages - let messages = IMap.fromList (zip [1 ..] storedMessages) + storedMessages <- readTVarIO sentMessages + let messages = IMap.fromList (zip [1 ..] $ toList storedMessages) forM_ missing $ \idx -> do case messages IMap.!? idx of Nothing -> diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index f4196f95232..d2c09515ad1 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -17,7 +17,6 @@ import Data.Sequence.Strict ((|>)) import Data.Text qualified as Text import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector -import Hydra.Logging (withTracerOutputTo) import Hydra.Network (Network (..)) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) @@ -39,14 +38,12 @@ import Test.QuickCheck ( collect, counterexample, generate, - resize, tabulate, vectorOf, within, (===), ) -import Test.QuickCheck.Monadic (assert, monadicIO, monitor, pick, run) -import Prelude (unlines, userError) +import Prelude (unlines) spec :: Spec spec = parallel $ do @@ -176,8 +173,10 @@ spec = parallel $ do it "appends messages to disk and can load them back" $ do withTempDir "network-messages-persistence" $ \tmpDir -> do + let networkMessagesFile = tmpDir <> "/network-messages" + Persistence{load, save} <- createPersistence $ tmpDir <> "/acks" - PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir <> "/network-messages" + PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ networkMessagesFile let messagePersistence = MessagePersistence @@ -211,8 +210,8 @@ spec = parallel $ do receivedMsgs `shouldBe` [ReliableMsg (fromList [1, 1]) (Data "node-1" msg)] - doesFileExist (tmpDir "network-messages") `shouldReturn` True - loadAll `shouldReturn` [Data "node-1" msg] + doesFileExist networkMessagesFile `shouldReturn` True + reloadAll networkMessagesFile `shouldReturn` [Data "node-1" msg] doesFileExist (tmpDir "acks") `shouldReturn` True load `shouldReturn` Just (fromList [1, 1]) @@ -252,6 +251,11 @@ spec = parallel $ do writeTVar seed' newGenSeed pure res + reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)] + reloadAll fileName = + createPersistenceIncremental fileName + >>= \PersistenceIncremental{loadAll} -> loadAll + noop :: Monad m => b -> m () noop = const $ pure () From 32a290ac9fb3bb5882ea1e0b2426776212539db6 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 11:05:17 +0100 Subject: [PATCH 07/13] Remove spurious import --- hydra-node/test/Hydra/PersistenceSpec.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hydra-node/test/Hydra/PersistenceSpec.hs b/hydra-node/test/Hydra/PersistenceSpec.hs index 2f7bc8c264b..c2569376554 100644 --- a/hydra-node/test/Hydra/PersistenceSpec.hs +++ b/hydra-node/test/Hydra/PersistenceSpec.hs @@ -9,7 +9,7 @@ import Data.Aeson (Value (..)) import Data.Aeson qualified as Aeson import Data.Text qualified as Text import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental) -import Test.QuickCheck (checkCoverage, counterexample, cover, elements, generate, oneof, suchThat, (===)) +import Test.QuickCheck (checkCoverage, counterexample, cover, elements, oneof, suchThat, (===)) import Test.QuickCheck.Gen (listOf) import Test.QuickCheck.Monadic (assert, monadicIO, monitor, pick, run) From 839e5c02d532143fa253bfb2c9d0be6f1303bb86 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 11:30:11 +0100 Subject: [PATCH 08/13] Add some haddock to PersistenceIncremental --- hydra-node/src/Hydra/Persistence.hs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hydra-node/src/Hydra/Persistence.hs b/hydra-node/src/Hydra/Persistence.hs index 8a44e170ddf..f8103af7f0e 100644 --- a/hydra-node/src/Hydra/Persistence.hs +++ b/hydra-node/src/Hydra/Persistence.hs @@ -56,6 +56,11 @@ data PersistenceIncremental a m = PersistenceIncremental } -- | Initialize persistence handle for given type 'a' at given file path. +-- +-- This instance of `PersistenceIncremental` is "thread-safe" in the sense that +-- it prevents loading from a different thread once one starts `append`ing +-- through the handle. If another thread attempts to `loadAll` after this point, +-- an `IncorrectAccessException` will be raised. createPersistenceIncremental :: forall a m. (MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) => From b357477936543079410458da3e68ab8a34b95da5 Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 14:24:22 +0100 Subject: [PATCH 09/13] Fix Logs schema --- hydra-node/json-schemas/logs.yaml | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/hydra-node/json-schemas/logs.yaml b/hydra-node/json-schemas/logs.yaml index 3484913bbb6..3d90d610240 100644 --- a/hydra-node/json-schemas/logs.yaml +++ b/hydra-node/json-schemas/logs.yaml @@ -483,7 +483,7 @@ definitions: - missing - acknowledged - localCounter - - partyIndex + - theirIndex properties: tag: type: string @@ -500,7 +500,7 @@ definitions: type: array items: type: number - partyIndex: + theirIndex: type: number - title: BroadcastCounter description: >- @@ -509,13 +509,13 @@ definitions: additionalProperties: false required: - tag - - partyIndex + - ourIndex - localCounter properties: tag: type: string enum: ["BroadcastCounter"] - partyIndex: + ourIndex: type: number localCounter: type: array @@ -528,13 +528,13 @@ definitions: additionalProperties: false required: - tag - - partyIndex + - ourIndex - localCounter properties: tag: type: string enum: ["BroadcastPing"] - partyIndex: + ourIndex: type: number localCounter: type: array @@ -549,7 +549,8 @@ definitions: - tag - acknowledged - localCounter - - partyIndex + - theirIndex + - ourIndex properties: tag: type: string @@ -562,7 +563,9 @@ definitions: type: array items: type: number - partyIndex: + theirIndex: + type: number + ourIndex: type: number - title: ClearedMessageQueue description: >- @@ -590,7 +593,8 @@ definitions: - tag - acknowledged - localCounter - - partyIndex + - theirIndex + - ourIndex properties: tag: type: string @@ -603,7 +607,9 @@ definitions: type: array items: type: number - partyIndex: + theirIndex: + type: number + ourIndex: type: number - title: ReliabilityFailedToFindMsg description: >- @@ -2063,7 +2069,7 @@ definitions: enum: ["WaitOnContestationDeadline"] - title: WaitOnTxs description: >- - Some transactions from a proposed snapshot have not been seen yet + Some transactions from a proposed snapshot have not been seen yet. type: object additionalProperties: false required: From e422c91e3b1e1040fe9198a4fb82d91cee9001cd Mon Sep 17 00:00:00 2001 From: Arnaud Bailly Date: Wed, 13 Dec 2023 15:26:06 +0100 Subject: [PATCH 10/13] Do not reuse persistence handle across several servers --- hydra-node/test/Hydra/API/ServerSpec.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hydra-node/test/Hydra/API/ServerSpec.hs b/hydra-node/test/Hydra/API/ServerSpec.hs index 2eac9ee4dc0..2d6b68edb34 100644 --- a/hydra-node/test/Hydra/API/ServerSpec.hs +++ b/hydra-node/test/Hydra/API/ServerSpec.hs @@ -365,8 +365,8 @@ spec = describe "ServerSpec" $ guard $ v ^? key "headStatus" == Just (Aeson.String "Initializing") guard $ v ^? key "snapshotUtxo" == Just expectedUtxos - -- expect the api server to load events from apiPersistence and project headStatus correctly - withTestAPIServer port alice apiPersistence tracer $ \_ -> do + newApiPersistence <- createPersistenceIncremental $ persistenceDir <> "/server-output" + withTestAPIServer port alice newApiPersistence tracer $ \_ -> do waitForValue port $ \v -> do guard $ v ^? key "headStatus" == Just (Aeson.String "Initializing") guard $ v ^? key "snapshotUtxo" == Just expectedUtxos From 9daddbb1c1e0e149a1e0d805720e7d33f5605410 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 14 Dec 2023 18:04:14 +0100 Subject: [PATCH 11/13] Use writeTVar instead of modifyTVar' .. const --- hydra-node/src/Hydra/Persistence.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hydra-node/src/Hydra/Persistence.hs b/hydra-node/src/Hydra/Persistence.hs index f8103af7f0e..892f5f718f2 100644 --- a/hydra-node/src/Hydra/Persistence.hs +++ b/hydra-node/src/Hydra/Persistence.hs @@ -4,7 +4,7 @@ module Hydra.Persistence where import Hydra.Prelude -import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, throwSTM) +import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar) import Control.Monad.Class.MonadFork (myThreadId) import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS @@ -73,7 +73,7 @@ createPersistenceIncremental fp = do PersistenceIncremental { append = \a -> do tid <- myThreadId - atomically $ modifyTVar' authorizedThread (const $ Just tid) + atomically $ writeTVar authorizedThread $ Just tid let bytes = toStrict $ Aeson.encode a <> "\n" liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) , loadAll = do From eb606389705d6509fb30b2318deb24fdf6c1bfd5 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 14 Dec 2023 18:23:05 +0100 Subject: [PATCH 12/13] Remove unused code --- .../test/Hydra/Network/ReliabilitySpec.hs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index d2c09515ad1..a4439172c42 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -14,7 +14,6 @@ import Control.Concurrent.Class.MonadSTM ( import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (Tracer (..), nullTracer) import Data.Sequence.Strict ((|>)) -import Data.Text qualified as Text import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector import Hydra.Network (Network (..)) @@ -34,12 +33,10 @@ import System.Random (mkStdGen, uniformR) import Test.Hydra.Fixture (alice, bob, carol) import Test.QuickCheck ( Positive (Positive), - arbitraryPrintableChar, collect, counterexample, generate, tabulate, - vectorOf, within, (===), ) @@ -312,27 +309,3 @@ mockMessagePersistence numberOfParties = do , loadMessages = toList <$> readTVarIO messages , appendMessage = \msg -> atomically $ modifyTVar' messages (|> msg) } - -realPersistenceFor :: (FromJSON msg, ToJSON msg) => String -> FilePath -> IO (MessagePersistence IO msg) -realPersistenceFor actor tmpDir = do - Persistence{load, save} <- createPersistence $ tmpDir actor "acks" - PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir actor "network-messages" - - pure $ - MessagePersistence - { loadAcks = do - mloaded <- load - case mloaded of - Nothing -> pure $ replicate (length [alice, bob]) 0 - Just acks -> pure acks - , saveAcks = save - , loadMessages = loadAll - , appendMessage = append - } - -newtype TestMsg = T Text - deriving newtype (Eq, Show, ToJSON, FromJSON) - -instance Arbitrary TestMsg where - arbitrary = - T . Text.pack <$> vectorOf 10000 arbitraryPrintableChar From 1b07b813fbfd33f9aa584602a59795c8d708b255 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 14 Dec 2023 18:29:12 +0100 Subject: [PATCH 13/13] Slightly refactored persistence test --- hydra-node/test/Hydra/PersistenceSpec.hs | 29 +++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/hydra-node/test/Hydra/PersistenceSpec.hs b/hydra-node/test/Hydra/PersistenceSpec.hs index c2569376554..8a14af91a88 100644 --- a/hydra-node/test/Hydra/PersistenceSpec.hs +++ b/hydra-node/test/Hydra/PersistenceSpec.hs @@ -9,9 +9,9 @@ import Data.Aeson (Value (..)) import Data.Aeson qualified as Aeson import Data.Text qualified as Text import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental) -import Test.QuickCheck (checkCoverage, counterexample, cover, elements, oneof, suchThat, (===)) +import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===)) import Test.QuickCheck.Gen (listOf) -import Test.QuickCheck.Monadic (assert, monadicIO, monitor, pick, run) +import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run) spec :: Spec spec = do @@ -60,20 +60,17 @@ spec = do monadicIO $ do items <- pick $ listOf genPersistenceItem moreItems <- pick $ listOf genPersistenceItem `suchThat` ((> 2) . length) - result :: Either PersistenceException () <- run $ withTempDir "hydra-persistence" $ \tmpDir -> do - persistence@PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir <> "/data" - forM_ items append - loadAll `shouldReturn` items - try $ loadAndAppendConcurrently persistence moreItems - - monitor $ counterexample $ show result - assert $ isLeft result - -loadAndAppendConcurrently :: PersistenceIncremental Value IO -> [Value] -> IO () -loadAndAppendConcurrently PersistenceIncremental{loadAll, append} moreItems = - race_ - (forever $ threadDelay 0.01 >> loadAll) - (forM_ moreItems $ \item -> append item >> threadDelay 0.01) + pure $ + withTempDir "hydra-persistence" $ \tmpDir -> do + PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir <> "/data" + forM_ items append + loadAll `shouldReturn` items + race_ + (forever $ threadDelay 0.01 >> loadAll) + (forM_ moreItems $ \item -> append item >> threadDelay 0.01) + `shouldThrow` \case + IncorrectAccessException{} -> True + _ -> False genPersistenceItem :: Gen Aeson.Value genPersistenceItem =