module Pipes.Fluid.ReactIO
( ReactIO(..)
, mergeIO
, mergeIO'
) where
import qualified Control.Concurrent.Async.Lifted.Safe as A
import qualified Control.Concurrent.STM as S
import Control.Lens
import Control.Monad.Base
import Control.Monad.Trans.Class
import Control.Monad.Trans.Control
import Data.Constraint.Forall (Forall)
import qualified Pipes as P
import qualified Pipes.Fluid.Alternative as PFA
import qualified Pipes.Prelude as PP
newtype ReactIO m a = ReactIO
{ reactivelyIO :: P.Producer a m ()
}
makeWrapped ''ReactIO
instance Monad m => Functor (ReactIO m) where
fmap f (ReactIO as) = ReactIO $ as P.>-> PP.map f
instance (MonadBaseControl IO m, Forall (A.Pure m)) => Applicative (ReactIO m) where
pure = ReactIO . P.yield
fs <*> as = ReactIO $ P.for (reactivelyIO $ mergeIO fs as) $ \r ->
case r of
Left (f, a) -> P.yield $ f a
Right (Left (f, Just a)) -> P.yield $ f a
Right (Right (Just f, a)) -> P.yield $ f a
Right (Left (_, Nothing)) -> pure ()
Right (Right (Nothing, _)) -> pure ()
mergeIO' :: (MonadBaseControl IO m, Forall (A.Pure m)) =>
Maybe x
-> Maybe y
-> ReactIO m x
-> ReactIO m y
-> ReactIO m (Either (x, y) (Either (x, Maybe y) (Maybe x, y)))
mergeIO' px_ py_ (ReactIO xs_) (ReactIO ys_) = ReactIO $ do
ax <- lift $ A.async $ P.next xs_
ay <- lift $ A.async $ P.next ys_
doMergeIO px_ py_ ax ay
where
doMergeIO :: (MonadBaseControl IO m, Forall (A.Pure m)) =>
Maybe x
-> Maybe y
-> A.Async (Either () (x, P.Producer x m ()))
-> A.Async (Either () (y, P.Producer y m ()))
-> P.Producer (Either (x, y) (Either (x, Maybe y) (Maybe x, y))) m ()
doMergeIO px py ax ay = do
r <-
lift $
liftBase . S.atomically $ PFA.bothOrEither (A.waitSTM ax) (A.waitSTM ay)
case r of
Left (Left _, Left _) -> pure ()
Right (Left (Left _)) -> do
ry <- lift $ A.wait ay
case ry of
Left _ -> pure ()
Right (y', ys') -> do
P.yield $ useRight y'
ys' P.>-> PP.map useRight
Right (Right (Left _)) -> do
rx <- lift $ A.wait ax
case rx of
Left _ -> pure ()
Right (x', xs') -> do
P.yield $ useLeft x'
xs' P.>-> PP.map useLeft
Right (Left (Right (x, xs'))) -> do
P.yield $ useLeft x
ax' <- lift $ A.async $ P.next xs'
doMergeIO (Just x) py ax' ay
Right (Right (Right (y, ys'))) -> do
P.yield $ useRight y
ay' <- lift $ A.async $ P.next ys'
doMergeIO px (Just y) ax ay'
Left (Right (x, xs'), Left _) -> do
P.yield $ useLeft x
xs' P.>-> PP.map useLeft
Left (Left _, Right (y, ys')) -> do
P.yield $ useRight y
ys' P.>-> PP.map useRight
Left (Right (x, xs'), Right (y, ys')) -> do
P.yield $ Left (x, y)
ax' <- lift $ A.async $ P.next xs'
ay' <- lift $ A.async $ P.next ys'
doMergeIO (Just x) (Just y) ax' ay'
where
useRight y = Right (Right (px, y))
useLeft x = Right (Left (x, py))
mergeIO :: (MonadBaseControl IO m, Forall (A.Pure m))
=> ReactIO m x
-> ReactIO m y
-> ReactIO m (Either (x, y) (Either (x, Maybe y) (Maybe x, y)))
mergeIO = mergeIO' Nothing Nothing