Skip to content

Commit

Permalink
add script filter
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterHajdu committed Nov 22, 2019
1 parent 8ba697c commit 9823d39
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 20 deletions.
7 changes: 5 additions & 2 deletions bobek.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ test-suite test
build-depends: base >=4.10
, hspec
, mtl
, process
, safe
, containers
, bytestring
, text
other-modules: FakeEnvironment
, Source
, ScriptFilter
, Destination
, Filter
, Message
, Mover
, ReceiveId
hs-source-dirs: src, test
default-language: Haskell2010
extensions: OverloadedStrings
default-extensions: OverloadedStrings
ghc-options: -Weverything
-Wno-missing-exported-signatures
-Wno-implicit-prelude
Expand Down Expand Up @@ -58,6 +60,7 @@ executable bobek
build-depends: base >=4.10
, amqp
, bytestring
, process
, mtl
, transformers
, text
Expand All @@ -66,7 +69,7 @@ executable bobek
, optparse-applicative
hs-source-dirs: src
default-language: Haskell2010
extensions: OverloadedStrings
default-extensions: OverloadedStrings
ghc-options: -Weverything
-Wno-missing-exported-signatures
-Wno-implicit-prelude
Expand Down
2 changes: 1 addition & 1 deletion src/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data SourceFunctions = MkSourceFunctions (IO (Either NoMessageReason Message)) (
data Env = MkEnv
{ envPublish :: [Message] -> IO PublishResult
, sourceFunctions :: SourceFunctions
, envFilterAction :: Message -> IO FilterAction
, envFilterAction :: Message -> IO FilterActions
}

newtype App a = MkApp {run :: ReaderT Env IO a} deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO)
Expand Down
16 changes: 14 additions & 2 deletions src/Filter.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
module Filter(Filter(..), FilterAction(..), FilterActions) where
module Filter(Filter(..), FilterAction(..), FilterActions(..), shouldAck, shouldCopy) where

import Message

type FilterActions = [FilterAction]
newtype FilterActions = MkFilterActions [FilterAction]

instance Semigroup FilterActions where
(MkFilterActions l) <> (MkFilterActions r) = MkFilterActions (l <> r)

instance Monoid FilterActions where
mempty = MkFilterActions []

data FilterAction =
Ack
| Copy
deriving (Eq, Show)

shouldAck :: FilterActions -> Bool
shouldAck (MkFilterActions actions) = elem Ack actions

shouldCopy :: FilterActions -> Bool
shouldCopy (MkFilterActions actions) = elem Copy actions

class Monad m => Filter m where
filterAction :: Message -> m FilterActions

8 changes: 4 additions & 4 deletions src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Main(main) where
import Env
import RabbitMqEnv
import FileEnv
import Filter(FilterAction(..))
import Filter(FilterActions(..), FilterAction(..))
import qualified Message as M
import Destination
import OptParse
Expand All @@ -19,7 +19,7 @@ import Data.Maybe (fromMaybe)
printError :: String -> IO ()
printError errorMsg = putStrLn $ "Unable to initialize rabbitmq environment: " ++ errorMsg

createEnv :: (M.Message -> IO FilterAction) -> ([M.Message] -> IO PublishResult) -> SourceFunctions -> Env
createEnv :: (M.Message -> IO FilterActions) -> ([M.Message] -> IO PublishResult) -> SourceFunctions -> Env
createEnv fltr pub sourceFuncs = MkEnv pub sourceFuncs fltr

type PublisherFunction = Either String ([M.Message] -> IO PublishResult)
Expand All @@ -35,8 +35,8 @@ createSource Stdin = return $ Right $ createStdinSource
createSource (Infile filePath) = createFileSource filePath
createSource (Queue uri queueName) = createRabbitMqSource (AMQP.fromURI uri) (pack queueName)

createFilter :: FilterOpt -> (M.Message -> IO FilterAction)
createFilter DontAck = const $ return Copy
createFilter :: FilterOpt -> (M.Message -> IO FilterActions)
createFilter DontAck = const $ return $ MkFilterActions [Copy]
createFilter (ScriptFilter scriptPath) = scriptFilter scriptPath

main :: IO ()
Expand Down
6 changes: 3 additions & 3 deletions src/Mover.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ runFilter msgs = do
splitUpMessagesByAction :: [(FilterActions, Message)] -> (Set.Set ReceiveId, [Message], Set.Set ReceiveId)
splitUpMessagesByAction = foldl' splitter (Set.empty, [], Set.empty)
where splitter (oldAck, oldPub, oldNoPub) (actions, msg) =
let newAck = if (Ack `elem` actions) then Set.insert (receiveId msg) oldAck else oldAck
newPub = if (Copy `elem` actions) then msg:oldPub else oldPub
newNoPub = if (Copy `elem` actions) then oldNoPub else Set.insert (receiveId msg) oldNoPub
let newAck = if (shouldAck actions) then Set.insert (receiveId msg) oldAck else oldAck
newPub = if (shouldCopy actions) then msg:oldPub else oldPub
newNoPub = if (shouldCopy actions) then oldNoPub else Set.insert (receiveId msg) oldNoPub
in (newAck, newPub, newNoPub)

