{-# Language BangPatterns #-}
module Registry 
where

  import           Types
  import           Network.Mom.Stompl.Client.Queue 
  import           System.Timeout
  import           Data.Time
  import           Data.Char (isDigit, toUpper)
  import           Data.List (nub)
  import           Data.Maybe (fromMaybe)
  import           Data.Map (Map)
  import qualified Data.Map as M
  import           Data.Sequence (Seq, (|>), (<|), ViewL(..))
  import qualified Data.Sequence as S
  import           Data.Foldable (toList)
  import           Codec.MIME.Type (nullType)
  import           Control.Exception (throwIO, catches)
  import           Control.Concurrent 
  import           Control.Monad (forever)
  import           Control.Applicative ((<$>))

  -----------------------------------------------------------------------
  -- | JobType: Service, Task or Topic
  -----------------------------------------------------------------------
  data JobType = Service | Task | Topic
    deriving (JobType -> JobType -> Bool
(JobType -> JobType -> Bool)
-> (JobType -> JobType -> Bool) -> Eq JobType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: JobType -> JobType -> Bool
$c/= :: JobType -> JobType -> Bool
== :: JobType -> JobType -> Bool
$c== :: JobType -> JobType -> Bool
Eq, Int -> JobType -> ShowS
[JobType] -> ShowS
JobType -> String
(Int -> JobType -> ShowS)
-> (JobType -> String) -> ([JobType] -> ShowS) -> Show JobType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [JobType] -> ShowS
$cshowList :: [JobType] -> ShowS
show :: JobType -> String
$cshow :: JobType -> String
showsPrec :: Int -> JobType -> ShowS
$cshowsPrec :: Int -> JobType -> ShowS
Show)

  -----------------------------------------------------------------------
  -- | Safe read method for JobType
  -----------------------------------------------------------------------
  readJobType :: String -> Maybe JobType
  readJobType :: String -> Maybe JobType
readJobType String
s = 
    case (Char -> Char) -> ShowS
forall a b. (a -> b) -> [a] -> [b]
map Char -> Char
toUpper String
s of
      String
"SERVICE" -> JobType -> Maybe JobType
forall a. a -> Maybe a
Just JobType
Service
      String
"TASK"    -> JobType -> Maybe JobType
forall a. a -> Maybe a
Just JobType
Task
      String
"TOPIC"   -> JobType -> Maybe JobType
forall a. a -> Maybe a
Just JobType
Topic
      String
_         -> Maybe JobType
forall a. Maybe a
Nothing

  ------------------------------------------------------------------------
  -- | A helper that shall ease the use of the registers.
  --   A registry to which a call wants to connect is described as
  --   
  --   * The 'QName' through which the registry receives requests;
  --
  --   * The 'Timeout' in microseconds, /i.e./ the time the caller
  --                   will wait before the request fails;
  --
  --   * A triple of heartbeat specifications:
  --     the /best/ value, /i.e./ 
  --          the rate at which the caller 
  --                   prefers to send heartbeats,
  --     the /minimum/ rate at which the caller 
  --                   can accept to send heartbeats,
  --     the /maximum/ rate at which the caller 
  --                   can accept to send heartbeats.
  --     Note that all these values are in milliseconds!
  ------------------------------------------------------------------------
  type RegistryDesc = (QName, Int, (Int, Int, Int))

  ------------------------------------------------------------------------
  -- | Connect to a registry:
  --   The caller registers itself at the registry.
  --   The owner of the registry will then
  --   use the caller depending on its purpose.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * 'JobName': The name of the job provided by the caller;
  --
  --   * 'JobType': The type of the job provided by the caller;
  --
  --   * 'QName': The registry's registration queue;
  --
  --   * 'QName': The queue to register;
  --              this is the queue the register will actually
  --              use (for forwarding requests or whatever
  --              it does in this specific case).
  --              The registry, internally,
  --              uses 'JobName' together with this queue
  --              as a /key/ to identify the provider. 
  --
  --   * Int: Timeout in microseconds;
  --
  --   * Int: Preferred heartbeat in milliseconds
  --          (0 for no heartbeats).
  --
  -- The function returns a tuple of 'StatusCode' 
  -- and the heartbeat proposed by the registry
  -- (which may differ from the preferred heartbeat of the caller).
  -- Whenever the 'StatusCode' is not 'OK', 
  -- the heartbeat is 0.
  -- If the 'JobName' is null, the 'StatusCode' will be 'BadRequest'.
  -- If the timeout expires, register throws 'TimeoutX'.
  ------------------------------------------------------------------------
  register :: Con -> JobName -> JobType -> 
                     QName   -> QName   -> 
                     Int -> Int -> IO (StatusCode, Int)
  register :: Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
j JobType
t String
o String
i Int
to Int
me | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
j    = (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
BadRequest,Int
0)
                           | Bool
otherwise =
      let i' :: String
i' = String
o String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/" String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
j String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/" String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
i
          hs :: [(String, String)]
hs = [(String
"__type__",    String
"register"),
                (String
"__job-type__",    JobType -> String
forall a. Show a => a -> String
show JobType
t),
                (String
"__job__",              String
j),
                (String
"__queue__",            String
i),
                (String
"__hb__",         Int -> String
forall a. Show a => a -> String
show Int
me),
                (String
"__channel__",         String
i')]
       in Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> OutBound ()
-> (Writer () -> IO (StatusCode, Int))
-> IO (StatusCode, Int)
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"RegistryW" String
o  [] [] OutBound ()
nobody     ((Writer () -> IO (StatusCode, Int)) -> IO (StatusCode, Int))
-> (Writer () -> IO (StatusCode, Int)) -> IO (StatusCode, Int)
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> 
          Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> InBound ()
-> (Reader () -> IO (StatusCode, Int))
-> IO (StatusCode, Int)
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c String
"RegistryR" String
i' [] [] InBound ()
ignorebody ((Reader () -> IO (StatusCode, Int)) -> IO (StatusCode, Int))
-> (Reader () -> IO (StatusCode, Int)) -> IO (StatusCode, Int)
forall a b. (a -> b) -> a -> b
$ \Reader ()
r -> do
            Writer () -> Type -> [(String, String)] -> () -> IO ()
forall a. Writer a -> Type -> [(String, String)] -> a -> IO ()
writeQ Writer ()
w Type
nullType [(String, String)]
hs ()
            Maybe (Message ())
mbF <- Int -> IO (Message ()) -> IO (Maybe (Message ()))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
to (IO (Message ()) -> IO (Maybe (Message ())))
-> IO (Message ()) -> IO (Maybe (Message ()))
forall a b. (a -> b) -> a -> b
$ Reader () -> IO (Message ())
forall a. Reader a -> IO (Message a)
readQ Reader ()
r 
            case Maybe (Message ())
mbF of
              Maybe (Message ())
Nothing -> PatternsException -> IO (StatusCode, Int)
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO (StatusCode, Int))
-> PatternsException -> IO (StatusCode, Int)
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
TimeoutX
                                    String
"No response from registry"
              Just Message ()
m  -> do Either String StatusCode
eiS <- Message () -> IO (Either String StatusCode)
forall m. Message m -> IO (Either String StatusCode)
getSC Message ()
m
                            case Either String StatusCode
eiS of 
                              Left  String
s  -> PatternsException -> IO (StatusCode, Int)
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO (StatusCode, Int))
-> PatternsException -> IO (StatusCode, Int)
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
BadStatusCodeX String
s
                              Right StatusCode
OK -> do Int
h <- Message () -> IO Int
forall m. Message m -> IO Int
getHB Message ()
m
                                             (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
OK, Int
h)
                              Right StatusCode
sc ->    (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
sc, Int
0)

  ------------------------------------------------------------------------
  -- | Disconnect from a registry:
  --   The caller disconnects from a registry
  --   to which it has registered before.
  --   For the case that the registry is not receiving heartbeats
  --   from the caller,
  --   it is essential to unregister, when
  --   the service is no longer provided.
  --   Otherwise, the registry has no way to know
  --   that it should not send requests to this provider anymore.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * 'JobName': The 'JobName' to unregister;
  --
  --   * 'QName': The registry's registration queue ;
  --
  --   * 'QName': The queue to unregister;
  --
  --   * Int: The timeout in microseconds.
  --
  --  The function returns a 'StatusCode'. 
  --  If 'JobName' is null, the 'StatusCode' will be 'BadRequest'.
  --  If the timeout expires, the function will throw 'TimeoutX'.
  ------------------------------------------------------------------------
  unRegister :: Con -> JobName -> 
                       QName   -> QName -> 
                       Int     -> IO StatusCode
  unRegister :: Con -> String -> String -> String -> Int -> IO StatusCode
unRegister Con
c String
j String
o String
i Int
tmo | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
j    = StatusCode -> IO StatusCode
forall (m :: * -> *) a. Monad m => a -> m a
return StatusCode
BadRequest
                         | Bool
otherwise = 
      let i' :: String
i' = String
o String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/" String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
j String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
i
          hs :: [(String, String)]
hs = [(String
"__type__", String
"unreg"),
                (String
"__job__",        String
j),
                (String
"__queue__",      String
i),
                (String
"__channel__",   String
i')]
       in Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> OutBound ()
-> (Writer () -> IO StatusCode)
-> IO StatusCode
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"RegistryW" String
o  [] [] OutBound ()
nobody     ((Writer () -> IO StatusCode) -> IO StatusCode)
-> (Writer () -> IO StatusCode) -> IO StatusCode
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> 
          Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> InBound ()
-> (Reader () -> IO StatusCode)
-> IO StatusCode
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c String
"RegistryR" String
i' [] [] InBound ()
ignorebody ((Reader () -> IO StatusCode) -> IO StatusCode)
-> (Reader () -> IO StatusCode) -> IO StatusCode
forall a b. (a -> b) -> a -> b
$ \Reader ()
r -> do
            Writer () -> Type -> [(String, String)] -> () -> IO ()
forall a. Writer a -> Type -> [(String, String)] -> a -> IO ()
writeQ Writer ()
w Type
nullType [(String, String)]
hs ()
            Maybe (Message ())
mbF <- Int -> IO (Message ()) -> IO (Maybe (Message ()))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message ()) -> IO (Maybe (Message ())))
-> IO (Message ()) -> IO (Maybe (Message ()))
forall a b. (a -> b) -> a -> b
$ Reader () -> IO (Message ())
forall a. Reader a -> IO (Message a)
readQ Reader ()
r 
            case Maybe (Message ())
mbF of
              Maybe (Message ())
Nothing -> PatternsException -> IO StatusCode
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO StatusCode)
-> PatternsException -> IO StatusCode
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
TimeoutX String
"No response from register"
              Just Message ()
