module State (
msgContent, numeric, ms,
Connection(..),
Copt(..),
oHeartBeat, oMaxRecv,
oAuth,
Transaction(..),
Topt(..), hasTopt, tmo,
TxState(..),
Receipt,
mkConnection,
logSend, logReceive,
addCon, rmCon, getCon,
addSub, addDest, getSub, getDest,
rmSub, rmDest,
mkTrn,
addTx, getTx, rmTx, rmThisTx,
getCurTx,
updTxState,
txPendingAck, txReceipts,
addAck, rmAck, addRec, rmRec,
forceRmRec,
checkReceipt)
where
import qualified Protocol as P
import Factory
import qualified Network.Mom.Stompl.Frame as F
import Network.Mom.Stompl.Client.Exception
import Control.Concurrent
import Control.Exception (throwIO)
import System.IO.Unsafe
import Data.List (find, deleteBy, delete)
import Data.Char (isDigit)
import Data.Time.Clock
msgContent :: P.Message a -> a
msgContent = P.msgCont
numeric :: String -> Bool
numeric = all isDigit
ms :: Int -> Int
ms u = 1000 * u
eq :: Eq a => (a, b) -> (a, b) -> Bool
eq x y = fst x == fst y
type Receipt = Rec
data Connection = Connection {
conId :: Con,
conCon :: P.Connection,
conOwner :: ThreadId,
conHisBeat :: UTCTime,
conMyBeat :: UTCTime,
conWait :: Int,
conSubs :: [SubEntry],
conDests :: [DestEntry],
conThrds :: [ThreadEntry],
conErrs :: [F.Frame],
conRecs :: [Receipt],
conAcks :: [P.MsgId]}
instance Eq Connection where
c1 == c2 = conId c1 == conId c2
data Copt =
OWaitBroker Int |
OMaxRecv Int |
OHeartBeat (F.Heart) |
OAuth String String
deriving (Eq, Show)
is :: Copt -> Copt -> Bool
is (OWaitBroker _) (OWaitBroker _) = True
is (OMaxRecv _) (OMaxRecv _) = True
is (OHeartBeat _) (OHeartBeat _) = True
is (OAuth _ _) (OAuth _ _) = True
is _ _ = False
noWait :: Int
noWait = 0
stdRecv :: Int
stdRecv = 1024
noBeat :: F.Heart
noBeat = (0,0)
noAuth :: (String, String)
noAuth = ("","")
oWaitBroker :: [Copt] -> Int
oWaitBroker os = case find (is $ OWaitBroker 0) os of
Just (OWaitBroker d) -> d
_ -> noWait
oMaxRecv :: [Copt] -> Int
oMaxRecv os = case find (is $ OMaxRecv 0) os of
Just (OMaxRecv i) -> i
_ -> stdRecv
oHeartBeat :: [Copt] -> F.Heart
oHeartBeat os = case find (is $ OHeartBeat (0,0)) os of
Just (OHeartBeat b) -> b
_ -> noBeat
oAuth :: [Copt] -> (String, String)
oAuth os = case find (is $ OAuth "" "") os of
Just (OAuth u p) -> (u, p)
_ -> noAuth
findCon :: Con -> [Connection] -> Maybe Connection
findCon cid = find (\c -> conId c == cid)
mkConnection :: Con -> P.Connection -> ThreadId -> UTCTime -> [Copt] -> Connection
mkConnection cid c myself t os = Connection cid c myself t t (oWaitBroker os)
[] [] [] [] [] []
addAckToCon :: P.MsgId -> Connection -> Connection
addAckToCon mid c = c {conAcks = mid : conAcks c}
rmAckFromCon :: P.MsgId -> Connection -> Connection
rmAckFromCon mid c = c {conAcks = delete mid $ conAcks c}
addRecToCon :: Receipt -> Connection -> Connection
addRecToCon r c = c {conRecs = r : conRecs c}
rmRecFromCon :: Receipt -> Connection -> Connection
rmRecFromCon r c = c {conRecs = delete r $ conRecs c}
checkReceiptCon :: Receipt -> Connection -> Bool
checkReceiptCon r c = case find (== r) $ conRecs c of
Nothing -> True
Just _ -> False
type SubEntry = (Sub, Chan F.Frame)
type DestEntry = (String, Chan F.Frame)
type ThreadEntry = (ThreadId, [Transaction])
addSubToCon :: SubEntry -> Connection -> Connection
addSubToCon s c = c {conSubs = s : conSubs c}
getSub :: Sub -> Connection -> Maybe (Chan F.Frame)
getSub sid c = lookup sid (conSubs c)
rmSubFromCon :: SubEntry -> Connection -> Connection
rmSubFromCon s c = c {conSubs = ss}
where ss = deleteBy eq s (conSubs c)
addDestToCon :: DestEntry -> Connection -> Connection
addDestToCon d c = c {conDests = d : conDests c}
getDest :: String -> Connection -> Maybe (Chan F.Frame)
getDest dst c = lookup dst (conDests c)
rmDestFromCon :: DestEntry -> Connection -> Connection
rmDestFromCon d c = c {conDests = ds}
where ds = deleteBy eq d (conDests c)
setHisTime :: UTCTime -> Connection -> Connection
setHisTime t c = c {conHisBeat = t}
setMyTime :: UTCTime -> Connection -> Connection
setMyTime t c = c {conMyBeat = t}
updCon :: Connection -> [Connection] -> [Connection]
updCon c cs = c : delete c cs
data Transaction = Trn {
txId :: Tx,
txState :: TxState,
txTmo :: Int,
txAbrtAck :: Bool,
txAbrtRc :: Bool,
txAcks :: [P.MsgId],
txRecs :: [Receipt]
}
instance Eq Transaction where
t1 == t2 = txId t1 == txId t2
findTx :: Tx -> [Transaction] -> Maybe Transaction
findTx tx = find (\x -> txId x == tx)
mkTrn :: Tx -> [Topt] -> Transaction
mkTrn tx os = Trn {
txId = tx,
txState = TxStarted,
txTmo = tmo os,
txAbrtAck = hasTopt OAbortMissingAcks os,
txAbrtRc = hasTopt OWithReceipts os,
txAcks = [],
txRecs = []}
data Topt =
OTimeout Int
| OWithReceipts
| OAbortMissingAcks
deriving (Eq, Show)
hasTopt :: Topt -> [Topt] -> Bool
hasTopt o os = o `elem` os
tmo :: [Topt] -> Int
tmo os = case find isTimeout os of
Just (OTimeout i) -> i
_ -> 0
where isTimeout o = case o of
OTimeout _ -> True
_ -> False
data TxState = TxStarted | TxEnded
deriving (Eq, Show)
setTxState :: TxState -> Transaction -> Transaction
setTxState st t = t {txState = st}
addAckToTx :: P.MsgId -> Transaction -> Transaction
addAckToTx mid t = t {txAcks = mid : txAcks t}
rmAckFromTx :: P.MsgId -> Transaction -> Transaction
rmAckFromTx mid t = t {txAcks = delete mid $ txAcks t}
addRecToTx :: Receipt -> Transaction -> Transaction
addRecToTx r t = t {txRecs = r : txRecs t}
rmRecFromTx :: Receipt -> Transaction -> Transaction
rmRecFromTx r t = t {txRecs = delete r $ txRecs t}
checkReceiptTx :: Receipt -> Transaction -> Bool
checkReceiptTx r = not . (elem r) . txRecs
txPendingAck :: Transaction -> Bool
txPendingAck t = txAbrtAck t && not (null $ txAcks t)
txReceipts :: Transaction -> Bool
txReceipts t = txAbrtRc t && not (null $ txRecs t)
con :: MVar [Connection]
con = unsafePerformIO $ newMVar []
addCon :: Connection -> IO ()
addCon c = modifyMVar_ con $ \cs -> return (c:cs)
getCon :: Con -> IO Connection
getCon cid = withCon cid $ \c -> return (c, c)
rmCon :: Con -> IO ()
rmCon cid = modifyMVar_ con $ \cs ->
case findCon cid cs of
Nothing -> return cs
Just c ->
return $ delete c cs
withCon :: Con -> (Connection -> IO (Connection, a)) -> IO a
withCon cid op = modifyMVar con (\cs ->
case findCon cid cs of
Nothing ->
throwIO $ ConnectException $
"No such Connection: " ++ show cid
Just c -> do
(c', x) <- op c
let cs' = updCon c' cs
return (cs', x))
logTime :: Con -> (UTCTime -> Connection -> Connection) -> IO ()
logTime cid f =
getCurrentTime >>= \t -> withCon cid (\c -> return (f t c, ()))
logSend :: Con -> IO ()
logSend cid = logTime cid setMyTime
logReceive :: Con -> IO ()
logReceive cid = logTime cid setHisTime
addSub :: Con -> SubEntry -> IO ()
addSub cid s = withCon cid $ \c -> return (addSubToCon s c, ())
addDest :: Con -> DestEntry -> IO ()
addDest cid d = withCon cid $ \c -> return (addDestToCon d c, ())
rmSub :: Con -> Sub -> IO ()
rmSub cid sid = withCon cid rm
where rm c = case getSub sid c of
Nothing -> return (c, ())
Just ch -> return (rmSubFromCon (sid, ch) c, ())
rmDest :: Con -> String -> IO ()
rmDest cid dst = withCon cid rm
where rm c = case getDest dst c of
Nothing -> return (c, ())
Just ch -> return (rmDestFromCon (dst, ch) c, ())
addTx :: Transaction -> Con -> IO ()
addTx t cid = withCon cid $ \c -> do
tid <- myThreadId
case lookup tid (conThrds c) of
Nothing ->
return (c {conThrds = [(tid, [t])]}, ())
Just ts ->
return (c {conThrds = addTx2Thrds t tid (conThrds c) ts}, ())
where addTx2Thrds tx tid ts trns =
(tid, tx : trns) : deleteBy eq (tid, trns) ts
getTx :: Tx -> Connection -> IO (Maybe Transaction)
getTx tx c = do
tid <- myThreadId
case lookup tid (conThrds c) of
Nothing -> return Nothing
Just ts -> return $ findTx tx ts
updTx :: Tx -> Con -> (Transaction -> Transaction) -> IO ()
updTx tx cid f = withCon cid $ \c -> do
tid <- myThreadId
case lookup tid (conThrds c) of
Nothing -> return (c, ())
Just ts ->
case findTx tx ts of
Nothing -> return (c, ())
Just t ->
let t' = f t
in return (c {conThrds =
updTxInThrds t' tid (conThrds c) ts},
())
where updTxInThrds t tid ts trns =
(tid, t : delete t trns) : deleteBy eq (tid, trns) ts
updTxState :: Tx -> Con -> TxState -> IO ()
updTxState tx cid st = updTx tx cid (setTxState st)
getCurTx :: Connection -> IO (Maybe Tx)
getCurTx c = do
tid <- myThreadId
case lookup tid (conThrds c) of
Nothing -> return Nothing
Just ts -> if null ts then return Nothing
else return $ Just $ (txId . head) ts
updCurTx :: (Transaction -> Transaction) ->
(Connection -> Connection) ->
Connection -> IO (Connection, ())
updCurTx onTx onCon c = do
tid <- myThreadId
case lookup tid $ conThrds c of
Nothing -> return (onCon c, ())
Just ts -> if null ts
then return (onCon c, ())
else do
let t = head ts
let t' = onTx t
let ts' = t' : tail ts
let c' = c {conThrds =
(tid, ts') :
deleteBy eq (tid, ts) (conThrds c)}
return (c', ())
addAck :: Con -> P.MsgId -> IO ()
addAck cid mid = do
let toTx = addAckToTx mid
let toCon = addAckToCon mid
withCon cid $ updCurTx toTx toCon
rmAck :: Con -> P.MsgId -> IO ()
rmAck cid mid = do
let fromTx = rmAckFromTx mid
let fromCon = rmAckFromCon mid
withCon cid $ updCurTx fromTx fromCon
addRec :: Con -> Receipt -> IO ()
addRec cid r = do
let toTx = addRecToTx r
let toCon = addRecToCon r
withCon cid $ updCurTx toTx toCon
rmRec :: Con -> Receipt -> IO ()
rmRec cid r = do
let fromTx = rmRecFromTx r
let fromCon = rmRecFromCon r
withCon cid $ updCurTx fromTx fromCon
forceRmRec :: Con -> Receipt -> IO ()
forceRmRec cid r = withCon cid doRmRec
where doRmRec c =
case find (== r) $ conRecs c of
Just _ -> return (rmRecFromCon r c, ())
Nothing ->
let thrds = map rmRecFromThrd $ conThrds c
in return (c {conThrds = thrds}, ())
rmRecFromThrd (thrd, ts) = (thrd, map (rmRecFromTx r) ts)
checkCurTx :: (Transaction -> Bool) ->
(Connection -> Bool) ->
Con -> IO Bool
checkCurTx onTx onCon cid = do
c <- getCon cid
tid <- myThreadId
case lookup tid $ conThrds c of
Nothing -> return $ onCon c
Just ts -> if null ts then return $ onCon c
else return $ onTx $ head ts
checkReceipt :: Con -> Receipt -> IO Bool
checkReceipt cid r = do
let onTx = checkReceiptTx r
let onCon = checkReceiptCon r
checkCurTx onTx onCon cid
rmThisTx :: Tx -> Con -> IO ()
rmThisTx tx cid = withCon cid $ \c -> do
tid <- myThreadId
case lookup tid (conThrds c) of
Nothing -> return (c, ())
Just ts ->
if null ts
then return (c {conThrds = deleteBy eq (tid, []) (conThrds c)}, ())
else
case findTx tx ts of
Nothing -> return (c, ())
Just t -> do
let ts' = delete t ts
if null ts'
then return (c {conThrds =
deleteBy eq (tid, ts) (conThrds c)}, ())
else return (c {conThrds = (tid, ts') :
deleteBy eq (tid, ts) (conThrds c)}, ())
rmTx :: Con -> IO ()
rmTx cid = withCon cid $ \c -> do
tid <- myThreadId
case lookup tid (conThrds c) of
Nothing -> return (c, ())
Just ts ->
if null ts
then return (c {conThrds = deleteBy eq (tid, []) (conThrds c)}, ())
else do
let ts' = tail ts
if null ts'
then return (c {conThrds =
deleteBy eq (tid, ts) (conThrds c)}, ())
else return (c {conThrds = (tid, ts') :
deleteBy eq (tid, ts) (conThrds c)}, ())