publishMessages :: Destination m => [Message] -> m (Set.Set ReceiveId)
Expand Down
36 changes: 31 additions & 5 deletions src/ScriptFilter.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
module ScriptFilter(scriptFilter, defaultFilter) where
module ScriptFilter(scriptFilter, defaultFilter, parseAction) where

import Filter
import Message
import System.Process

scriptFilter :: FilePath -> Message -> IO FilterAction
scriptFilter _ _ = return CopyAndAck
import qualified Data.ByteString.Char8 as BSC(null, breakSubstring, hGetLine, hPutStrLn, ByteString)
import System.IO(hFlush)

defaultFilter :: Message -> IO FilterAction
defaultFilter _ = return CopyAndAck
contains :: BSC.ByteString -> BSC.ByteString -> Bool
contains text pattern = let (_, after) = BSC.breakSubstring pattern text
in not $ BSC.null after

actionChecker :: BSC.ByteString -> FilterAction -> BSC.ByteString -> FilterActions
actionChecker pattern action line = let actions = if line `contains` pattern then [action] else []
in MkFilterActions actions

actionCheckers :: [BSC.ByteString -> FilterActions]
actionCheckers = [actionChecker "ack" Ack, actionChecker "copy" Copy]

parseAction :: BSC.ByteString -> FilterActions
parseAction line = mconcat $ actionCheckers <*> [line]

serializeMessage :: Message -> BSC.ByteString
serializeMessage (MkMessage _ _ msg) = msg

scriptFilter :: FilePath -> Message -> IO FilterActions
scriptFilter path msg = do
(Just inHandle, Just outHandle, _, _) <- createProcess (proc path []){ std_in = CreatePipe, std_out = CreatePipe }
_ <- BSC.hPutStrLn inHandle (serializeMessage msg)
hFlush inHandle
line <- BSC.hGetLine outHandle
return $ parseAction line

defaultFilter :: Message -> IO FilterActions
defaultFilter _ = return $ MkFilterActions [Copy, Ack]
26 changes: 23 additions & 3 deletions test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import ReceiveId(ReceiveId(..))
import Test.Hspec
import FakeEnvironment
import Message
import Source()
import Destination
import Filter
import ScriptFilter
import Data.Bifoldable (biList)

makeId :: Integral a => a -> ReceiveId
Expand All @@ -15,7 +17,7 @@ makeMessages :: Int -> [Message]
makeMessages n = (\rid -> (MkMessage (makeId rid) "routing key" "test message")) <$> [1..n]

bothFilter :: Message -> FilterActions
bothFilter = const [Copy, Ack]
bothFilter = const $ MkFilterActions [Copy, Ack]

twoBulkMessages :: [Message]
twoBulkMessages = makeMessages 1500
Expand Down Expand Up @@ -58,12 +60,30 @@ main = hspec $ do
(acknowledgedMessages result) `shouldBe` [toBeAcked]

it "should acknowledge messages only if the filter asks for it" $ do
let onlyCopy = const [Copy]
let onlyCopy = const $ MkFilterActions [Copy]
let result = runMoveMessages (MkEnv (Right <$> onePageMessages) [] [] allSucceeds onlyCopy)
(acknowledgedMessages result) `shouldBe` []

it "should copy the messages only if the filter asks for it" $ do
let onlyAck = const [Ack]
let onlyAck = const $ MkFilterActions [Ack]
let result = runMoveMessages (MkEnv (Right <$> twoBulkMessages) [] [] allSucceeds onlyAck)
(acknowledgedMessages result) `shouldBe` twoBulksOfIds
(null $ published result) `shouldBe` True

describe "script filters" $ do
it "should parse ack from response line" $ do
(shouldAck $ parseAction "ack") `shouldBe` True
(shouldAck $ parseAction " ack ") `shouldBe` True
(shouldCopy $ parseAction "ack") `shouldBe` False
(shouldCopy $ parseAction " ack ") `shouldBe` False

it "should parse copy from response line" $ do
(shouldCopy $ parseAction "copy") `shouldBe` True
(shouldCopy $ parseAction " copy ") `shouldBe` True
(shouldAck $ parseAction "copy") `shouldBe` False
(shouldAck $ parseAction " copy ") `shouldBe` False

it "should combine copy and ack" $ do
let ackcopy = "ackcopy"
(shouldAck $ parseAction ackcopy) `shouldBe` True
(shouldCopy $ parseAction ackcopy) `shouldBe` True

0 comments on commit 9823d39

Please sign in to comment.