m  -> do Either String StatusCode
eiS <- Message () -> IO (Either String StatusCode)
forall m. Message m -> IO (Either String StatusCode)
getSC Message ()
m
                            case Either String StatusCode
eiS of
                              Left String
s   -> PatternsException -> IO StatusCode
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO StatusCode)
-> PatternsException -> IO StatusCode
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
BadStatusCodeX String
s
                              Right StatusCode
sc -> StatusCode -> IO StatusCode
forall (m :: * -> *) a. Monad m => a -> m a
return StatusCode
sc

  ------------------------------------------------------------------------
  -- | Send heartbeats:
  --
  --   * 'MVar' 'HB': An MVar of type 'HB', this MVar will be used
  --                  to keep track of when the heartbeat has actually to
  --                  be sent.
  --
  --   * 'Writer' (): The writer through which to send the heartbeat;
  --                  The queue name of the writer is the registration queue
  --                  of the registry; note that its type is ():
  --                  heartbeats are empty messages.
  --
  --   * 'JobName': The 'JobName' for which to send heartbeats;
  --
  --   * 'QName': The queue for which to send heartbeats.
  ------------------------------------------------------------------------
  heartbeat :: MVar HB -> Writer () -> JobName -> QName -> IO ()
  heartbeat :: MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
m Writer ()
w String
j String
q | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
q    = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    | Bool
otherwise = 
    let hs :: [(String, String)]
hs = [(String
"__type__", String
"hb"),
              (String
"__job__",     String
j),
              (String
"__queue__",   String
q)]
     in do UTCTime
now <- IO UTCTime
getCurrentTime 
           MVar HB -> (HB -> IO HB) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar HB
m (UTCTime -> [(String, String)] -> HB -> IO HB
go UTCTime
now [(String, String)]
hs)
    where go :: UTCTime -> [(String, String)] -> HB -> IO HB
go UTCTime
now [(String, String)]
hs hb :: HB
hb@(HB Int
me UTCTime
nxt)
            | Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& UTCTime
