module Control.Concurrent.STM.Promise.Tree
(
Tree(..), Label(..),
requireAll,requireAny,tryAll,
evalTree,watchTree,
interleave, interleave',
showTree
) where
import Control.Monad hiding (mapM_)
import Prelude hiding (mapM_, foldr1, concatMap, concat)
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.DTVar
import Control.Concurrent.STM.Promise
import Data.Monoid
import Data.Typeable
import Data.Traversable
import Data.Foldable
import Data.Function
import Data.Maybe
import Data.List (transpose)
data Label
= Both
| Either
deriving (Eq, Ord, Show, Typeable)
data Tree a
= Node Label (Tree a) (Tree a)
| Leaf a
| Recoverable (Tree a)
deriving (Eq, Ord, Show, Typeable, Traversable, Foldable, Functor)
instance Monad Tree where
return = Leaf
Leaf x >>= f = f x
Node l u v >>= f = Node l (u >>= f) (v >>= f)
Recoverable t >>= f = Recoverable (t >>= f)
instance Applicative Tree where
pure = return
(<*>) = ap
ensureNonempty :: String -> [a] -> [a]
ensureNonempty s [] = error $ s ++ ": non-empty list!"
ensureNonempty _ xs = xs
requireAll :: [Tree a] -> Tree a
requireAll = foldr1 (Node Both) . ensureNonempty "requireAll"
requireAny :: [Tree a] -> Tree a
requireAny = foldr1 (Node Either) . ensureNonempty "requireAny"
tryAll :: [Tree a] -> Tree a
tryAll = foldr1 (Node Both) . map Recoverable . ensureNonempty "tryAll"
showTree :: Show a => Tree a -> String
showTree = go (2 :: Int)
where
go _ (Leaf a) = show a
go n (Recoverable t1) = (n < 0) ? par $ "? " ++ go 0 t1
go n (Node Both t1 t2) = (n < 1) ? par $ go 1 t1 ++ " & " ++ go 1 t2
go n (Node Either t1 t2) = (n < 2) ? par $ go 2 t1 ++ " | " ++ go 2 t2
par = ('(':) . (++")")
True ? f = f
False ? _ = id
cancelTree :: Tree (Promise a) -> IO ()
cancelTree = mapM_ cancel
interleave :: Tree a -> [a]
interleave (Leaf a) = return a
interleave (Node Either t1 t2) = interleave t1 /\/ interleave t2
interleave (Node Both t1 t2) = interleave t1 ++ interleave t2
interleave (Recoverable t) = interleave t
(/\/) :: [a] -> [a] -> [a]
(x:xs) /\/ ys = x:(ys /\/ xs)
[] /\/ ys = ys
interleave' :: Tree a -> [a]
interleave' (Leaf a) = return a
interleave' t@(Node Either _ _) = concat . transpose . map interleave $ collect Either t
interleave' t@(Node Both _ _) = concatMap interleave $ collect Both t
interleave' (Recoverable t) = interleave t
collect :: Label -> Tree a -> [Tree a]
collect lbl (Node lbl' t1 t2) | lbl == lbl' = collect lbl t1 ++ collect lbl t2
collect _ t = return t
evalTree :: forall a . Monoid a => (a -> Bool) -> Tree (Promise a) -> IO (a,a)
evalTree failure t00 = do
err_var <- newTVarIO mempty
m <- go_intermediate err_var t00
res <- atomically $ takeTMVar m
errs <- readTVarIO err_var
return (errs,read_result res)
where
read_result :: PromiseResult a -> a
read_result (An a) = a
read_result _ = mempty
go_intermediate err_var = go
where
go :: Tree (Promise a) -> IO (TMVar (PromiseResult a))
go t0 = case t0 of
Leaf u -> forkWrapTMVar $ \ write ->
(write =<<) $ atomically $ do
r <- result u
case r of
Unfinished -> retry
An a | failure a -> do
modifyTVar' err_var (mappend a)
return Cancelled
_ -> return r
Recoverable t -> forkWrapTMVar $ \ write -> do
m <- go t
res <- atomically (takeTMVar m)
case res of
Cancelled -> write (An mempty)
_ -> write res
Node lbl t1 t2 -> forkWrapTMVar $ \ write -> do
let combine = case lbl of
Both -> \ x y -> fmap (uncurry mappend) (bothResults x y)
Either -> eitherResult
m1 <- go t1
m2 <- go t2
res <- atomically $ do
r1 <- fromMaybe Unfinished `liftM` tryTakeTMVar m1
r2 <- fromMaybe Unfinished `liftM` tryTakeTMVar m2
case combine r1 r2 of
Unfinished -> retry
res -> return res
cancelTree t0
write res
forkWrapTMVar :: ((PromiseResult a -> IO ()) -> IO ()) -> IO (TMVar (PromiseResult a))
forkWrapTMVar write_ = do
v <- newEmptyTMVarIO
void $ forkIO $ write_ (atomically . putTMVar v)
return v
watchTree :: Monoid a => (a -> Bool) -> Tree (Promise a) -> IO (TChan a,DTVar (Tree (PromiseResult a)))
watchTree failure t_init = do
ch <- newTChanIO
(,) ch `liftM` go_intermediate ch t_init
where
go_intermediate failure_chan = go
where
go t0 = case t0 of
Leaf u ->
forkWrapTVar (Leaf Unfinished) $ \ write -> atomically $ do
r <- result u
when (isUnfinished r) retry
write . Leaf =<< case r of
An a | failure a -> do
writeTChan failure_chan a
return Cancelled
_ -> return r
Recoverable t -> do
d <- go t
init_tree <- readDTVarIO d
forkWrapTVar (Recoverable init_tree) $ \ write -> fix $ \ loop -> do
r <- listenDTVarIO d
case r of
Leaf Cancelled -> atomically $ write (Leaf (An mempty))
Leaf An{} -> atomically $ write r
_ -> do
atomically $ write (Recoverable r)
loop
Node lbl t1 t2 -> do
let combine = case lbl of
Both -> \ x y -> fmap (uncurry mappend) (bothResults x y)
Either -> eitherResult
d1 <- go t1
d2 <- go t2
init_tree <- liftM2 (Node lbl) (readDTVarIO d1) (readDTVarIO d2)
forkWrapTVar init_tree $ \ write -> fix $ \ loop -> do
[r1,r2] <- listenDTVarsIO [d1,d2]
let r = case (r1,r2) of
(Leaf a,Leaf b) -> combine a b
_ -> Unfinished
if isUnfinished r
then do
atomically $ write (Node lbl r1 r2)
loop
else do
atomically $ write (Leaf r)
cancelTree t0
forkWrapTVar :: Tree (PromiseResult a) -> ((Tree (PromiseResult a) -> STM ()) -> IO ()) ->
IO (DTVar (Tree (PromiseResult a)))
forkWrapTVar init_tree mk = do
v <- newDTVarIO init_tree
void $ forkIO $ mk (writeDTVar v)
return v