module Control.Concurrent.SCC.Foundation
(
ParallelizableMonad (parallelize),
Pipe, Source, Sink,
pipe, pipeD, pipeP, get, getSuccess, get', canPut, put,
liftPipe, runPipes,
cond, whenNull, pour, pourMap, pourMapMaybe, tee, getList, putList, putQueue, consumeAndSuppress)
where
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (liftM, liftM2, when)
import Control.Monad.Identity
import Control.Parallel (par, pseq)
import Data.Foldable (toList)
import Data.Maybe (maybe)
import Data.Sequence (Seq, viewl)
import Data.Typeable (Typeable, cast)
import Debug.Trace (trace)
class Monad m => ParallelizableMonad m where
parallelize :: m a -> m b -> m (a, b)
parallelize = liftM2 (,)
instance ParallelizableMonad Identity where
parallelize ma mb = let a = runIdentity ma
b = runIdentity mb
in a `par` (b `pseq` Identity (a, b))
instance ParallelizableMonad Maybe where
parallelize ma mb = case ma `par` (mb `pseq` (ma, mb))
of (Just a, Just b) -> Just (a, b)
_ -> Nothing
instance ParallelizableMonad IO where
parallelize ma mb = do va <- newEmptyMVar
vb <- newEmptyMVar
forkIO (ma >>= putMVar va)
forkIO (mb >>= putMVar vb)
a <- takeMVar va
b <- takeMVar vb
return (a, b)
newtype Pipe context m r = Pipe {proceed :: PipeState context -> m (PipeRendezvous context m r)}
data PipeState context = PipeState {level :: Int,
clock :: Integer}
data PipeRendezvous context m r = Suspend [Suspension context m r]
| Done Integer r
data Suspension context m r = Suspension {targetLevel :: Int,
state :: PipeState context,
description :: String,
continuation :: SuspendedContinuation context m r}
data SuspendedContinuation context m r = forall x. Typeable x => Get (Maybe x -> Pipe context m r)
| forall x. Typeable x => Put x (Bool -> Pipe context m r)
| CanPut (Bool -> Pipe context m r)
data Source context x = Source Int String
data Sink context x = Sink Int String
type Consumer c m x r = Source c x -> Pipe c m r
type Producer c m x r = Sink c x -> Pipe c m r
liftPipe :: forall context m r. Monad m => m r -> Pipe context m r
liftPipe mr = Pipe (\state-> liftM (Done (clock state)) mr)
runPipes :: forall m r. Monad m => (forall context. Pipe context m r) -> m r
runPipes c = proceed c (PipeState 1 0) >>= \s-> case s of Done _ r -> return r
instance Monad m => Monad (Pipe context m) where
return r = Pipe (\state-> return (Done (clock state) r))
Pipe p >>= f = Pipe (\state-> p state >>= apply f state)
where apply :: forall r1 r2. (r1 -> Pipe context m r2) -> PipeState context -> PipeRendezvous context m r1
-> m (PipeRendezvous context m r2)
apply f state (Done t r) = proceed (f r) state{clock= succ t}
apply f state (Suspend suspensions) = return $ Suspend (map suspendApplied suspensions)
where suspendApplied s = postApply (>>= f) s{description= "applied " ++ description s}
postApply :: (Pipe context m r1 -> Pipe context m r2) -> Suspension context m r1 -> Suspension context m r2
postApply f s = s{continuation= case continuation s of Get cont -> Get (f . cont)
Put x cont -> Put x (f . cont)
CanPut cont -> CanPut (f . cont)}
instance ParallelizableMonad m => ParallelizableMonad (Pipe context m) where
parallelize p1 p2 = Pipe (\state-> liftM combine $ parallelize (proceed p1 state) (proceed p2 state))
where combine :: forall r1 r2. (PipeRendezvous context m r1, PipeRendezvous context m r2) -> PipeRendezvous context m (r1, r2)
combine (Done c1 r1, Done c2 r2) = Done (max c1 c2) (r1, r2)
combine (Suspend s1, Done c2 r2) = Suspend (map (adjustSuspension c2 (liftM $ flip (,) r2)) s1)
combine (Done c1 r1, Suspend s2) = Suspend (map (adjustSuspension c1 (liftM $ (,) r1)) s2)
combine (r1@(Suspend s1), r2@(Suspend s2)) = Suspend (merge (map (postApply (flip parallelize (rewrap r2))) s1)
(map (postApply (parallelize (rewrap r1))) s2))
rewrap :: PipeRendezvous context m r -> Pipe context m r
rewrap r = Pipe $ const $ return $ r
adjustSuspension :: Integer -> (Pipe context m r1 -> Pipe context m r2)
-> Suspension context m r1 -> Suspension context m r2
adjustSuspension c f s = postApply f s{state= (state s) {clock= clock (state s) `max` c}}
instance Show (Suspension context m r) where
show Suspension{targetLevel= lvl, description = desc, continuation= c} = (case c of Put{} -> "(Put)"
CanPut{} -> "(CanPut)"
Get{} -> "(Get)")
++ desc ++ " -> " ++ show lvl
pipe :: forall context x m r1 r2. Monad m => Producer context m x r1 -> Consumer context m x r2 -> Pipe context m (r1, r2)
pipe = pipeD ""
pipeD :: forall c x m r1 r2. Monad m => String -> Producer c m x r1 -> Consumer c m x r2 -> Pipe c m (r1, r2)
pipeD description producer consumer = pipePrim description (liftM2 (,)) producer consumer
pipeP :: forall c x m r1 r2. ParallelizableMonad m => Producer c m x r1 -> Consumer c m x r2 -> Pipe c m (r1, r2)
pipeP producer consumer = pipePrim "" parallelize producer consumer
pipePrim :: forall c m x r1 r2. Monad m =>
String -> (forall a b. m a -> m b -> m (a, b)) -> Producer c m x r1 -> Consumer c m x r2 -> Pipe c m (r1, r2)
pipePrim description pairMonads producer consumer
= Pipe (\(PipeState level clock)-> let level' = succ level
description' = description ++ ':' : show level
in assert (track (indent level ++ "pipe " ++ description')) $
do (ps, cs) <- pairMonads (proceed (producer (Sink level description'))
(PipeState level' clock))
(proceed (consumer (Source level description'))
(PipeState level' clock))
reduce pairMonads level ps cs)
reduce :: forall c m r1 r2. Monad m =>
(m (PipeRendezvous c m r1) -> m (PipeRendezvous c m r2) -> m (PipeRendezvous c m r1, PipeRendezvous c m r2))
-> Int -> PipeRendezvous c m r1 -> PipeRendezvous c m r2 -> m (PipeRendezvous c m (r1, r2))
reduce pairMonads level (Done t1 r1) (Done t2 r2)
= assert (track (indent level ++ "Done " ++ show level ++ " -> " ++ show level)) $
return (Done (max t1 t2) (r1, r2))
reduce pairMonads level (Suspend ps@(Suspension{targetLevel= l1, state= s1, continuation= pCont} : _)) consumer@Done{}
| l1 == level, Put _ cont <- pCont
= assert (track (indent level ++ "Failed producer put " ++ show ps ++ " from " ++ show level)) $
proceed (cont False) s1 >>= \p'-> reduce pairMonads level p' consumer
| l1 == level, CanPut cont <- pCont
= assert (track (indent level ++ "Finish producer " ++ show ps ++ " from " ++ show level)) $
proceed (cont False) s1 >>= \p'-> reduce pairMonads level p' consumer
| l1 < level = assert (track (indent level ++ "Suspend producer " ++ show ps ++ " from " ++ show level)) $
return $ Suspend $ map (delay (\ps'-> reduce pairMonads level ps' consumer)) ps
| otherwise = error (show l1 ++ ">" ++ show level ++ " | producer : " ++ show ps)
reduce pairMonads level producer@Done{} (Suspend cs@(Suspension{targetLevel= l2, state= s2, continuation= cCont} : _))
| l2 == level, Get cont <- cCont
= assert (track (indent level ++ "Finish consumer " ++ show cs ++ " from " ++ show level)) $
proceed (cont Nothing) s2 >>= reduce pairMonads level producer
| l2 < level
= assert (track (indent level ++ "Suspend consumer " ++ show cs ++ " from " ++ show level)) $
return $ Suspend $ map (delay (reduce pairMonads level producer)) cs
| otherwise = error (show l2 ++ ">" ++ show level ++ " | consumer : " ++ show cs)
reduce pairMonads level producer@(Suspend ps@(Suspension{targetLevel= l1, state= s1, continuation= pc} : _))
consumer@(Suspend cs@(Suspension{targetLevel= l2, state= s2, continuation= Get cCont} : _))
| l1 == level && l2 == level, CanPut pCont <- pc
= assert (track (indent level ++ "CanPut Match at " ++ show level ++ " : " ++ show ps ++ " -> " ++ show cs)) $
proceed (pCont True) s1 >>= \p'-> reduce pairMonads level p' consumer
| l1 == level, Put x pCont <- pc
= assert (track (indent level ++ "Match at " ++ show level ++ " : " ++ show ps ++ " -> " ++ show cs)) $
do (p', c') <- pairMonads (assert (track "producer (") $ proceed (pCont True) (synchronizeState s1 s2))
(assert (track ") consumer (") $ proceed (cCont (cast x)) (synchronizeState s2 s1))
assert (track ") combined ->") reduce pairMonads level p' c'
reduce pairMonads level producer@(Suspend ps) consumer@(Suspend cs) = assert (track (indent level ++ "Suspend producer & consumer, "
++ show ps ++ " from " ++ show level ++ " & "
++ show cs ++ " from " ++ show level)) $
keepSuspending ps cs
where keepSuspending (Suspension{targetLevel=level'} : pTail) cs | level' == level = keepSuspending pTail cs
keepSuspending ps (Suspension{targetLevel= level'} : cTail) | level' == level = keepSuspending ps cTail
keepSuspending ps cs = assert (track (indent level ++ "Suspend' producer & consumer, "
++ show ps ++ " from " ++ show level ++ " & "
++ show cs ++ " from " ++ show level)) $
return $ Suspend $
merge (map (\p-> delay (\p'-> reduce pairMonads level p' consumer) p) ps)
(map (delay (reduce pairMonads level producer)) cs)
merge :: [Suspension context m r] -> [Suspension context m r] -> [Suspension context m r]
merge [] l = l
merge l [] = l
merge l1@(h1@Suspension{targetLevel= level1, state= PipeState _ c1} : tail1)
l2@(h2@Suspension{targetLevel= level2, state= PipeState _ c2} : tail2)
| level1 > level2 = h1 : merge tail1 l2
| level1 < level2 = h2 : merge l1 tail2
| c1 < c2 = h1 : merge tail1 l2
| otherwise = h2 : merge l1 tail2
delay :: Monad m =>
(PipeRendezvous context m r1 -> m (PipeRendezvous context m r2)) -> Suspension context m r1 -> Suspension context m r2
delay f = delay' (\p-> Pipe $ \state-> proceed p state >>= f)
delay' :: (Pipe context m r1 -> Pipe context m r2) -> Suspension context m r1 -> Suspension context m r2
delay' f s@Suspension{description= desc, continuation= Get cont}
= s{description= "delayed " ++ desc, continuation= Get (f . cont)}
delay' f s@Suspension{description= desc, continuation= Put x cont}
= s{description= "delayed " ++ desc, continuation= Put x (f . cont)}
delay' f s@Suspension{description= desc, continuation= CanPut cont}
= s{description= "delayed " ++ desc, continuation= CanPut (f . cont)}
synchronizeState :: PipeState context -> PipeState context -> PipeState context
synchronizeState (PipeState pid1 clock1) (PipeState pid2 clock2) = (PipeState pid1 (max clock1 clock2))
indent 0 = ""
indent n = ' ' : indent (n `div` 2)
get :: forall context x m r. (Monad m, Typeable x) => Source context x -> Pipe context m (Maybe x)
get (Source pid desc) = assert (track (indent pid ++ "Get from " ++ desc ++ "@" ++ show pid)) $
Pipe (\state@(PipeState pid' clock)->
assert (track (indent pid ++ "Get<- " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)) $
return $ Suspend $
[Suspension pid state ("get from " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock) $ Get return])
getSuccess :: forall context x m. (Monad m, Typeable x)
=> Source context x
-> (x -> Pipe context m ())
-> Pipe context m ()
getSuccess source succeed = get source >>= maybe (return ()) succeed
get' :: forall context x m r. (Monad m, Typeable x) => Source context x -> Pipe context m x
get' source = get source >>= maybe (error "get' failed") return
put :: forall context x m r. (Monad m, Typeable x) => Sink context x -> x -> Pipe context m Bool
put (Sink pid desc) x = assert (track (indent pid ++ "Put into " ++ desc ++ "@" ++ show pid)) $
Pipe (\state@(PipeState pid' clock)->
assert (track (indent pid ++ "Put-> " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)) $
return $ Suspend $
[Suspension pid state ("put into " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)
(Put x return)])
canPut :: forall context x m r. (Monad m, Typeable x) => Sink context x -> Pipe context m Bool
canPut (Sink pid desc) = assert (track (indent pid ++ "CanPut into " ++ desc ++ "@" ++ show pid)) $
Pipe (\state@(PipeState pid' clock)->
assert (track (indent pid ++ "CanPut-> " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)) $
return $ Suspend $
[Suspension pid state ("canPut into " ++ desc ++ "@" ++ show pid ++ ":" ++ show clock)
(CanPut return)])
pour :: forall c x m. (Monad m, Typeable x) => Source c x -> Sink c x -> Pipe c m ()
pour source sink = fill'
where fill' = canPut sink >>= flip when (getSuccess source (\x-> put sink x >> fill'))
pourMap :: forall c x y m. (Monad m, Typeable x, Typeable y) => (x -> y) -> Source c x -> Sink c y -> Pipe c m ()
pourMap f source sink = loop
where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> put sink (f x) >> loop))
pourMapMaybe :: forall c x y m. (Monad m, Typeable x, Typeable y) => (x -> Maybe y) -> Source c x -> Sink c y -> Pipe c m ()
pourMapMaybe f source sink = loop
where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> maybe (return False) (put sink) (f x) >> loop))
tee :: (Monad m, Typeable x) => Source c x -> Sink c x -> Sink c x -> Pipe c m ()
tee source sink1 sink2 = distribute
where distribute = do c1 <- canPut sink1
c2 <- canPut sink2
when (c1 && c2)
(get source >>= maybe (return ()) (\x-> put sink1 x >> put sink2 x >> distribute))
putList :: forall x c m. (Monad m, Typeable x) => [x] -> Sink c x -> Pipe c m [x]
putList [] sink = return []
putList l@(x:rest) sink = put sink x >>= cond (putList rest sink) (return l)
getList :: forall x c m. (Monad m, Typeable x) => Source c x -> Pipe c m [x]
getList source = get source >>= maybe (return []) (\x-> liftM (x:) (getList source))
consumeAndSuppress :: forall x c m. (Monad m, Typeable x) => Source c x -> Pipe c m ()
consumeAndSuppress source = get source
>>= maybe (return ()) (const (consumeAndSuppress source))
cond :: a -> a -> Bool -> a
cond x y test = if test then x else y
whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a]
whenNull action list = if null list then action else return list
track :: String -> Bool
track message = True
putQueue :: forall c m x. (Monad m, Typeable x) => Seq x -> Sink c x -> Pipe c m [x]
putQueue q sink = putList (toList (viewl q)) sink