transient-universe-0.6.0.1: fully composable remote execution for the creation of distributed systems

Safe HaskellNone
LanguageHaskell2010

Transient.Move

Contents

Description

transient-universe extends the seamless composability of concurrent multi-threaded programs provided by transient to a multi-node cloud. Distributed concurrent programs are created and composed seamlessly and effortlessly as if they were written for a single node. transient-universe has diverse applications from simple distributed applications to massively parallel and distributed map-reduce problems. If you are considering Apache Spark or Cloud Haskell then transient might be a simpler yet better solution for you.

Transient makes it easy to write composable, distributed event driven reactive UI applications with client side and server side code composed freely in the same application. For example, Axiom is a transient based unified client and server side web application framework that provides a better programming model and composability compared to frameworks like ReactJS.

Overview

The Cloud monad adds the following facilities to complement the TransIO monad:

  • Create a distributed compute cluster of nodes
  • Move computations across nodes at any point during computation
  • Run computations on multiple nodes in parallel

Further Reading

Synopsis

Running the Monad

newtype Cloud a Source #

Constructors

Cloud 

Fields

Instances
Monad Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

(>>=) :: Cloud a -> (a -> Cloud b) -> Cloud b #

(>>) :: Cloud a -> Cloud b -> Cloud b #

return :: a -> Cloud a #

fail :: String -> Cloud a #

Functor Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

fmap :: (a -> b) -> Cloud a -> Cloud b #

(<$) :: a -> Cloud b -> Cloud a #

MonadFail Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

fail :: String -> Cloud a #

Applicative Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

pure :: a -> Cloud a #

(<*>) :: Cloud (a -> b) -> Cloud a -> Cloud b #

liftA2 :: (a -> b -> c) -> Cloud a -> Cloud b -> Cloud c #

(*>) :: Cloud a -> Cloud b -> Cloud b #

(<*) :: Cloud a -> Cloud b -> Cloud a #

Alternative Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

empty :: Cloud a #

(<|>) :: Cloud a -> Cloud a -> Cloud a #

some :: Cloud a -> Cloud [a] #

many :: Cloud a -> Cloud [a] #

AdditionalOperators Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

(**>) :: Cloud a -> Cloud b -> Cloud b #

(<**) :: Cloud a -> Cloud b -> Cloud a #

atEnd' :: Cloud a -> Cloud b -> Cloud a #

(<***) :: Cloud a -> Cloud b -> Cloud a #

atEnd :: Cloud a -> Cloud b -> Cloud a #

MonadState EventF Cloud Source # 
Instance details

Defined in Transient.Move.Internals

Methods

get :: Cloud EventF #

put :: EventF -> Cloud () #

state :: (EventF -> (a, EventF)) -> Cloud a #

(Eq a, Fractional a) => Fractional (Cloud a) Source # 
Instance details

Defined in Transient.Move.Internals

Methods

(/) :: Cloud a -> Cloud a -> Cloud a #

recip :: Cloud a -> Cloud a #

fromRational :: Rational -> Cloud a #

(Num a, Eq a) => Num (Cloud a) Source # 
Instance details

Defined in Transient.Move.Internals

Methods

(+) :: Cloud a -> Cloud a -> Cloud a #

(-) :: Cloud a -> Cloud a -> Cloud a #

(*) :: Cloud a -> Cloud a -> Cloud a #

negate :: Cloud a -> Cloud a #

abs :: Cloud a -> Cloud a #

signum :: Cloud a -> Cloud a #

fromInteger :: Integer -> Cloud a #

Monoid a => Semigroup (Cloud a) Source # 
Instance details

Defined in Transient.Move.Internals

Methods

(<>) :: Cloud a -> Cloud a -> Cloud a #

sconcat :: NonEmpty (Cloud a) -> Cloud a #

stimes :: Integral b => b -> Cloud a -> Cloud a #

Monoid a => Monoid (Cloud a) Source # 
Instance details

Defined in Transient.Move.Internals

