Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel #19

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions achille.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ library
, Achille.Core.Task
, Achille.Dot
build-depends: base >= 4.16 && < 4.18
, async >= 2.2.1 && < 2.3
, binary >= 0.8.9 && < 0.9
, binary-instances >= 1.0.3 && < 1.1
, bytestring >= 0.11.3 && < 0.12
, clock >= 0.8.3 && < 0.9
, constraints >= 0.13.4 && < 0.14
, containers >= 0.6.5 && < 0.7
, directory >= 1.3.6 && < 1.4
Expand Down
11 changes: 6 additions & 5 deletions achille/Achille/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Data.Time (UTCTime(..))
import Numeric (showFFloat)
import Options.Applicative
import System.Directory (removePathForcibly)
import System.CPUTime
import System.Clock as Clock
import System.IO

import Achille.Cache
Expand Down Expand Up @@ -144,10 +144,11 @@ achilleWith cfg@Config{..} t = customExecParser p opts >>= \case
*> removePathForcibly (toFilePath outputDir)
Build force verbose -> do
colorful <- hIsTerminalDevice stdout
start <- getCPUTime
start <- Clock.getTime Monotonic
() <- runAchille cfg force verbose colorful t
stop <- getCPUTime
putStrLn $ "All done! (" <> show (Duration (stop - start)) <> ")"
stop <- Clock.getTime Monotonic
let elapsed = toNanoSecs $ diffTimeSpec stop start
putStrLn $ "All done! (" <> show (Duration elapsed) <> ")"
Graph output -> outputGraph output (toProgram t)
where
opts = info (achilleCLI <**> helper) $ fullDesc <> header description
Expand All @@ -159,7 +160,7 @@ achilleWith cfg@Config{..} t = customExecParser p opts >>= \case
newtype Duration = Duration Integer

instance Show Duration where
show (Duration d) = stab (d * 10) ["ps", "ns", "μs", "ms", "s"]
show (Duration d) = stab (d * 10) ["ns", "μs", "ms", "s"]
where
stab :: Integer -> [String] -> String
stab x (_ :us@(_:_)) | x >= 10000 = stab (x `div` 1000) us
Expand Down
126 changes: 95 additions & 31 deletions achille/Achille/Core/Program.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
{-# LANGUAGE DerivingStrategies #-}
module Achille.Core.Program where

import Prelude hiding ((.), id, seq, (>>=), (>>), fst, snd)
import Prelude hiding ((.), id, seq, (>>), fst, snd)
import Prelude qualified as Prelude

import Control.Category
import Control.Monad (foldM)
import Control.Monad.Reader.Class
import Control.Monad.Writer.Class
import Control.Concurrent (MVar)

import GHC.Stack (HasCallStack)
import Data.Binary (Binary)
Expand Down Expand Up @@ -100,27 +102,35 @@ data BoxedValue =
, lastChange :: UTCTime
}

