From 7ee97f1ce7c72dc62867a2423385560ab22d6a08 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 25 Feb 2025 16:49:30 -0800 Subject: [PATCH] Run downloading, unpacking, and saving in parallel --- unison-cli/package.yaml | 2 ++ unison-cli/src/Unison/Share/SyncV2.hs | 37 +++++++++++++++++++++++---- unison-cli/unison-cli.cabal | 2 ++ 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/unison-cli/package.yaml b/unison-cli/package.yaml index d3d48f2c8a..796cf28103 100644 --- a/unison-cli/package.yaml +++ b/unison-cli/package.yaml @@ -37,6 +37,8 @@ library: - containers >= 0.6.3 - conduit - conduit-extra + - stm-conduit + - stm-chans - cryptonite - either - errors diff --git a/unison-cli/src/Unison/Share/SyncV2.hs b/unison-cli/src/Unison/Share/SyncV2.hs index 14870f208d..c2f7df4ec7 100644 --- a/unison-cli/src/Unison/Share/SyncV2.hs +++ b/unison-cli/src/Unison/Share/SyncV2.hs @@ -12,6 +12,7 @@ where import Codec.Serialise qualified as CBOR import Conduit (ConduitT) import Conduit qualified as C +import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Lens import Control.Monad.Except import Control.Monad.Reader (ask) @@ -24,6 +25,7 @@ import Data.ByteString.Lazy qualified as BL import Data.Conduit.Attoparsec qualified as C import Data.Conduit.Combinators qualified as C import Data.Conduit.List qualified as CL +import Data.Conduit.TQueue qualified as TQueue import Data.Conduit.Zlib qualified as C import Data.Foldable qualified as Foldable import Data.Graph qualified as Graph @@ -68,6 +70,8 @@ import Unison.SyncV2.Types qualified as SyncV2 import Unison.Util.Servant.CBOR qualified as CBOR import Unison.Util.Timing qualified as Timing import UnliftIO qualified as IO +import UnliftIO.Async qualified as Async +import qualified UnliftIO.STM as STM type Stream i o = ConduitT i o StreamM () @@ -255,11 +259,34 @@ syncSortedStream shouldValidate codebase stream = do let handler :: Stream (Vector (Hash32, TempEntity)) o handler = C.mapM_C \entityBatch -> do validateAndSave shouldValidate codebase entityBatch - C.runConduit $ - stream - C..| CL.chunksOf batchSize - C..| unpackChunks codebase - C..| handler + downloadQ <- liftIO $ STM.newTBMQueueIO 10 -- 10 batches, not 10 entities + unpackerQ <- liftIO $ STM.newTBMQueueIO 10 + let downloaderSink = TQueue.sinkTBMQueue downloadQ + let downloaderSource = TQueue.sourceTBMQueue downloadQ + let unpackerSink = TQueue.sinkTBMQueue unpackerQ + let unpackedSource = TQueue.sourceTBMQueue unpackerQ + let downloadC = + stream + C..| CL.chunksOf batchSize + C..| downloaderSink + let saverC = + downloaderSource + C..| unpackChunks codebase + C..| unpackerSink + let handlerC = + unpackedSource + C..| handler + + -- Run the three conduits concurrently, and wait for them all to finish, fail if any of them fail. + ExceptT . Async.runConc $ do + a <- Async.conc . runExceptT $ do + C.runConduit downloadC + STM.atomically $ STM.closeTBMQueue downloadQ + b <- Async.conc . runExceptT $ do + C.runConduit saverC + STM.atomically $ STM.closeTBMQueue unpackerQ + c <- Async.conc . runExceptT $ C.runConduit handlerC + pure (a >> b >> c) -- | Topologically sort entities based on their dependencies, returning a list in dependency-first order. sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)] diff --git a/unison-cli/unison-cli.cabal b/unison-cli/unison-cli.cabal index 8a40ecd6e4..f9bcc73ed6 100644 --- a/unison-cli/unison-cli.cabal +++ b/unison-cli/unison-cli.cabal @@ -250,6 +250,8 @@ library , servant-client , servant-conduit , stm + , stm-chans + , stm-conduit , temporary , text , text-ansi