transient-universe-0.4.4.1: Distributed computing with algebraic/monadic composability, map-reduce

Safe HaskellNone
LanguageHaskell2010

Transient.Move

Contents

Description

transient-universe extends the seamless composability of concurrent multi-threaded programs provided by transient to a multi-node cloud. Distributed concurrent programs are created and composed seamlessly and effortlessly as if they were written for a single node. transient-universe has diverse applications from simple distributed applications to massively parallel and distributed map-reduce problems. If you are considering Apache Spark or Cloud Haskell then transient might be a simpler yet better solution for you.

Transient makes it easy to write composable, distributed event driven reactive UI applications with client side and server side code composed freely in the same application. For example, Axiom is a transient based unified client and server side web application framework that provides a better programming model and composability compared to frameworks like ReactJS.

Overview

The Cloud monad adds the following facilities to complement the TransIO monad:

  • Create a distributed compute cluster of nodes
  • Move computations across nodes at any point during computation
  • Run computations on multiple nodes in parallel

Further Reading

Synopsis

Running the Monad

newtype Cloud a Source #

Constructors

Cloud 

Fields

Instances

Monad Cloud Source # 

Methods

(>>=) :: Cloud a -> (a -> Cloud b) -> Cloud b #

(>>) :: Cloud a -> Cloud b -> Cloud b #

return :: a -> Cloud a #

fail :: String -> Cloud a #

Functor Cloud Source # 

Methods

fmap :: (a -> b) -> Cloud a -> Cloud b #

(<$) :: a -> Cloud b -> Cloud a #

Applicative Cloud Source # 

Methods

pure :: a -> Cloud a #

(<*>) :: Cloud (a -> b) -> Cloud a -> Cloud b #

(*>) :: Cloud a -> Cloud b -> Cloud b #

(<*) :: Cloud a -> Cloud b -> Cloud a #

Alternative Cloud Source # 

Methods

empty :: Cloud a #

(<|>) :: Cloud a -> Cloud a -> Cloud a #

some :: Cloud a -> Cloud [a] #

many :: Cloud a -> Cloud [a] #

MonadState EventF Cloud Source # 

Methods

get :: Cloud EventF #

put :: EventF -> Cloud () #

state :: (EventF -> (a, EventF)) -> Cloud a #

(Eq a, Num a) => Num (Cloud a) Source # 

Methods

(+) :: Cloud a -> Cloud a -> Cloud a #

(-) :: Cloud a -> Cloud a -> Cloud a #

(*) :: Cloud a -> Cloud a -> Cloud a #

negate :: Cloud a -> Cloud a #

abs :: Cloud a -> Cloud a #

signum :: Cloud a -> Cloud a #

fromInteger :: Integer -> Cloud a #

Monoid a => Monoid (Cloud a) Source # 

Methods

mempty :: Cloud a #

mappend :: Cloud a -> Cloud a -> Cloud a #

mconcat :: [Cloud a] -> Cloud a #

runCloud :: Cloud b -> TransIO b Source #

Execute a distributed computation in the TransIO monad. Note that all the computations inside the TransIO monad that enclose the cloud computation must be logged.

runCloudIO :: Typeable a => Cloud a -> IO (Maybe a) Source #

Run a distributed computation inside the IO monad. Enables asynchronous console input (see keep).

runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a) Source #

Run a distributed computation inside the IO monad with no console input.

Node & Cluster Management

To join the cluster a node connects to a well known node already part of the cluster.

import Transient.Move (runCloudIO, lliftIO, createNode, connect, getNodes, onAll)

main = runCloudIO $ do
    this   <- lliftIO (createNode "192.168.1.2" 8000)
    master <- lliftIO (createNode "192.168.1.1" 8000)
    connect this master
    onAll getNodes >>= lliftIO . putStrLn . show

data Node Source #

Constructors

Node 

Instances

Creating nodes

createNodeServ :: HostName -> Integer -> [Service] -> IO Node Source #

Create a node from a hostname (or IP address), port number and a list of services.

createNode :: HostName -> Integer -> IO Node Source #

Create a node from a hostname (or IP address) and port number. The node is created without any services.

Joining the cluster

connect :: Node -> Node -> Cloud () Source #

Add a node (first parameter) to the cluster using a node that is already part of the cluster (second parameter). The added node starts listening for incoming connections and the rest of the computation is executed on this newly added node.

connect' :: Node -> Cloud () Source #

Reconcile the list of nodes in the cluster using a remote node already part of the cluster. Reconciliation results in each node in the cluster having exactly the same list of nodes.

listen :: Node -> Cloud () Source #

Setup the node to start listening for incoming connections.

addNodes :: [Node] -> TransIO () Source #

Add a list of nodes to the list of existing cluster nodes.

shuffleNodes :: MonadIO m => m [Node] Source #

Shuffle the list of cluster nodes and return the shuffled list.

Querying nodes

getMyNode :: TransIO Node Source #

Return the local node i.e. the node where this computation is running.

getNodes :: MonadIO m => m [Node] Source #

Return the list of nodes in the cluster.

isBrowserInstance :: Bool Source #

Returns True if we are running in the browser.

Running Local Computations

