module Transient.EVars where
import Transient.Base
import Transient.Internals(runTransState,onNothing, EventF(..), killChildren)
import qualified Data.Map as M
import Data.Typeable
import Control.Concurrent
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Exception(SomeException)
import Data.List(nub)
import Control.Monad.State
data EVar a= EVar Int (TVar (Int,Int)) (TChan (StreamData a)) deriving Typeable
newEVar :: TransIO (EVar a)
newEVar = Transient $ do
id <- genId
rn <- liftIO $ newTVarIO (0,0)
ref <-liftIO newTChanIO
return . Just $ EVar id rn ref
cleanEVar :: EVar a -> TransIO ()
cleanEVar (EVar id rn ref1)= liftIO $ atomically $ do
writeTChan ref1 SDone
writeTVar rn (0,0)
readEVar (EVar id rn ref1)= do
liftIO $ atomically $ readTVar rn >>= \(n,n') -> writeTVar rn $ (n+1,n'+1)
r <- parallel $ atomically $ do
r <- peekTChan ref1
(n,n') <- readTVar rn
if n'> 1 then do
writeTVar rn (n,n'1)
return r
else do
readTChan ref1
writeTVar rn (n,n)
return r
case r of
SDone -> empty
SMore x -> return x
SLast x -> return x
SError e -> do
liftIO . atomically $ readTVar rn >>= \(n,n') -> writeTVar rn $ (n1,n'1)
error $ "readEVar: "++ show e
writeEVar (EVar id rn ref1) x= liftIO $ atomically $ do
writeTChan ref1 $ SMore x
lastWriteEVar (EVar id rn ref1) x= liftIO $ atomically $ do
writeTChan ref1 $ SLast x
type FinishReason= Maybe SomeException
checkFinalize v=
case v of
SDone -> finish Nothing >> stop
SLast x -> return x
SError e -> liftIO ( print e) >> finish Nothing >> stop
SMore x -> return x
data Finish= Finish (EVar FinishReason) deriving Typeable
initFinish :: TransIO Finish
initFinish= do
fin <- newEVar
let f = Finish fin
setData f
return f
onFinish :: (FinishReason ->TransIO ()) -> TransIO ()
onFinish close= do
Finish finish <- getSData <|> initFinish
e <- readEVar finish
close e
stop
<|>
return ()
finish :: FinishReason -> TransIO ()
finish e= do
liftIO $ putStr "finish: " >> print e
Finish finish <- getSData <|> initFinish
lastWriteEVar finish e
unFinish= do
Finish fin <- getSData
cleanEVar fin
<|> return ()
killOnFinish comp= do
chs <- liftIO $ newTVarIO []
onFinish $ const $ do
liftIO $ killChildren chs
r <- comp
modify $ \ s -> s{children= chs}
return r