nxt UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
now = do Writer () -> Type -> [(String, String)] -> () -> IO ()
forall a. Writer a -> Type -> [(String, String)] -> a -> IO ()
writeQ Writer ()
w Type
nullType [(String, String)]
hs ()
                                       HB -> IO HB
forall (m :: * -> *) a. Monad m => a -> m a
return HB
hb{hbMeNext :: UTCTime
hbMeNext = UTCTime -> Int -> UTCTime
timeAdd UTCTime
now Int
me}
            | Bool
otherwise           =    HB -> IO HB
forall (m :: * -> *) a. Monad m => a -> m a
return HB
hb

  ------------------------------------------------------------------------
  -- | A provider is an opaque data type;
  --   most of its attributes are used only internally by the registry.
  --   Interesting for user applications, however, is the queue
  --   that identifies the provider.
  ------------------------------------------------------------------------
  data Provider = Provider {
                    -- | Queue through which the job is provided 
                    Provider -> String
prvQ   :: QName,
                    Provider -> Int
prvHb  :: Int,
                    Provider -> UTCTime
prvNxt :: UTCTime
                  }
    deriving Int -> Provider -> ShowS
[Provider] -> ShowS
Provider -> String
(Int -> Provider -> ShowS)
-> (Provider -> String) -> ([Provider] -> ShowS) -> Show Provider
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Provider] -> ShowS
$cshowList :: [Provider] -> ShowS
show :: Provider -> String
$cshow :: Provider -> String
showsPrec :: Int -> Provider -> ShowS
$cshowsPrec :: Int -> Provider -> ShowS
Show

  ------------------------------------------------------------------------
  -- Two providers are identical if they have the same queue name
  ------------------------------------------------------------------------
  instance Eq Provider where
    Provider
x == :: Provider -> Provider -> Bool
== Provider
y = Provider -> String
prvQ Provider
x String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== Provider -> String
prvQ Provider
y

  ------------------------------------------------------------------------
  -- Add provider to seq or, if already in, 
  -- update according to the values of the new node.
  ------------------------------------------------------------------------
  updOrAddProv :: Bool -> (Provider -> Provider) -> Provider -> 
                  Seq Provider -> Seq Provider
  updOrAddProv :: Bool
-> (Provider -> Provider)
-> Provider
-> Seq Provider
-> Seq Provider
updOrAddProv Bool
add Provider -> Provider
upd Provider
p Seq Provider
s = 
    case Seq Provider -> ViewL Provider
forall a. Seq a -> ViewL a
S.viewl Seq Provider
s of
      ViewL Provider
S.EmptyL -> if Bool
add then Provider -> Seq Provider
forall a. a -> Seq a
S.singleton Provider
p else Seq Provider
forall a. Seq a
S.empty
      Provider
x :< Seq Provider
ss  -> if Provider -> String
prvQ Provider
x String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== Provider -> String
prvQ Provider
p 
                    then Provider -> Provider
upd Provider
x Provider -> Seq Provider -> Seq Provider
forall a. a -> Seq a -> Seq a
<| Seq Provider
ss
                    else     Provider
x Provider -> Seq Provider -> Seq Provider
forall a. a -> Seq a -> Seq a
<| Bool
-> (Provider -> Provider)
-> Provider
-> Seq Provider
-> Seq Provider
updOrAddProv Bool
add Provider -> Provider
upd Provider
p Seq Provider
ss

  ------------------------------------------------------------------------
  -- Remove one provider from the seq
  ------------------------------------------------------------------------
  remProv :: QName -> Seq Provider -> Seq Provider
  remProv :: String -> Seq Provider -> Seq Provider
remProv String
q Seq Provider
s =
    case Seq Provider -> ViewL Provider
forall a. Seq a -> ViewL a
S.viewl Seq Provider
s of
      ViewL Provider
S.EmptyL -> Seq Provider
forall a. Seq a
S.empty
      Provider
x :< Seq Provider
ss  -> if Provider -> String
prvQ Provider
x String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
q then Seq Provider
ss
                                 else Provider
x Provider -> Seq Provider -> Seq Provider
forall a. a -> Seq a -> Seq a
<| String -> Seq Provider -> Seq Provider
remProv String
q Seq Provider
ss

  ------------------------------------------------------------------------
  -- Get head of seq and add to end of sequence;
  -- remove all "dead" nodes on the way
  ------------------------------------------------------------------------
  getHeads :: UTCTime -> Seq Provider -> ([Provider], Seq Provider)
  getHeads :: UTCTime -> Seq Provider -> ([Provider], Seq Provider)
getHeads UTCTime
now Seq Provider
s = 
    case Seq Provider -> ViewL Provider
forall a. Seq a -> ViewL a
S.viewl Seq Provider
s of
      ViewL Provider
S.EmptyL -> ([], Seq Provider
forall a. Seq a
S.empty)
      Provider
x :< Seq Provider
ss  -> if Provider -> Int
prvHb  Provider
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&&
                     Provider -> UTCTime
prvNxt Provider
x UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
now then UTCTime -> Seq Provider -> ([Provider], Seq Provider)
getHeads UTCTime
now Seq Provider
ss
                                    else ([Provider
x], Seq Provider
ss Seq Provider -> Provider -> Seq Provider
forall a. Seq a -> a -> Seq a
|> Provider
x)

  ------------------------------------------------------------------------
  -- Job: 'JobType' plus 'Sequence' of 'Provider's
  ------------------------------------------------------------------------
  data JobNode = JobNode {
                    JobNode -> JobType
jobType  :: JobType,
                    JobNode -> Seq Provider
jobProvs :: Seq Provider
                  }

  ------------------------------------------------------------------------
  -- The inner heart of the registry: 
  -- a 'Map' of 'JobName', 'JobNode'
  ------------------------------------------------------------------------
  data Reg = Reg {
               Reg -> String
regName :: String,
               Reg -> Map String JobNode
regWork :: Map JobName JobNode
             }

  ------------------------------------------------------------------------
  -- | Registry: An opaque data type
  ------------------------------------------------------------------------
  data Registry = Registry {
                    Registry -> MVar Reg
regM :: MVar Reg
                  }

  ------------------------------------------------------------------------
  -- Use registry (with return value)
  ------------------------------------------------------------------------
  useRegistry :: Registry -> (Reg -> IO (Reg, r)) -> IO r
  useRegistry :: Registry -> (Reg -> IO (Reg, r)) -> IO r
