{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module OM.Legion.RChan (
RChan,
RuntimeMessage(..),
JoinRequest(..),
readRChan,
newRChan,
) where
import Control.Concurrent (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Binary (Binary)
import Data.ByteString.Lazy (ByteString)
import Data.CRDT.EventFold (Event(Output, State), Diff, EventFold,
EventId)
import Data.Map (Map)
import Data.Time (DiffTime)
import GHC.Generics (Generic)
import OM.Fork (Actor(Msg, actorChan), Responder)
import OM.Legion.Connection (JoinResponse)
import OM.Legion.MsgChan (ClusterName, MessageId, Peer)
import Prelude ((.), (<$>), Maybe, Show)
newtype RChan e = RChan {
forall e. RChan e -> MVar (RuntimeMessage e)
unRChan :: MVar (RuntimeMessage e)
}
instance Actor (RChan e) where
type Msg (RChan e) = RuntimeMessage e
actorChan :: RChan e -> Msg (RChan e) -> IO ()
actorChan = MVar (RuntimeMessage e) -> RuntimeMessage e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (MVar (RuntimeMessage e) -> RuntimeMessage e -> IO ())
-> (RChan e -> MVar (RuntimeMessage e))
-> RChan e
-> RuntimeMessage e
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RChan e -> MVar (RuntimeMessage e)
forall e. RChan e -> MVar (RuntimeMessage e)
unRChan
readRChan
:: (MonadIO m)
=> RChan e
-> m (RuntimeMessage e)
readRChan :: forall (m :: * -> *) e.
MonadIO m =>
RChan e -> m (RuntimeMessage e)
readRChan =
IO (RuntimeMessage e) -> m (RuntimeMessage e)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (RuntimeMessage e) -> m (RuntimeMessage e))
-> (RChan e -> IO (RuntimeMessage e))
-> RChan e
-> m (RuntimeMessage e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (RuntimeMessage e) -> IO (RuntimeMessage e)
forall a. MVar a -> IO a
takeMVar (MVar (RuntimeMessage e) -> IO (RuntimeMessage e))
-> (RChan e -> MVar (RuntimeMessage e))
-> RChan e
-> IO (RuntimeMessage e)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RChan e -> MVar (RuntimeMessage e)
forall e. RChan e -> MVar (RuntimeMessage e)
unRChan
newRChan :: (MonadIO m) => m (RChan e)
newRChan :: forall (m :: * -> *) e. MonadIO m => m (RChan e)
newRChan =
MVar (RuntimeMessage e) -> RChan e
forall e. MVar (RuntimeMessage e) -> RChan e
RChan (MVar (RuntimeMessage e) -> RChan e)
-> m (MVar (RuntimeMessage e)) -> m (RChan e)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (MVar (RuntimeMessage e)) -> m (MVar (RuntimeMessage e))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (RuntimeMessage e))
forall a. IO (MVar a)
newEmptyMVar
data RuntimeMessage e
= ApplyFast e (Responder (Output e))
| ApplyConsistent e (Responder (Output e))
| Eject Peer (Responder ())
| Merge (Diff ClusterName Peer e)
| FullMerge (EventFold ClusterName Peer e)
| Outputs (Map (EventId Peer) (Output e))
| Join JoinRequest (Responder (JoinResponse e))
| ReadState (Responder (EventFold ClusterName Peer e))
| Call Peer ByteString (Responder ByteString)
| Cast Peer ByteString
| Broadcall
DiffTime
ByteString
(Responder (Map Peer (Maybe ByteString)))
| Broadcast ByteString
| SendCallResponse Peer MessageId ByteString
| HandleCallResponse Peer MessageId ByteString
| Resend (Responder ())
| GetStats (Responder (EventFold ClusterName Peer e))
deriving stock instance
( Show e
, Show (Output e)
, Show (State e)
)
=>
Show (RuntimeMessage e)
newtype JoinRequest = JoinRequest Peer
deriving stock ((forall x. JoinRequest -> Rep JoinRequest x)
-> (forall x. Rep JoinRequest x -> JoinRequest)
-> Generic JoinRequest
forall x. Rep JoinRequest x -> JoinRequest
forall x. JoinRequest -> Rep JoinRequest x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. JoinRequest -> Rep JoinRequest x
from :: forall x. JoinRequest -> Rep JoinRequest x
$cto :: forall x. Rep JoinRequest x -> JoinRequest
to :: forall x. Rep JoinRequest x -> JoinRequest
Generic, Int -> JoinRequest -> ShowS
[JoinRequest] -> ShowS
JoinRequest -> String
(Int -> JoinRequest -> ShowS)
-> (JoinRequest -> String)
-> ([JoinRequest] -> ShowS)
-> Show JoinRequest
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> JoinRequest -> ShowS
showsPrec :: Int -> JoinRequest -> ShowS
$cshow :: JoinRequest -> String
show :: JoinRequest -> String
$cshowList :: [JoinRequest] -> ShowS
showList :: [JoinRequest] -> ShowS
Show)
instance Binary JoinRequest