data Env = Env (IntMap BoxedValue) {-# UNPACK #-} !Int
data Env = Env (IntMap (MVar (Maybe BoxedValue))) {-# UNPACK #-} !Int

emptyEnv :: Env
emptyEnv = Env IntMap.empty 0

lookupEnv :: Env -> Int -> Maybe a
lookupEnv (Env env _) k = env !? k <&> \(Boxed v _) -> unsafeCoerce v
lookupEnv :: (Monad m, AchilleIO m) => Env -> Int -> m (Maybe a)
lookupEnv (Env env _) k
| Just var <- env !? k = AIO.readMVar var >>= \case
Just (Boxed v _) -> pure (Just (unsafeCoerce v))
Nothing -> pure Nothing
lookupEnv _ _ = pure Nothing

bindEnv :: Env -> UTCTime -> Value a -> Env
bindEnv (Env env n) t x = Env (IntMap.insert n (Boxed x t) env) (n + 1)
bindEnv :: (Monad m, AchilleIO m) => Env -> m (MVar (Maybe BoxedValue), Env)
bindEnv (Env env n) = do
var <- AIO.newEmptyMVar
pure (var, Env (IntMap.insert n var env) (n + 1))

envChanged :: Env -> UTCTime -> IntSet -> Bool
envChanged (Env env _) lastTime = IntSet.foldr' op False
envChanged :: (Monad m, AchilleIO m) => Env -> UTCTime -> IntSet -> m Bool
envChanged (Env env _) lastTime vars = foldM op False (IntSet.elems vars)
-- NOTE(flupe): maybe we can early return once we reach True
-- TODO(flupe): we shouldn't ever fail looking up the env,
-- so we're not filtering enough variables...
where op :: Int -> Bool -> Bool
op k b =
where op :: (Monad m, AchilleIO m) => Bool -> Int -> m Bool
op b k =
case env IntMap.!? k of
Just (Boxed _ t) -> lastTime < t || b
Nothing -> b
Just var -> AIO.readMVar var >>= \case
Just (Boxed _ t) -> pure (lastTime < t || b)
Nothing -> pure b
Nothing -> pure b -- NOTE(flupe): shouldn't ever fail

depsClean :: Map Path UTCTime -> UTCTime -> DynDeps -> Bool
depsClean edits lastT (getFileDeps -> fdeps) = getAll $ foldMap (All . isClean) fdeps
Expand All @@ -135,12 +145,25 @@ runProgramIn
=> Env -> Program m a -> PrimTask m (Value a)
runProgramIn env t = case t of

Var k -> maybe halt pure $ lookupEnv env k
Var k -> maybe halt pure =<< lift (lookupEnv env k)

Seq x y -> do
(cx, cy) <- splitCache
(_, cx') <- withCache cx $ runProgramIn env x
(cx, cy) <- splitCache
ctx <- ask
mvx <- lift AIO.newEmptyMVar

-- run x in seperate thread
lift $ AIO.fork do
(_, cx', deps) <- runPrimTask (runProgramIn env x) ctx cx
AIO.putMVar mvx (cx', deps)

-- run y without waiting for x
(vy, cy') <- withCache cy $ runProgramIn env y

-- now we dow wait for x
(cx', deps) <- lift $ AIO.readMVar mvx
tell deps

joinCache cx' cy'
forward vy

Expand All @@ -150,15 +173,30 @@ runProgramIn env t = case t of
-- it is important to know if it changed *since the last time* a task has been executed.
Bind x f -> do
cached :: Maybe (UTCTime, Cache) <- fromCache
Context{currentTime} <- ask
ctx@Context{currentTime} <- ask
let (cx, cf) = maybe (Cache.emptyCache, Cache.emptyCache)
(Cache.splitCache . Prelude.snd)
cached
lastChange = maybe zeroTime Prelude.fst cached
(vx, cx') <- withCache cx $ runProgramIn env x
let lastChange' = if any hasChanged vx then currentTime else lastChange
let env' = maybe env (bindEnv env lastChange') vx

(var, env') <- lift (bindEnv env)
mvcx <- lift AIO.newEmptyMVar

-- fork and run x
lift $ AIO.fork do
res@(vx, _, _) <- runPrimTask (runProgramIn env x) ctx cx
let lastChange' = if any hasChanged vx then currentTime else lastChange
AIO.putMVar var (vx <&> \v -> Boxed v lastChange')
AIO.putMVar mvcx res

-- run f without waiting for x
(vy, cf') <- withCache cf $ runProgramIn env' f

-- now we do wait for x
(vx, cx', deps) <- lift (AIO.readMVar mvcx)
tell deps

let lastChange' = if any hasChanged vx then currentTime else lastChange
toCache (lastChange', Cache.joinCache cx' cf')
forward vy
-- TODO(flupe): propagate failure to environment
Expand All @@ -171,8 +209,9 @@ runProgramIn env t = case t of
case cached of
Just (t, _, d, c) -> (t , d , c )
_ -> (zeroTime, mempty, Cache.emptyCache)
dirtyEnv <- lift (envChanged env lastRun vs)
if isNothing cached
|| envChanged env lastRun vs
|| dirtyEnv
|| not (depsClean updatedFiles lastRun deps) then do
((v, cache'), deps) <- listen $ withCache cache $ local (\c -> c {lastTime = lastRun})
$ runProgramIn env p
Expand Down Expand Up @@ -239,22 +278,47 @@ runProgramIn env t = case t of
forChanges [] _ = pure (Just [], [])
forChanges (Deleted :vs) cs = forChanges vs (drop 1 cs) <&> first (fmap (Deleted:))
forChanges (Inserted x:vs) cs = do
Context{currentTime} <- ask
let env' = bindEnv env zeroTime (value False x)
(y, cy) <- withCache Cache.emptyCache $ runProgramIn env' f
ctx@Context{currentTime} <- ask

(var, env') <- lift (bindEnv env)
lift (AIO.putMVar var (Just (Boxed (value False x) zeroTime)))

mvy <- lift AIO.newEmptyMVar

lift $ AIO.fork do
res <- runPrimTask (runProgramIn env' f) ctx Cache.emptyCache
AIO.putMVar mvy res

(changes, caches) <- forChanges vs cs
(y, cy, deps) <- lift (AIO.readMVar mvy)
tell deps

case y of
Nothing -> pure (Nothing, (zeroTime, cy) : cs)
Just vy -> forChanges vs cs <&> bimap (fmap (Inserted (theVal vy):)) ((currentTime, cy):)
Nothing -> pure (Nothing, (zeroTime, cy) : caches)
Just vy -> pure (fmap (Inserted (theVal vy):) changes, (currentTime, cy):caches)

forChanges (Kept v:vs) cs = do
Context{currentTime} <- ask
let ((vlastChange, cv), cs') =
fromMaybe ((zeroTime, Cache.emptyCache), []) (uncons cs)
ctx@Context{currentTime} <- ask

let ((vlastChange, cv), cs') = fromMaybe ((zeroTime, Cache.emptyCache), []) (uncons cs)
let vtchange = if hasChanged v then currentTime else vlastChange
let env' = bindEnv env vtchange v
(y, cy) <- withCache cv $ runProgramIn env' f

(var, env') <- lift (bindEnv env)
lift (AIO.putMVar var (Just (Boxed v vtchange)))

mvy <- lift AIO.newEmptyMVar

lift $ AIO.fork do
res <- runPrimTask (runProgramIn env' f) ctx cv
AIO.putMVar mvy res

(changes, caches) <- forChanges vs cs'
(y, cy, deps) <- lift (AIO.readMVar mvy)
tell deps

case y of
Nothing -> pure (Nothing, [(vtchange, cy)])
Just vy -> forChanges vs cs' <&> bimap (fmap (Kept vy:)) ((vtchange, cy):)
Just vy -> pure (fmap (Kept vy:) changes, (vtchange, cy):caches)

Scoped x y -> do
(cx, cy) <- splitCache
Expand Down
12 changes: 12 additions & 0 deletions achille/Achille/IO.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-- | Exposes an IO interface used by core achille recipes
module Achille.IO (AchilleIO(..)) where

import Control.Concurrent (MVar)
import Data.Text (Text)
import Data.Time.Clock (UTCTime)
import Data.String (fromString)
Expand All @@ -14,10 +15,12 @@ import System.Directory qualified as Directory
import System.FilePath qualified as FilePath
import System.FilePath.Glob qualified as Glob
import System.Process qualified as Process
import Control.Concurrent qualified as Concurrent

import Achille.Path



-- | Interface for IO operations used by core recipes.
class AchilleIO m where
-- | Retrieve a file as a bytestring.
Expand Down Expand Up @@ -52,6 +55,10 @@ class AchilleIO m where
getModificationTime :: Path -> m UTCTime

getCurrentTime :: m UTCTime
newEmptyMVar :: m (MVar a)
putMVar :: MVar a -> a -> m ()
readMVar :: MVar a -> m a
fork :: m () -> m ()


ensureDirExists :: Path -> IO ()
Expand All @@ -73,3 +80,8 @@ instance AchilleIO IO where
glob dir pattern = map fromString <$> Glob.globDir1 pattern (toFilePath dir)
getModificationTime = Directory.getModificationTime . toFilePath
getCurrentTime = Time.getCurrentTime

newEmptyMVar = Concurrent.newEmptyMVar
putMVar = Concurrent.putMVar
readMVar = Concurrent.readMVar
fork = (() <$) . Concurrent.forkIO
8 changes: 7 additions & 1 deletion achille/Achille/Task/Prim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ instance (Monad m, AchilleIO m) => AchilleIO (PrimTask m) where
log = lift . AIO.log
readCommand cmd args = lift (AIO.readCommand cmd args)
glob root pat = lift (AIO.glob root pat) <* tell (dependsOnPattern pat)

-- NOTE(flupe): ^ maybe for AchilleIO (PrimTask m) we actually want to do
-- smart path transformation?

newEmptyMVar = lift AIO.newEmptyMVar
readMVar = lift . AIO.readMVar
putMVar v = lift . AIO.putMVar v
fork = error "shouldn't use fork inside stateful computation"



data LogType
= LogErr
| LogInfo
Expand Down
1 change: 1 addition & 0 deletions docs/docs.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ build-type: Simple
tested-with: GHC == 9.4.4
executable docs
main-is: Main.hs
ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: GHC2021
default-extensions: BlockArguments
, DeriveAnyClass
Expand Down
6 changes: 6 additions & 0 deletions tests/Test/Achille/FakeIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import Control.Monad.Writer.Strict
import Control.Monad.State.Strict
import Control.Monad.Reader
import Control.Monad.Trans (lift)
import Control.Monad.IO.Class (liftIO)

import Data.Text (Text)
import Data.ByteString qualified as BS
Expand All @@ -21,6 +22,7 @@ import System.Directory qualified as Directory
import System.FilePath qualified as FilePath
import System.FilePath.Glob qualified as Glob
import Data.Map.Strict qualified as Map
import Control.Concurrent qualified as Concurrent


import Achille.CLI (processDeps)
Expand Down Expand Up @@ -71,6 +73,10 @@ instance AchilleIO FakeIO where
glob r pat = asks (globFS r pat)
getCurrentTime = undefined

newEmptyMVar = liftIO Concurrent.newEmptyMVar
putMVar v = liftIO . Concurrent.putMVar v
readMVar = liftIO . Concurrent.readMVar
fork m = m

runFakeIO :: FakeIO a -> FileSystem -> IO (a, [IOActions])
runFakeIO c fs = runWriterT (runReaderT c fs)
Expand Down