Methods

mempty :: Cloud a #

mappend :: Cloud a -> Cloud a -> Cloud a #

mconcat :: [Cloud a] -> Cloud a #

runCloud :: Cloud a -> TransIO a Source #

Execute a distributed computation inside a TransIO computation. All the computations in the TransIO monad that enclose the cloud computation must be logged

runCloudIO :: Typeable a => Cloud a -> IO (Maybe a) Source #

Run a distributed computation inside the IO monad. Enables asynchronous console input (see keep).

runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a) Source #

Run a distributed computation inside the IO monad with no console input.

Node & Cluster Management

To join the cluster a node connects to a well known node already part of the cluster.

import Transient.Move (runCloudIO, lliftIO, createNode, connect, getNodes, onAll)

main = runCloudIO $ do
    this   <- lliftIO (createNode "192.168.1.2" 8000)
    master <- lliftIO (createNode "192.168.1.1" 8000)
    connect this master
    onAll getNodes >>= lliftIO . putStrLn . show

data Node Source #

Constructors

Node 
Instances
Eq Node Source # 
Instance details

Defined in Transient.Move.Internals

Methods

(==) :: Node -> Node -> Bool #

(/=) :: Node -> Node -> Bool #

Ord Node Source # 
Instance details

Defined in Transient.Move.Internals

Methods

compare :: Node -> Node -> Ordering #

(<) :: Node -> Node -> Bool #

(<=) :: Node -> Node -> Bool #

(>) :: Node -> Node -> Bool #

(>=) :: Node -> Node -> Bool #

max :: Node -> Node -> Node #

min :: Node -> Node -> Node #

Read Node Source # 
Instance details

Defined in Transient.Move.Internals

Show Node Source # 
Instance details

Defined in Transient.Move.Internals

Methods

showsPrec :: Int -> Node -> ShowS #

show :: Node -> String #

showList :: [Node] -> ShowS #

Indexable Suscribed Source # 
Instance details

Defined in Transient.Move.PubSub

Serializable Suscribed Source # 
Instance details

Defined in Transient.Move.PubSub

Loggable Node Source # 
Instance details

Defined in Transient.Move.Internals

Creating nodes

type Service = [(SKey, SValue)] Source #

createNodeServ :: HostName -> Int -> [Service] -> IO Node Source #

Create a node from a hostname (or IP address), port number and a list of services.

Joining the cluster

connect :: Node -> Node -> Cloud () Source #

Add a node (first parameter) to the cluster using a node that is already part of the cluster (second parameter). The added node starts listening for incoming connections and the rest of the computation is executed on this newly added node.

connect' :: Node -> Cloud () Source #

Reconcile the list of nodes in the cluster using a remote node already part of the cluster. Reconciliation end up in each node in the cluster having the same list of nodes.

listen :: Node -> Cloud () Source #

Setup the node to start listening for incoming connections.

addNodes :: [Node] -> TransIO () Source #

Add a list of nodes to the list of existing nodes know locally. If the node is already present, It add his services to the already present node services which have the first element equal (usually the "name" field) will be substituted if the match

addThisNodeToRemote :: Cloud () Source #

add this node to the list of know nodes in the remote node connected by a wormhole. This is useful when the node is called back by the remote node. In the case of web nodes with webSocket connections, this is the way to add it to the list of known nodes in the server.

shuffleNodes :: MonadIO m => m [Node] Source #

Shuffle the list of cluster nodes and return the shuffled list.

Querying nodes

getMyNode :: TransIO Node Source #

Return the local node i.e. the node where this computation is running.

getNodes :: MonadIO m => m [Node] Source #

Return the list of nodes in the cluster.

getEqualNodes :: TransIO [Node] Source #

get the nodes that have the same service definition that the calling node

isBrowserInstance :: Bool Source #

Returns True if we are running in the browser.

Running Local Computations

local :: Loggable a => TransIO a -> Cloud a Source #

