From 9823d3926c7c9d4e53c2271ecd9df0c641aa65b3 Mon Sep 17 00:00:00 2001 From: Peter Hajdu Date: Fri, 15 Nov 2019 14:53:37 +0100 Subject: [PATCH] add script filter --- bobek.cabal | 7 +++++-- src/Env.hs | 2 +- src/Filter.hs | 16 ++++++++++++++-- src/Main.hs | 8 ++++---- src/Mover.hs | 6 +++--- src/ScriptFilter.hs | 36 +++++++++++++++++++++++++++++++----- test/Test.hs | 26 +++++++++++++++++++++++--- 7 files changed, 81 insertions(+), 20 deletions(-) diff --git a/bobek.cabal b/bobek.cabal index 129663b..4ce65dd 100644 --- a/bobek.cabal +++ b/bobek.cabal @@ -17,12 +17,14 @@ test-suite test build-depends: base >=4.10 , hspec , mtl + , process , safe , containers , bytestring , text other-modules: FakeEnvironment , Source + , ScriptFilter , Destination , Filter , Message @@ -30,7 +32,7 @@ test-suite test , ReceiveId hs-source-dirs: src, test default-language: Haskell2010 - extensions: OverloadedStrings + default-extensions: OverloadedStrings ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-implicit-prelude @@ -58,6 +60,7 @@ executable bobek build-depends: base >=4.10 , amqp , bytestring + , process , mtl , transformers , text @@ -66,7 +69,7 @@ executable bobek , optparse-applicative hs-source-dirs: src default-language: Haskell2010 - extensions: OverloadedStrings + default-extensions: OverloadedStrings ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-implicit-prelude diff --git a/src/Env.hs b/src/Env.hs index d4c9c91..f0c8b23 100644 --- a/src/Env.hs +++ b/src/Env.hs @@ -16,7 +16,7 @@ data SourceFunctions = MkSourceFunctions (IO (Either NoMessageReason Message)) ( data Env = MkEnv { envPublish :: [Message] -> IO PublishResult , sourceFunctions :: SourceFunctions - , envFilterAction :: Message -> IO FilterAction + , envFilterAction :: Message -> IO FilterActions } newtype App a = MkApp {run :: ReaderT Env IO a} deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO) diff --git a/src/Filter.hs b/src/Filter.hs index 9f6f69a..1a9c315 100644 --- a/src/Filter.hs +++ b/src/Filter.hs @@ -1,14 +1,26 @@ -module Filter(Filter(..), FilterAction(..), FilterActions) where +module Filter(Filter(..), FilterAction(..), FilterActions(..), shouldAck, shouldCopy) where import Message -type FilterActions = [FilterAction] +newtype FilterActions = MkFilterActions [FilterAction] + +instance Semigroup FilterActions where + (MkFilterActions l) <> (MkFilterActions r) = MkFilterActions (l <> r) + +instance Monoid FilterActions where + mempty = MkFilterActions [] data FilterAction = Ack | Copy deriving (Eq, Show) +shouldAck :: FilterActions -> Bool +shouldAck (MkFilterActions actions) = elem Ack actions + +shouldCopy :: FilterActions -> Bool +shouldCopy (MkFilterActions actions) = elem Copy actions + class Monad m => Filter m where filterAction :: Message -> m FilterActions diff --git a/src/Main.hs b/src/Main.hs index aa57079..244e8e7 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -3,7 +3,7 @@ module Main(main) where import Env import RabbitMqEnv import FileEnv -import Filter(FilterAction(..)) +import Filter(FilterActions(..), FilterAction(..)) import qualified Message as M import Destination import OptParse @@ -19,7 +19,7 @@ import Data.Maybe (fromMaybe) printError :: String -> IO () printError errorMsg = putStrLn $ "Unable to initialize rabbitmq environment: " ++ errorMsg -createEnv :: (M.Message -> IO FilterAction) -> ([M.Message] -> IO PublishResult) -> SourceFunctions -> Env +createEnv :: (M.Message -> IO FilterActions) -> ([M.Message] -> IO PublishResult) -> SourceFunctions -> Env createEnv fltr pub sourceFuncs = MkEnv pub sourceFuncs fltr type PublisherFunction = Either String ([M.Message] -> IO PublishResult) @@ -35,8 +35,8 @@ createSource Stdin = return $ Right $ createStdinSource createSource (Infile filePath) = createFileSource filePath createSource (Queue uri queueName) = createRabbitMqSource (AMQP.fromURI uri) (pack queueName) -createFilter :: FilterOpt -> (M.Message -> IO FilterAction) -createFilter DontAck = const $ return Copy +createFilter :: FilterOpt -> (M.Message -> IO FilterActions) +createFilter DontAck = const $ return $ MkFilterActions [Copy] createFilter (ScriptFilter scriptPath) = scriptFilter scriptPath main :: IO () diff --git a/src/Mover.hs b/src/Mover.hs index 87d549f..db7dcb4 100644 --- a/src/Mover.hs +++ b/src/Mover.hs @@ -34,9 +34,9 @@ runFilter msgs = do splitUpMessagesByAction :: [(FilterActions, Message)] -> (Set.Set ReceiveId, [Message], Set.Set ReceiveId) splitUpMessagesByAction = foldl' splitter (Set.empty, [], Set.empty) where splitter (oldAck, oldPub, oldNoPub) (actions, msg) = - let newAck = if (Ack `elem` actions) then Set.insert (receiveId msg) oldAck else oldAck - newPub = if (Copy `elem` actions) then msg:oldPub else oldPub - newNoPub = if (Copy `elem` actions) then oldNoPub else Set.insert (receiveId msg) oldNoPub + let newAck = if (shouldAck actions) then Set.insert (receiveId msg) oldAck else oldAck + newPub = if (shouldCopy actions) then msg:oldPub else oldPub + newNoPub = if (shouldCopy actions) then oldNoPub else Set.insert (receiveId msg) oldNoPub in (newAck, newPub, newNoPub) publishMessages :: Destination m => [Message] -> m (Set.Set ReceiveId) diff --git a/src/ScriptFilter.hs b/src/ScriptFilter.hs index fee94dc..2cde54a 100644 --- a/src/ScriptFilter.hs +++ b/src/ScriptFilter.hs @@ -1,10 +1,36 @@ -module ScriptFilter(scriptFilter, defaultFilter) where +module ScriptFilter(scriptFilter, defaultFilter, parseAction) where import Filter import Message +import System.Process -scriptFilter :: FilePath -> Message -> IO FilterAction -scriptFilter _ _ = return CopyAndAck +import qualified Data.ByteString.Char8 as BSC(null, breakSubstring, hGetLine, hPutStrLn, ByteString) +import System.IO(hFlush) -defaultFilter :: Message -> IO FilterAction -defaultFilter _ = return CopyAndAck +contains :: BSC.ByteString -> BSC.ByteString -> Bool +contains text pattern = let (_, after) = BSC.breakSubstring pattern text + in not $ BSC.null after + +actionChecker :: BSC.ByteString -> FilterAction -> BSC.ByteString -> FilterActions +actionChecker pattern action line = let actions = if line `contains` pattern then [action] else [] + in MkFilterActions actions + +actionCheckers :: [BSC.ByteString -> FilterActions] +actionCheckers = [actionChecker "ack" Ack, actionChecker "copy" Copy] + +parseAction :: BSC.ByteString -> FilterActions +parseAction line = mconcat $ actionCheckers <*> [line] + +serializeMessage :: Message -> BSC.ByteString +serializeMessage (MkMessage _ _ msg) = msg + +scriptFilter :: FilePath -> Message -> IO FilterActions +scriptFilter path msg = do + (Just inHandle, Just outHandle, _, _) <- createProcess (proc path []){ std_in = CreatePipe, std_out = CreatePipe } + _ <- BSC.hPutStrLn inHandle (serializeMessage msg) + hFlush inHandle + line <- BSC.hGetLine outHandle + return $ parseAction line + +defaultFilter :: Message -> IO FilterActions +defaultFilter _ = return $ MkFilterActions [Copy, Ack] diff --git a/test/Test.hs b/test/Test.hs index 8083380..ee8d03f 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -4,8 +4,10 @@ import ReceiveId(ReceiveId(..)) import Test.Hspec import FakeEnvironment import Message +import Source() import Destination import Filter +import ScriptFilter import Data.Bifoldable (biList) makeId :: Integral a => a -> ReceiveId @@ -15,7 +17,7 @@ makeMessages :: Int -> [Message] makeMessages n = (\rid -> (MkMessage (makeId rid) "routing key" "test message")) <$> [1..n] bothFilter :: Message -> FilterActions -bothFilter = const [Copy, Ack] +bothFilter = const $ MkFilterActions [Copy, Ack] twoBulkMessages :: [Message] twoBulkMessages = makeMessages 1500 @@ -58,12 +60,30 @@ main = hspec $ do (acknowledgedMessages result) `shouldBe` [toBeAcked] it "should acknowledge messages only if the filter asks for it" $ do - let onlyCopy = const [Copy] + let onlyCopy = const $ MkFilterActions [Copy] let result = runMoveMessages (MkEnv (Right <$> onePageMessages) [] [] allSucceeds onlyCopy) (acknowledgedMessages result) `shouldBe` [] it "should copy the messages only if the filter asks for it" $ do - let onlyAck = const [Ack] + let onlyAck = const $ MkFilterActions [Ack] let result = runMoveMessages (MkEnv (Right <$> twoBulkMessages) [] [] allSucceeds onlyAck) (acknowledgedMessages result) `shouldBe` twoBulksOfIds (null $ published result) `shouldBe` True + + describe "script filters" $ do + it "should parse ack from response line" $ do + (shouldAck $ parseAction "ack") `shouldBe` True + (shouldAck $ parseAction " ack ") `shouldBe` True + (shouldCopy $ parseAction "ack") `shouldBe` False + (shouldCopy $ parseAction " ack ") `shouldBe` False + + it "should parse copy from response line" $ do + (shouldCopy $ parseAction "copy") `shouldBe` True + (shouldCopy $ parseAction " copy ") `shouldBe` True + (shouldAck $ parseAction "copy") `shouldBe` False + (shouldAck $ parseAction " copy ") `shouldBe` False + + it "should combine copy and ack" $ do + let ackcopy = "ackcopy" + (shouldAck $ parseAction ackcopy) `shouldBe` True + (shouldCopy $ parseAction ackcopy) `shouldBe` True