>
This module exports a simple, idiomatic implementation of the Actor Model.
> module Control.Concurrent.Actors (
>
>
>
>
> Action()
> , Behavior(..)
>
> , (<.|>)
>
>
>
> , Mailbox()
> , out
> , send , send' , (<->)
> , received
> , guardReceived
>
> , Sources(), Joined
>
> , spawn
>
>
>
>
>
> , yield
> , receive
>
>
>
> , coproductMb
> , contraProduct
> , zipMb
> , contraFanin
> , contraFanout
>
>
>
> , runBehavior_
> , runBehavior
>
>
> , printB
> , putStrB
> , signalB
> , constB
>
> ) where
>
> import Control.Monad
> import Control.Applicative
> import Control.Monad.Reader(ask)
> import qualified Data.Foldable as F
> import Control.Monad.IO.Class
> import Control.Concurrent(forkIO)
> import Data.Monoid
> import Control.Arrow((***),(&&&),(|||))
>
>
> import Data.Functor.Contravariant
>
> import Control.Concurrent.Chan.Split
>
> import Control.Concurrent.Actors.Behavior
TODO
-----
0.4.1
- performance tuning / benchmarking:
- first optimize TreeExample, by way of Benchmark.hs
- criterion and profiling w/r/t lib.:
- play with underlying Behavior Monad stack?
- be more controlled about the source lists (do once before defaultMain), use 'evaluate'
- run with +RTS -s and make sure everything is 0
- see if case-based nil is better
- try storing the same chan (observable sharing) in each node, and use for streaming
send an MVar with messages for the query operation
- get accurate baseline comparison between actors and set
- use INLINABLE
- test again with SPECIALIZE instead
- try adding INLINE to all with higher-order args (or higher-order newtype wrappers)
and make sure our LHS looks good for inlining
- specialize `Action i (Behavior i)` or allow lots of unfolding... ? Optimize those loops, somehow. Rewrite rules?
- look at "let floating" and INLINEABLE to get functions with "fully-applied (syntactically) LHS"
- split-chan ChItem in heap profile -hy
- take a look at threadscope for random tree test
- forkOnIO to keep communicating actors on same HEC?
- compare with previous version (cp to /tmp to use previous version)
Later:
- make that also work with Behaviors of arbitrary input types using new GHC generics?
- can we make joins work with arbitrary types using Generics?
- can we support Either in Sources?
- get complete code coverage into simple test module
- interesting solution to exit detection:
http://en.wikipedia.org/wiki/Huang%27s_algorithm
- dynamically-bounded chans, based on number of writers to control
producer/consumer issues? Possibly add more goodies to chan-split
see: http://hackage.haskell.org/package/stm-chans
- look at what Functor/Contravariant for read/write ends, and corresponding
natural transformations those allow suggest about limits of Actor model
and investigate inverse of Actors (Reducers?)
- create an experimental Collectors sub-module
- investigate ways of positively influencing thread scheduling based on
actor work agenda?
- export some more useful Actors and global thingies
- 'loop' which keeps consuming (is this provided by a class?)
- function returning an actor to "load balance" inputs over multiple
actors
- an actor that sends a random stream?
- a pre-declared Mailbox for IO?
Eventually:
- some sort of exception handling technique a.la erlang
- abilty to launch an actor that automatically "replicates" if its chan needs more
consumers. This should probably be restricted to an `Action i ()` that we
repeat.
- can we automatically throttle producers on an Actor system level,
optimizing message flow with some algorithm?
- provide an "adapter" for amazon SQS, allowing truly distributed message
passing
- play w/ distributed-process (cloud haskell)
- consider: combining TChans, where values are popped off when available,
for chan-split?
- look at ways we can represent network IO as channels to interface with
this. E.g:
- https://github.com/ztellman/aleph
- http://akka.io/ (scala remote actors lib)
- http://www.zeromq.org/intro:read-the-manual
- interface to amazon SQS
- http://msgpack.org/
- "shared memory" approaches?
- cloudhaskell, haskell-mpi, etc. see:
http://stackoverflow.com/questions/8362998/distributed-haskell-state-of-the-art-in-2011
-Behavior -> enumeratee package translator (and vice versa)
(maybe letting us use useful enumerators)
...also now pipes, conduits, etc. etc.
- study ambient/join/fusion calculi for clues to where it's really at
CHAN TYPES
==========
By defining our Mailbox as the bare "send" operation we get a very convenient
way of defining contravariant instance, without all the overhead we had before,
while ALSO now supporting some great natural transformations on Mailboxes &
Messages.
We use this newtype to get 'Contravariant' for free, possibly revealing other
insights:
> type Sender a = Op (IO ()) a
>
> mailbox :: (a -> IO ()) -> Mailbox a
> mailbox = Mailbox . Op
>
> runMailbox :: Mailbox a -> a -> IO ()
> runMailbox = getOp . sender
>
> mkMessages :: OutChan a -> Messages a
> mkMessages = Messages . readChan
>
>
>
>
>
> newtype Mailbox a = Mailbox { sender :: Sender a }
> deriving (Contravariant)
Previously we were polymorphic in SplitChan in many places. Now that spawn
has polymorphic result type we simply export a function to convert from
any SplitChan type. Otherwise we'd have to provide type annotations everywhere.
I liked the previous version, since a send within an actor is semantically-
identical regardless of the channel type.
>
>
> out :: (SplitChan i x)=> i a -> Mailbox a
> out = mailbox . writeChan
We don't need to expose this thanks to the miracle of MonadFix and recursive do,
but this can be generated via the NewSplitChan class below if the user imports
the library:
> newtype Messages a = Messages { readMsg :: IO a }
> deriving (Functor)
>
>
> instance SplitChan Mailbox Messages where
> readChan = readMsg
> writeChan = runMailbox
>
> instance NewSplitChan Mailbox Messages where
> newSplitChan = (out *** mkMessages) `fmap` newSplitChan
For Mailboxes we can define all transformations associated with Cartesian and
CoCartesian (from 'categories') but where the category is Dual (->), i.e. the
order of the transformation is flipped.
I don't know if/how these precisely fit into an existing class, but for now here
are a handful of useful combinators:
> coproductMb :: Mailbox a -> Mailbox b -> Mailbox (Either a b)
> coproductMb m1 m2 = mailbox $ either (writeChan m1) (writeChan m2)
>
> zipMb :: Mailbox a -> Mailbox b -> Mailbox (a,b)
> zipMb m1 m2 = mailbox $ \(a,b) -> writeChan m1 a >> writeChan m2 b
>
The naming here doesn't make much sense now that these are general. Keep for
now and hope we can deprecate in favor of functionality in one of E.K.'s
libs?
>
> contraProduct :: Contravariant f => f (Either a b) -> (f a, f b)
> contraProduct = contramap Left &&& contramap Right
>
>
> contraFanin :: Contravariant f => (b -> a) -> (c -> a) -> f a -> f (Either b c)
> contraFanin f g = contramap (f ||| g)
>
>
> contraFanout :: Contravariant f=> (a -> b) -> (a -> c) -> f (b,c) -> f a
> contraFanout f g = contramap (f &&& g)
ACTIONS
=======
Functionality is based on our underlying type classes, but users shouldn't need
to import a bunch of libraries to get basic Behavior building functionality.
> infixl 3 <.|>
>
>
>
>
>
>
> (<.|>) :: Behavior i -> Behavior i -> Behavior i
> b <.|> b' = b `mappend` constB b'
The 'yield' function is so named because it is "relinquishing control", i.e. I
think the name reminds of the functionality of <|> and mappend (the last input
is passed along) and also has the meaning "quit".
Its similarity (or not) to the 'enumerator' function of the same same may be a
source of confusion (or the opposite)... I'm not sure.
>
>
>
>
> yield :: Action i a
> yield = mzero
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> receive :: Action i (Behavior i) -> Action i (Behavior i)
> receive = return . Receive
>
>
>
>
> received :: Action i i
> received = ask
>
>
>
> guardReceived :: (i -> Bool) -> Action i i
> guardReceived p = ask >>= \i-> guard (p i) >> return i
>
>
>
>
> send :: (MonadIO m)=> Mailbox a -> a -> m ()
> send b = liftIO . writeChan b
>
>
>
> send' :: (MonadIO m)=> Mailbox a -> a -> m ()
> send' b a = a `seq` send b a
> infixr 1 <->
>
>
>
>
>
>
> (<->) :: (MonadIO m)=> a -> m (Mailbox a) -> m (Mailbox a)
> a <-> mmb = mmb >>= \mb-> send mb a >> return mb
FORKING AND RUNNING ACTORS:
===========================
The strict Actor Model is limited in expressiveness, in that it doesn't allow
for a method of synchronization, e.g. we cannot have an actor that pairs up
incoming messages from two different channels. I think this leads to nonsense
like "selective receive" in Erlang (disclaimer: IANA erlang-xpert).
I've realized that I can keep all the nice semantics of actors (i.e. this
change doesn't affect Behaviors) , while supporting synchronization and
simplifying the API all at the same time! This method is inspired by the "join
calculus", and I'm sure this isn't a new idea.
To support this elegantly in the API, we define a class with associated type,
and make 'spawn' the method. This allows the pattern of joins to be determined
polymorphically based on users' pattern match!
NOTE: My original goal was to use GHC.Generic to support arbitrary joins on
any Generic a=> Behavior a ...but it wasn't coming together. Let me know
if you can figure it out.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> class Sources s where
> type Joined s
> newJoinedChan :: IO (s, Messages (Joined s))
Spawn uses un-exported newJoinedChan where we used newSplitChan previously:
>
>
>
>
> spawn :: (MonadIO m, Sources s)=> Behavior (Joined s) -> m s
> spawn b = liftIO $ do
> (srcs, msgs) <- newJoinedChan
> let runner b' = readChan msgs >>= runBehaviorStep b' >>= F.mapM_ runner
> void $ forkIO (runner b)
> return srcs
...and our instance for Mailbox completes previous simple spawn functionality:
> instance Sources (Mailbox a) where
> type Joined (Mailbox a) = a
> newJoinedChan = newSplitChan
By adding an instance for (,) synchronization and wonderful new things become
possible!
> instance (Sources a, Sources b)=> Sources (a,b) where
> type Joined (a,b) = (Joined a, Joined b)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> let m' = Messages $ liftM2 (,) (readMsg ma) (readMsg mb)
> return ((sa,sb), m')
We'll add instances up to 7-tuples, since that seems to be standard, but people
can use nested tuples:
> instance (Sources a, Sources b, Sources c, Sources d, Sources e, Sources f, Sources g)=> Sources (a,b,c,d,e,f,g) where
> type Joined (a,b,c,d,e,f,g) = (Joined a, Joined b,Joined c,Joined d,Joined e,Joined f,Joined g)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> (se, me) <- newJoinedChan
> (sf, mf) <- newJoinedChan
> (sg, mg) <- newJoinedChan
> let m' = Messages $ (,,,,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md <*> readMsg me <*> readMsg mf <*> readMsg mg
> return ((sa,sb,sc,sd,se,sf,sg), m')
>
> instance (Sources a, Sources b, Sources c, Sources d, Sources e, Sources f)=> Sources (a,b,c,d,e,f) where
> type Joined (a,b,c,d,e,f) = (Joined a, Joined b,Joined c,Joined d,Joined e,Joined f)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> (se, me) <- newJoinedChan
> (sf, mf) <- newJoinedChan
> let m' = Messages $ (,,,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md <*> readMsg me <*> readMsg mf
> return ((sa,sb,sc,sd,se,sf), m')
>
> instance (Sources a, Sources b, Sources c, Sources d, Sources e)=> Sources (a,b,c,d,e) where
> type Joined (a,b,c,d,e) = (Joined a, Joined b,Joined c,Joined d,Joined e)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> (se, me) <- newJoinedChan
> let m' = Messages $ (,,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md <*> readMsg me
> return ((sa,sb,sc,sd,se), m')
>
> instance (Sources a, Sources b, Sources c, Sources d)=> Sources (a,b,c,d) where
> type Joined (a,b,c,d) = (Joined a, Joined b,Joined c,Joined d)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> let m' = Messages $ (,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md
> return ((sa,sb,sc,sd), m')
>
> instance (Sources a, Sources b, Sources c)=> Sources (a,b,c) where
> type Joined (a,b,c) = (Joined a, Joined b,Joined c)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> let m' = Messages $ (,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc
> return ((sa,sb,sc), m')
I give up for now on defining an instance for sums. This probably requires a
different formulation for class
...and we also support Either as a source, since this is the only way to get a joined
product of sums; otherwise users could just use 'contraProduct', a pure operation.
> -- | > type Joined (a :-: b) = Either (Joined a) (Joined b)
> --
> -- A product of 'Sources' corresponding to a @Behavior (Either a b)@. Allows
> -- 'spawn'-ing a @Behavior@ which receives a sum of perhaps-'Joined' products.
> --
> -- See also: 'contraProduct'
> data a :-: b = (:-:) { sourceLeft :: a
> , sourceRight :: b }
>
> instance (Sources a, Sources b)=> Sources (a :-: b) where
> type Joined (a :-: b) = Either (Joined a) (Joined b)
> --newJoinedChan :: IO (a :-: b, Messages (Either (Joined a) (Joined b)))
> newJoinedChan = do
> (src, msgs) <- newSplitChan
> let (s1, s2) = contraProduct src
> return (decompose s1 :-: decompose s2, msgs)
class Sources s where
type Joined s :: *
newJoinedChan :: IO (s, Messages (Joined s))
decomp :: Mailbox (a,b) -> (Mailbox a, Mailbox b)
decomp :: Mailbox a -> Mailbox a
decomp :: Mailbox (Either a b) -> (Mailbox a :-: Mailbox b)
We can subsume the old 'spawn_' functionality in our class as well, and imagine
returning an infinite source of ()s:
>
>
>
>
>
>
> instance Sources () where
> type Joined () = ()
> newJoinedChan =
> return ((), Messages $ return ())
Replace polymorphic craziness with old spawn_ function, when we can:
>
> spawn_ :: (MonadIO m)=> Behavior () -> m ()
> spawn_ = liftIO . void . forkIO . runBehavior_
NOTE: spawnReading removed in 0.4, since it was unused (by me), exposed
confusing implementation details, supports e.g. launching an actor on a
bounded channel which violates the Model, and doesn't provide an effective
way to do much cool stuff like reading from a network socket.
Instead I guess we should expose enough internals in a separate module to
support future cool stuff.
RUNNING ACTORS
--------------
These work in IO, returning () when the actor finishes with done/mzero:
>
>
> runBehavior_ :: Behavior () -> IO ()
> runBehavior_ b = runBehavior b $ repeat ()
>
>
> runBehavior :: Behavior a -> [a] -> IO ()
> runBehavior b (a:as) = runBehaviorStep b a >>= F.mapM_ (`runBehavior` as)
> runBehavior _ _ = return ()
USEFUL GENERAL BEHAVIORS
========================
>
>
> printB :: (Show s, Eq n, Num n)=> n -> Behavior s
> printB = contramap (unlines . return . show) . putStrB
We want to yield right after printing the last input to print. This lets us
compose with signalB for instance:
write5ThenExit = putStrB 5 `mappend` signalB c
and the above will signal as soon as it has printed the last message. If we try
to define this in a more traditional recursive way the signal above would only
happen as soon as the sixth message was received.
For now we allow negative
>
> putStrB :: (Eq n, Num n)=> n -> Behavior String
> putStrB 0 = mempty
> putStrB n = Receive $ do
> s <- received
> liftIO $ putStr s
> guard (n /= 1)
> return $ putStrB (n1)
>
>
>
>
> signalB :: Mailbox () -> Behavior i
> signalB c = Receive (send c () >> yield)
>
>
>
>
>
> constB :: Behavior i -> Behavior i
> constB = Receive . return