-- |
-- Module:     FRP.NetWire.Concurrent
-- Copyright:  (c) 2011 Ertugrul Soeylemez
-- License:    BSD3
-- Maintainer: Ertugrul Soeylemez <es@ertes.de>
--
-- Wire concurrency.  Send signals through multiple wires concurrently.
-- This module is *highly experimental* and subject to change entirely
-- in future revisions.  Please use it with care.

module FRP.NetWire.Concurrent
    ( -- * Combining wires
      (~*~),
      (~&~),
      (~+~)
    )
    where

import Control.Applicative
import Control.Arrow
import Control.Concurrent
import Control.Concurrent.STM
import Control.DeepSeq
import FRP.NetWire.Tools
import FRP.NetWire.Wire


-- | Concurrent version of '***'.  Passes its input signals to both
-- argument wires concurrently.

(~*~) :: Wire IO a c -> Wire IO b d -> Wire IO (a, b) (c, d)
w1' ~*~ w2' =
    mkGen $ \ws (x', y') -> do
        (xVar, thr1) <- forkWire w1' ws x'
        (yVar, thr2) <- forkWire w2' ws y'
        (mx, w1) <- takeMVar xVar
        (my, w2) <- takeMVar yVar
        mapM_ killThread [thr1, thr2]
        return (liftA2 (,) mx my, w1 ~*~ w2)

infixr 3 ~*~


-- | Concurrent version of '&&&'.  Passes its input signal to both
-- argument wires concurrently.

(~&~) :: Wire IO a b -> Wire IO a c -> Wire IO a (b, c)
w1' ~&~ w2' = arr dup >>> w1' ~*~ w2'

infixr 3 ~&~


-- | Concurrent version of '<+>'.  Passes its input signal to both
-- argument wires concurrently, returning the result of the first wire
-- which does not inhibit.

(~+~) :: NFData b => Wire IO a b -> Wire IO a b -> Wire IO a b
w1' ~+~ w2' =
    mkGen $ \ws x' -> do
        x1Var <- newEmptyTMVarIO
        x2Var <- newEmptyTMVarIO
        thr1 <- forkIO (toGen w1' ws x' >>= atomically . putTMVar x1Var)
        thr2 <- forkIO (toGen w2' ws x' >>= atomically . putTMVar x2Var)
        let res1 = do (mx, w1) <- takeTMVar x1Var; check (isRight mx); return (mx, w1 ~+~ w2')
            res2 = do (mx, w2) <- takeTMVar x2Var; check (isRight mx); return (mx, w1' ~+~ w2)
            noRes = do (mx1, w1) <- takeTMVar x1Var
                       (mx2, w2) <- takeTMVar x2Var
                       check (isLeft mx1 && isLeft mx2)
                       return (mx2, w1 ~+~ w2)
        atomically (res1 <|> res2 <|> noRes) <* mapM_ killThread [thr1, thr2]


-- | Pass the given input to the given wire concurrently.

forkWire :: Wire IO a b -> WireState IO -> a -> IO (MVar (Output b, Wire IO a b), ThreadId)
forkWire w' ws x' = do
    resultVar <- newEmptyMVar
    thr <- forkIO (toGen w' ws x' >>= putMVar resultVar)
    return (resultVar, thr)


-- | Is this a left value?

isLeft :: Either e a -> Bool
isLeft = either (const True) (const False)


-- | Is this a right value?

isRight :: Either e a -> Bool
isRight = either (const False) (const True)