diff --git a/bobek.cabal b/bobek.cabal index 09e513e..3b9c56c 100644 --- a/bobek.cabal +++ b/bobek.cabal @@ -23,6 +23,7 @@ test-suite test other-modules: FakeEnvironment , Source , Destination + , Filter , Message , Mover , ReceiveId @@ -47,6 +48,8 @@ executable bobek , Destination , Mover , ReceiveId + , Filter + , ScriptFilter , Env , RabbitMqEnv , FileEnv diff --git a/src/Env.hs b/src/Env.hs index 51c112d..02de1fd 100644 --- a/src/Env.hs +++ b/src/Env.hs @@ -7,6 +7,7 @@ import Source import Destination import ReceiveId import Message +import Filter import Control.Monad.Trans.Reader (ReaderT) import Control.Monad.Reader (MonadReader, liftIO, ask, MonadIO, runReaderT) @@ -14,6 +15,7 @@ data Env = MkEnv { envPublish :: [Message] -> IO PublishResult , envReceive :: IO (Either NoMessageReason Message) , envAcknowledge :: [ReceiveId] -> IO () + , envFilterAction :: Message -> IO FilterAction } newtype App a = MkApp {run :: ReaderT Env IO a} deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO) @@ -31,5 +33,10 @@ instance Source App where env <- ask liftIO $ (envAcknowledge env) ids +instance Filter App where + filterAction msg = do + env <- ask + liftIO $ (envFilterAction env) msg + runMover :: Env -> IO () runMover = runReaderT (run moveMessages) diff --git a/src/Filter.hs b/src/Filter.hs new file mode 100644 index 0000000..0785458 --- /dev/null +++ b/src/Filter.hs @@ -0,0 +1,16 @@ +module Filter(Filter(..), FilterAction(..), shouldAck) where + +import Message + +data FilterAction = + Ack + | Copy + | CopyAndAck + +shouldAck :: FilterAction -> Bool +shouldAck Copy = False +shouldAck _ = True + +class Filter m where + filterAction :: Message -> m FilterAction + diff --git a/src/Main.hs b/src/Main.hs index af58df0..ea75b00 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -3,12 +3,14 @@ module Main(main) where import Env import RabbitMqEnv import FileEnv +import Filter(FilterAction) import qualified Message as M import Destination import Source import ReceiveId import OptParse import Data.Text +import ScriptFilter import qualified Network.AMQP as AMQP(fromURI) @@ -18,8 +20,8 @@ import Data.Semigroup ((<>)) printError :: String -> IO () printError errorMsg = putStrLn $ "Unable to initialize rabbitmq environment: " ++ errorMsg -createEnv :: ([M.Message] -> IO PublishResult) -> ((IO (Either NoMessageReason M.Message)), ([ReceiveId] -> IO ())) -> Env -createEnv pub (rec, ack) = MkEnv pub rec ack +createEnv :: (M.Message -> IO FilterAction) -> ([M.Message] -> IO PublishResult) -> ((IO (Either NoMessageReason M.Message)), ([ReceiveId] -> IO ())) -> Env +createEnv fltr pub (rec, ack) = MkEnv pub rec ack fltr type PublisherFunction = Either String ([M.Message] -> IO PublishResult) type SourceFunctions = Either String (IO (Either NoMessageReason M.Message), [ReceiveId] -> IO ()) @@ -39,4 +41,5 @@ main = do opts <- execParser (info optionParser (fullDesc <> progDesc "rabbitmq swiss army knife")) maybePublisher <- createDestination (destination opts) maybeSource <- createSource (source opts) - either printError runMover (createEnv <$> maybePublisher <*> maybeSource) + let fltr = maybe defaultFilter scriptFilter (OptParse.filter opts) + either printError runMover ((createEnv fltr) <$> maybePublisher <*> maybeSource) diff --git a/src/Mover.hs b/src/Mover.hs index 75366e2..46322ca 100644 --- a/src/Mover.hs +++ b/src/Mover.hs @@ -1,23 +1,35 @@ +{-# LANGUAGE ScopedTypeVariables #-} module Mover(moveMessages) where import Destination +import Message import Source import Message() +import Filter -import Control.Monad (unless) +import Control.Monad (unless, void) -moveMessages :: (Source m, Destination m) => m () -moveMessages = do - stuff +getMessages :: Source m => m (Either NoMessageReason [Message]) +getMessages = do + msg <- receive --todo: should use bulked publish + return $ pure <$> msg -stuff :: (Source m, Destination m) => m () -stuff = do - maybeMessage <- receive - case maybeMessage of - Left _ -> return () - Right message -> do - publishResult <- publish [message] - let ackable = succeeded publishResult - unless (null ackable) $ acknowledge ackable - stuff +publishAndAckMessages :: forall m.(Source m, Destination m, Filter m) => [Message] -> m () +publishAndAckMessages msgs = void $ traverse publishSingleMessage msgs --todo: should use bulked publish + where publishSingleMessage :: Message -> m () + publishSingleMessage msg = do + publishResult <- publish [msg] + needsAck <- shouldAck <$> (filterAction msg) + if needsAck then do + let ackable = succeeded publishResult + unless (null ackable) $ acknowledge ackable + else return () +moveMessages :: (Source m, Destination m, Filter m) => m () +moveMessages = do + maybeMessages <- getMessages + case maybeMessages of + Left _ -> return () + Right messages -> do + publishAndAckMessages messages + moveMessages diff --git a/src/OptParse.hs b/src/OptParse.hs index 2c624f5..4310349 100644 --- a/src/OptParse.hs +++ b/src/OptParse.hs @@ -18,6 +18,7 @@ newtype Path = MkPath Text deriving(Data.String.IsString, Show) data Opts = MkOpts { source :: SourceOpts , destination :: DestinationOpts + , filter :: Maybe FilePath } deriving(Show) data SourceOpts @@ -34,6 +35,15 @@ optionParser :: Parser Opts optionParser = MkOpts <$> (srcAmqpOpt <|> srcFileOpt) <*> (destAmqpOpt <|> destFileOpt) + <*> filterOpt + +filterOpt :: Parser (Maybe FilePath) +filterOpt = (optional $ strOption + ( long "filter" + <> short 'f' + <> metavar "SCRIPT" + <> help "Filter script path.") + ) srcAmqpOpt :: Parser SourceOpts srcAmqpOpt = AmqpSource diff --git a/test/FakeEnvironment.hs b/test/FakeEnvironment.hs index 612e0fd..640a92f 100644 --- a/test/FakeEnvironment.hs +++ b/test/FakeEnvironment.hs @@ -10,6 +10,7 @@ import Destination import Control.Monad.State.Strict (State, execState) import Control.Monad.State (MonadState, get, put) import Safe(tailSafe, headDef) +import Filter runMoveMessages :: Environment -> Environment runMoveMessages env = execState (run moveMessages) env @@ -19,6 +20,7 @@ data Environment = MkEnv { , acknowledgedMessages :: [[ReceiveId]] , published :: [[Message]] , publishResults :: [PublishResult] + , filterResults :: [FilterAction] } newtype FakeEnvironment a = MkFakeEnvironment {run :: State Environment a} deriving (Functor, Applicative, Monad, MonadState Environment) @@ -32,11 +34,19 @@ instance Source FakeEnvironment where return (headDef (Left NMREmptyQueue) toRec) acknowledge ackIds = do - env@(MkEnv _ acks _ _) <- get + env@(MkEnv _ acks _ _ _) <- get put $ env {acknowledgedMessages = acks++[ackIds]} instance Destination FakeEnvironment where publish publishedMessages = do - env@(MkEnv _ _ pubed pubRes) <- get + env@(MkEnv _ _ pubed pubRes _) <- get put $ env {published = pubed++[publishedMessages], publishResults = tailSafe pubRes} return $ head pubRes + +instance Filter FakeEnvironment where + filterAction _ = do + oldEnv@(MkEnv _ _ _ _ actions) <- get + let newEnv = oldEnv {filterResults = tailSafe actions} + put newEnv + return (head actions) + diff --git a/test/Test.hs b/test/Test.hs index ec37b1f..08b05c1 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -6,6 +6,7 @@ import FakeEnvironment import Message import Source import Destination +import Filter testMessages :: [Message] testMessages = (\rid -> (MkMessage (MkReceiveId rid) "routing key" "test message")) <$> [1..100] @@ -25,18 +26,27 @@ someSucceeds succeededIds = (\rid -> if elem rid succeededIds then MkPublishResult [] [rid] else MkPublishResult [rid] []) <$> testIds +bothFilter :: [FilterAction] +bothFilter = repeat CopyAndAck + main :: IO () main = hspec $ do describe "moveMessages" $ do it "should send message received from the source" $ do - let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses) - (published result) `shouldBe` (:[]) <$> testMessages + let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses bothFilter) + (published result) `shouldBe` pure <$> testMessages it "should acknowledge published messages" $ do - let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses) - (acknowledgedMessages result) `shouldBe` (:[]) <$> testIds + let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses bothFilter) + (acknowledgedMessages result) `shouldBe` pure <$> testIds it "should acknowledge messages only if publishing succeeds" $ do let succeededIds = [MkReceiveId 10, MkReceiveId 20] - let result = runMoveMessages (MkEnv testMessagesToReceive [] [] (someSucceeds succeededIds)) - (acknowledgedMessages result) `shouldBe` ((:[]) <$> succeededIds) + let result = runMoveMessages (MkEnv testMessagesToReceive [] [] (someSucceeds succeededIds) bothFilter) + (acknowledgedMessages result) `shouldBe` pure <$> succeededIds + + it "should acknowledge messages only if the filter asks for it" $ do + let onlyCopy = repeat Copy + let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses onlyCopy) + (acknowledgedMessages result) `shouldBe` [] + diff --git a/todo b/todo index b64e0d0..e87e5d0 100644 --- a/todo +++ b/todo @@ -1 +1,2 @@ - handle exceptions during configmwait + - add logging