Skip to content

Commit

Permalink
add filter
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterHajdu committed Oct 31, 2019
1 parent 0fbfe6d commit 2e80af6
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 25 deletions.
3 changes: 3 additions & 0 deletions bobek.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ test-suite test
other-modules: FakeEnvironment
, Source
, Destination
, Filter
, Message
, Mover
, ReceiveId
Expand All @@ -47,6 +48,8 @@ executable bobek
, Destination
, Mover
, ReceiveId
, Filter
, ScriptFilter
, Env
, RabbitMqEnv
, FileEnv
Expand Down
7 changes: 7 additions & 0 deletions src/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import Source
import Destination
import ReceiveId
import Message
import Filter
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Reader (MonadReader, liftIO, ask, MonadIO, runReaderT)

data Env = MkEnv
{ envPublish :: [Message] -> IO PublishResult
, envReceive :: IO (Either NoMessageReason Message)
, envAcknowledge :: [ReceiveId] -> IO ()
, envFilterAction :: Message -> IO FilterAction
}

newtype App a = MkApp {run :: ReaderT Env IO a} deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO)
Expand All @@ -31,5 +33,10 @@ instance Source App where
env <- ask
liftIO $ (envAcknowledge env) ids

instance Filter App where
filterAction msg = do
env <- ask
liftIO $ (envFilterAction env) msg

runMover :: Env -> IO ()
runMover = runReaderT (run moveMessages)
16 changes: 16 additions & 0 deletions src/Filter.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module Filter(Filter(..), FilterAction(..), shouldAck) where

import Message

data FilterAction =
Ack
| Copy
| CopyAndAck

shouldAck :: FilterAction -> Bool
shouldAck Copy = False
shouldAck _ = True

class Filter m where
filterAction :: Message -> m FilterAction

9 changes: 6 additions & 3 deletions src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module Main(main) where
import Env
import RabbitMqEnv
import FileEnv
import Filter(FilterAction)
import qualified Message as M
import Destination
import Source
import ReceiveId
import OptParse
import Data.Text
import ScriptFilter

import qualified Network.AMQP as AMQP(fromURI)

Expand All @@ -18,8 +20,8 @@ import Data.Semigroup ((<>))
printError :: String -> IO ()
printError errorMsg = putStrLn $ "Unable to initialize rabbitmq environment: " ++ errorMsg

createEnv :: ([M.Message] -> IO PublishResult) -> ((IO (Either NoMessageReason M.Message)), ([ReceiveId] -> IO ())) -> Env
createEnv pub (rec, ack) = MkEnv pub rec ack
createEnv :: (M.Message -> IO FilterAction) -> ([M.Message] -> IO PublishResult) -> ((IO (Either NoMessageReason M.Message)), ([ReceiveId] -> IO ())) -> Env
createEnv fltr pub (rec, ack) = MkEnv pub rec ack fltr