useRegistry Registry
r = MVar Reg -> (Reg -> IO (Reg, r)) -> IO r
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Registry -> MVar Reg
regM Registry
r)

  ------------------------------------------------------------------------
  -- Use registry (without return value)
  ------------------------------------------------------------------------
  useRegistry_ :: Registry -> (Reg -> IO Reg) -> IO ()
  useRegistry_ :: Registry -> (Reg -> IO Reg) -> IO ()
useRegistry_ Registry
r = MVar Reg -> (Reg -> IO Reg) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Registry -> MVar Reg
regM Registry
r)

  ------------------------------------------------------------------------
  -- Add provider to job
  ------------------------------------------------------------------------
  insertR :: Registry -> JobName -> JobType -> QName -> Int -> IO ()
  insertR :: Registry -> String -> JobType -> String -> Int -> IO ()
insertR Registry
r String
jn JobType
w String
qn Int
i = 
    Registry -> (Reg -> IO Reg) -> IO ()
useRegistry_ Registry
r ((Reg -> IO Reg) -> IO ()) -> (Reg -> IO Reg) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> do UTCTime
now <- IO UTCTime
getCurrentTime
                                Reg -> IO Reg
forall (m :: * -> *) a. Monad m => a -> m a
return Reg
reg{regWork :: Map String JobNode
regWork = UTCTime -> Map String JobNode -> Map String JobNode
ins UTCTime
now (Map String JobNode -> Map String JobNode)
-> Map String JobNode -> Map String JobNode
forall a b. (a -> b) -> a -> b
$ Reg -> Map String JobNode
regWork Reg
reg}
    where ins :: UTCTime -> Map String JobNode -> Map String JobNode
ins UTCTime
now Map String JobNode
m = 
            let j :: JobNode
j  = JobNode -> Maybe JobNode -> JobNode
forall a. a -> Maybe a -> a
fromMaybe (JobType -> Seq Provider -> JobNode
JobNode JobType
w Seq Provider
forall a. Seq a
S.empty) (Maybe JobNode -> JobNode) -> Maybe JobNode -> JobNode
forall a b. (a -> b) -> a -> b
$ String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn Map String JobNode
m
                p :: Provider
p  = String -> Int -> UTCTime -> Provider
Provider String
qn Int
i (UTCTime -> Provider) -> UTCTime -> Provider
forall a b. (a -> b) -> a -> b
$ UTCTime -> Bool -> Int -> UTCTime
nextHB UTCTime
now Bool
True Int
i
                ps :: Seq Provider
ps = Bool
-> (Provider -> Provider)
-> Provider
-> Seq Provider
-> Seq Provider
updOrAddProv Bool
True (Provider -> Provider -> Provider
forall p p. p -> p -> p
upd Provider
p) Provider
p (Seq Provider -> Seq Provider) -> Seq Provider -> Seq Provider
forall a b. (a -> b) -> a -> b
$ JobNode -> Seq Provider
jobProvs JobNode
j
             in String -> JobNode -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert String
jn JobNode
j{jobProvs :: Seq Provider
jobProvs = Seq Provider
ps} Map String JobNode
m
          upd :: p -> p -> p
upd p
n p
_ = p
n

  ------------------------------------------------------------------------
  -- Update heartbeat of provider 
  ------------------------------------------------------------------------
  updR :: Registry -> JobName -> QName -> IO ()
  updR :: Registry -> String -> String -> IO ()
updR Registry
r String
jn String
qn  = 
    Registry -> (Reg -> IO Reg) -> IO ()
useRegistry_ Registry
r ((Reg -> IO Reg) -> IO ()) -> (Reg -> IO Reg) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> do UTCTime
now <- IO UTCTime
getCurrentTime
                                Reg -> IO Reg
forall (m :: * -> *) a. Monad m => a -> m a
return Reg
reg{regWork :: Map String JobNode
regWork = UTCTime -> Map String JobNode -> Map String JobNode
ins UTCTime
now (Map String JobNode -> Map String JobNode)
-> Map String JobNode -> Map String JobNode
forall a b. (a -> b) -> a -> b
$ Reg -> Map String JobNode
regWork Reg
reg}
    where ins :: UTCTime -> Map String JobNode -> Map String JobNode
ins UTCTime
now Map String JobNode
m = 
            case String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn Map String JobNode
m of
              Maybe JobNode
Nothing -> Map String JobNode
m
              Just JobNode
j  -> let p :: Provider
p  = String -> Int -> UTCTime -> Provider
Provider String
qn Int
0 UTCTime
now
                             ps :: Seq Provider
ps = Bool
-> (Provider -> Provider)
-> Provider
-> Seq Provider
-> Seq Provider
updOrAddProv Bool
False (UTCTime -> Provider -> Provider
upd UTCTime
now) Provider
p 
                                                     (JobNode -> Seq Provider
jobProvs JobNode
j)
                          in String -> JobNode -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert String
jn JobNode
j{jobProvs :: Seq Provider
jobProvs = Seq Provider
ps} Map String JobNode
m
          upd :: UTCTime -> Provider -> Provider
upd UTCTime
now Provider
o = Provider
o{prvNxt :: UTCTime
prvNxt = UTCTime -> Bool -> Int -> UTCTime
nextHB UTCTime
now Bool
True (Int -> UTCTime) -> Int -> UTCTime
forall a b. (a -> b) -> a -> b
$ Int
tolerance Int -> Int -> Int
forall a. Num a => a -> a -> a
* Provider -> Int
prvHb Provider
o}
      
  ------------------------------------------------------------------------
  -- Remove 'Provider' from the job
  ------------------------------------------------------------------------
  removeR :: Registry -> JobName -> QName -> IO ()
  removeR :: Registry -> String -> String -> IO ()
removeR Registry
r String
jn String
qn = 
    Registry -> (Reg -> IO Reg) -> IO ()
useRegistry_ Registry
r ((Reg -> IO Reg) -> IO ()) -> (Reg -> IO Reg) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> Reg -> IO Reg
forall (m :: * -> *) a. Monad m => a -> m a
return Reg
reg{regWork :: Map String JobNode
regWork = Map String JobNode -> Map String JobNode
ins (Map String JobNode -> Map String JobNode)
-> Map String JobNode -> Map String JobNode
forall a b. (a -> b) -> a -> b
$ Reg -> Map String JobNode
regWork Reg
reg}
    where ins :: Map String JobNode -> Map String JobNode
ins Map String JobNode
m = 
            case String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn Map String JobNode