Means that this computation will be executed in the current node. the result will be logged so the closure will be recovered if the computation is translated to other node by means of primitives like beamTo, forkTo, runAt, teleport, clustered, mclustered etc

onAll :: TransIO a -> Cloud a Source #

alternative to local It means that if the computation is translated to other node this will be executed again if this has not been executed inside a local computation.

onAll foo
local foo'
local $ do
      bar
      runCloud $ do
              onAll baz
              runAt node ....
runAt node' .....

foo bar and baz will e executed locally. But foo will be executed remotely also in node' while foo' bar and baz don't.

lazy :: TransIO a -> Cloud a Source #

only executes if the result is demanded. It is useful when the conputation result is only used in the remote node, but it is not serializable. All the state changes executed in the argument with setData setState etc. are lost

localFix :: Cloud () Source #

One problem of forwarding closures for streaming is that it could transport not only the data but extra information that reconstruct the closure in the destination node. In a single in-single out interaction It may not be a problem, but think, for example, when I have to synchronize N editors by forwarding small modifications, or worst of all, when transmitting packets of audio or video. But the size of the closure, that is, the amount of variables that I have to transport increases when the code is more complex. But transient build closures upon closures, so It has to send only what has changed since the last interaction.

In one-to-one interactions whithin a wormhole, this is automatic, but when there are different wormholes involved, it is necessary to tell explicitly what is the closure that will continue the execution. this is what localFix does. otherwise it will use the closure 0.

main= do
     filename <- local input
     source <- atServer $ local $ readFile filename
     local $ render source inEditor
    --  send upto here one single time please,  so I only stream the deltas
     localFix
     delta <- react  onEachChange
     forallNodes $ update delta

if forwardChanges send to all the nodes editing the document, the data necessary to reconstruct the closure would include even the source code of the file on EACH change. Fortunately it is possible to fix a closure that will not change in all the remote nodes so after that, I only have to send the only necessary variable, the delta. This is as efficient as an hand-made socket writeforkThreadreadSocket loop for each node.

fixRemote :: TransIO b -> Cloud b Source #

executes a non-serilizable action in the remote node, whose result can be used by subsequent remote invocations

lliftIO :: Loggable a => IO a -> Cloud a Source #

the Cloud monad has no MonadIO instance. `lliftIO= local . liftIO`

localIO :: Loggable a => IO a -> Cloud a Source #

`localIO = lliftIO`

Moving Computations

wormhole :: Loggable b => Node -> Cloud b -> Cloud b Source #

A wormhole opens a connection with another node anywhere in a computation. teleport uses this connection to translate the computation back and forth between the two nodes connected. If the connection fails, it search the network for suitable relay nodes to reach the destination node.

copyData :: Loggable b => b -> Cloud b Source #

copy a session data variable from the local to the remote node. If there is none set in the local node, The parameter is the default value. In this case, the default value is also set in the local node.

fixClosure :: Cloud () Source #

subsequent remote invocatioms will send logs to this closure. Therefore logs will be shorter.

Also, non serializable statements before it will not be re-executed

Running at a Remote Node

beamTo :: Node -> Cloud () Source #

continue the execution in a new node

forkTo :: Node -> Cloud () Source #

execute in the remote node a process with the same execution state

callTo :: Loggable a => Node -> Cloud a -> Cloud a Source #

open a wormhole to another node and executes an action on it. currently by default it keep open the connection to receive additional requests and responses (streaming)

runAt :: Loggable a => Node -> Cloud a -> Cloud a Source #

Execute a computation in the node that initiated the connection.

if the sequence of connections is n1 -> n2 -> n3 then `atCallingNode $ atCallingNode foo` in n3 would execute foo in n1, -- while `atRemote $ atRemote foo` would execute it in n3 atCallingNode :: Loggable a => Cloud a -> Cloud a atCallingNode proc= connectCaller $ atRemote proc

synonymous of callTo