type PublisherFunction = Either String ([M.Message] -> IO PublishResult)
type SourceFunctions = Either String (IO (Either NoMessageReason M.Message), [ReceiveId] -> IO ())
Expand All @@ -39,4 +41,5 @@ main = do
opts <- execParser (info optionParser (fullDesc <> progDesc "rabbitmq swiss army knife"))
maybePublisher <- createDestination (destination opts)
maybeSource <- createSource (source opts)
either printError runMover (createEnv <$> maybePublisher <*> maybeSource)
let fltr = maybe defaultFilter scriptFilter (OptParse.filter opts)
either printError runMover ((createEnv fltr) <$> maybePublisher <*> maybeSource)
40 changes: 26 additions & 14 deletions src/Mover.hs
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
{-# LANGUAGE ScopedTypeVariables #-}
module Mover(moveMessages) where

import Destination
import Message
import Source
import Message()
import Filter

import Control.Monad (unless)
import Control.Monad (unless, void)

moveMessages :: (Source m, Destination m) => m ()
moveMessages = do
stuff
getMessages :: Source m => m (Either NoMessageReason [Message])
getMessages = do
msg <- receive --todo: should use bulked publish
return $ pure <$> msg

stuff :: (Source m, Destination m) => m ()
stuff = do
maybeMessage <- receive
case maybeMessage of
Left _ -> return ()
Right message -> do
publishResult <- publish [message]
let ackable = succeeded publishResult
unless (null ackable) $ acknowledge ackable
stuff
publishAndAckMessages :: forall m.(Source m, Destination m, Filter m) => [Message] -> m ()
publishAndAckMessages msgs = void $ traverse publishSingleMessage msgs --todo: should use bulked publish
where publishSingleMessage :: Message -> m ()
publishSingleMessage msg = do
publishResult <- publish [msg]
needsAck <- shouldAck <$> (filterAction msg)
if needsAck then do
let ackable = succeeded publishResult
unless (null ackable) $ acknowledge ackable
else return ()

moveMessages :: (Source m, Destination m, Filter m) => m ()
moveMessages = do
maybeMessages <- getMessages
case maybeMessages of
Left _ -> return ()
Right messages -> do
publishAndAckMessages messages
moveMessages
10 changes: 10 additions & 0 deletions src/OptParse.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ newtype Path = MkPath Text deriving(Data.String.IsString, Show)
data Opts = MkOpts
{ source :: SourceOpts
, destination :: DestinationOpts
, filter :: Maybe FilePath
} deriving(Show)

data SourceOpts
Expand All @@ -34,6 +35,15 @@ optionParser :: Parser Opts
optionParser = MkOpts
<$> (srcAmqpOpt <|> srcFileOpt)
<*> (destAmqpOpt <|> destFileOpt)
<*> filterOpt

filterOpt :: Parser (Maybe FilePath)
filterOpt = (optional $ strOption
( long "filter"
<> short 'f'
<> metavar "SCRIPT"
<> help "Filter script path.")
)

srcAmqpOpt :: Parser SourceOpts
srcAmqpOpt = AmqpSource
Expand Down
14 changes: 12 additions & 2 deletions test/FakeEnvironment.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Destination
import Control.Monad.State.Strict (State, execState)
import Control.Monad.State (MonadState, get, put)
import Safe(tailSafe, headDef)
import Filter

runMoveMessages :: Environment -> Environment
runMoveMessages env = execState (run moveMessages) env
Expand All @@ -19,6 +20,7 @@ data Environment = MkEnv {
, acknowledgedMessages :: [[ReceiveId]]
, published :: [[Message]]
, publishResults :: [PublishResult]
, filterResults :: [FilterAction]
}

newtype FakeEnvironment a = MkFakeEnvironment {run :: State Environment a} deriving (Functor, Applicative, Monad, MonadState Environment)
Expand All @@ -32,11 +34,19 @@ instance Source FakeEnvironment where
return (headDef (Left NMREmptyQueue) toRec)

acknowledge ackIds = do
env@(MkEnv _ acks _ _) <- get
env@(MkEnv _ acks _ _ _) <- get
put $ env {acknowledgedMessages = acks++[ackIds]}

instance Destination FakeEnvironment where
publish publishedMessages = do
env@(MkEnv _ _ pubed pubRes) <- get
env@(MkEnv _ _ pubed pubRes _) <- get
put $ env {published = pubed++[publishedMessages], publishResults = tailSafe pubRes}
return $ head pubRes

instance Filter FakeEnvironment where
filterAction _ = do
oldEnv@(MkEnv _ _ _ _ actions) <- get
let newEnv = oldEnv {filterResults = tailSafe actions}
put newEnv
return (head actions)

22 changes: 16 additions & 6 deletions test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import FakeEnvironment
import Message
import Source
import Destination
import Filter

testMessages :: [Message]
testMessages = (\rid -> (MkMessage (MkReceiveId rid) "routing key" "test message")) <$> [1..100]
Expand All @@ -25,18 +26,27 @@ someSucceeds succeededIds = (\rid -> if elem rid succeededIds
then MkPublishResult [] [rid]
else MkPublishResult [rid] []) <$> testIds

bothFilter :: [FilterAction]
bothFilter = repeat CopyAndAck

main :: IO ()
main = hspec $ do
describe "moveMessages" $ do
it "should send message received from the source" $ do
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses)
(published result) `shouldBe` (:[]) <$> testMessages
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses bothFilter)
(published result) `shouldBe` pure <$> testMessages

it "should acknowledge published messages" $ do
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses)
(acknowledgedMessages result) `shouldBe` (:[]) <$> testIds
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses bothFilter)
(acknowledgedMessages result) `shouldBe` pure <$> testIds

it "should acknowledge messages only if publishing succeeds" $ do
let succeededIds = [MkReceiveId 10, MkReceiveId 20]
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] (someSucceeds succeededIds))
(acknowledgedMessages result) `shouldBe` ((:[]) <$> succeededIds)
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] (someSucceeds succeededIds) bothFilter)
(acknowledgedMessages result) `shouldBe` pure <$> succeededIds

it "should acknowledge messages only if the filter asks for it" $ do
let onlyCopy = repeat Copy
let result = runMoveMessages (MkEnv testMessagesToReceive [] [] publishSuccesses onlyCopy)
(acknowledgedMessages result) `shouldBe` []

1 change: 1 addition & 0 deletions todo
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- handle exceptions during configmwait
- add logging

0 comments on commit 2e80af6

Please sign in to comment.