{-# Language BangPatterns #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Patterns/Balancer.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- This module provides a balancer for services and tasks
-- and a topic router.
-- Balancers for services and tasks improve scalability and reliability
-- of servers and workers. Workers should always be used with a balancer
-- (since balancing workload is the main idea of workers);
-- servers can very well be used without a balancer, but won't scale
-- with increasing numbers of clients.
--
-- A balancer consists of a registry to which 
-- servers and workers connect;
-- servers and workers are maintained in lists 
-- according to the job they provide.
-- Clients and pushers send requests to the balancer,
-- which then forwards the request to a server or worker.
-- The client will receive the reply not through the balancer,
-- but directly from the server (to which the reply queue
-- was forwarded as part of the request message -- 
-- see 'ClientA' for details).
-- 
-- With servers and workers sending heartbeats,
-- a balancer also improves reliability
-- in contrast to a topology
-- where a task is pushed to a single worker or 
-- a request is sent to only one server.
-- 
-- A router is a forwarder of a topic.
-- A router is very similar to a publisher ('PubA')
-- with the difference that the router
-- does not create new topic data, 
-- but uses topic data received from a publisher
-- (a router, hence, is a subscriber and a publisher).
-- Routers can be used to balance the workload of publishers:
-- Instead of one publisher serving thousands of subscribers,
-- the initial publisher would serve thousands of routers,
-- which, in their turn, serve thousands of subscribers 
-- (or even other routers).
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Patterns.Balancer (
                                      -- * Balancer
                                      withBalancer,
                                      -- * Router 
                                      withRouter)
where

  import           Registry
  import           Types
  import           Network.Mom.Stompl.Client.Queue 
  import           Network.Mom.Stompl.Patterns.Basic
  import           Codec.MIME.Type (nullType)
  import           Control.Exception (throwIO, catches)
  import           Control.Monad (forever, unless)

  -----------------------------------------------------------------------
  -- | Create a Service and Task Balancer with the lifetime
  --   of the application-defined action passed in
  --   and start it in a background thread:
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the balancer, used for error handling;
  --
  --   * 'QName': Registration queue -- this queue is used
  --              by providers to connect to the registry,
  --              it is not used for consumer requests;
  --
  --   * (Int, Int): Heartbeat range of the 'Registry' 
  --                 (see 'withRegistry' for details);
  --  
  --   * 'QName': Request queue -- this queue is used
  --              for consumer requests;
  --
  --   * 'OnError': Error handling;
  --
  --   * IO r: Action that defines the lifetime of the balancer;
  --           the result /r/ is also the result of /withBalancer/.
  -----------------------------------------------------------------------
  withBalancer :: Con     -> String  -> QName -> (Int, Int) -> 
                  QName   -> OnError -> IO r  -> IO r
  withBalancer :: Con
-> String
-> String
-> (Int, Int)
-> String
-> OnError
-> IO r
-> IO r
withBalancer Con
c String
n String
qn (Int
mn,Int
mx) String
rq OnError
onErr IO r
action =
    Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
forall r.
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
withRegistry Con
c String
n String
qn (Int
mn,Int
mx) OnError
onErr ((Registry -> IO r) -> IO r) -> (Registry -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Registry
reg -> 
      Con
-> String
-> ReaderDesc ByteString
-> WriterDesc ByteString
-> ((Reader ByteString, Writer ByteString) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
c String
n (String
rq,        [], [], InBound ByteString
bytesIn)
                   (String
"unknown", [], [], OutBound ByteString
bytesOut) (((Reader ByteString, Writer ByteString) -> IO r) -> IO r)
-> ((Reader ByteString, Writer ByteString) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader ByteString
r,Writer ByteString
w) -> 
        IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (Registry -> Reader ByteString -> Writer ByteString -> IO ()
forall a b. Registry -> Reader a -> Writer a -> IO b
balance Registry
reg Reader ByteString
r Writer ByteString
w) IO r
action
    where balance :: Registry -> Reader a -> Writer a -> IO b
balance Registry
reg Reader a
r Writer a
w = 
              IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
                Message a
m  <- Reader a -> IO (Message a)
forall a. Reader a -> IO (Message a)
readQ Reader a
r
                String
jn <- Message a -> IO String
forall m. Message m -> IO String
getJobName Message a
m
                Bool
t  <- Registry -> String -> (Provider -> IO ()) -> IO Bool
mapR Registry
reg String
jn (Writer a -> Message a -> Provider -> IO ()
forall a. Writer a -> Message a -> Provider -> IO ()
send2Prov Writer a
w Message a
m)
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
t (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
NoProviderX String
jn)
              (String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)
          send2Prov :: Writer a -> Message a -> Provider -> IO ()
send2Prov Writer a
w Message a
m Provider
p = Writer a -> String -> Type -> [Header] -> a -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc Writer a
w (Provider -> String
prvQ Provider
p) Type
nullType 
                                         (Message a -> [Header]
forall a. Message a -> [Header]
msgHdrs Message a
m) (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> a
forall a. Message a -> a
msgContent Message a
m

  -----------------------------------------------------------------------
  -- | Create a router with the lifetime of the 
  --   application-defined action passed in
  --   and start it in a background thread:
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the router, used for error handling;
  --
  --   * 'JobName': Routed topic;
  --
  --   * 'QName': Registration queue of the source publisher;
  --
  --   * 'QName': Queue through which the internal subscriber
  --              will receive the topic data from the source publisher;
  --
  --   * 'QName': Registration queue of the target publisher
  --              to which subscribers will connect;
  --
  --   * Int: Registration timeout 
  --          (timeout to register at the source publisher);
  --  
  --   * 'QName': Request queue -- this queue is used
  --              for consumer requests;
  --
  --   * 'OnError': Error handling;
  --
  --   * IO r: Action that defines the lifetime of the router;
  --           the result /r/ is also the result of /withRouter/.
  -----------------------------------------------------------------------
  withRouter :: Con   -> String  -> JobName -> 
                QName -> QName   -> QName   -> 
                Int   -> OnError -> IO r    -> IO r
  withRouter :: Con
-> String
-> String
-> String
-> String
-> String
-> Int
-> OnError
-> IO r
-> IO r
withRouter Con
c String
n String
jn String
srq String
ssq String
trq Int
tmo OnError
onErr IO r
action = 
     Con
-> String
-> String
-> String
-> OnError
-> WriterDesc ByteString
-> (PubA ByteString -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
c String
n String
jn String
trq OnError
onErr 
               (String
"unknown", [], [], OutBound ByteString
bytesOut) ((PubA ByteString -> IO r) -> IO r)
-> (PubA ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PubA ByteString
p ->
       Con
-> String
-> String
-> String
-> Int
-> ReaderDesc ByteString
-> (Message ByteString -> IO ())
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
c String
n String
jn String
srq Int
tmo 
                     (String
ssq,       [], [], InBound ByteString
bytesIn) (PubA ByteString -> Message ByteString -> IO ()
forall o. PubA o -> Message o -> IO ()
pub PubA ByteString
p) OnError
onErr IO r
action
    where pub :: PubA o -> Message o -> IO ()
pub PubA o
p Message o
m = PubA o -> Type -> [Header] -> o -> IO ()
forall o. PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
nullType (Message o -> [Header]
forall a. Message a -> [Header]
msgHdrs Message o
m) (o -> IO ()) -> o -> IO ()
forall a b. (a -> b) -> a -> b
$ Message o -> o
forall a. Message a -> a
msgContent Message o
m