>
This module exports a simple, idiomatic implementation of the Actor Model.
> module Control.Concurrent.Actors (
>
>
>
>
>
> Action()
>
> , Behavior(..)
>
> , (<.|>)
>
>
>
> , Mailbox()
> , send
> , received
> , guardReceived
>
>
>
> , spawn
> , spawn_
> , spawnReading
>
>
> , yield
> , receive
>
>
>
> , runBehavior_
> , runBehavior
>
>
> , printB
> , putStrB
> , signalB
> , constB
>
> ) where
>
> import Control.Monad
> import Control.Monad.Reader(ask)
> import qualified Data.Foldable as F
> import Control.Monad.IO.Class
> import Control.Concurrent(forkIO)
> import Data.Monoid
>
>
> import Data.Functor.Contravariant
>
> import Control.Concurrent.Chan.Split
>
>
> import Control.Concurrent.Actors.Behavior
------ CPP MACROS ------
These macros are only provided by cabal unfortunately.... makes it difficult to
work with GHCi:
#if !MIN_VERSION_base(4,3,0)
> void :: (Monad m)=> m a -> m ()
> void = (>> return ())
#endif
------------------------
TODO
-----
0.3.0:
- define natural transformation combinators (in IO unfortunately) a.la.
'categories' for Mailbox. So
- :: Mailbox (a,b) -> (Mailbox a, Mailbox b) -- divide?
- :: Mailbox a -> Mailbox b -> Mailbox (Either a b) -- add?
- etc...
put these in a separate sub-module, optionally import, mention how an
extension to actor model or something
- allow supplying the first input message for an actor during spawn. This is
awkward otherwise. Include in same sub-module as above?
- performance testing:
- take a look at threadscope for random tree test
- get complete code coverage into simple test module
- interesting solution to exit detection:
http://en.wikipedia.org/wiki/Huang%27s_algorithm
- better method for waiting for threads to complete. should probably use
actor message passing
- look into whether we should use Text lib instead of strings?
OverloadedStrings?
-import Data.String, make polymorphic over IsString
-test if this lets us use it in importing module w/ OverloadedStrings
extension
- structured declarative and unit tests
- some sort of exception handling technique via Actors
(look at enumerator package)
- strict send' function
Later:
- investigate ways of positively influencing thread scheduling based on
actor work agenda?
- export some more useful Actors and global thingies
- 'loop' which keeps consuming (is this provided by a class?)
- function returning an actor to "load balance" inputs over multiple
actors
- an actor that sends a random stream?
- a pre-declared Mailbox for IO?
Eventualy:
- provide an "adapter" for amazon SQS, allowing truly distributed message
passing
- investigate erlang-style selective receive (using Alternative?)
- consider: combining TChans, where values are popped off when available,
for chan-split?
- look at ways we can represent network IO as channels to interface with
this. E.g:
- https://github.com/ztellman/aleph
- http://akka.io/ (scala remote actors lib)
- http://www.zeromq.org/intro:read-the-manual
- interface to amazon SQS
- http://msgpack.org/
- "shared memory" approaches?
- cloudhaskell, haskell-mpi, etc. see:
http://stackoverflow.com/questions/8362998/distributed-haskell-state-of-the-art-in-2011
-Behavior -> enumeratee package translator (and vice versa)
(maybe letting us use useful enumerators)
...also now pipes, conduits, etc. etc.
CHAN TYPES
==========
>
>
> newtype Mailbox a = Mailbox { inChan :: InChan a }
> deriving (Contravariant)
>
We don't need to expose this thanks to the miracle of MonadFix and recursive do,
but this can be generated via the NewSplitChan class below if the user imports
the library:
> newtype Messages a = Messages { outChan :: OutChan a }
> deriving (Functor)
>
>
> instance SplitChan Mailbox Messages where
> readChan = readChan . outChan
> writeChan = writeChan . inChan
> writeList2Chan = writeList2Chan . inChan
>
> instance NewSplitChan Mailbox Messages where
> newSplitChan = fmap (\(i,o)-> (Mailbox i, Messages o)) newSplitChan
>
ACTIONS
=======
Functionality is based on our underlying type classes, but users shouldn't need
to import a bunch of libraries to get basic Behavior building functionality.
> infixl 3 <.|>
>
>
>
>
>
> (<.|>) :: Behavior i -> Behavior i -> Behavior i
> b <.|> b' = b `mappend` constB b'
The 'yield' function is so named because it is "relinquishing control", i.e. I
think the name reminds of the functionality of <|> and mappend (the last input
is passed along) and also has the meaning "quit".
Its similarity (or not) to the 'enumerator' function of the same same may be a
source of confusion (or the opposite)... I'm not sure.
>
>
>
>
> yield :: Action i a
> yield = mzero
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> receive :: Action i (Behavior i) -> Action i (Behavior i)
> receive = return . Receive
>
>
>
>
> received :: Action i i
> received = ask
>
>
>
> guardReceived :: (i -> Bool) -> Action i i
> guardReceived p = ask >>= \i-> guard (p i) >> return i
>
>
>
>
>
> send :: (MonadIO m, SplitChan c x)=> c a -> a -> m ()
> send b = liftIO . writeChan b
FORKING AND RUNNING ACTORS:
===========================
>
>
>
> spawnReading :: (MonadIO m, SplitChan x c)=> c i -> Behavior i -> m ()
> spawnReading str = liftIO . void . forkIO . actorRunner
> where actorRunner b =
> readChan str >>= runBehaviorStep b >>= F.mapM_ actorRunner
RUNNING ACTORS
--------------
These work in IO, returning () when the actor finishes with done/mzero:
>
>
> runBehavior_ :: Behavior () -> IO ()
> runBehavior_ b = runBehavior b [(),()..]
>
>
> runBehavior :: Behavior a -> [a] -> IO ()
> runBehavior b (a:as) = runBehaviorStep b a >>= F.mapM_ (`runBehavior` as)
> runBehavior _ _ = return ()
FORKING ACTORS
--------------
>
>
>
> spawn :: (MonadIO m)=> Behavior i -> m (Mailbox i)
> spawn b = do
> (m,s) <- liftIO newSplitChan
> spawnReading s b
> return m
>
>
>
>
> spawn_ :: (MonadIO m)=> Behavior () -> m ()
> spawn_ = liftIO . void . forkIO . runBehavior_
USEFUL GENERAL BEHAVIORS
========================
>
>
> printB :: (Show s, Eq n, Num n)=> n -> Behavior s
> printB = contramap (unlines . return . show) . putStrB
We want to yield right after printing the last input to print. This lets us
compose with signalB for instance:
write5ThenExit = putStrB 5 `mappend` signalB c
and the above will signal as soon as it has printed the last message. If we try
to define this in a more traditional recursive way the signal above would only
happen as soon as the sixth message was received.
For now we allow negative
>
> putStrB :: (Eq n, Num n)=> n -> Behavior String
> putStrB 0 = mempty
> putStrB n = Receive $ do
> s <- received
> liftIO $ putStr s
> guard (n /= 1)
> return $ putStrB (n1)
>
>
>
>
> signalB :: (SplitChan c x)=> c () -> Behavior i
> signalB c = Receive (send c () >> yield)
>
>
>
>
>
> constB :: Behavior i -> Behavior i
> constB = Receive . return