Skip to content

Commit

Permalink
Implement PersistenceIncremental
Browse files Browse the repository at this point in the history
  • Loading branch information
v0d1ch committed Nov 21, 2024
1 parent 12b1081 commit 439d56f
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 53 deletions.
1 change: 1 addition & 0 deletions hydra-node/hydra-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ test-suite tests
Hydra.OptionsSpec
Hydra.PartySpec
Hydra.PersistenceSpec
Hydra.SqlLitePersistenceSpec
Hydra.UtilsSpec
Paths_hydra_node
Spec
Expand Down
63 changes: 17 additions & 46 deletions hydra-node/src/Hydra/SqlLitePersistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,10 @@ module Hydra.SqlLitePersistence where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar)
import Control.Lens.Combinators (iforM)
import Control.Monad.Class.MonadFork (myThreadId)
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as C8
import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T
import Database.SQLite.Simple (FromRow, Only (..), Query (..), execute, execute_, field, fromRow, open, query_)
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath (takeDirectory)
import UnliftIO.IO.File (withBinaryFile)

data PersistenceException
= PersistenceException String
Expand All @@ -26,24 +18,23 @@ data PersistenceException

instance Exception PersistenceException

-- | Handle to save and load files to/from disk using JSON encoding.
-- | Handle to save and load files to/from db using JSON encoding.
data Persistence a m = Persistence
{ save :: ToJSON a => a -> m ()
, load :: FromJSON a => m (Maybe a)
}

newtype Acks = Acks BSL.ByteString
-- | A carrier type that wraps the JSON string. Needed just to specify the sql instances we need.
newtype Record = Record BSL.ByteString deriving newtype (Eq, Show)

instance FromRow Acks where
fromRow = Acks <$> field
instance FromRow Record where
fromRow = Record <$> field

-- | Initialize persistence handle for given type 'a' at given file path.
createPersistence ::
MonadIO m =>
FilePath ->
m (Persistence a m)
createPersistence fp = do
liftIO . createDirectoryIfMissing True $ takeDirectory fp
let dbName = Query (T.pack fp)
conn <- liftIO $ open fp
_ <- liftIO $ execute_ conn $ "CREATE TABLE IF NOT EXISTS " <> dbName <> " (id INTEGER PRIMARY KEY, msg SQLBlob)"
Expand All @@ -52,10 +43,10 @@ createPersistence fp = do
{ save = \a -> do
liftIO $ execute conn ("INSERT INTO " <> dbName <> " (msg) VALUES (?)") (Only $ Aeson.encode a)
, load = do
r <- liftIO $ query_ conn ("SELECT msg from " <> dbName <> " order by id desc limit 1")
r <- liftIO $ query_ conn ("SELECT msg FROM " <> dbName <> " ORDER BY id DESC LIMIT 1")
case r of
[] -> pure Nothing
(Acks result : _) -> pure $ Aeson.decode result
(Record result : _) -> pure $ Aeson.decode result
}

-- | Handle to save incrementally and load files to/from disk using JSON encoding.
Expand All @@ -64,43 +55,23 @@ data PersistenceIncremental a m = PersistenceIncremental
, loadAll :: FromJSON a => m [a]
}

-- | Initialize persistence handle for given type 'a' at given file path.
--
-- This instance of `PersistenceIncremental` is "thread-safe" in the sense that
-- it prevents loading from a different thread once one starts `append`ing
-- through the handle. If another thread attempts to `loadAll` after this point,
-- an `IncorrectAccessException` will be raised.
createPersistenceIncremental ::
forall a m.
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
(MonadIO m, MonadThrow m) =>
FilePath ->
m (PersistenceIncremental a m)
createPersistenceIncremental fp = do
liftIO . createDirectoryIfMissing True $ takeDirectory fp
authorizedThread <- newTVarIO Nothing
let dbName = Query (T.pack fp)
conn <- liftIO $ open fp
_ <- liftIO $ execute_ conn $ "CREATE TABLE IF NOT EXISTS " <> dbName <> " (id INTEGER PRIMARY KEY, msg SQLBlob)"
pure $
PersistenceIncremental
{ append = \a -> do
tid <- myThreadId
atomically $ writeTVar authorizedThread $ Just tid
let bytes = toStrict $ Aeson.encode a <> "\n"
liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes)
liftIO $ execute conn ("INSERT INTO " <> dbName <> " (msg)") (Only $ Aeson.encode a)
, loadAll = do
tid <- myThreadId
atomically $ do
authTid <- readTVar authorizedThread
when (isJust authTid && authTid /= Just tid) $
throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread")

