>
This module exports a simple, idiomatic implementation of the Actor Model.
> module Control.Concurrent.Actors (
>
> Actor
> , Loop
> , NextActor(..)
> , ActorM()
>
> , continue
> , continue_
> , done
> , aseq
>
> , send
>
> , receive
> , receiveList
>
> , Mailbox()
> , newMailbox
>
> , Action()
> , forkActor
> , forkActorUsing
> , forkLoop
> , runActorUsing
> , runLoop
> ) where
>
> import Control.Monad
> import Control.Monad.IO.Class
> import Control.Monad.Trans.Maybe
> import Control.Concurrent
> import Control.Applicative
TODO?:
- Function for combining mailboxes (make a monoid? Only allow doing this in IO?)
Alternately: what if we delegated a single 'mainActor' that is in IO, and which
any Actor can send a message to? This would be an IO event loop and we would run
this IO loop from 'main' which would block until... so tired.
RE: Mailbox Class:
- Having such a class is a decent idea anyway since we may want to have
a synchronous Mailbox type, in which case we should
Here we define the Actor environment, similar to IO, in which we can launch new
Actors and send messages to Actors in scope. The implementation is hidden from
the user to enforce these restrictions.
>
> newtype ActorM a = ActorM { actorM :: MaybeT IO a }
> deriving (Monad, Functor, Applicative,
> Alternative, MonadPlus, MonadIO)
>
> runActorM = runMaybeT . actorM
First we define an Actor: a function that takes an input, maybe returning a new
actor:
TODO: Consider making Actor the newtype and eliminating NextActor
newtype Actor i = Actor { actor :: i -> ActorM (Actor i) }
continue :: (i -> ActorM (Actor i)) -> ActorM (Actor i)
> type Actor i = i -> ActorM (NextActor i)
> newtype NextActor i = NextActor { nextActor :: Actor i }
Now some functions for building Actor computations:
>
> continue :: Actor i -> ActorM (NextActor i)
> continue = return . NextActor
IMPLEMENTATION NOTE:
when an actor terminates, its mailbox persists and we
currently provide no functions to query an actor's status.
Signaling an actor's termination should be done with
message passing.
>
> done :: ActorM (NextActor i)
> done = mzero
IMPLEMENTATION NOTE:
We might find that we can use the monoid abstraction, or
that we should make Actor a newtype for other reasons. For
now we have this for composing
>
> aseq :: Actor i -> Actor i -> Actor i
> aseq f g i = NextActor <$> (nextf <|> return g)
> where nextf = (`aseq` g) . nextActor <$> f i
A Loop is just an Actor that ignores its input. We provide some useful
functions for building and running such computations:
>
> type Loop = ActorM (NextActor ())
>
>
> continue_ :: Loop -> ActorM (NextActor i)
> continue_ = fmap (NextActor . fixConst . nextActor)
> where fixConst c = const $ continue_ $ c ()
Here we define the "mailbox" that an Actor collects messages from, and other
actors send messages to. It is simply a Chan with hidden implementation.
IMPLEMENTATION NOTE:
we make no attempt to ensure that only one actor is reading
from a given Chan. This means two Actors can share the work
reading from the same mailbox.
If we want to change this in the future, Mailbox will contain
a type :: TVar ThreadID
To implement synchronous chans (or singly-buffered chans),
we can use a SyncMailbox type containing an MVar and
possibly another Var for ensuring syncronicity. An MVar
writer will never block indefinitely. Use a class for writing
and reading these mailbox types.
>
> newtype Mailbox i = Mailbox { mailbox :: Chan i }
>
IMPLEMENTATION NOTE:
We allow sending of messages to Actors in IO, treating the
main thread as something of an Actor with special privileges;
It can launch actors and message them, but also read as it
pleases from Mailboxes
>
>
> send :: (Action m)=> Mailbox a -> a -> m ()
> send b = liftIOtoA . writeChan (mailbox b)
>
>
> receive :: Mailbox o -> IO o
> receive = readChan . mailbox
>
> receiveList :: Mailbox o -> IO [o]
> receiveList = getChanContents . mailbox
>
>
> newMailbox :: (Action m)=> m (Mailbox a)
> newMailbox = liftIOtoA newChan >>= return . Mailbox
The Action class represents environments in which we can operate on actors. That
is we would like to be able to send a message in IO
>
>
> class Monad m => Action m where
> liftIOtoA :: IO a -> m a
>
> forkA :: IO () -> m ()
> forkA io = liftIOtoA $ forkIO io >> return ()
>
> instance Action IO where
> liftIOtoA = id
>
> instance Action ActorM where
> liftIOtoA = ActorM . liftIO
>
> forkActor :: (Action m)=> Actor i -> m (Mailbox i)
> forkActor a = do
> b <- newMailbox
> forkActorUsing b a
> return b
>
>
> forkActorUsing :: (Action m)=> Mailbox i -> Actor i -> m ()
> forkActorUsing b = forkA . actorHandler b
>
>
> forkLoop :: (Action m)=> Loop -> m ()
> forkLoop = forkA . runLoop
>
>
> runLoop :: Loop -> IO ()
> runLoop l = runActorM l >>=
> maybe (return ()) (runLoop . ($ ()) . nextActor)
>
>
> runActorUsing :: Mailbox i -> Actor i -> IO ()
> runActorUsing = actorHandler
Internal function that feeds the actor computation its values. This may be
extended to support additional functionality in the future.
> actorHandler :: Mailbox i -> Actor i -> IO ()
> actorHandler (mailbox->c) = loop
> where loop a = readChan c >>=
> runActorM . a >>=
> maybe (return ()) (loop . nextActor)