local :: Loggable a => TransIO a -> Cloud a Source #

Means that this computation will be executed in the current node. the result will be logged so the closure will be recovered if the computation is translated to other node by means of primitives like beamTo, forkTo, runAt, teleport, clustered, mclustered etc

onAll :: TransIO a -> Cloud a Source #

alternative to local It means that if the computation is translated to other node this will be executed again if this has not been executed inside a local computation.

onAll foo
local foo'
local $ do
      bar
      runCloud $ do
              onAll baz
              runAt node ....
callTo node' .....

Here foo will be executed in node' but foo' bar and baz don't.

However foo bar and baz will e executed in node.

lliftIO :: Loggable a => IO a -> Cloud a Source #

the Cloud monad has no MonadIO instance. `lliftIO= local . liftIO`

localIO :: Loggable a => IO a -> Cloud a Source #

locally perform IO. `localIO = lliftIO`

fullStop :: Cloud stop Source #

stop the current computation and does not execute any alternative computation

Moving Computations

wormhole :: Loggable a => Node -> Cloud a -> Cloud a Source #

A wormhole opens a connection with another node anywhere in a computation. teleport uses this connection to translate the computation back and forth between the two nodes connected

teleport :: Cloud () Source #

translates computations back and forth between two nodes reusing a connection opened by wormhole

each teleport transport to the other node what is new in the log since the last teleport

It is used trough other primitives like runAt which involves two teleports:

runAt node= wormhole node $ loggedc $ do > teleport > r <- Cloud $ runCloud proc <** setData WasRemote > teleport > return r

copyData :: (Typeable * b, Read b, Show b) => b -> Cloud b Source #

copy a session data variable from the local to the remote node. If there is none set in the local node, The parameter is the default value. In this case, the default value is also set in the local node.

Running at a Remote Node

beamTo :: Node -> Cloud () Source #

continue the execution in a new node all the previous actions from listen to this statement must have been logged

forkTo :: Node -> Cloud () Source #

execute in the remote node a process with the same execution state

callTo :: Loggable a => Node -> Cloud a -> Cloud a Source #

open a wormhole to another node and executes an action on it. currently by default it keep open the connection to receive additional requests and responses (streaming)

runAt :: Loggable a => Node -> Cloud a -> Cloud a Source #

synonymous of callTo

atRemote :: Loggable a => Cloud a -> Cloud a Source #

Within an open connection to other node opened by wormhole, it run the computation in the remote node and return the result back to the original node.

Running at Multiple Nodes

clustered :: Loggable a => Cloud a -> Cloud a Source #

execute a Transient action in each of the nodes connected.

The response of each node is received by the invoking node and processed by the rest of the procedure. By default, each response is processed in a new thread. To restrict the number of threads use the thread control primitives.

this snippet receive a message from each of the simulated nodes:

main = keep $ do
   let nodes= map createLocalNode [2000..2005]
   addNodes nodes
   (foldl (<|>) empty $ map listen nodes) <|> return ()

   r <- clustered $ do
              Connection (Just(PortNumber port, _, _, _)) _ <- getSData
              return $ "hi from " ++ show port++ "\n"
   liftIO $ putStrLn r
   where
   createLocalNode n= createNode "localhost" (PortNumber n)

callNodes :: (Typeable * a, Typeable * a1, Read a, Read a1, Show a, Show a1) => (Cloud a1 -> Cloud a -> Cloud a) -> Cloud a -> Cloud a1 -> Cloud a Source #

Messaging

putMailbox :: Typeable a => a -> TransIO () Source #

write to the mailbox Mailboxes are node-wide, for all processes that share the same connection data, that is, are under the same listen or connect while EVars are only visible by the process that initialized it and his children. Internally, the mailbox is in a well known EVar stored by listen in the Connection state.

putMailbox' :: (Typeable b, Ord b, Typeable a) => b -> a -> TransIO () Source #

write to a mailbox identified by an Integer besides the type

getMailbox :: Typeable a => TransIO a Source #

get messages from the mailbox that matches with the type expected. The order of reading is defined by readTChan This is reactive. it means that each new message trigger the execution of the continuation each message wake up all the getMailbox computations waiting for it.

getMailbox' :: (Typeable b, Ord b, Typeable a) => b -> TransIO a Source #

read from a mailbox identified by a number besides the type

cleanMailbox :: Typeable a => a -> TransIO () Source #

delete all subscriptions for that mailbox expecting this kind of data

cleanMailbox' :: Typeable a => Int -> a -> TransIO () Source #

clean a mailbox identified by an Int and the type

Thread Control

single :: TransIO a -> TransIO a Source #

run a single thread with that action for each connection created. When the same action is re-executed within that connection, all the threads generated by the previous execution are killed

  box <-  foo
  r <- runAt node . local . single $ getMailbox box
  localIO $ print r

if foo return differnt mainbox indentifiers, the above code would print the messages of the last one. Without single, it would print the messages of all of them.

unique :: a -> TransIO () Source #

run an unique continuation for each connection. The first thread that execute unique is executed for that connection. The rest are ignored.

Buffering Control

REST API

Low Level APIs

data ParseContext a Source #

Constructors

IsString a => ParseContext (IO a) a