| Safe Haskell | None | 
|---|---|
| Language | Haskell2010 | 
Transient.Move
Contents
Description
transient-universe extends the seamless composability of concurrent
 multi-threaded programs provided by
 transient
 to a multi-node cloud.  Distributed concurrent programs are created and
 composed seamlessly and effortlessly as if they were written for a single
 node.  transient-universe has diverse applications from simple distributed
 applications to massively parallel and distributed map-reduce problems.  If
 you are considering Apache Spark or Cloud Haskell then transient might be a
 simpler yet better solution for you.
Transient makes it easy to write composable, distributed event driven reactive UI applications with client side and server side code composed freely in the same application. For example, Axiom is a transient based unified client and server side web application framework that provides a better programming model and composability compared to frameworks like ReactJS.
Overview
The Cloud monad adds the following facilities to complement the TransIO
 monad:
- Create a distributed compute cluster of nodes
- Move computations across nodes at any point during computation
- Run computations on multiple nodes in parallel
Further Reading
- newtype Cloud a = Cloud {- runCloud' :: TransIO a
 
- runCloud :: Cloud a -> TransIO a
- runCloudIO :: Typeable a => Cloud a -> IO (Maybe a)
- runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a)
- data Node = Node {- nodeHost :: HostName
- nodePort :: Int
- connection :: Maybe (MVar Pool)
- nodeServices :: Service
 
- type Service = [(Package, Program)]
- createNodeServ :: HostName -> Int -> Service -> IO Node
- createNode :: HostName -> Int -> IO Node
- createWebNode :: IO Node
- connect :: Node -> Node -> Cloud ()
- connect' :: Node -> Cloud ()
- listen :: Node -> Cloud ()
- addNodes :: [Node] -> TransIO ()
- shuffleNodes :: MonadIO m => m [Node]
- getMyNode :: TransIO Node
- getWebServerNode :: TransIO Node
- getNodes :: MonadIO m => m [Node]
- nodeList :: TVar [Node]
- isBrowserInstance :: Bool
- local :: Loggable a => TransIO a -> Cloud a
- onAll :: TransIO a -> Cloud a
- lazy :: TransIO a -> Cloud a
- fixRemote :: TransIO b -> Cloud b
- loggedc :: Loggable a => Cloud a -> Cloud a
- lliftIO :: Loggable a => IO a -> Cloud a
- localIO :: Loggable a => IO a -> Cloud a
- fullStop :: TransIO stop
- wormhole :: Loggable a => Node -> Cloud a -> Cloud a
- teleport :: Cloud ()
- copyData :: (Typeable * b, Read b, Show b) => b -> Cloud b
- fixClosure :: Cloud ()
- beamTo :: Node -> Cloud ()
- forkTo :: Node -> Cloud ()
- callTo :: Loggable a => Node -> Cloud a -> Cloud a
- runAt :: Loggable a => Node -> Cloud a -> Cloud a
- atRemote :: Loggable a => Cloud a -> Cloud a
- clustered :: Loggable a => Cloud a -> Cloud a
- mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a
- callNodes :: (Show a2, Show a1, Read a2, Read a1, Typeable * a2, Typeable * a1) => (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1
- putMailbox :: Typeable val => val -> TransIO ()
- putMailbox' :: (Typeable key, Ord key, Typeable val) => key -> val -> TransIO ()
- getMailbox :: Typeable val => TransIO val
- getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val
- cleanMailbox :: Typeable a => a -> TransIO ()
- cleanMailbox' :: Typeable a => Int -> a -> TransIO ()
- single :: TransIO a -> TransIO a
- unique :: a -> TransIO ()
- setBuffSize :: Int -> TransIO ()
- getBuffSize :: TransIO BuffSize
- api :: TransIO ByteString -> Cloud ()
- data HTTPMethod
- type PostParams = [(ByteString, String)]
Running the Monad
runCloud :: Cloud a -> TransIO a Source #
Execute a distributed computation inside a TransIO computation.
 All the  computations in the TransIO monad that enclose the cloud computation must be logged
runCloudIO :: Typeable a => Cloud a -> IO (Maybe a) Source #
Run a distributed computation inside the IO monad. Enables asynchronous
 console input (see keep).
runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a) Source #
Run a distributed computation inside the IO monad with no console input.
Node & Cluster Management
To join the cluster a node connects to a well known node already part of
 the cluster.
import Transient.Move (runCloudIO, lliftIO, createNode, connect, getNodes, onAll)
main = runCloudIO $ do
    this   <- lliftIO (createNode "192.168.1.2" 8000)
    master <- lliftIO (createNode "192.168.1.1" 8000)
    connect this master
    onAll getNodes >>= lliftIO . putStrLn . show
Creating nodes
createNodeServ :: HostName -> Int -> Service -> IO Node Source #
Create a node from a hostname (or IP address), port number and a list of services.
createWebNode :: IO Node Source #
Joining the cluster
connect :: Node -> Node -> Cloud () Source #
Add a node (first parameter) to the cluster using a node that is already part of the cluster (second parameter). The added node starts listening for incoming connections and the rest of the computation is executed on this newly added node.
connect' :: Node -> Cloud () Source #
Reconcile the list of nodes in the cluster using a remote node already part of the cluster. Reconciliation end up in each node in the cluster having the same list of nodes.
shuffleNodes :: MonadIO m => m [Node] Source #
Shuffle the list of cluster nodes and return the shuffled list.
Querying nodes
getMyNode :: TransIO Node Source #
Return the local node i.e. the node where this computation is running.
getWebServerNode :: TransIO Node Source #
isBrowserInstance :: Bool Source #
Returns True if we are running in the browser.
Running Local Computations
onAll :: TransIO a -> Cloud a Source #
alternative to local It means that if the computation is translated to other node
 this will be executed again if this has not been executed inside a local computation.
onAll foo
local foo'
local $ do
      bar
      runCloud $ do
              onAll baz
              runAt node ....
callTo node' .....Here foo will be executed in node' but foo' bar and baz don't.
However foo bar and baz will e executed in node.
lazy :: TransIO a -> Cloud a Source #
only executes if the result is demanded. It is useful when the conputation result is only used in the remote node, but it is not serializable.
fixRemote :: TransIO b -> Cloud b Source #
executes a non-serilizable action in the remote node, whose result can be used by subsequent remote invocations
fullStop :: TransIO stop Source #
stop the current computation and does not execute any alternative computation
Moving Computations
wormhole :: Loggable a => Node -> Cloud a -> Cloud a Source #
A wormhole opens a connection with another node anywhere in a computation.
 teleport uses this connection to translate the computation back and forth between the two nodes connected
copyData :: (Typeable * b, Read b, Show b) => b -> Cloud b Source #
copy a session data variable from the local to the remote node. If there is none set in the local node, The parameter is the default value. In this case, the default value is also set in the local node.
fixClosure :: Cloud () Source #
experimental: subsequent remote invocatioms will send logs to this closure. Therefore logs will be shorter.
Also, non serializable statements before it will not be re-executed
Running at a Remote Node
forkTo :: Node -> Cloud () Source #
execute in the remote node a process with the same execution state
callTo :: Loggable a => Node -> Cloud a -> Cloud a Source #
open a wormhole to another node and executes an action on it. currently by default it keep open the connection to receive additional requests and responses (streaming)
runAt :: Loggable a => Node -> Cloud a -> Cloud a Source #
Execute a computation in the node that initiated the connection.
if the sequence of connections is  n1 -> n2 -> n3 then  `atCallingNode $ atCallingNode foo` in n3 
 would execute foo in n1, -- while `atRemote $ atRemote foo` would execute it in n3
 atCallingNode :: Loggable a => Cloud a -> Cloud a
 atCallingNode proc=  connectCaller $ atRemote proc 
synonymous of callTo
atRemote :: Loggable a => Cloud a -> Cloud a Source #
Within a connection to a node opened by wormhole, it run the computation in the remote node and return
 the result back to the original node.
If atRemote is executed in the remote node, then the computation is executed in the original node
wormhole node2 $ do
    t <- atRemote $ do
          r <- foo              -- executed in node2
          s <- atRemote bar r   -- executed in the original node
          baz s                 -- in node2
    bat t                      -- in the original nodeRunning at Multiple Nodes
clustered :: Loggable a => Cloud a -> Cloud a Source #
execute a Transient action in each of the nodes connected.
The response of each node is received by the invoking node and processed by the rest of the procedure. By default, each response is processed in a new thread. To restrict the number of threads use the thread control primitives.
this snippet receive a message from each of the simulated nodes:
main = keep $ do
   let nodes= map createLocalNode [2000..2005]
   addNodes nodes
   (foldl (<|>) empty $ map listen nodes) <|> return ()
   r <- clustered $ do
              Connection (Just(PortNumber port, _, _, _)) _ <- getSData
              return $ "hi from " ++ show port++ "\n"
   liftIO $ putStrLn r
   where
   createLocalNode n= createNode "localhost" (PortNumber n)callNodes :: (Show a2, Show a1, Read a2, Read a1, Typeable * a2, Typeable * a1) => (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1 Source #
Messaging
putMailbox :: Typeable val => val -> TransIO () Source #
write to the mailbox
 Mailboxes are node-wide, for all processes that share the same connection data, that is, are under the
 same listen  or connect
 while EVars are only visible by the process that initialized  it and his children.
 Internally, the mailbox is in a well known EVar stored by listen in the Connection state.
putMailbox' :: (Typeable key, Ord key, Typeable val) => key -> val -> TransIO () Source #
write to a mailbox identified by an identifier besides the type
getMailbox :: Typeable val => TransIO val Source #
get messages from the mailbox that matches with the type expected.
 The order of reading is defined by readTChan
 This is reactive. it means that each new message trigger the execution of the continuation
 each message wake up all the getMailbox computations waiting for it.
getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val Source #
read from a mailbox identified by an identifier besides the type
cleanMailbox :: Typeable a => a -> TransIO () Source #
delete all subscriptions for that mailbox expecting this kind of data
cleanMailbox' :: Typeable a => Int -> a -> TransIO () Source #
clean a mailbox identified by an Int and the type
Thread Control
single :: TransIO a -> TransIO a Source #
run a single thread with that action for each connection created. When the same action is re-executed within that connection, all the threads generated by the previous execution are killed
box <- foo r <- runAt node . local . single $ getMailbox box localIO $ print r
if foo return differnt mainbox indentifiers, the above code would print the messages of the last one. Without single, it would print the messages of all of them.
unique :: a -> TransIO () Source #
run an unique continuation for each connection. The first thread that execute unique is
 executed for that connection. The rest are ignored.
Buffering Control
setBuffSize :: Int -> TransIO () Source #
getBuffSize :: TransIO BuffSize Source #
REST API
api :: TransIO ByteString -> Cloud () Source #
data HTTPMethod Source #
Instances
type PostParams = [(ByteString, String)] Source #