module Control.Distributed.Process.Internal.Primitives
(
send
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, terminate
, ProcessTerminationException(..)
, getSelfPid
, getSelfNode
, link
, unlink
, monitor
, unmonitor
, say
, register
, unregister
, whereis
, nsend
, registerRemote
, unregisterRemote
, whereisRemote
, whereisRemoteAsync
, nsendRemote
, unClosure
, catch
, expectTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
) where
import Prelude hiding (catch)
import Data.Binary (decode)
import Data.Typeable (Typeable, typeOf)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception, throw)
import qualified Control.Exception as Exception (catch)
import Control.Concurrent.MVar (modifyMVar)
import Control.Concurrent.Chan (writeChan)
import Control.Concurrent.STM
( STM
, atomically
, orElse
, newTChan
, readTChan
, newTVar
, readTVar
, writeTVar
)
import Control.Distributed.Process.Internal.CQueue (dequeue, BlockSpec(..))
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Closure(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, NCMsg(..)
, ProcessSignal(..)
, monitorCounter
, spawnCounter
, Closure(..)
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, procMsg
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, createMessage
, Static(..)
)
import Control.Distributed.Process.Internal.MessageT
( sendMessage
, sendBinary
, getLocalNode
)
import Control.Distributed.Process.Internal.Node (runLocalProcess)
import Control.Distributed.Process.Internal.Closure.Resolution (resolveClosure)
import Control.Distributed.Process.Internal.Dynamic (fromDyn, dynTypeRep)
send :: Serializable a => ProcessId -> a -> Process ()
send them msg = procMsg $ sendMessage (ProcessIdentifier them) msg
expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
proc <- ask
liftIO . modifyMVar (processState proc) $ \st -> do
chan <- liftIO . atomically $ newTChan
let lcid = st ^. channelCounter
cid = SendPortId { sendPortProcessId = processId proc
, sendPortLocalId = lcid
}
sport = SendPort cid
rport = ReceivePortSingle chan
tch = TypedChannel chan
return ( (channelCounter ^: (+ 1))
. (typedChannelWithId lcid ^= Just tch)
$ st
, (sport, rport)
)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort cid) msg = procMsg $ sendBinary (SendPortIdentifier cid) msg
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan = liftIO . atomically . receiveSTM
where
receiveSTM :: ReceivePort a -> STM a
receiveSTM (ReceivePortSingle c) =
readTChan c
receiveSTM (ReceivePortBiased ps) =
foldr1 orElse (map receiveSTM ps)
receiveSTM (ReceivePortRR psVar) = do
ps <- readTVar psVar
a <- foldr1 orElse (map receiveSTM ps)
writeTVar psVar (rotate ps)
return a
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePortBiased
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR ps = liftIO . atomically $ ReceivePortRR <$> newTVar ps
newtype Match b = Match { unMatch :: Message -> Maybe (Process b) }
receiveWait :: [Match b] -> Process b
receiveWait ms = do
queue <- processQueue <$> ask
Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms)
proc
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout t ms = do
queue <- processQueue <$> ask
let blockSpec = if t == 0 then NonBlocking else Timeout t
mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms)
case mProc of
Nothing -> return Nothing
Just proc -> Just <$> proc
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = matchIf (const True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf c p = Match $ \msg ->
let decoded :: a
decoded = decode . messageEncoding $ msg in
if messageFingerprint msg == fingerprint (undefined :: a) && c decoded
then Just $ p decoded
else Nothing
matchUnknown :: Process b -> Match b
matchUnknown = Match . const . Just
data ProcessTerminationException = ProcessTerminationException
deriving (Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate = liftIO $ throw ProcessTerminationException
getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask
getSelfNode :: Process NodeId
getSelfNode = localNodeId <$> procMsg getLocalNode
link :: ProcessId -> Process ()
link = sendCtrlMsg Nothing . Link . ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor = monitor' . ProcessIdentifier
unlink :: ProcessId -> Process ()
unlink pid = do
unlinkAsync pid
receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid)
(\_ -> return ())
]
unlinkNode :: NodeId -> Process ()
unlinkNode nid = do
unlinkNodeAsync nid
receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid)
(\_ -> return ())
]
unlinkPort :: SendPort a -> Process ()
unlinkPort sport = do
unlinkPortAsync sport
receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport)
(\_ -> return ())
]
unmonitor :: MonitorRef -> Process ()
unmonitor ref = do
unmonitorAsync ref
receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref)
(\_ -> return ())
]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch p h = do
node <- procMsg getLocalNode
lproc <- ask
let run :: Process a -> IO a
run proc = runLocalProcess node proc lproc
liftIO $ Exception.catch (run p) (run . h)
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout timeout = receiveTimeout timeout [match return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync nid proc = do
spawnRef <- getSpawnRef
sendCtrlMsg (Just nid) $ Spawn proc spawnRef
return spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
monitor' . NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort cid) =
monitor' (SendPortIdentifier cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
sendCtrlMsg Nothing . Unmonitor
linkNode :: NodeId -> Process ()
linkNode = link' . NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort (SendPort cid) =
link' (SendPortIdentifier cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
sendCtrlMsg Nothing . Unlink . ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
sendCtrlMsg Nothing . Unlink . NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync (SendPort cid) =
sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid
say :: String -> Process ()
say string = do
now <- liftIO getCurrentTime
us <- getSelfPid
nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string)
register :: String -> ProcessId -> Process ()
register label pid =
sendCtrlMsg Nothing (Register label (Just pid))
registerRemote :: NodeId -> String -> ProcessId -> Process ()
registerRemote nid label pid =
sendCtrlMsg (Just nid) (Register label (Just pid))
unregister :: String -> Process ()
unregister label =
sendCtrlMsg Nothing (Register label Nothing)
unregisterRemote :: NodeId -> String -> Process ()
unregisterRemote nid label =
sendCtrlMsg (Just nid) (Register label Nothing)
whereis :: String -> Process (Maybe ProcessId)
whereis label = do
sendCtrlMsg Nothing (WhereIs label)
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote nid label = do
whereisRemoteAsync nid label
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync nid label =
sendCtrlMsg (Just nid) (WhereIs label)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
sendCtrlMsg Nothing (NamedSend label (createMessage msg))
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote nid label msg =
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
unClosure :: forall a. Typeable a => Closure a -> Process a
unClosure (Closure (Static label) env) = do
rtable <- remoteTable <$> procMsg getLocalNode
case resolveClosure rtable label env of
Nothing -> throw . userError $ "Unregistered closure " ++ show label
Just dyn -> return $ fromDyn dyn (throw (typeError dyn))
where
typeError dyn = userError $ "lookupStatic type error: "
++ "cannot match " ++ show (dynTypeRep dyn)
++ " against " ++ show (typeOf (undefined :: a))
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor ident = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. monitorCounter
return ( monitorCounter ^: (+ 1) $ st
, MonitorRef ident counter
)
getSpawnRef :: Process SpawnRef
getSpawnRef = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. spawnCounter
return ( spawnCounter ^: (+ 1) $ st
, SpawnRef counter
)
monitor' :: Identifier -> Process MonitorRef
monitor' ident = do
monitorRef <- getMonitorRefFor ident
sendCtrlMsg Nothing $ Monitor monitorRef
return monitorRef
link' :: Identifier -> Process ()
link' = sendCtrlMsg Nothing . Link
sendCtrlMsg :: Maybe NodeId
-> ProcessSignal
-> Process ()
sendCtrlMsg mNid signal = do
us <- getSelfPid
let msg = NCMsg { ctrlMsgSender = ProcessIdentifier us
, ctrlMsgSignal = signal
}
case mNid of
Nothing -> do
ctrlChan <- localCtrlChan <$> procMsg getLocalNode
liftIO $ writeChan ctrlChan msg
Just nid ->
procMsg $ sendBinary (NodeIdentifier nid) msg