{-# Language RankNTypes #-}
module Network.Mom.Stompl.Client.Conduit (
qSource, qSink,
qMultiSource, qMultiSink)
where
import qualified Data.Conduit as C
import Codec.MIME.Type (Type)
import Control.Monad.Trans (liftIO)
import Control.Monad.IO.Class (MonadIO)
import System.Timeout
import Network.Mom.Stompl.Client.Queue
import Network.Mom.Stompl.Frame
qSource :: MonadIO m =>
Reader i -> Int -> C.ConduitT () (Message i) m ()
qSource :: Reader i -> Int -> ConduitT () (Message i) m ()
qSource Reader i
r Int
tmo = ConduitT () (Message i) m ()
forall i. ConduitT i (Message i) m ()
go
where go :: ConduitT i (Message i) m ()
go = IO (Maybe (Message i))
-> ConduitT i (Message i) m (Maybe (Message i))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r) ConduitT i (Message i) m (Maybe (Message i))
-> (Maybe (Message i) -> ConduitT i (Message i) m ())
-> ConduitT i (Message i) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (Message i) -> ConduitT i (Message i) m ()
mbYield
mbYield :: Maybe (Message i) -> ConduitT i (Message i) m ()
mbYield Maybe (Message i)
mbX = case Maybe (Message i)
mbX of
Maybe (Message i)
Nothing -> () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Message i
x -> Message i -> ConduitT i (Message i) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Message i
x ConduitT i (Message i) m ()
-> ConduitT i (Message i) m () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i (Message i) m ()
go
qSink :: MonadIO m =>
Writer o -> Type -> [Header] -> C.ConduitT o C.Void m ()
qSink :: Writer o -> Type -> [Header] -> ConduitT o Void m ()
qSink Writer o
w Type
t [Header]
hs = (o -> ConduitT o Void m ()) -> ConduitT o Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever ((o -> ConduitT o Void m ()) -> ConduitT o Void m ())
-> (o -> ConduitT o Void m ()) -> ConduitT o Void m ()
forall a b. (a -> b) -> a -> b
$ \o
x -> IO () -> ConduitT o Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer o
w Type
t [Header]
hs o
x)
lastMHdr :: Header
lastMHdr :: Header
lastMHdr = ([Char]
"__last__", [Char]
"true")
qMultiSource :: MonadIO m =>
Reader i -> Int -> C.ConduitT () (Message i) m ()
qMultiSource :: Reader i -> Int -> ConduitT () (Message i) m ()
qMultiSource Reader i
r Int
tmo = ConduitT () (Message i) m ()
forall i. ConduitT i (Message i) m ()
loop
where loop :: ConduitT i (Message i) m ()
loop = do
Maybe (Message i)
mbX <- IO (Maybe (Message i))
-> ConduitT i (Message i) m (Maybe (Message i))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r)
case Maybe (Message i)
mbX of
Maybe (Message i)
Nothing -> () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Message i
x -> case [Char] -> [Header] -> Maybe [Char]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup (Header -> [Char]
forall a b. (a, b) -> a
fst Header
lastMHdr) ([Header] -> Maybe [Char]) -> [Header] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ Message i -> [Header]
forall a. Message a -> [Header]
msgHdrs Message i
x of
Just [Char]
"true" -> Message i -> ConduitT i (Message i) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Message i
x
Maybe [Char]
_ -> Message i -> ConduitT i (Message i) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Message i
x ConduitT i (Message i) m ()
-> ConduitT i (Message i) m () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i (Message i) m ()
loop
qMultiSink :: MonadIO m =>
Writer o -> Type -> [Header] -> C.ConduitT o C.Void m ()
qMultiSink :: Writer o -> Type -> [Header] -> ConduitT o Void m ()
qMultiSink Writer o
w Type
t [Header]
hs = do
Maybe o
mbX <- ConduitT o Void m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
case Maybe o
mbX of
Maybe o
Nothing -> () -> ConduitT o Void m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just o
x -> o -> ConduitT o Void m ()
forall (m :: * -> *) o. MonadIO m => o -> ConduitT o o m ()
go o
x
where hs' :: [Header]
hs' = Header
lastMHdrHeader -> [Header] -> [Header]
forall a. a -> [a] -> [a]
:[Header]
hs
go :: o -> ConduitT o o m ()
go o
x = do
Maybe o
mbY <- ConduitT o o m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
case Maybe o
mbY of
Maybe o
Nothing -> IO () -> ConduitT o o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer o
w Type
t [Header]
hs' o
x)
Just o
y -> IO () -> ConduitT o o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer o
w Type
t [Header]
hs o
x) ConduitT o o m () -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> o -> ConduitT o o m ()
go o
y