module Control.Distributed.Process.Internal.Primitives
(
send
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, terminate
, ProcessTerminationException(..)
, getSelfPid
, getSelfNode
, link
, unlink
, monitor
, unmonitor
, say
, register
, unregister
, whereis
, nsend
, registerRemote
, unregisterRemote
, whereisRemote
, whereisRemoteAsync
, nsendRemote
, unClosure
, catch
, mask
, onException
, bracket
, bracket_
, finally
, expectTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Data.Binary (decode)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception, throwIO, SomeException)
import qualified Control.Exception as Ex (catch, mask)
import Control.Concurrent.MVar (modifyMVar)
import Control.Concurrent.Chan (writeChan)
import Control.Concurrent.STM
( STM
, atomically
, orElse
, newTChan
, readTChan
, newTVar
, readTVar
, writeTVar
)
import Control.Distributed.Process.Internal.CQueue (dequeue, BlockSpec(..))
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static (Closure)
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unclosure)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, NCMsg(..)
, ProcessSignal(..)
, monitorCounter
, spawnCounter
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, createMessage
, runLocalProcess
)
import Control.Distributed.Process.Internal.Node (sendMessage, sendBinary)
send :: Serializable a => ProcessId -> a -> Process ()
send them msg = do
proc <- ask
liftIO $ sendMessage (processNode proc)
(ProcessIdentifier (processId proc))
(ProcessIdentifier them)
msg
expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
proc <- ask
liftIO . modifyMVar (processState proc) $ \st -> do
chan <- liftIO . atomically $ newTChan
let lcid = st ^. channelCounter
cid = SendPortId { sendPortProcessId = processId proc
, sendPortLocalId = lcid
}
sport = SendPort cid
rport = ReceivePortSingle chan
tch = TypedChannel chan
return ( (channelCounter ^: (+ 1))
. (typedChannelWithId lcid ^= Just tch)
$ st
, (sport, rport)
)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort cid) msg = do
proc <- ask
liftIO $ sendBinary (processNode proc)
(ProcessIdentifier (processId proc))
(SendPortIdentifier cid)
msg
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan = liftIO . atomically . receiveSTM
where
receiveSTM :: ReceivePort a -> STM a
receiveSTM (ReceivePortSingle c) =
readTChan c
receiveSTM (ReceivePortBiased ps) =
foldr1 orElse (map receiveSTM ps)
receiveSTM (ReceivePortRR psVar) = do
ps <- readTVar psVar
a <- foldr1 orElse (map receiveSTM ps)
writeTVar psVar (rotate ps)
return a
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePortBiased
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR ps = liftIO . atomically $ ReceivePortRR <$> newTVar ps
newtype Match b = Match { unMatch :: Message -> Maybe (Process b) }
receiveWait :: [Match b] -> Process b
receiveWait ms = do
queue <- processQueue <$> ask
Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms)
proc
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout t ms = do
queue <- processQueue <$> ask
let blockSpec = if t == 0 then NonBlocking else Timeout t
mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms)
case mProc of
Nothing -> return Nothing
Just proc -> Just <$> proc
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = matchIf (const True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf c p = Match $ \msg ->
let decoded :: a
decoded = decode . messageEncoding $ msg in
if messageFingerprint msg == fingerprint (undefined :: a) && c decoded
then Just $ p decoded
else Nothing
matchUnknown :: Process b -> Match b
matchUnknown = Match . const . Just
data ProcessTerminationException = ProcessTerminationException
deriving (Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate = liftIO $ throwIO ProcessTerminationException
getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask
getSelfNode :: Process NodeId
getSelfNode = localNodeId . processNode <$> ask
link :: ProcessId -> Process ()
link = sendCtrlMsg Nothing . Link . ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor = monitor' . ProcessIdentifier
unlink :: ProcessId -> Process ()
unlink pid = do
unlinkAsync pid
receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid)
(\_ -> return ())
]
unlinkNode :: NodeId -> Process ()
unlinkNode nid = do
unlinkNodeAsync nid
receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid)
(\_ -> return ())
]
unlinkPort :: SendPort a -> Process ()
unlinkPort sport = do
unlinkPortAsync sport
receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport)
(\_ -> return ())
]
unmonitor :: MonitorRef -> Process ()
unmonitor ref = do
unmonitorAsync ref
receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref)
(\_ -> return ())
]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch p h = do
lproc <- ask
liftIO $ Ex.catch (runLocalProcess lproc p) (runLocalProcess lproc . h)
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask p = do
lproc <- ask
liftIO $ Ex.mask $ \restore ->
runLocalProcess lproc (p (liftRestore lproc restore))
where
liftRestore :: LocalProcess -> (forall a. IO a -> IO a) -> (forall a. Process a -> Process a)
liftRestore lproc restoreIO = liftIO . restoreIO . runLocalProcess lproc
onException :: Process a -> Process b -> Process a
onException p what = p `catch` \e -> do _ <- what
liftIO $ throwIO (e :: SomeException)
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket before after thing = do
mask $ \restore -> do
a <- before
r <- restore (thing a) `onException` after a
_ <- after a
return r
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ before after thing = bracket before (const after) (const thing)
finally :: Process a -> Process b -> Process a
finally a sequel = bracket_ (return ()) sequel a
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout timeout = receiveTimeout timeout [match return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync nid proc = do
spawnRef <- getSpawnRef
sendCtrlMsg (Just nid) $ Spawn proc spawnRef
return spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
monitor' . NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort cid) =
monitor' (SendPortIdentifier cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
sendCtrlMsg Nothing . Unmonitor
linkNode :: NodeId -> Process ()
linkNode = link' . NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort (SendPort cid) =
link' (SendPortIdentifier cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
sendCtrlMsg Nothing . Unlink . ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
sendCtrlMsg Nothing . Unlink . NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync (SendPort cid) =
sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid
say :: String -> Process ()
say string = do
now <- liftIO getCurrentTime
us <- getSelfPid
nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string)
register :: String -> ProcessId -> Process ()
register label pid =
sendCtrlMsg Nothing (Register label (Just pid))
registerRemote :: NodeId -> String -> ProcessId -> Process ()
registerRemote nid label pid =
sendCtrlMsg (Just nid) (Register label (Just pid))
unregister :: String -> Process ()
unregister label =
sendCtrlMsg Nothing (Register label Nothing)
unregisterRemote :: NodeId -> String -> Process ()
unregisterRemote nid label =
sendCtrlMsg (Just nid) (Register label Nothing)
whereis :: String -> Process (Maybe ProcessId)
whereis label = do
sendCtrlMsg Nothing (WhereIs label)
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote nid label = do
whereisRemoteAsync nid label
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync nid label =
sendCtrlMsg (Just nid) (WhereIs label)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
sendCtrlMsg Nothing (NamedSend label (createMessage msg))
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote nid label msg =
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
unClosure :: forall a. Typeable a => Closure a -> Process a
unClosure closure = do
rtable <- remoteTable . processNode <$> ask
case Static.unclosure rtable closure of
Left err -> fail $ "Could not resolve closure: " ++ err
Right x -> return x
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor ident = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. monitorCounter
return ( monitorCounter ^: (+ 1) $ st
, MonitorRef ident counter
)
getSpawnRef :: Process SpawnRef
getSpawnRef = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. spawnCounter
return ( spawnCounter ^: (+ 1) $ st
, SpawnRef counter
)
monitor' :: Identifier -> Process MonitorRef
monitor' ident = do
monitorRef <- getMonitorRefFor ident
sendCtrlMsg Nothing $ Monitor monitorRef
return monitorRef
link' :: Identifier -> Process ()
link' = sendCtrlMsg Nothing . Link
sendCtrlMsg :: Maybe NodeId
-> ProcessSignal
-> Process ()
sendCtrlMsg mNid signal = do
proc <- ask
let msg = NCMsg { ctrlMsgSender = ProcessIdentifier (processId proc)
, ctrlMsgSignal = signal
}
case mNid of
Nothing -> do
ctrlChan <- localCtrlChan . processNode <$> ask
liftIO $ writeChan ctrlChan msg
Just nid ->
liftIO $ sendBinary (processNode proc)
(ProcessIdentifier (processId proc))
(NodeIdentifier nid)
msg