liftIO (doesFileExist fp) >>= \case
False -> pure []
True -> do
bs <- readFileBS fp
-- NOTE: We require the whole file to be loadable. It might
-- happen that the data written by 'append' is only there
-- partially and then this will fail (which we accept now).
iforM (C8.lines bs) $ \i o ->
case Aeson.eitherDecodeStrict' o of
Left e -> throwIO $ PersistenceException ("Error at line: " <> show (i + 1) <> " in file " <> fp <> " - " <> e)
Right decoded -> pure decoded
r <- liftIO $ query_ conn ("SELECT msg FROM " <> dbName <> " ORDER BY id DESC")
forM r $ \(Record i) ->
case Aeson.decode i of
Nothing -> throwIO $ PersistenceException ("Error decoding a record " <> show i)
Just a -> pure a
}
13 changes: 6 additions & 7 deletions hydra-node/test/Hydra/PersistenceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import Test.Hydra.Prelude
import Data.Aeson (Value (..))
import Data.Aeson qualified as Aeson
import Data.Text qualified as Text
import Hydra.Persistence (PersistenceException (..), PersistenceIncremental (..), createPersistenceIncremental)
import Hydra.SqlLitePersistence (Persistence (..), createPersistence)
import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental)
import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===))
import Test.QuickCheck.Gen (listOf)
import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run)
Expand All @@ -28,11 +27,11 @@ spec = do
checkCoverage $
monadicIO $ do
item <- pick genPersistenceItem
actualResult <- run $ do
-- withTempDir "hydra-persistence" $ \tmpDir -> do
Persistence{save, load} <- createPersistence "acks"
save item
load
actualResult <- run $
withTempDir "hydra-persistence" $ \tmpDir -> do
Persistence{save, load} <- createPersistence $ tmpDir <> "/data"
save item
load
pure $ actualResult === Just item

describe "PersistenceIncremental" $ do
Expand Down
71 changes: 71 additions & 0 deletions hydra-node/test/Hydra/SqlLitePersistenceSpec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{-# LANGUAGE LambdaCase #-}

module Hydra.SqlLitePersistenceSpec where

import Hydra.Prelude hiding (drop, label)
import Test.Hydra.Prelude

import Data.Aeson (Value (..))
import Data.Aeson qualified as Aeson
import Data.Text qualified as Text
import Hydra.SqlLitePersistence (Persistence (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental)
import Test.QuickCheck (checkCoverage, cover, elements, oneof, (===))
import Test.QuickCheck.Gen (listOf)
import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run)

dbName :: String
dbName = "testdb"

spec :: Spec
spec = do
describe "SqlLitePersistence" $ do
let persistence = createPersistence dbName
it "can handle empty reads" $ do
Persistence{load} <- persistence
load `shouldReturn` (Nothing :: Maybe Aeson.Value)

it "is consistent after save/load roundtrip" $
checkCoverage $
monadicIO $ do
Persistence{save, load} <- run persistence
item <- pick genPersistenceItem
actualResult <- run $ do
save item
load
pure $ actualResult === Just item

describe "PersistenceIncremental" $ do
let persistenceIncremental = createPersistenceIncremental dbName
fit "can handle empty reads" $ do
PersistenceIncremental{loadAll} <- persistenceIncremental
loadAll `shouldReturn` ([] :: [Aeson.Value])

fit "is consistent after multiple append calls" $
checkCoverage $
monadicIO $ do
PersistenceIncremental{loadAll, append} <- run persistenceIncremental
items <- pick $ listOf genPersistenceItem
monitor (cover 1 (null items) "no items stored")
actualResult <- run $ do
forM_ items append
loadAll
pure $ all (`elem` actualResult) items

genPersistenceItem :: Gen Aeson.Value
genPersistenceItem =
oneof
[ pure Null
, String <$> genSomeText
]

genSomeText :: Gen Text
genSomeText = do
let t = ['A' .. 'z'] <> ['\n', '\t', '\r']
Text.pack <$> listOf (elements t)

containsNewLine :: [Aeson.Value] -> Bool
containsNewLine = \case
[] -> False
(i : is) -> case i of
String t | "\n" `Text.isInfixOf` t -> True
_ -> containsNewLine is

0 comments on commit 439d56f

Please sign in to comment.