{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE ImpredicativeTypes #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} module Pipes.Fluid.ImpulseIO ( ImpulseIO(..) , module Pipes.Fluid.Merge ) where import Control.Applicative import qualified Control.Concurrent.Async.Lifted.Safe as A import qualified Control.Concurrent.STM as S import Control.Lens import Control.Monad.Base import Control.Monad.Trans.Class import Control.Monad.Trans.Control import Data.Constraint.Forall (Forall) import Data.Semigroup import Data.These import qualified Pipes as P import Pipes.Fluid.Merge import qualified Pipes.Prelude as PP -- | The applicative instance of this combines multiple Producers reactively -- ie, yields a value as soon as either or both of the input producers yields a value. -- This creates two threads each time this combinator is used. -- Warning: This means that the monadic effects are run in isolation from each other -- so if the monad is something like (StateT s IO), then the state will alternate -- between the two input producers, which is most likely not what you want. newtype ImpulseIO m a = ImpulseIO { impulsivelyIO :: P.Producer a m () } makeWrapped ''ImpulseIO instance (MonadBaseControl IO m, Forall (A.Pure m), Semigroup a) => Semigroup (ImpulseIO m a) where (<>) = mergeDiscrete instance (MonadBaseControl IO m, Forall (A.Pure m), Semigroup a) => Monoid (ImpulseIO m a) where mempty = ImpulseIO $ pure () mappend = mergeDiscrete instance Monad m => Functor (ImpulseIO m) where fmap f (ImpulseIO as) = ImpulseIO $ as P.>-> PP.map f instance (MonadBaseControl IO m, Forall (A.Pure m)) => Applicative (ImpulseIO m) where pure = ImpulseIO . P.yield -- 'ap' doesn't know about initial values fs <*> as = ImpulseIO $ P.for (impulsivelyIO $ merge fs as) $ \r -> case r of Coupled _ f a -> P.yield $ f a -- never got anything from one of the signals, can't do anything yet. -- drop the event LeftOnly _ _ -> pure () RightOnly _ _-> pure () -- | Reactively combines two producers, given initial values to use when the produce hasn't produced anything yet -- Combine two signals, and returns a signal that emits -- @Either bothfired (Either (leftFired, previousRight) (previousLeft, rightFired))@. -- This creates two threads each time this combinator is used. -- Warning: This means that the monadic effects are run in isolation from each other -- so if the monad is something like (StateT s IO), then the state will alternate -- between the two input producers, which is most likely not what you want. -- This will be detect as a compile error due to use of Control.Concurrent.Async.Lifted.Safe instance (MonadBaseControl IO m, Forall (A.Pure m)) => Merge (ImpulseIO m) where merge' px_ py_ (ImpulseIO xs_) (ImpulseIO ys_) = ImpulseIO $ do ax <- lift $ A.async $ P.next xs_ ay <- lift $ A.async $ P.next ys_ doMergeIO px_ py_ ax ay where doMergeIO :: (MonadBaseControl IO m, Forall (A.Pure m)) => Maybe x -> Maybe y -> A.Async (Either () (x, P.Producer x m ())) -> A.Async (Either () (y, P.Producer y m ())) -> P.Producer (Merged x y) m () doMergeIO px py ax ay = do r <- lift $ liftBase . S.atomically $ bothOrEither (A.waitSTM ax) (A.waitSTM ay) case r of -- both @ax@ and @ay@ have ended These (Left _) (Left _) -> pure () -- @ax@ ended, @ay@ still waiting This (Left _) -> do ry <- lift $ A.wait ay -- wait for @ay@ to return and then -- only use @ys@ case ry of Left _ -> pure () Right (y, ys') -> case px of Nothing -> do P.yield $ RightOnly OtherDead y ys' P.>-> PP.map (RightOnly OtherDead) Just x -> do P.yield $ Coupled (FromRight OtherDead) x y ys' P.>-> PP.map (Coupled (FromRight OtherDead) x) -- @ax@ still waiting, @ay@ ended That (Left _) -> do rx <- lift $ A.wait ax -- wait for @ax@ to retrun and then -- only use @xs@ case rx of Left _ -> pure () Right (x, xs') -> case py of Nothing -> do P.yield $ LeftOnly OtherDead x xs' P.>-> PP.map (LeftOnly OtherDead) Just y -> do P.yield $ Coupled (FromLeft OtherDead) x y xs' P.>-> PP.map (\x' -> Coupled (FromLeft OtherDead) x' y) -- @ax@ produced something, @ay@ still waiting This (Right (x, xs')) -> do case py of Nothing -> P.yield $ LeftOnly OtherLive x Just y -> P.yield $ Coupled (FromLeft OtherLive) x y ax' <- lift $ A.async $ P.next xs' doMergeIO (Just x) py ax' ay -- @ax@ still waiting, @ay@ produced something That (Right (y, ys')) -> do case px of Nothing -> P.yield $ RightOnly OtherLive y Just x -> P.yield $ Coupled (FromRight OtherLive) x y ay' <- lift $ A.async $ P.next ys' doMergeIO px (Just y) ax ay' -- @ax@ produced something, @ay@ ended These (Right (x, xs')) (Left _) -> case py of Nothing -> do P.yield $ LeftOnly OtherDead x xs' P.>-> PP.map (LeftOnly OtherDead) Just y -> do P.yield $ Coupled (FromLeft OtherDead) x y xs' P.>-> PP.map (\x' -> Coupled (FromLeft OtherDead) x' y) -- @af@ ended, @aa@ produced something These (Left _) (Right (y, ys')) -> case px of Nothing -> do P.yield $ RightOnly OtherDead y ys' P.>-> PP.map (RightOnly OtherDead) Just x -> do P.yield $ Coupled (FromRight OtherDead) x y ys' P.>-> PP.map (Coupled (FromRight OtherDead) x) -- both @fs@ and @as@ produced something These (Right (x, xs')) (Right (y, ys')) -> do P.yield $ Coupled FromBoth x y ax' <- lift $ A.async $ P.next xs' ay' <- lift $ A.async $ P.next ys' doMergeIO (Just x) (Just y) ax' ay' -- | Used internally by Impulse and ImpulseIO identifying which side (or both) returned values bothOrEither :: Alternative f => f a -> f b -> f (These a b) bothOrEither left right = (These <$> left <*> right) <|> (This <$> left) <|> (That <$> right)