m of
              Maybe JobNode
Nothing -> Map String JobNode
m
              Just JobNode
j  -> 
                let ps :: Seq Provider
ps = String -> Seq Provider -> Seq Provider
remProv String
qn (Seq Provider -> Seq Provider) -> Seq Provider -> Seq Provider
forall a b. (a -> b) -> a -> b
$ JobNode -> Seq Provider
jobProvs JobNode
j
                 in if Seq Provider -> Bool
forall a. Seq a -> Bool
S.null Seq Provider
ps then String -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> Map k a -> Map k a
M.delete String
jn Map String JobNode
m
                                 else String -> JobNode -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert String
jn JobNode
j{jobProvs :: Seq Provider
jobProvs = Seq Provider
ps} Map String JobNode
m

  ------------------------------------------------------------------------
  -- | Map action to 'Provider's of job 'JobName';
  --   mapping means different things for:
  --
  --   * Serice, Task: action is applied to the first
  --                   active provider of a list of providers
  --                   and this provider
  --                   is then sent to the back of the list,
  --                   hence, implementing a balancer.
  --
  --   * Topic: action is applied to all providers,
  --            hence, implementing a publisher.
  --
  --   Parameters:
  --
  --   * 'Registry': The registry to use;
  --
  --   * 'JobName': The job to which to apply the action;
  --
  --   * ('Provider' -> IO ()): The action to apply.
  --
  --   The function returns False iff the requested job is not available
  --   and True otherwise. (Note that a job without providers is removed;
  --   when the function returns True, the job, thus, 
  --   was applied at least once.
  ------------------------------------------------------------------------
  mapR :: Registry -> JobName -> (Provider -> IO ()) -> IO Bool
  mapR :: Registry -> String -> (Provider -> IO ()) -> IO Bool
mapR Registry
r String
jn Provider -> IO ()
f = 
    Registry -> (Reg -> IO (Reg, Bool)) -> IO Bool
forall r. Registry -> (Reg -> IO (Reg, r)) -> IO r
useRegistry Registry
r ((Reg -> IO (Reg, Bool)) -> IO Bool)
-> (Reg -> IO (Reg, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> IO UTCTime
getCurrentTime        IO UTCTime -> (UTCTime -> IO (Reg, Bool)) -> IO (Reg, Bool)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \UTCTime
now ->
                            UTCTime -> Map String JobNode -> IO (Map String JobNode, Bool)
ins UTCTime
now (Reg -> Map String JobNode
regWork Reg
reg) IO (Map String JobNode, Bool)
-> ((Map String JobNode, Bool) -> IO (Reg, Bool)) -> IO (Reg, Bool)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(Map String JobNode
js,Bool
t) ->
                            (Reg, Bool) -> IO (Reg, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Reg
reg{regWork :: Map String JobNode
regWork = Map String JobNode
js},Bool
t)
    where ins :: UTCTime -> Map String JobNode -> IO (Map String JobNode, Bool)
ins UTCTime
now Map String JobNode
m = 
            case String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn Map String JobNode
m of
              Maybe JobNode
Nothing -> (Map String JobNode, Bool) -> IO (Map String JobNode, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Map String JobNode
m, Bool
False)
              Just JobNode
j  -> 
                let ([Provider]
xs, Seq Provider
ps) = if JobNode -> JobType
jobType JobNode
j JobType -> [JobType] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobType
Service, JobType
Task]
                                 then UTCTime -> Seq Provider -> ([Provider], Seq Provider)
getHeads UTCTime
now (Seq Provider -> ([Provider], Seq Provider))
-> Seq Provider -> ([Provider], Seq Provider)
forall a b. (a -> b) -> a -> b
$ JobNode -> Seq Provider
jobProvs JobNode
j
                                 else (Seq Provider -> [Provider]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Provider -> [Provider]) -> Seq Provider -> [Provider]
forall a b. (a -> b) -> a -> b
$ JobNode -> Seq Provider
jobProvs JobNode
j, 
                                                JobNode -> Seq Provider
jobProvs JobNode
j)
                 in (Provider -> IO ()) -> [Provider] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Provider -> IO ()
f [Provider]
xs IO ()
-> IO (Map String JobNode, Bool) -> IO (Map String JobNode, Bool)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> 
                    (Map String JobNode, Bool) -> IO (Map String JobNode, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> JobNode -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert String
jn JobNode
j{jobProvs :: Seq Provider
jobProvs = Seq Provider
ps} Map String JobNode
m, Bool
True)

  ------------------------------------------------------------------------
  -- | Map function of type 
  --
  --   > 'Provider' -> 'Provider'
  --
  --   to all 'Provider's of job 'JobName'
  --   (independent of 'JobType')
  ------------------------------------------------------------------------
  mapAllR :: Registry -> JobName -> (Provider -> Provider) -> IO ()
  mapAllR :: Registry -> String -> (Provider -> Provider) -> IO ()
mapAllR Registry
r String
jn Provider -> Provider
f = 
    Registry -> (Reg -> IO Reg) -> IO ()
useRegistry_ Registry
r ((Reg -> IO Reg) -> IO ()) -> (Reg -> IO Reg) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> Map String JobNode -> IO (Map String JobNode)
forall (m :: * -> *).
Monad m =>
Map String JobNode -> m (Map String JobNode)
ins (Reg -> Map String JobNode
regWork Reg
reg) IO (Map String JobNode) -> (Map String JobNode -> IO Reg) -> IO Reg
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Map String JobNode
m ->
                             Reg -> IO Reg
forall (m :: * -> *) a. Monad m => a -> m a
return Reg
reg{regWork :: Map String JobNode
regWork = Map String JobNode
m}
    where ins :: Map String JobNode -> m (Map String JobNode)
ins Map String JobNode
m = 
            case String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn Map String JobNode
m of
              Maybe JobNode
Nothing -> Map String JobNode -> m (Map String JobNode)
forall (m :: * -> *) a. Monad m => a -> m a
return Map String JobNode
m
              Just JobNode
j  -> Map String JobNode -> m (Map String JobNode)
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> JobNode -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert String
jn JobNode
j{jobProvs :: Seq Provider
jobProvs = Seq Provider -> Seq Provider
go (Seq Provider -> Seq Provider) -> Seq Provider -> Seq Provider
forall a b. (a -> b) -> a -> b
$ JobNode -> Seq Provider
jobProvs JobNode
j} Map String JobNode
m)
          go :: Seq Provider -> Seq Provider
