{-# Language BangPatterns #-}
module Network.Mom.Stompl.Patterns.Balancer (
withBalancer,
withRouter)
where
import Registry
import Types
import Network.Mom.Stompl.Client.Queue
import Network.Mom.Stompl.Patterns.Basic
import Codec.MIME.Type (nullType)
import Control.Exception (throwIO, catches)
import Control.Monad (forever, unless)
withBalancer :: Con -> String -> QName -> (Int, Int) ->
QName -> OnError -> IO r -> IO r
withBalancer :: Con
-> String
-> String
-> (Int, Int)
-> String
-> OnError
-> IO r
-> IO r
withBalancer Con
c String
n String
qn (Int
mn,Int
mx) String
rq OnError
onErr IO r
action =
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
forall r.
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
withRegistry Con
c String
n String
qn (Int
mn,Int
mx) OnError
onErr ((Registry -> IO r) -> IO r) -> (Registry -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Registry
reg ->
Con
-> String
-> ReaderDesc ByteString
-> WriterDesc ByteString
-> ((Reader ByteString, Writer ByteString) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
c String
n (String
rq, [], [], InBound ByteString
bytesIn)
(String
"unknown", [], [], OutBound ByteString
bytesOut) (((Reader ByteString, Writer ByteString) -> IO r) -> IO r)
-> ((Reader ByteString, Writer ByteString) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader ByteString
r,Writer ByteString
w) ->
IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (Registry -> Reader ByteString -> Writer ByteString -> IO ()
forall a b. Registry -> Reader a -> Writer a -> IO b
balance Registry
reg Reader ByteString
r Writer ByteString
w) IO r
action
where balance :: Registry -> Reader a -> Writer a -> IO b
balance Registry
reg Reader a
r Writer a
w =
IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
Message a
m <- Reader a -> IO (Message a)
forall a. Reader a -> IO (Message a)
readQ Reader a
r
String
jn <- Message a -> IO String
forall m. Message m -> IO String
getJobName Message a
m
Bool
t <- Registry -> String -> (Provider -> IO ()) -> IO Bool
mapR Registry
reg String
jn (Writer a -> Message a -> Provider -> IO ()
forall a. Writer a -> Message a -> Provider -> IO ()
send2Prov Writer a
w Message a
m)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
t (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
NoProviderX String
jn)
(String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)
send2Prov :: Writer a -> Message a -> Provider -> IO ()
send2Prov Writer a
w Message a
m Provider
p = Writer a -> String -> Type -> [Header] -> a -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc Writer a
w (Provider -> String
prvQ Provider
p) Type
nullType
(Message a -> [Header]
forall a. Message a -> [Header]
msgHdrs Message a
m) (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> a
forall a. Message a -> a
msgContent Message a
m
withRouter :: Con -> String -> JobName ->
QName -> QName -> QName ->
Int -> OnError -> IO r -> IO r
withRouter :: Con
-> String
-> String
-> String
-> String
-> String
-> Int
-> OnError
-> IO r
-> IO r
withRouter Con
c String
n String
jn String
srq String
ssq String
trq Int
tmo OnError
onErr IO r
action =
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc ByteString
-> (PubA ByteString -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
c String
n String
jn String
trq OnError
onErr
(String
"unknown", [], [], OutBound ByteString
bytesOut) ((PubA ByteString -> IO r) -> IO r)
-> (PubA ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PubA ByteString
p ->
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc ByteString
-> (Message ByteString -> IO ())
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
c String
n String
jn String
srq Int
tmo
(String
ssq, [], [], InBound ByteString
bytesIn) (PubA ByteString -> Message ByteString -> IO ()
forall o. PubA o -> Message o -> IO ()
pub PubA ByteString
p) OnError
onErr IO r
action
where pub :: PubA o -> Message o -> IO ()
pub PubA o
p Message o
m = PubA o -> Type -> [Header] -> o -> IO ()
forall o. PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
nullType (Message o -> [Header]
forall a. Message a -> [Header]
msgHdrs Message o
m) (o -> IO ()) -> o -> IO ()
forall a b. (a -> b) -> a -> b
$ Message o -> o
forall a. Message a -> a
msgContent Message o
m