-- | -- Module: FRP.NetWire.Concurrent -- Copyright: (c) 2011 Ertugrul Soeylemez -- License: BSD3 -- Maintainer: Ertugrul Soeylemez -- -- Wire concurrency. Send signals through multiple wires concurrently. -- This module is *highly experimental* and subject to change entirely -- in future revisions. Please use it with care. module FRP.NetWire.Concurrent ( -- * Combining wires (~*~), (~&~), (~+~) ) where import Control.Applicative import Control.Arrow import Control.Concurrent import Control.Concurrent.STM import Control.DeepSeq import FRP.NetWire.Tools import FRP.NetWire.Wire -- | Concurrent version of '(***)'. Passes its input signals to both -- argument wires concurrently. (~*~) :: Wire IO a c -> Wire IO b d -> Wire IO (a, b) (c, d) w1' ~*~ w2' = mkGen $ \ws (x', y') -> do (xVar, thr1) <- forkWire w1' ws x' (yVar, thr2) <- forkWire w2' ws y' (mx, w1) <- takeMVar xVar (my, w2) <- takeMVar yVar mapM_ killThread [thr1, thr2] return (liftA2 (,) mx my, w1 ~*~ w2) infixr 3 ~*~ -- | Concurrent version of '(&&&)'. Passes its input signal to both -- argument wires concurrently. (~&~) :: Wire IO a b -> Wire IO a c -> Wire IO a (b, c) w1' ~&~ w2' = arr dup >>> w1' ~*~ w2' infixr 3 ~&~ -- | Concurrent version of '(<+>)'. Passes its input signal to both -- argument wires concurrently, returning the result of the first wire -- which does not inhibit. (~+~) :: NFData b => Wire IO a b -> Wire IO a b -> Wire IO a b w1' ~+~ w2' = mkGen $ \ws x' -> do x1Var <- newEmptyTMVarIO x2Var <- newEmptyTMVarIO thr1 <- forkIO (toGen w1' ws x' >>= atomically . putTMVar x1Var) thr2 <- forkIO (toGen w2' ws x' >>= atomically . putTMVar x2Var) let res1 = do (mx, w1) <- takeTMVar x1Var; check (isRight mx); return (mx, w1 ~+~ w2') res2 = do (mx, w2) <- takeTMVar x2Var; check (isRight mx); return (mx, w1' ~+~ w2) noRes = do (mx1, w1) <- takeTMVar x1Var (mx2, w2) <- takeTMVar x2Var check (isLeft mx1 && isLeft mx2) return (mx2, w1 ~+~ w2) atomically (res1 <|> res2 <|> noRes) <* mapM_ killThread [thr1, thr2] -- | Pass the given input to the given wire concurrently. forkWire :: Wire IO a b -> WireState IO -> a -> IO (MVar (Output b, Wire IO a b), ThreadId) forkWire w' ws x' = do resultVar <- newEmptyMVar thr <- forkIO (toGen w' ws x' >>= putMVar resultVar) return (resultVar, thr) -- | Is this a left value? isLeft :: Either e a -> Bool isLeft = either (const True) (const False) -- | Is this a right value? isRight :: Either e a -> Bool isRight = either (const False) (const True)