module Control.TimeWarp.Rpc.MonadDialog
(
MonadDialog (..)
, Dialog (..)
, runDialog
, ForkStrategy (..)
, send
, sendH
, sendR
, listen
, listenH
, listenR
, reply
, replyH
, replyR
, Listener (..)
, ListenerH (..)
, ListenerR
, getListenerName
, getListenerNameH
, MonadListener
) where
import Control.Lens (at, iso, (.~), (^.), _2)
import Control.Monad (void, when)
import Control.Monad.Base (MonadBase (..))
import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow,
handleAll)
import Control.Monad.Morph (hoist)
import Control.Monad.Reader (MonadReader (ask, local),
ReaderT (..))
import Control.Monad.State (MonadState)
import Control.Monad.Trans (MonadIO, MonadTrans (..))
import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl (..),
MonadTransControl (..), StM,
defaultLiftBaseWith, defaultLiftWith,
defaultRestoreM, defaultRestoreT)
import Data.ByteString (ByteString)
import Data.Conduit (Conduit, yield, (=$=))
import qualified Data.Conduit.List as CL
import Data.Map as M
import Data.Proxy (Proxy (..))
import Formatting (sformat, shown, stext, (%))
import System.Wlog (CanLog, HasLoggerName,
LoggerNameBox (..), WithLogger,
logDebug, logError, logWarning)
import Serokell.Util.Lens (WrappedM (..))
import Control.TimeWarp.Rpc.Message (ContentData (..), Message (..),
MessageName, NameData (..),
Packable (..), RawData (..),
Unpackable (..), WithHeaderData (..),
unpackMsg)
import Control.TimeWarp.Rpc.MonadTransfer (Binding,
MonadResponse (peerAddr, replyRaw),
MonadTransfer (..), NetworkAddress,
ResponseT (..), commLog)
import Control.TimeWarp.Timed (MonadTimed, ThreadId, fork_)
data ForkStrategy s = ForkStrategy
{ withForkStrategy :: forall m . (MonadIO m, MonadTimed m)
=> s -> m () -> m ()
}
class MonadTransfer s m => MonadDialog s p m | m -> p, m -> s where
packingType :: m p
forkStrategy :: m (ForkStrategy MessageName)
setForkStrategy :: ForkStrategy MessageName -> m a -> m a
packMsg' :: (Packable p r, MonadDialog s p m, MonadThrow m) => Conduit r m ByteString
packMsg' = lift packingType >>= packMsg
doSend :: (Packable p r, MonadDialog s p m, MonadThrow m)
=> NetworkAddress -> r -> m ()
doSend addr r = sendRaw addr $ yield r =$= packMsg'
send :: (Packable p (WithHeaderData () (ContentData r)), MonadDialog s p m, MonadThrow m)
=> NetworkAddress -> r -> m ()
send addr msg = doSend addr $ WithHeaderData () (ContentData msg)
sendH :: (Packable p (WithHeaderData h (ContentData r)), MonadDialog s p m, MonadThrow m)
=> NetworkAddress -> h -> r -> m ()
sendH addr h msg = doSend addr $ WithHeaderData h (ContentData msg)
sendR :: (Packable p (WithHeaderData h RawData), MonadDialog s p m, MonadThrow m)
=> NetworkAddress -> h -> RawData -> m ()
sendR addr h raw = doSend addr $ WithHeaderData h raw
doReply :: (Packable p r, MonadDialog s p m, MonadResponse s m, MonadThrow m)
=> r -> m ()
doReply r = replyRaw $ yield r =$= packMsg'
reply :: (Packable p (WithHeaderData () (ContentData r)), MonadDialog s p m,
MonadResponse s m, MonadThrow m)
=> r -> m ()
reply msg = doReply $ WithHeaderData () (ContentData msg)
replyH :: (Packable p (WithHeaderData h (ContentData r)), MonadDialog s p m,
MonadResponse s m, MonadThrow m)
=> h -> r -> m ()
replyH h msg = doReply $ WithHeaderData h (ContentData msg)
replyR :: (Packable p (WithHeaderData h RawData), MonadDialog s p m,
MonadResponse s m, MonadThrow m)
=> h -> RawData -> m ()
replyR h raw = doReply $ WithHeaderData h raw
type MonadListener s m =
( MonadIO m
, MonadMask m
, MonadTimed m
, MonadTransfer s m
, WithLogger m
)
listen :: (Unpackable p (WithHeaderData () RawData),Unpackable p NameData,
MonadListener s m, MonadDialog s p m)
=> Binding -> [Listener s p m] -> m (m ())
listen binding listeners = listenH binding $ convert <$> listeners
where
convert :: Listener s p m -> ListenerH s p () m
convert (Listener f) = ListenerH $ f . second
second ((), r) = r
listenH :: (Unpackable p (WithHeaderData h RawData), Unpackable p NameData,
MonadListener s m, MonadDialog s p m)
=> Binding -> [ListenerH s p h m] -> m (m ())
listenH binding listeners =
listenR binding listeners (const $ return True)
listenR :: (Unpackable p (WithHeaderData h RawData),Unpackable p NameData,
MonadListener s m, MonadDialog s p m)
=> Binding -> [ListenerH s p h m] -> ListenerR s h m -> m (m ())
listenR binding listeners rawListener = do
packing <- packingType
forking <- forkStrategy
listenRaw binding $ handleAll handleE $
unpackMsg packing =$= CL.mapM (processContent packing)
=$= CL.mapM_ (uncurry $ withForkStrategy forking)
where
processContent packing msg = do
WithHeaderData header raw <- extractMsgPart packing msg
NameData name <- extractMsgPart packing msg
case listenersMap ^. at name of
Nothing -> do
commLog . logWarning $
sformat ("No listener with name "%stext%" defined") name
putDownstream name . void $
invokeRawListenerSafe $ rawListener (header, raw)
Just (ListenerH f) -> do
ContentData r <- extractMsgPart packing msg
putDownstream name $ do
cont <- invokeRawListenerSafe $ rawListener (header, raw)
peer <- peerAddr
when cont $ do
commLog . logDebug $
sformat ("Got message from "%stext%": "%stext)
peer (formatMessage r)
invokeListenerSafe name $ f (header, r)
handleE e = lift . commLog . logWarning $
sformat ("Error parsing message: " % shown) e
listenersMap = M.fromList [(getListenerNameH li, li) | li <- listeners]
invokeRawListenerSafe = handleAll $ \e -> do
commLog . logError $ sformat ("Uncaught error in raw listener: "%shown) e
return False
invokeListenerSafe name = handleAll $
commLog . logError . sformat ("Uncaught error in listener "%shown%": "%shown) name
putDownstream = curry return
data Listener s p m =
forall r . (Unpackable p (ContentData r), Message r)
=> Listener (r -> ResponseT s m ())
data ListenerH s p h m =
forall r . (Unpackable p (ContentData r), Message r)
=> ListenerH ((h, r) -> ResponseT s m ())
type ListenerR s h m = (h, RawData) -> ResponseT s m Bool
getListenerName :: Listener s p m -> MessageName
getListenerName (Listener f) = messageName $ proxyOfArg f
where
proxyOfArg :: (a -> b) -> Proxy a
proxyOfArg _ = Proxy
getListenerNameH :: ListenerH s p h m -> MessageName
getListenerNameH (ListenerH f) = messageName $ proxyOfArg f
where
proxyOfArg :: ((h, a) -> b) -> Proxy a
proxyOfArg _ = Proxy
newtype Dialog p m a = Dialog
{ getDialog :: ReaderT (p, ForkStrategy MessageName) m a
} deriving (Functor, Applicative, Monad, MonadIO, MonadTrans,
MonadThrow, MonadCatch, MonadMask,
MonadState s, CanLog, HasLoggerName, MonadTimed)
runDialog :: p -> Dialog p m a -> m a
runDialog p = flip runReaderT (p, ForkStrategy $ const fork_) . getDialog
instance MonadBase IO m => MonadBase IO (Dialog p m) where
liftBase = lift . liftBase
instance MonadTransControl (Dialog p) where
type StT (Dialog p) a = StT (ReaderT p) a
liftWith = defaultLiftWith Dialog getDialog
restoreT = defaultRestoreT Dialog
instance MonadBaseControl IO m => MonadBaseControl IO (Dialog p m) where
type StM (Dialog p m) a = ComposeSt (Dialog p) m a
liftBaseWith = defaultLiftBaseWith
restoreM = defaultRestoreM
type instance ThreadId (Dialog p m) = ThreadId m
instance Monad m => WrappedM (Dialog p m) where
type UnwrappedM (Dialog p m) = ReaderT (p, ForkStrategy MessageName) m
_WrappedM = iso getDialog Dialog
instance MonadTransfer s m => MonadTransfer s (Dialog p m) where
instance MonadTransfer s m => MonadDialog s p (Dialog p m) where
packingType = fst <$> Dialog ask
forkStrategy = snd <$> Dialog ask
setForkStrategy fs = Dialog . local (_2 .~ fs) . getDialog
instance MonadDialog s p m => MonadDialog s p (ReaderT r m) where
packingType = lift packingType
forkStrategy = lift forkStrategy
setForkStrategy fs = hoist $ setForkStrategy fs
deriving instance MonadDialog s p m => MonadDialog s p (LoggerNameBox m)
deriving instance MonadDialog s p m => MonadDialog s p (ResponseT s0 m)