module Control.Concurrent.SCC.Foundation
(
Pipe, Source, Sink, Consumer, Producer,
pipe, pipeD, get, getSuccess, canPut, put,
liftPipe, runPipes,
cond, whenNull, pour, tee, getList, putList, consumeAndSuppress)
where
import Control.Exception (assert)
import Control.Monad (liftM, when)
import Data.Maybe (maybe)
import Data.Typeable (Typeable, cast)
import Debug.Trace (trace)
newtype Monad m => Pipe context m r = Pipe {proceed :: context -> Int -> Integer -> m (PipeState context m r)}
data PipeState context m r = Suspend context [Suspension context m r]
| Done Integer r
data Suspension context m r = Suspension {pid :: Int,
clock :: Integer,
description :: String,
continuation :: SuspendedContinuation context m r}
data SuspendedContinuation context m r = forall x. Typeable x => Get (Maybe x -> Pipe context m r)
| forall x. Typeable x => Put x (Bool -> Pipe context m r)
| CanPut (Bool -> Pipe context m r)
data Source context x = Source context Int String
data Sink context x = Sink context Int String
type Consumer m x r = forall c. Source c x -> Pipe c m r
type Producer m x r = forall c. Sink c x -> Pipe c m r
liftPipe :: forall context m r. Monad m => m r -> Pipe context m r
liftPipe mr = Pipe (\context pid clock-> liftM (Done clock) mr)
runPipes :: forall m r. Monad m => (forall context. Pipe context m r) -> m r
runPipes c = proceed c undefined 1 0 >>= \s-> case s of Done _ r -> return r
instance Monad m => Monad (Pipe context m) where
return r = Pipe (\context pid clock-> return (Done clock r))
Pipe p >>= f = Pipe (\context pid clock-> p context pid clock >>= apply f context pid)
where apply :: forall r1 r2. (r1 -> Pipe context m r2) -> context -> Int -> PipeState context m r1 -> m (PipeState context m r2)
apply f context pid (Done clock r) = proceed (f r) context pid (succ clock)
apply f _ pid (Suspend context suspensions) = return $ Suspend context (map suspendApplied suspensions)
where suspendApplied s@Suspension{description= desc, clock= clock', continuation= Get cont}
= s{description= "applied " ++ desc, continuation= Get ((f =<<) . cont)}
suspendApplied s@Suspension{description= desc, clock= clock', continuation= Put x cont}
= s{description= "applied " ++ desc, continuation= Put x ((f =<<) . cont)}
suspendApplied s@Suspension{description= desc, clock= clock', continuation= CanPut cont}
= s{description= "applied " ++ desc, continuation= CanPut ((f =<<) . cont)}
instance Show (Suspension context m r) where
show Suspension{pid= pid, description = desc, continuation= c} = (case c of Put{} -> "(Put)"
CanPut{} -> "(CanPut)"
Get{} -> "(Get)")
++ desc ++ " -> " ++ show pid
pipe :: forall context x m r1 r2. Monad m => Producer m x r1 -> Consumer m x r2 -> Pipe context m (r1, r2)
pipe = pipeD ""
pipeD :: forall context x m r1 r2. Monad m => String -> Producer m x r1 -> Consumer m x r2 -> Pipe context m (r1, r2)
pipeD description producer consumer
= Pipe (\context pid clock-> let producerPid = 2*pid
consumerPid = 2*pid+1
context' = undefined
description' = description ++ ':' : show pid
in assert (track (indent pid ++ "pipe " ++ description')) $
do ps <- proceed (producer (Sink context' producerPid description')) context' producerPid clock
cs <- proceed (consumer (Source context' consumerPid description')) context' consumerPid clock
reduce context' producerPid ps consumerPid cs)
reduce :: forall c m r1 r2. Monad m => c -> Int -> PipeState c m r1 -> Int -> PipeState c m r2 -> m (PipeState c m (r1, r2))
reduce context pid1 (Done t1 r1) pid2 (Done t2 r2)
= assert (track (indent pid1 ++ "Done " ++ show pid1 ++ " -> " ++ show pid2)) $
return (Done (max t1 t2) (r1, r2))
reduce context pid1 (Suspend c1 ps@(Suspension{pid= pid1', clock= t, continuation= pCont} : _)) pid2 consumer@Done{}
| pid1' == pid1, Put _ cont <- pCont
= assert (track (indent pid1 ++ "Failed producer put " ++ show ps ++ " from " ++ show pid1)) $
proceed (cont False) context pid1 t >>= \p'-> reduce context pid1 p' pid2 consumer
| pid1' == pid1, CanPut cont <- pCont
= assert (track (indent pid1 ++ "Finish producer " ++ show ps ++ " from " ++ show pid1)) $
proceed (cont False) context pid1 t >>= \p'-> reduce context pid1 p' pid2 consumer
| pid1' < pid1 = assert (track (indent pid1 ++ "Suspend producer " ++ show ps ++ " from " ++ show pid1)) $
return $ Suspend context $ map (delay (\ps'-> reduce context pid1 ps' pid2 consumer)) ps
| otherwise = error (show pid1' ++ ">" ++ show pid1 ++ " | producer : " ++ show ps)
reduce context pid1 producer@Done{} pid2 (Suspend c2 cs@(Suspension{pid= pid2', clock= t, continuation= cCont} : _))
| pid2' == pid2, Get cont <- cCont
= assert (track (indent pid1 ++ "Finish consumer " ++ show cs ++ " from " ++ show pid2)) $
proceed (cont Nothing) context pid2 t >>= reduce context pid1 producer pid2
| pid2' < pid2 = assert (track (indent pid1 ++ "Suspend consumer " ++ show cs ++ " from " ++ show pid2)) $
return $ Suspend context $ map (delay (reduce context pid1 producer pid2)) cs
| otherwise = error (show pid2' ++ ">" ++ show pid2 ++ " | consumer : " ++ show cs)
reduce context pid1 producer@(Suspend _ ps@(Suspension{pid= pid1', clock=t1, continuation= pc} : _))
pid2 consumer@(Suspend _ cs@(Suspension{pid= pid2', clock=t2, continuation= Get cCont} : _))
| pid1' == pid1 && pid2' == pid2, CanPut pCont <- pc
= assert (track (indent pid1 ++ "CanPut Match at " ++ show pid1 ++ "/" ++ show pid2 ++ " : " ++ show ps ++ " -> " ++ show cs)) $
proceed (pCont True) context pid1 t1 >>= \p'-> reduce context pid1 p' pid2 consumer
| pid1' == pid1 && pid2' == pid2, Put x pCont <- pc
= assert (track (indent pid1 ++ "Match at " ++ show pid1 ++ "/" ++ show pid2 ++ " : " ++ show ps ++ " -> " ++ show cs)) $
let t' = max t1 t2
in do p' <- assert (track "producer (") $ proceed (pCont True) context pid1 t'
c' <- assert (track ") consumer (") $ proceed (cCont (cast x)) context pid2 t'
assert (track ") combined ->") reduce context pid1 p' pid2 c'
reduce context pid1 producer@(Suspend c1 ps) pid2 consumer@(Suspend c2 cs) = assert (track (indent pid1 ++ "Suspend producer & consumer, "
++ show ps ++ " from " ++ show pid1 ++ " & "
++ show cs ++ " from " ++ show pid2)) $
keepSuspending ps cs
where keepSuspending (Suspension{pid=pid1'} : pTail) cs | pid1' == pid1 = keepSuspending pTail cs
keepSuspending ps (Suspension{pid= pid2'} : cTail) | pid2' == pid2 = keepSuspending ps cTail
keepSuspending ps cs = assert (track (indent pid1 ++ "Suspend' producer & consumer, "
++ show ps ++ " from " ++ show pid1 ++ " & "
++ show cs ++ " from " ++ show pid2)) $
return $ Suspend context $
merge (map (\p-> delay (\p'-> reduce context pid1 p' pid2 consumer) p) ps)
(map (delay (reduce context pid1 producer pid2)) cs)
merge :: Monad m => [Suspension context m r] -> [Suspension context m r] -> [Suspension context m r]
merge [] l = l
merge l [] = l
merge l1@(h1@Suspension{pid= pid1, clock= c1} : tail1) l2@(h2@Suspension{pid= pid2, clock= c2} : tail2)
| pid1 > pid2 = h1 : merge tail1 l2
| pid1 < pid2 = h2 : merge l1 tail2
| c1 < c2 = h1 : merge tail1 l2
| otherwise = h2 : merge l1 tail2
delay :: Monad m => (PipeState context m r1 -> m (PipeState context m r2)) -> Suspension context m r1 -> Suspension context m r2
delay f = delay' (\p-> Pipe $ \context pid clock-> proceed p context pid clock >>= f)
delay' :: Monad m => (Pipe context m r1 -> Pipe context m r2) -> Suspension context m r1 -> Suspension context m r2
delay' f s@Suspension{description= desc, continuation= Get cont}
= s{description= "delayed " ++ desc, continuation= Get (f . cont)}
delay' f s@Suspension{description= desc, continuation= Put x cont}
= s{description= "delayed " ++ desc, continuation= Put x (f . cont)}
delay' f s@Suspension{description= desc, continuation= CanPut cont}
= s{description= "delayed " ++ desc, continuation= CanPut (f . cont)}
indent 0 = ""
indent n = ' ' : indent (n `div` 2)
get :: forall context context' x m r. (Monad m, Typeable x)
=> Source context' x -> Pipe context m (Maybe x)
get (Source _ pid desc) = assert (track (indent pid ++ "Get from " ++ desc ++ "@" ++ show pid)) $
Pipe (\context pid' clock->
assert (track (indent pid ++ "Get<- " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)) $
return $ Suspend context $
[Suspension pid clock ("get from " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock) $ Get return])
getSuccess :: forall context context' x m. (Monad m, Typeable x)
=> Source context' x
-> (x -> Pipe context m ())
-> Pipe context m ()
getSuccess source succeed = get source >>= maybe (return ()) succeed
put :: forall context context' x m r. (Monad m, Typeable x) => Sink context' x -> x -> Pipe context m Bool
put (Sink _ pid desc) x = assert (track (indent pid ++ "Put into " ++ desc ++ "@" ++ show pid)) $
Pipe (\context pid' clock->
assert (track (indent pid ++ "Put-> " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)) $
return $ Suspend context $
[Suspension pid clock ("put into " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)
(Put x return)])
canPut :: forall context context' x m r. (Monad m, Typeable x) => Sink context' x -> Pipe context m Bool
canPut (Sink _ pid desc) = assert (track (indent pid ++ "CanPut into " ++ desc ++ "@" ++ show pid)) $
Pipe (\context pid' clock->
assert (track (indent pid ++ "CanPut-> " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)) $
return $ Suspend context $
[Suspension pid clock ("canPut into " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)
(CanPut return)])
pour :: forall c c1 c2 x m. (Monad m, Typeable x) => Source c1 x -> Sink c2 x -> Pipe c m ()
pour source sink = fill'
where fill' = canPut sink >>= flip when (getSuccess source (\x-> put sink x >> fill'))
tee :: (Monad m, Typeable x) => Source c1 x -> Sink c2 x -> Sink c3 x -> Pipe c m ()
tee source sink1 sink2 = distribute
where distribute = do c1 <- canPut sink1
c2 <- canPut sink2
when (c1 && c2) (getSuccess source $ \x-> put sink1 x >> put sink2 x >> distribute)
putList :: forall x c c1 m. (Monad m, Typeable x) => [x] -> Sink c1 x -> Pipe c m [x]
putList [] sink = return []
putList l@(x:rest) sink = put sink x >>= cond (putList rest sink) (return l)
getList :: forall x c c1 m. (Monad m, Typeable x) => Source c1 x -> Pipe c m [x]
getList source = get source >>= maybe (return []) (\x-> liftM (x:) (getList source))
consumeAndSuppress :: forall x c c1 m. (Monad m, Typeable x) => Source c1 x -> Pipe c m ()
consumeAndSuppress source = getSuccess source (\x-> consumeAndSuppress source)
cond :: a -> a -> Bool -> a
cond x y test = if test then x else y
whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a]
whenNull action list = if null list then action else return list
track :: String -> Bool
track message = True