atRemote :: Loggable a => Cloud a -> Cloud a Source #

Within a connection to a node opened by wormhole, it run the computation in the remote node and return the result back to the original node.

If atRemote is executed in the remote node, then the computation is executed in the original node

wormhole node2 $ do
    t <- atRemote $ do
          r <- foo              -- executed in node2
          s <- atRemote bar r   -- executed in the original node
          baz s                 -- in node2
    bat t                      -- in the original node

setSynchronous :: Bool -> TransIO () Source #

set remote invocations synchronous this is necessary when data is transfered very fast from node to node in a stream non-deterministically in order to keep the continuation of the calling node unchanged until the arrival of the response since all the calls share a single continuation in the calling node.

If there is no response from the remote node, the streaming is interrupted

main= keep $ initNode $  onBrowser $  do
 local $ setSynchronous True
 line  <- local $  threads 0 $ choose[1..10::Int]
 localIO $ print ("1",line)
 atRemote $ localIO $ print line
 localIO $ print ("2", line)

Running at Multiple Nodes

clustered :: Loggable a => Cloud a -> Cloud a Source #

execute a Transient action in each of the nodes connected.

The response of each node is received by the invoking node and processed by the rest of the procedure. By default, each response is processed in a new thread. To restrict the number of threads use the thread control primitives.

this snippet receive a message from each of the simulated nodes:

main = keep $ do
   let nodes= map createLocalNode [2000..2005]
   addNodes nodes
   (foldl (<|>) empty $ map listen nodes) <|> return ()

   r <- clustered $ do
              Connection (Just(PortNumber port, _, _, _)) _ <- getSData
              return $ "hi from " ++ show port++ "\n"
   liftIO $ putStrLn r
   where
   createLocalNode n= createNode "localhost" (PortNumber n)

callNodes :: (Loggable a1, Loggable a2) => (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1 Source #

callNodes' :: (Loggable a1, Loggable a2) => [Node] -> (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1 Source #

foldNet :: Loggable a => (Cloud a -> Cloud a -> Cloud a) -> Cloud a -> Cloud a -> Cloud a Source #

crawl the nodes executing the same action in each node and accumulate the results using a binary operator

Messaging

putMailbox :: Typeable val => val -> TransIO () #

write to the mailbox Mailboxes are node-wide, for all processes that share the same connection data, that is, are under the same listen or connect while EVars are only visible by the process that initialized it and his children. Internally, the mailbox is in a well known EVar stored by listen in the Connection state.

putMailbox' :: (Typeable key, Ord key, Typeable val) => key -> val -> TransIO () #

write to a mailbox identified by an identifier besides the type

getMailbox :: Typeable val => TransIO val #

get messages from the mailbox that matches with the type expected. The order of reading is defined by readTChan This is reactive. it means that each new message trigger the execution of the continuation each message wake up all the getMailbox computations waiting for it.

getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val #

read from a mailbox identified by an identifier besides the type

deleteMailbox :: Typeable a => a -> TransIO () #

delete all subscriptions for that mailbox expecting this kind of data

deleteMailbox' :: (Typeable key, Ord key, Typeable a) => key -> a -> TransIO () #

clean a mailbox identified by an Int and the type

Thread Control

single :: TransIO a -> TransIO a Source #

run a single thread with that action for each connection created. When the same action is re-executed within that connection, all the threads generated by the previous execution are killed

  box <-  foo
  r <- runAt node . local . single $ getMailbox box
  localIO $ print r

if foo return different mainbox indentifiers, the above code would print the messages of the last one. Without single, it would print the messages of all of them since each call would install a new getMailBox for each one of them

unique :: TransIO a -> TransIO a Source #

run an unique continuation for each connection. The first thread that execute unique is executed for that connection. The rest are ignored.

Buffering Control

REST API

api :: TransIO ByteString -> Cloud () Source #

forward all the result of the Transient computation to the opened connection

noHTTP :: Cloud () Source #

filter out HTTP requests