module Control.Wire.Trans.Fork
(
WFork(..),
WireMgr,
startWireMgr,
stopWireMgr,
withWireMgr,
WireChan,
feedWireChan,
readWireChan,
WireThread,
killWireThread
)
where
import qualified Data.Map as M
import Control.Applicative
import Control.Arrow
import Control.Concurrent.Lifted
import Control.Concurrent.STM
import Control.Exception.Lifted
import Control.Monad
import Control.Monad.Fix
import Control.Monad.Trans.Control
import Control.Monad.Trans
import Control.Wire.Types
import Data.Map (Map)
import Data.Monoid
class Arrow (>~) => WFork (>~) where
feedWire :: Wire e (>~) (WireChan a b, a) ()
forkWire :: Wire e (>~) (Wire e (>~) a b, WireMgr)
(WireChan a b, WireThread)
queryWire :: Monoid e => Wire e (>~) (WireChan a b) b
instance (MonadBaseControl IO m, MonadIO m) => WFork (Kleisli m) where
feedWire =
mkFixM $ \(wc, x') -> do
let ichan = wcInputChan wc
liftIO . atomically $ writeTChan ichan x'
return (Right ())
forkWire =
mkFixM $ \(thrW, mgr) -> do
ichan <- liftIO newTChanIO
ochan <- liftIO newTChanIO
doneVar <- liftIO (newTVarIO False)
quitVar <- liftIO (newTVarIO False)
let wc = WireChan { wcInputChan = ichan,
wcOutputChan = ochan }
mgrOp mgr $ do
tid <- fork (thread ichan ochan quitVar doneVar thrW)
let wt = WireThread { wtDoneVar = doneVar,
wtThreadId = tid,
wtQuitVar = quitVar }
let thrsVar = wmThrsVar mgr
liftIO . atomically $ do
thrs <- readTVar thrsVar
writeTVar thrsVar (M.insert tid wt thrs)
return (Right (wc, wt))
where
thread ichan ochan quitVar doneVar =
fix $ \loop w' -> do
mx' <- liftIO . atomically $
Just <$> readTChan ichan <|>
Nothing <$ (readTVar quitVar >>= check)
case mx' of
Just x' -> do
(mx, w) <- toGenM w' x'
either (const $ return ()) (liftIO . atomically . writeTChan ochan) mx
loop w
Nothing -> do
liftIO (atomically $ writeTVar doneVar True)
queryWire =
mkFixM $ \wc -> do
let ochan = wcOutputChan wc
liftIO . atomically $
Right <$> readTChan ochan <|>
return (Left mempty)
data WireChan a b =
WireChan {
wcInputChan :: !(TChan a),
wcOutputChan :: !(TChan b)
}
data WireMgr =
WireMgr {
wmFreeVar :: !(TVar Bool),
wmThrsVar :: !(TVar (Map ThreadId WireThread))
}
data WireThread
= WireThread {
wtDoneVar :: !(TVar Bool),
wtThreadId :: !ThreadId,
wtQuitVar :: !(TVar Bool)
}
feedWireChan :: WireChan a b -> a -> IO ()
feedWireChan (wcInputChan -> ichan) = atomically . writeTChan ichan
killWireThread :: WireMgr -> WireThread -> IO ()
killWireThread mgr thr = do
let WireThread { wtDoneVar = doneVar,
wtThreadId = tid,
wtQuitVar = quitVar } = thr
thrsVar = wmThrsVar mgr
mgrOp mgr $ do
thrs <- readTVarIO thrsVar
atomically (writeTVar quitVar True)
atomically $ do
readTVar doneVar >>= check
writeTVar thrsVar (M.delete tid thrs)
mgrOp :: (MonadBaseControl IO m, MonadIO m) => WireMgr -> m a -> m a
mgrOp mgr c = do
let freeVar = wmFreeVar mgr
liftIO . atomically $ do
readTVar freeVar >>= check
writeTVar freeVar False
c `finally` liftIO (atomically $ writeTVar freeVar True)
readWireChan :: WireChan a b -> IO b
readWireChan (wcOutputChan -> ochan) = atomically (readTChan ochan)
startWireMgr :: IO WireMgr
startWireMgr = do
freeVar <- newTVarIO True
thrsVar <- newTVarIO M.empty
return WireMgr { wmFreeVar = freeVar,
wmThrsVar = thrsVar }
stopWireMgr :: WireMgr -> IO ()
stopWireMgr mgr =
mgrOp mgr $ do
let thrsVar = wmThrsVar mgr
thrs <- fmap M.assocs (readTVarIO thrsVar)
forM_ thrs $ \(_, wtQuitVar -> quitVar) ->
atomically (writeTVar quitVar True)
forM_ thrs $ \(tid, wtDoneVar -> doneVar) -> do
atomically (readTVar doneVar >>= check)
killThread tid
atomically (writeTVar thrsVar M.empty)
withWireMgr :: (MonadBaseControl IO m, MonadIO m) => (WireMgr -> m a) -> m a
withWireMgr k = do
mgr <- liftIO startWireMgr
k mgr `finally` liftIO (stopWireMgr mgr)