Skip to content

Commit

Permalink
extract source functions to separate type
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterHajdu committed Nov 15, 2019
1 parent 3f0d7b7 commit 52c9e4b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 22 deletions.
17 changes: 9 additions & 8 deletions src/Env.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Env(Env(..), App(..), runMover) where
module Env(Env(..), App(..), runMover, SourceFunctions(..)) where

import Mover
import Source
Expand All @@ -9,12 +9,13 @@ import ReceiveId
import Message
import Filter
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Reader (MonadReader, liftIO, ask, MonadIO, runReaderT)
import Control.Monad.Reader (MonadReader, liftIO, ask, asks, MonadIO, runReaderT)

data SourceFunctions = MkSourceFunctions (IO (Either NoMessageReason Message)) ([ReceiveId] -> IO ())

data Env = MkEnv
{ envPublish :: [Message] -> IO PublishResult
, envReceive :: IO (Either NoMessageReason Message)
, envAcknowledge :: [ReceiveId] -> IO ()
, sourceFunctions :: SourceFunctions
, envFilterAction :: Message -> IO FilterAction
}

Expand All @@ -27,11 +28,11 @@ instance Destination App where

instance Source App where
receive = do
env <- ask
liftIO $ envReceive env
(MkSourceFunctions recv _) <- asks sourceFunctions
liftIO recv
acknowledge ids = do
env <- ask
liftIO $ (envAcknowledge env) ids
(MkSourceFunctions _ ack) <- asks sourceFunctions
liftIO $ ack ids

instance Filter App where
filterAction msg = do
Expand Down
11 changes: 6 additions & 5 deletions src/FileEnv.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import Control.Arrow(left)

import Data.Text()

import Env(SourceFunctions(..))

routingKeySeparator :: Char
routingKeySeparator = ' '

Expand All @@ -38,14 +40,13 @@ recv handle = do
return $ maybeBody >>= parseRoutingKeyAndMessage
where mapError e = if isEOFError e then NMREmptyQueue else NMRError $ show e

createStdinSource :: (IO (Either NoMessageReason Message), [ReceiveId] -> IO ())
createStdinSource = (recv stdin, const $ return ())
createStdinSource :: SourceFunctions
createStdinSource = MkSourceFunctions (recv stdin) (const $ return ())

--todo: extract source functions to a named data structure
createFileSource :: FilePath -> IO (Either String (IO (Either NoMessageReason Message), [ReceiveId] -> IO ()))
createFileSource :: FilePath -> IO (Either String SourceFunctions)
createFileSource filePath = do
maybeHandle <- catchIO $ openFile filePath ReadMode
return $ bimap show (\handle -> (recv handle, const $ return ())) maybeHandle
return $ bimap show (\handle -> MkSourceFunctions (recv handle) (const $ return ())) maybeHandle

serializeMessage :: Message -> BSC.ByteString
serializeMessage (MkMessage _ routingK msg) = (encodeUtf8 routingK) `BSC.append` (BSC.cons routingKeySeparator msg)
Expand Down
10 changes: 3 additions & 7 deletions src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import FileEnv
import Filter(FilterAction)
import qualified Message as M
import Destination
import Source
import ReceiveId
import OptParse
import Data.Text
import ScriptFilter
Expand All @@ -21,8 +19,8 @@ import Data.Function ((&))
printError :: String -> IO ()
printError errorMsg = putStrLn $ "Unable to initialize rabbitmq environment: " ++ errorMsg

createEnv :: (M.Message -> IO FilterAction) -> ([M.Message] -> IO PublishResult) -> ((IO (Either NoMessageReason M.Message)), ([ReceiveId] -> IO ())) -> Env
createEnv fltr pub (rec, ack) = MkEnv pub rec ack fltr
createEnv :: (M.Message -> IO FilterAction) -> ([M.Message] -> IO PublishResult) -> SourceFunctions -> Env
createEnv fltr pub sourceFuncs = MkEnv pub sourceFuncs fltr

type PublisherFunction = Either String ([M.Message] -> IO PublishResult)

Expand All @@ -32,9 +30,7 @@ createDestination (Outfile filePath) = createFileDestination filePath
createDestination (Exchange uri ex maybeRk) =
createRabbitMqDestination (AMQP.fromURI uri) (pack ex) (pack <$> maybeRk)

type SourceFunctions = Either String (IO (Either NoMessageReason M.Message), [ReceiveId] -> IO ())

createSource :: SourceOpts -> IO SourceFunctions
createSource :: SourceOpts -> IO (Either String SourceFunctions)
createSource Stdin = return $ Right $ createStdinSource
createSource (Infile filePath) = createFileSource filePath
createSource (Queue uri queueName) = createRabbitMqSource (AMQP.fromURI uri) (pack queueName)
Expand Down
5 changes: 3 additions & 2 deletions src/RabbitMqEnv.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module RabbitMqEnv(createRabbitMqSource, createRabbitMqDestination) where

import Env(SourceFunctions(..))
import qualified Message as M
import Source
import Destination
Expand Down Expand Up @@ -75,10 +76,10 @@ createChannel connOpts = runExceptT $ do
maybeChan <- liftIO $ catchAmqp $ AMQP.openChannel conn
except $ left show maybeChan

createRabbitMqSource :: AMQP.ConnectionOpts -> T.Text -> IO (Either String (IO (Either NoMessageReason M.Message), [ReceiveId] -> IO ()))
createRabbitMqSource :: AMQP.ConnectionOpts -> T.Text -> IO (Either String SourceFunctions)
createRabbitMqSource connOpts queue = do
maybeChan <- createChannel connOpts
return $ bimap show (\chan -> (rabbitReceive chan queue, rabbitAcknowledge chan)) maybeChan
return $ bimap show (\chan -> MkSourceFunctions (rabbitReceive chan queue) (rabbitAcknowledge chan)) maybeChan

createRabbitMqDestination :: AMQP.ConnectionOpts -> T.Text -> Maybe T.Text -> IO (Either String ([M.Message] -> IO PublishResult))
createRabbitMqDestination connOpts exchange routingKey = runExceptT $ do
Expand Down

0 comments on commit 52c9e4b

Please sign in to comment.