module Hakka.Actor (
ActorRef,
ActorIO,
noop,
liftIO,
actor,
tell,
(!),
forward,
schedule,
scheduleOnce,
sender,
self,
parent,
log,
Severity (Debug,Info,Warn,Error),
become,
stop,
actorSystem,
ActorSystem, terminate,
Cancellable, cancel
) where
import Prelude hiding (log)
import Data.List (partition)
import Data.Maybe (isNothing)
import Control.Monad (forM_,when)
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception
import Control.Monad.Trans.State.Strict
import Control.Monad.IO.Class
data ActorPath =
RootActorPath { name :: String } |
ChildActorPath { parentPath :: ActorPath, name :: String } deriving (Eq)
parentOrRoot (ChildActorPath p _) = p
parentOrRoot root = root
isChildOf :: ActorPath -> ActorPath -> Bool
isChildOf a@(ChildActorPath parent name) b = a == b || isChildOf parent b
isChildOf _ _ = False
instance Show ActorPath where
show (RootActorPath uri) = uri
show (ChildActorPath parent s) = (show parent) ++ "/" ++ s
isReserved :: Char -> Bool
isReserved = (flip elem) "[]?:#@!$()&%*+,;=/' "
validateSegment segment =
if any isReserved segment
then error $ "segment contains reserved characters: " ++ segment
else segment
(//) :: ActorPath -> String -> ActorPath
(//) parent = ChildActorPath parent . validateSegment
newtype ActorRef m = ActorRef { actorPath :: ActorPath } deriving (Eq)
instance Show (ActorRef m) where
show (ActorRef path) = "*" ++ show path
data Signal = PoisonPill | ChildFailed ActorPath String deriving Show
data Message m = Message m | Signal Signal deriving Show
data Envelope m = Envelope {
from :: ActorRef m,
to :: ActorRef m,
message :: Message m
} deriving Show
data SystemMessage m =
NewActor {
path :: ActorPath,
inbox :: Inbox m
} |
ActorTerminated ActorPath |
Deliver (Envelope m) |
Log ActorPath Severity String |
Schedule Int Int (MVar ()) (Envelope m)
newtype ActorIO m a = ActorIO (StateT (ActorContext m) IO a)
deriving (Monad,MonadIO,Applicative,Functor)
type Inbox m = Chan (Envelope m)
data ActorContext m = ActorContext {
ctxBehavior :: Behavior m,
ctxSender :: ActorRef m,
ctxSelf :: ActorRef m,
ctxSysMessage :: SystemMessage m -> IO (),
ctxActor :: String -> Behavior m -> IO (ActorRef m)
}
data Behavior m = Behavior {
handleMessage :: m -> ActorIO m (),
handleSignal :: Signal -> ActorIO m ()
} | Stopped
defaultBehavior :: Show m => Behavior m
defaultBehavior = Behavior {
handleMessage = \m ->
log Warn $ "unhandled message received",
handleSignal = \sig -> case sig of
PoisonPill -> stop
ChildFailed path e ->
log Error $ "child " ++ show path ++ " failed: " ++ e
}
tell :: ActorRef m
-> m
-> ActorRef m
-> ActorIO m ()
tell other msg sender = ActorIO $ do
ctx <- get
liftIO $ ctxSysMessage ctx $ Deliver $ Envelope sender other (Message msg)
(!) :: ActorRef m
-> m
-> ActorIO m ()
ref ! msg = self >>= tell ref msg
forward :: ActorRef m
-> m
-> ActorIO m ()
forward ref msg = sender >>= tell ref msg
become :: (m -> ActorIO m ())
-> ActorIO m ()
become msgHandler = ActorIO $ modify (\ctx -> ctx { ctxBehavior = (ctxBehavior ctx) { handleMessage = msgHandler } })
stop :: ActorIO m ()
stop = ActorIO $ modify (\ctx -> ctx { ctxBehavior = Stopped })
data Cancellable = forall m . Cancellable { cancel :: ActorIO m () }
schedule :: Int
-> Int
-> ActorRef m
-> m
-> ActorIO m Cancellable
schedule initialDuration duration ref msg = ActorIO $ do
ctx <- get
stop <- liftIO $ newEmptyMVar
liftIO $ ctxSysMessage ctx $ Schedule initialDuration duration stop $ Envelope (ctxSelf ctx) ref (Message msg)
return $ Cancellable $ liftIO $ tryPutMVar stop () >> return ()
scheduleOnce :: Int
-> ActorRef m
-> m
-> ActorIO m Cancellable
scheduleOnce duration ref msg = schedule duration (1) ref msg
self :: ActorIO m (ActorRef m)
self = ActorIO $ get >>= (return . ctxSelf)
parent :: ActorIO m (ActorRef m)
parent = do
ActorRef (ChildActorPath parent _) <- self
return $ ActorRef parent
noop :: ActorIO m ()
noop = ActorIO $ return ()
sender :: ActorIO m (ActorRef m)
sender = ActorIO $ get >>= (return . ctxSender)
actor :: Show m => String
-> (m -> ActorIO m ())
-> ActorIO m (ActorRef m)
actor name b = ActorIO $ do
ctx <- get
ref <- liftIO $ ctxActor ctx name (defaultBehavior { handleMessage = b })
return ref
data Severity = Debug | Info | Warn | Error deriving Show
log :: Severity
-> String
-> ActorIO m ()
log s msg = ActorIO $ do
ctx <- get
let path = actorPath $ ctxSelf ctx
liftIO $ ctxSysMessage ctx $ Log path s msg
handleMsg :: Behavior m -> Message m -> ActorIO m ()
handleMsg (Behavior h _) (Message msg) = h msg
handleMsg (Behavior _ h) (Signal sig) = h sig
runActor :: Show m => Chan (SystemMessage m) -> Behavior m -> IO (Inbox m)
runActor system b = do
inbox <- newChan
let loop Stopped = return ()
loop bhv = do
Envelope sender self payload <- readChan inbox
let send = writeChan system . Deliver
let sysMessage m = writeChan system m
let actor name b = do
inbox <- runActor system b
let path = (actorPath self) // name
writeChan system $ NewActor path inbox
return $ ActorRef path
let context = ActorContext b sender self sysMessage actor
let handler e = do
let path = actorPath self
let signal = Signal $ ChildFailed path (displayException (e :: SomeException))
send $ Envelope self (ActorRef (parentPath path)) signal
return Stopped
let calculateNewBehavior = do
let exec (ActorIO x) = execStateT x
ctx <- exec (handleMsg bhv payload) context
return $ ctxBehavior ctx
newBehavior <- catch calculateNewBehavior handler
loop newBehavior
forkIO $ loop b
return inbox
data ActorSystem = ActorSystem {
terminate :: IO ()
}
actorSystem :: Show m => String
-> ActorIO m a
-> IO ActorSystem
actorSystem name init = do
system <- newChan
let rootRef = ActorRef (RootActorPath $ validateSegment name)
let loop actors = do
msg <- readChan system
let deliver msg@(Envelope from to payload)
| to == rootRef = case payload of
Signal PoisonPill -> do
putStrLn "System Stopped"
Signal (ChildFailed p e) -> do
putStrLn $ "Actor Failed: " ++ show p ++ ": " ++ e
writeChan system $ ActorTerminated p
Message m -> do
putStrLn $ "plain message sent to root actor: " ++ show m
| otherwise = case lookup (actorPath to) actors of
Just recipient -> do
writeChan recipient msg
Nothing -> do
putStrLn $ "message could not be delivered to " ++ (show (actorPath to))
let calculateNewState = case msg of
NewActor path inbox -> do
return $ Just ((path,inbox):actors)
ActorTerminated path -> do
let (remove,retain) = partition ((`isChildOf` path) . fst) actors
forM_ remove $ \(p,a) -> writeChan a $ Envelope rootRef (ActorRef p) (Signal PoisonPill)
return $ Just retain
Schedule initialDuration duration smvar msg -> do
forkIO $ do
threadDelay $ duration * 1000
stop <- tryTakeMVar smvar
when (isNothing stop) $ do
writeChan system $ Deliver msg
when (duration >= 0) $ writeChan system $ Schedule initialDuration duration smvar msg
return $ Just actors
Log path s msg -> do
putStrLn $ "[" ++ show s ++ "] [" ++ show path ++ "]: " ++ msg
return $ Just actors
Deliver msg -> do
deliver msg
return $ Just actors
let handler e = do
putStrLn $ "Actor system '" ++ name ++ "' failed: " ++ (displayException (e :: SomeException))
return Nothing
newState <- catch calculateNewState handler
maybe (return ()) loop newState
forkIO $ loop []
let sysMessage = writeChan system
let actor name behavior = do
inbox <- runActor system behavior
let path = (actorPath rootRef) // name
writeChan system $ NewActor path inbox
return $ ActorRef path
let rootContext = ActorContext Stopped rootRef rootRef sysMessage actor
let stopMessage = Envelope rootRef rootRef $ Signal PoisonPill
let stop = writeChan system $ Deliver stopMessage
let runInit (ActorIO init) = evalStateT init rootContext
result <- runInit init
return $ ActorSystem stop