go Seq Provider
s = case Seq Provider -> ViewL Provider
forall a. Seq a -> ViewL a
S.viewl Seq Provider
s of
                   ViewL Provider
S.EmptyL -> Seq Provider
forall a. Seq a
S.empty
                   Provider
x :< Seq Provider
ss  -> Provider -> Provider
f Provider
x Provider -> Seq Provider -> Seq Provider
forall a. a -> Seq a -> Seq a
<| Seq Provider -> Seq Provider
go Seq Provider
ss
              
  ------------------------------------------------------------------------
  -- | Retrieves /n/ 'Provider's of a certain job;
  --   getProvider works, for all 'JobType's
  --   according to the work balancer logic, /i.e./:
  --   it returns the first n providers of the list for this job
  --   and moves them to the end of the list.
  --   'getProvider' is used, for instance, in the Desk pattern. 
  --
  --   * 'Registry': The registry in use;
  --
  --   * 'JobName': The job for which the caller needs a provider;
  --
  --   * Int: The number /n/ of providers to retrieve; 
  --          if less than /n/ providers are available for this job,
  --          all available providers will be returned,
  --          but no error event is created.
  ------------------------------------------------------------------------
  getProvider :: Registry -> JobName -> Int -> IO [Provider]
  getProvider :: Registry -> String -> Int -> IO [Provider]
getProvider Registry
r String
jn Int
n = 
    Registry -> (Reg -> IO (Reg, [Provider])) -> IO [Provider]
forall r. Registry -> (Reg -> IO (Reg, r)) -> IO r
useRegistry Registry
r ((Reg -> IO (Reg, [Provider])) -> IO [Provider])
-> (Reg -> IO (Reg, [Provider])) -> IO [Provider]
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> do UTCTime
now <- IO UTCTime
getCurrentTime
                               let ([Provider]
x,Map String JobNode
m) = UTCTime -> Map String JobNode -> ([Provider], Map String JobNode)
ins UTCTime
now (Map String JobNode -> ([Provider], Map String JobNode))
-> Map String JobNode -> ([Provider], Map String JobNode)
forall a b. (a -> b) -> a -> b
$ Reg -> Map String JobNode
regWork Reg
reg
                               (Reg, [Provider]) -> IO (Reg, [Provider])
forall (m :: * -> *) a. Monad m => a -> m a
return (Reg
reg{regWork :: Map String JobNode
regWork = Map String JobNode
m}, [Provider]
x)
    where ins :: UTCTime -> Map String JobNode -> ([Provider], Map String JobNode)
ins UTCTime
now Map String JobNode
m   = case String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn Map String JobNode
m of
                          Maybe JobNode
Nothing -> ([], Map String JobNode
m)
                          Just JobNode
j  -> 
                            let ([Provider]
x,Seq Provider
ps) = UTCTime -> Seq Provider -> Int -> ([Provider], Seq Provider)
forall a.
(Ord a, Num a) =>
UTCTime -> Seq Provider -> a -> ([Provider], Seq Provider)
go UTCTime
now (JobNode -> Seq Provider
jobProvs JobNode
j) Int
n
                             in ([Provider]
x, String -> JobNode -> Map String JobNode -> Map String JobNode
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert String
jn JobNode
j{jobProvs :: Seq Provider
jobProvs = Seq Provider
ps} Map String JobNode
m)
          go :: UTCTime -> Seq Provider -> a -> ([Provider], Seq Provider)
go  UTCTime
now Seq Provider
ps a
i | a
i a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0    = ([],Seq Provider
ps)
                       | Bool
otherwise = let (![Provider]
x ,Seq Provider
ps1) = UTCTime -> Seq Provider -> ([Provider], Seq Provider)
getHeads UTCTime
now Seq Provider
ps
                                         (![Provider]
x',Seq Provider
ps2) = UTCTime -> Seq Provider -> a -> ([Provider], Seq Provider)
go UTCTime
now Seq Provider
ps1 (a
ia -> a -> a
forall a. Num a => a -> a -> a
-a
1)
                                      in ([Provider] -> [Provider]
forall a. Eq a => [a] -> [a]
nub ([Provider]
x[Provider] -> [Provider] -> [Provider]
forall a. [a] -> [a] -> [a]
++[Provider]
x'), Seq Provider
ps2)

  ------------------------------------------------------------------------
  -- | This function shows all jobs with all their providers
  --   in a registry; the function is intended for debugging only.
  ------------------------------------------------------------------------
  showRegistry :: Registry -> IO ()
  showRegistry :: Registry -> IO ()
showRegistry Registry
r = 
    Registry -> (Reg -> IO Reg) -> IO ()
useRegistry_ Registry
r ((Reg -> IO Reg) -> IO ()) -> (Reg -> IO Reg) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Reg
reg -> let l :: [String]
l  = ((String, JobNode) -> String) -> [(String, JobNode)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String, JobNode) -> String
forall a b. (a, b) -> a
fst ([(String, JobNode)] -> [String])
-> [(String, JobNode)] -> [String]
forall a b. (a -> b) -> a -> b
$ Map String JobNode -> [(String, JobNode)]
forall k a. Map k a -> [(k, a)]
M.toList (Reg -> Map String JobNode
regWork Reg
reg)
                                 p :: [[Provider]]
p  = (String -> [Provider]) -> [String] -> [[Provider]]
forall a b. (a -> b) -> [a] -> [b]
map (Reg -> String -> [Provider]
getProvs Reg
reg) [String]
l
                                 lp :: [(String, [Provider])]
lp = [String] -> [[Provider]] -> [(String, [Provider])]
forall a b. [a] -> [b] -> [(a, b)]
zip [String]
l [[Provider]]
p
                              in [(String, [Provider])] -> IO ()
forall a. Show a => a -> IO ()
print [(String, [Provider])]
lp IO () -> IO Reg -> IO Reg
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Reg -> IO Reg
forall (m :: * -> *) a. Monad m => a -> m a
return Reg
reg
    where getProvs :: Reg -> String -> [Provider]
getProvs Reg
reg String
jn = case String -> Map String JobNode -> Maybe JobNode
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup String
jn (Map String JobNode -> Maybe JobNode)
-> Map String JobNode -> Maybe JobNode
forall a b. (a -> b) -> a -> b
$ Reg -> Map String JobNode
regWork Reg
reg of
                              Maybe JobNode
Nothing -> []
                              Just JobNode
ps -> Seq Provider -> [Provider]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Provider -> [Provider]) -> Seq Provider -> [Provider]
forall a b. (a -> b) -> a -> b
$ JobNode -> Seq Provider
jobProvs JobNode
ps

  ------------------------------------------------------------------------
  -- | A registry is used through a function 
  --   that, internally, creates a registry
  --   and defines its lifetime in terms of the scope of an action
  --   passed in to the function:
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the registry used for error handling;
  --
  --   * 'QName': Name of the registration queue.
  --              It is this queue to which 'register'
  --              sends a registration request;
  --
  --   * (Int, Int): Minimal and maximal accepted heartbeat interval;
  --
  --   * 'OnError': Error handler;
  --
  --   * ('Registry' -> IO r): The action that defines 
  --                           the registry's lifetime;
  --                           the result of this action, /r/, 
  --                           is also the result of /withRegistry/.
  ------------------------------------------------------------------------
  withRegistry :: Con -> String -> QName -> (Int, Int)
                      -> OnError -> (Registry -> IO r) -> IO r
  withRegistry :: Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
withRegistry Con
c String
n String
rq (Int
mn, Int
mx) OnError
onErr Registry -> IO r
action = 
    -- always start the reader in the main thread -------------
    -- for if started in the background thread    -------------
    -- the action may send a message              -------------
    -- without the reader having subscribed to its queue ------
    Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> InBound ()
-> (Reader () -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c (String
n String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"Reader") String
rq [] [] InBound ()
ignorebody ((Reader () -> IO r) -> IO r) -> (Reader () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader ()
r -> do
      let nm :: String
nm  = String
n String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"Registry"
      Registry
reg <- MVar Reg -> Registry
Registry (MVar Reg -> Registry) -> IO (MVar Reg) -> IO Registry
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reg -> IO (MVar Reg)
forall a. a -> IO (MVar a)
newMVar (String -> Map String JobNode -> Reg
Reg String
nm Map String JobNode
forall k a. Map k a
M.empty)
      IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (Registry -> Reader () -> String -> IO ()
forall m r. Registry -> Reader m -> String -> IO r
startReg Registry
reg Reader ()
r String
nm) (Registry -> IO r
action Registry
reg)
    where startReg :: Registry -> Reader m -> String -> IO r
startReg Registry
reg Reader m
r String
nm = 
            Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [(String, String)]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c (String
n String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"Writer") String
"unknown" [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> 
              IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches 
                (do Message m
m <- Reader m -> IO (Message m)
forall a. Reader a -> IO (Message a)
readQ    Reader m
r
                    String
t <- Message m -> IO String
forall m. Message m -> IO String
getMType Message m
m
                    case String
t of
                      String
"register" -> Registry -> Message m -> Writer () -> (Int, Int) -> IO ()
forall m. Registry -> Message m -> Writer () -> (Int, Int) -> IO ()
handleRegister   Registry
reg Message m
m Writer ()
w (Int
mn,Int
mx)
                      String
"unreg"    -> Registry -> Message m -> Writer () -> IO ()
forall m. Registry -> Message m -> Writer () -> IO ()
handleUnRegister Registry
reg Message m
m Writer ()
w
                      String
"hb"       -> Registry -> Message m -> IO ()
forall m. Registry -> Message m -> IO ()
handleHeartbeat  Registry
reg Message m
m
                      String
x          -> PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String -> PatternsException
HeaderX String
"__type__" (String -> PatternsException) -> String -> PatternsException
forall a b. (a -> b) -> a -> b
$
                                                String
"Unknown type: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
x)
                (String -> OnError -> [Handler ()]
ignoreHandler String
nm OnError
onErr)

  ------------------------------------------------------------------------
  -- Handle registration request
  ------------------------------------------------------------------------
  handleRegister :: Registry -> Message m -> Writer () -> (Int, Int) -> IO ()
  handleRegister :: Registry -> Message m -> Writer () -> (Int, Int) -> IO ()
handleRegister Registry
r Message m
m Writer ()
w (Int
mn,Int
mx) = do
    (String
j,String
q) <- Message m -> IO (String, String)
forall m. Message m -> IO (String, String)
getJobQueue Message m
m
    String
ch    <- Message m -> IO String
forall m. Message m -> IO String
getChannel Message m
m
    JobType
t     <- Message m -> IO JobType
forall m. Message m -> IO JobType
getJobType Message m
m 
    Int
hb    <- Message m -> IO Int
forall m. Message m -> IO Int
getHB Message m
m 
    let h :: Int
h | Int
hb Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
hb Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx = if (Int
mn Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
hb) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (Int
hb Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
mx) then Int
mn else Int
mx
          | Bool
otherwise          = Int
hb
    Registry -> String -> JobType -> String -> Int -> IO ()
insertR Registry
r String
j JobType
t String
q Int
h
    let hs :: [(String, String)]
hs = [(String
"__sc__", StatusCode -> String
forall a. Show a => a -> String
show StatusCode
OK),
              (String
"__hb__", Int -> String
forall a. Show a => a -> String
show Int
h)]
    Writer () -> String -> Type -> [(String, String)] -> () -> IO ()
forall a.
Writer a -> String -> Type -> [(String, String)] -> a -> IO ()
writeAdHoc Writer ()
w String
ch Type
nullType [(String, String)]
hs ()

  ------------------------------------------------------------------------
  -- Handle unRegister request
  ------------------------------------------------------------------------
  handleUnRegister :: Registry -> Message m -> Writer () -> IO ()
  handleUnRegister :: Registry -> Message m -> Writer () -> IO ()
handleUnRegister Registry
r Message m
m Writer ()
w = do
    (String
j,String
q) <- Message m -> IO (String, String)
forall m. Message m -> IO (String, String)
getJobQueue Message m
m
    String
ch    <- Message m -> IO String
forall m. Message m -> IO String
getChannel Message m
m
    Registry -> String -> String -> IO ()
removeR Registry
r String
j String
q 
    let hs :: [(String, String)]
hs=[(String
"__sc__", StatusCode -> String
forall a. Show a => a -> String
show StatusCode
OK)]
    Writer () -> String -> Type -> [(String, String)] -> () -> IO ()
forall a.
Writer a -> String -> Type -> [(String, String)] -> a -> IO ()
writeAdHoc Writer ()
w String
ch Type
nullType [(String, String)]
hs ()

  ------------------------------------------------------------------------
  -- Handle heartbeat
  ------------------------------------------------------------------------
  handleHeartbeat :: Registry -> Message m -> IO ()
  handleHeartbeat :: Registry -> Message m -> IO ()
handleHeartbeat Registry
r Message m
m = do
    (String
j,String
q) <- Message m -> IO (String, String)
forall m. Message m -> IO (String, String)
getJobQueue Message m
m
    Registry -> String -> String -> IO ()
updR Registry
r String
j String
q
    -- print $ msgHdrs m -- test

  ------------------------------------------------------------------------
  -- | Get JobQueue
  --   (and throw an exception if at least 
  --    one of the headers does not exist)
  ------------------------------------------------------------------------
  getJobQueue :: Message m -> IO (String, String)
  getJobQueue :: Message m -> IO (String, String)
getJobQueue Message m
m = Message m -> IO String
forall m. Message m -> IO String
getJobName Message m
m IO String -> (String -> IO (String, String)) -> IO (String, String)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \String
j -> Message m -> IO String
forall m. Message m -> IO String
getQueue Message m
m IO String -> (String -> IO (String, String)) -> IO (String, String)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \String
q -> (String, String) -> IO (String, String)
forall (m :: * -> *) a. Monad m => a -> m a
return (String
j,String
q)

  ------------------------------------------------------------------------
  -- | Get Message Type from headers
  --   (and throw an exception if the header does not exist)
  ------------------------------------------------------------------------
  getMType :: Message m -> IO String
  getMType :: Message m -> IO String
getMType = String -> String -> Message m -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__type__" String
"No message type in headers"

  ------------------------------------------------------------------------
  -- | Get Job name from headers
  --   (and throw an exception if the header does not exist)
  ------------------------------------------------------------------------
  getJobName :: Message m -> IO String
  getJobName :: Message m -> IO String
getJobName = String -> String -> Message m -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__job__" String
"No job name in headers" 

  ------------------------------------------------------------------------
  -- | Get Reply queue (channel) from headers
  --   (and throw an exception if the header does not exist)
  ------------------------------------------------------------------------
  getChannel :: Message m -> IO String
  getChannel :: Message m -> IO String
getChannel = String -> String -> Message m -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__channel__" String
"No response q in headers" 

  ------------------------------------------------------------------------
  -- | Get Queue name from headers
  --   (and throw an exception if the header does not exist)
  ------------------------------------------------------------------------
  getQueue :: Message m -> IO String
  getQueue :: Message m -> IO String
getQueue = String -> String -> Message m -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__queue__" String
"No queue q in headers" 

  ------------------------------------------------------------------------
  -- | Get Job type from headers
  --   (and throw an exception if the header does not exist
  --        or contains an invalid value)
  ------------------------------------------------------------------------
  getJobType :: Message m -> IO JobType
  getJobType :: Message m -> IO JobType
getJobType Message m
m = 
    String -> String -> Message m -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__job-type__" String
"No job type in headers"  Message m
m IO String -> (String -> IO JobType) -> IO JobType
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \String
x ->
      case String -> Maybe JobType
readJobType String
x of
        Maybe JobType
Nothing -> PatternsException -> IO JobType
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO JobType)
-> PatternsException -> IO JobType
forall a b. (a -> b) -> a -> b
$ String -> String -> PatternsException
HeaderX String
"__job-type__" (String -> PatternsException) -> String -> PatternsException
forall a b. (a -> b) -> a -> b
$
                                     String
"unknown type: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
x 
        Just JobType
t  -> JobType -> IO JobType
forall (m :: * -> *) a. Monad m => a -> m a
return JobType
t  

  ------------------------------------------------------------------------
  -- | Get Heartbeat specification from headers
  --   (and throw an exception if the header does not exist
  --        or if its value is not numeric)
  ------------------------------------------------------------------------
  getHB :: Message m -> IO Int
  getHB :: Message m -> IO Int
getHB Message m
m = 
    case String -> [(String, String)] -> Maybe String
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup String
"__hb__" ([(String, String)] -> Maybe String)
-> [(String, String)] -> Maybe String
forall a b. (a -> b) -> a -> b
$ Message m -> [(String, String)]
forall a. Message a -> [(String, String)]
msgHdrs Message m
m of
      Maybe String
Nothing -> Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0 
      Just String
v  -> if (Char -> Bool) -> String -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all Char -> Bool
isDigit String
v 
                   then Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ String -> Int
forall a. Read a => String -> a
read String
v
                   else PatternsException -> IO Int
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO Int) -> PatternsException -> IO Int
forall a b. (a -> b) -> a -> b
$ String -> String -> PatternsException
HeaderX String
"__hb__" (String -> PatternsException) -> String -> PatternsException
forall a b. (a -> b) -> a -> b
$
                          String
"heartbeat not numeric: "  String -> ShowS
forall a. [a] -> [a] -> [a]
++ ShowS
forall a. Show a => a -> String
show String
v

  ------------------------------------------------------------------------
  -- | Get Status code from headers
  --   (and throw an exception if the header does not exist)
  ------------------------------------------------------------------------
  getSC :: Message m -> IO (Either String StatusCode)
  getSC :: Message m -> IO (Either String StatusCode)
getSC Message m
m = String -> Either String StatusCode
readStatusCode (String -> Either String StatusCode)
-> IO String -> IO (Either String StatusCode)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> String -> Message m -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__sc__"
                                 String
"No status code in message" Message m
m
                               
  ------------------------------------------------------------------------
  -- | Get Generic function to retrieve a header value
  --   (and throw an exception if the header does not exist):
  --
  --   * String: Key of the wanted header
  --
  --   * String: Error message in case there is no such header
  --
  --   * 'Message' m: The message whose headers we want to search
  ------------------------------------------------------------------------
  getHeader :: String -> String -> Message m -> IO String
  getHeader :: String -> String -> Message m -> IO String
getHeader String
h String
e Message m
m = case String -> [(String, String)] -> Maybe String
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup String
h ([(String, String)] -> Maybe String)
-> [(String, String)] -> Maybe String
forall a b. (a -> b) -> a -> b
$ Message m -> [(String, String)]
forall a. Message a -> [(String, String)]
msgHdrs Message m
m of
                      Maybe String
Nothing -> PatternsException -> IO String
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO String) -> PatternsException -> IO String
forall a b. (a -> b) -> a -> b
$ String -> String -> PatternsException
HeaderX String
h String
e
                      Just String
v  -> String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
v