{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE EmptyDataDecls #-}
{-# LANGUAGE CPP #-}
module Pipes.Transduce.Internal where
import Data.Bifunctor
import qualified Data.Semigroup as S
import Data.Void
import Data.Foldable
import Control.Applicative.Lift
import Control.Monad
import Control.Monad.Trans.Except
import Control.Monad.Trans.Free hiding (Pure)
import qualified Control.Foldl as Foldl
import Control.Concurrent (newMVar,withMVar)
import Control.Concurrent.Conceit
import Control.Exception
import Pipes
import Pipes.Lift (distribute)
import Pipes.Prelude (drain)
import qualified Pipes.Prelude as Pipes
import qualified Pipes.Group as Pipes
import qualified Pipes.Parse
import Pipes.Concurrent
import Pipes.Safe (SafeT, runSafeT)
import Streaming (Stream,Of)
import qualified Streaming.Prelude as Streaming
import Lens.Micro
newtype Fold1 b e a = Fold1 { runFold1 :: Lift (Fold1_ b e) a } deriving (Functor)
data Fold1_ b e a =
TrueFold (Foldl.FoldM (ExceptT e IO) b a)
| ExhaustiveCont (forall r. Producer b IO r -> IO (Either e (a,r)))
| NonexhaustiveCont (Producer b IO () -> IO (Either e a))
deriving (Functor)
instance Applicative (Fold1 b e) where
pure a = Fold1 (pure a)
Fold1 fa <*> Fold1 a = Fold1 (fa <*> a)
instance Applicative (Fold1_ b e) where
pure a = ExhaustiveCont (\producer -> do
r <- runEffect (producer >-> Pipes.drain)
pure (Right (a,r)))
TrueFold f1 <*> TrueFold f2 = TrueFold (f1 <*> f2)
s1 <*> s2 = bifurcate (nonexhaustiveCont s1) (nonexhaustiveCont s2)
where
bifurcate fs as = ExhaustiveCont (\producer -> do
(outbox1,inbox1,seal1) <- spawn' (bounded 1)
(outbox2,inbox2,seal2) <- spawn' (bounded 1)
runConceit $
(\f x r -> (f x,r))
<$>
Conceit (fs (fromInput inbox1) `finally` atomically seal1)
<*>
Conceit (as (fromInput inbox2) `finally` atomically seal2)
<*>
(_Conceit $
(runEffect (producer >-> Pipes.tee (toOutput outbox1 *> Pipes.drain)
>-> (toOutput outbox2 *> Pipes.drain)))
`finally` atomically seal1
`finally` atomically seal2))
instance Bifunctor (Fold1_ b) where
bimap f g s = case s of
TrueFold (Foldl.FoldM step start done) -> TrueFold (Foldl.FoldM
(\previous input -> withExceptT f (step previous input))
(withExceptT f start)
(\final -> withExceptT f (fmap g (done final))))
ExhaustiveCont u -> ExhaustiveCont (fmap (liftM (bimap f (bimap g id))) u)
NonexhaustiveCont h -> NonexhaustiveCont (fmap (liftM (bimap f g)) h)
instance Bifunctor (Fold1 b) where
bimap f g (Fold1 s) = Fold1 (case s of
Pure a -> Pure (g a)
Other o -> Other (bimap f g o))
instance (S.Semigroup a) => S.Semigroup (Fold1 b e a) where
s1 <> s2 = (S.<>) <$> s1 <*> s2
#if !(MIN_VERSION_base(4,11,0))
instance (Monoid a,S.Semigroup a) => Monoid (Fold1 b e a) where
#else
instance (Monoid a) => Monoid (Fold1 b e a) where
#endif
mempty = pure mempty
#if !(MIN_VERSION_base(4,11,0))
mappend = (S.<>)
#endif
nonexhaustiveCont :: Fold1_ b e a -> Producer b IO () -> IO (Either e a)
nonexhaustiveCont (TrueFold e) = \producer -> runExceptT (Foldl.impurely Pipes.foldM e (hoist lift producer))
nonexhaustiveCont (ExhaustiveCont e) = \producer -> liftM (fmap fst) (e producer)
nonexhaustiveCont (NonexhaustiveCont u) = u
exhaustiveCont :: Fold1_ b e a -> Producer b IO r -> IO (Either e (a,r))
exhaustiveCont s = case s of
TrueFold e -> \producer ->
runExceptT (Foldl.impurely Pipes.foldM' e (hoist lift producer))
ExhaustiveCont e -> e
NonexhaustiveCont activity -> \producer -> do
(outbox,inbox,seal) <- spawn' (bounded 1)
runConceit $
(,)
<$>
Conceit (activity (fromInput inbox) `finally` atomically seal)
<*>
(_Conceit $
(runEffect (producer >-> (toOutput outbox *> Pipes.drain))
`finally` atomically seal))
withFallibleCont
:: (Producer b IO () -> IO (Either e a))
-> Fold1 b e a
withFallibleCont f = Fold1 (Other (NonexhaustiveCont f))
withFallibleCont'
:: (forall r. Producer b IO r -> IO (Either e (a,r)))
-> Fold1 b e a
withFallibleCont' f = Fold1 (Other (ExhaustiveCont f))
withCont
:: (Producer b IO () -> IO a)
-> Fold1 b e a
withCont aFold = withFallibleCont $ fmap (fmap pure) $ aFold
withCont'
:: (forall r. Producer b IO r -> IO (a,r))
-> Fold1 b e a
withCont' aFold = withFallibleCont' $ fmap (fmap pure) aFold
withStreamCont
:: (Stream (Of b) IO () -> IO a)
-> Fold1 b e a
withStreamCont c = withCont $ c . Streaming.unfoldr Pipes.next
withStreamCont'
:: (forall r. Stream (Of b) IO r -> IO (a, r))
-> Fold1 b e a
withStreamCont' c = withCont' $ c . Streaming.unfoldr Pipes.next
withFallibleStreamCont
:: (Stream (Of b) IO () -> IO (Either e a))
-> Fold1 b e a
withFallibleStreamCont c = withFallibleCont $ c . Streaming.unfoldr Pipes.next
withFallibleStreamCont'
:: (forall r. Stream (Of b) IO r -> IO (Either e (a, r)))
-> Fold1 b e a
withFallibleStreamCont' c = withFallibleCont' $ c . Streaming.unfoldr Pipes.next
withFold :: Foldl.Fold b a -> Fold1 b e a
withFold aFold = Fold1 (Other (TrueFold (Foldl.generalize aFold)))
withFoldIO :: Foldl.FoldM IO b a -> Fold1 b e a
withFoldIO aFold = Fold1 (Other (TrueFold (hoistFold lift aFold)))
hoistFold :: Monad m => (forall a. m a -> n a) -> Foldl.FoldM m i r -> Foldl.FoldM n i r
hoistFold g (Foldl.FoldM step begin done) = Foldl.FoldM (\s i -> g (step s i)) (g begin) (g . done)
withFallibleFold :: Foldl.FoldM (ExceptT e IO) b a -> Fold1 b e a
withFallibleFold aFold = Fold1 (Other (TrueFold aFold))
withConsumer :: Consumer b IO () -> Fold1 b e ()
withConsumer consumer = withCont $ \producer -> runEffect $ producer >-> consumer
withConsumer' :: Consumer b IO Void -> Fold1 b e ()
withConsumer' consumer = withCont' $ \producer -> fmap ((,) ()) $ runEffect $ producer >-> fmap absurd consumer
withConsumerM :: MonadIO m
=> (m () -> IO (Either e a))
-> Consumer b m ()
-> Fold1 b e a
withConsumerM whittle consumer = withFallibleCont $ \producer -> whittle $ runEffect $ (hoist liftIO producer) >-> consumer
withConsumerM' :: MonadIO m
=> (forall r. m r -> IO (Either e (a,r)))
-> Consumer b m Void
-> Fold1 b e a
withConsumerM' whittle consumer = withFallibleCont' $ \producer -> whittle $ runEffect $ (hoist liftIO producer) >-> fmap absurd consumer
withSafeConsumer
:: Consumer b (SafeT IO) Void
-> Fold1 b e ()
withSafeConsumer = withConsumerM' (fmap (\r -> Right ((),r)) . runSafeT)
withFallibleConsumer
:: Consumer b (ExceptT e IO) Void
-> Fold1 b e ()
withFallibleConsumer = withConsumerM' (fmap (fmap (\r -> ((), r))) . runExceptT)
withParser
:: Pipes.Parse.Parser b IO (Either e a)
-> Fold1 b e a
withParser parser = withFallibleCont' $ \producer -> drainage $ Pipes.Parse.runStateT parser producer
where
drainage m = do
(a,leftovers) <- m
r <- runEffect (leftovers >-> Pipes.Prelude.drain)
case a of
Left e -> return (Left e)
Right a' -> return (Right (a',r))
withParserM :: MonadIO m
=> (forall r. m (a,r) -> IO (Either e (c,r)))
-> Pipes.Parse.Parser b m a -> Fold1 b e c
withParserM f parser = withFallibleCont' $ \producer -> f $ drainage $ (Pipes.Parse.runStateT parser) (hoist liftIO producer)
where
drainage m = do
(a,leftovers) <- m
r <- runEffect (leftovers >-> Pipes.Prelude.drain)
return (a,r)
fold1Fallibly :: Fold1 b e a -> Producer b IO r -> IO (Either e (a,r))
fold1Fallibly (Fold1 (unLift -> s)) = exhaustiveCont s
fold1 :: Fold1 b Void a -> Producer b IO r -> IO (a,r)
fold1 (Fold1 (unLift -> s)) = liftM (either absurd id) . exhaustiveCont s
data Transducer x b e a =
M (b -> a)
| F (b -> [a])
| P (forall r. Producer b IO r -> Producer a IO r)
| PE (forall r. Producer b IO r -> Producer a IO (Either e r))
| S (forall r. Producer b IO r -> FreeT (Producer a IO) IO r)
| SE (forall r. Producer b IO r -> FreeT (Producer a IO) IO (Either e r))
instance Functor (Transducer x b e) where
fmap = second
instance Bifunctor (Transducer x b) where
bimap f g s = case s of
M x -> M (g . x)
F x -> F (fmap g . x)
P x -> P (\producer -> for (x producer) (Pipes.yield . g))
PE x -> PE (\producer -> liftM (first f) (for (x producer) (Pipes.yield . g)))
S x -> S (\producer -> transFreeT (\p -> for p (Pipes.yield . g)) (x producer))
SE x -> SE (\producer -> liftM (first f) (transFreeT (\p -> (for p (Pipes.yield . g))) (x producer)))
mapper
:: (a -> b)
-> Transducer Continuous a e b
mapper = M
fallibleMapper
:: (a -> Either e b)
-> Transducer Continuous a e b
fallibleMapper fallible = PE (\producer -> (runExceptT . distribute) (for (hoist lift producer) (\a -> do
case fallible a of
Left e -> lift (throwE e)
Right b -> Pipes.yield b)))
mapperFoldable
:: Foldable f
=> (a -> f b)
-> Transducer Continuous a e b
mapperFoldable f = F (Data.Foldable.toList . f)
mapperEnumerable
:: Enumerable f
=> (a -> f IO b)
-> Transducer Continuous a e b
mapperEnumerable enumerable = P (\producer -> for producer (enumerate . toListT . enumerable))
transducer
:: (forall r. Producer b IO r -> Producer a IO r)
-> Transducer Continuous b e a
transducer = P
fallibleTransducer
:: (forall r. Producer b IO r -> Producer a IO (Either e r))
-> Transducer Continuous b e a
fallibleTransducer = PE
delimit
:: (forall r. Producer a IO r -> FreeT (Producer a' IO) IO r)
-> Transducer Continuous b e a
-> Transducer Delimited b e a'
delimit f t = case t of
M func -> S (\producer -> f (producer >-> Pipes.map func))
F func -> S (\producer -> f (producer >-> Pipes.mapFoldable func))
P g -> S (f . g)
PE g -> SE (f . g)
S g -> S (f . Pipes.concats . g)
SE g -> SE (f . Pipes.concats . g)
transduce1 :: Transducer Continuous b e a -> Fold1 a e r -> Fold1 b e r
transduce1 (M _) (Fold1 (Pure x)) =
Fold1 (Pure x)
transduce1 (M f) (Fold1 (Other s)) = (Fold1 (Other (case s of
TrueFold x -> TrueFold (Foldl.premapM f x)
ExhaustiveCont x -> ExhaustiveCont (\producer -> x (producer >-> Pipes.map f))
NonexhaustiveCont x -> NonexhaustiveCont (\producer -> x (producer >-> Pipes.map f)))))
transduce1 (F _) (Fold1 (Pure x)) =
Fold1 (Pure x)
transduce1 (F f) (Fold1 (Other s)) = (Fold1 (Other (case s of
TrueFold x -> TrueFold (Foldl.handlesM (folding f) x)
ExhaustiveCont x -> ExhaustiveCont (\producer -> x (producer >-> Pipes.mapFoldable f))
NonexhaustiveCont x -> NonexhaustiveCont (\producer -> x (producer >-> Pipes.mapFoldable f)))))
transduce1 (P f) (Fold1 (unLift -> s)) = case s of
NonexhaustiveCont x -> Fold1 (Other (NonexhaustiveCont (x . f)))
_ -> Fold1 (Other (ExhaustiveCont (exhaustiveCont s . f)))
transduce1 (PE f) (Fold1 (exhaustiveCont . unLift -> s)) = do
Fold1 (Other (ExhaustiveCont (\producer -> do
(outbox,inbox,seal) <- spawn' (bounded 1)
runConceit $
(\(r,()) r' -> (r,r'))
<$>
Conceit (s (fromInput inbox) `finally` atomically seal)
<*>
(Conceit $
(runEffect (f producer >-> (toOutput outbox *> Pipes.drain))
`finally` atomically seal)))))
transduce1 (S f) somefold = transduce1 (P (Pipes.concats . f)) somefold
transduce1 (SE f) somefold = transduce1 (PE (Pipes.concats . f)) somefold
groups
:: (forall r. Producer b IO r -> Producer b' IO r)
-> Transducer Delimited a e b
-> Transducer Delimited a e b'
groups f t = case t of
M func -> P (f . (\producer -> producer >-> Pipes.map func))
F func -> P (f . (\producer -> producer >-> Pipes.mapFoldable func))
P g -> P (f . g)
PE g -> PE (f . g)
S g -> S (Pipes.maps f . g)
SE g -> SE (Pipes.maps f . g)
folds
:: Fold1 b Void b'
-> Transducer Delimited a e b
-> Transducer Continuous a e b'
folds somefold t = case t of
M func -> folds somefold (P (\producer -> producer >-> Pipes.map func))
F func -> folds somefold (P (\producer -> producer >-> Pipes.mapFoldable func))
P g -> folds somefold (S (liftF . g))
PE g -> folds somefold (SE (liftF . g))
S g -> P (Pipes.concats . transFreeT ((\action -> lift action >>= (\(b',r) -> Pipes.yield b' >> return r)) . fold1 somefold) . g)
SE g -> PE (Pipes.concats . transFreeT ((\action -> lift action >>= (\(b',r) -> Pipes.yield b' >> return r)) . fold1 somefold) . g)
data Delimited
data Continuous
concats
:: Transducer Delimited a e b
-> Transducer Continuous a e b
concats t = case t of
M func -> M func
F func -> F func
P g -> P g
PE g -> PE g
S g -> P (Pipes.concats . g)
SE g -> PE (Pipes.concats . g)
intercalates
:: Producer b IO ()
-> Transducer Delimited a e b
-> Transducer Continuous a e b
intercalates p t = case t of
M func -> M func
F func -> F func
P g -> P g
PE g -> PE g
S g -> P (Pipes.intercalates p . g)
SE g -> PE (Pipes.intercalates p . g)
newtype Fold2 b1 b2 e a = Fold2 (Lift (Fold2_ b1 b2 e) a) deriving (Functor)
data Fold2_ b1 b2 e a =
First (Fold1_ b1 e a)
| Second (Fold1_ b2 e a)
| Both (forall r1 r2. Producer b1 IO r1 -> Producer b2 IO r2 -> IO (Either e (a,r1,r2))) deriving (Functor)
fold2Fallibly_ :: Fold2_ b1 b2 e a -> Producer b1 IO r1 -> Producer b2 IO r2 -> IO (Either e (a,r1,r2))
fold2Fallibly_ theFold producer1 producer2 = case theFold of
Both f -> f producer1 producer2
First f -> runConceit $
(\(r1,x1) (_,x2) -> (r1,x1,x2))
<$>
Conceit (exhaustiveCont f producer1)
<*>
Conceit (fold1Fallibly (pure ()) producer2)
Second f -> runConceit $
(\(_,x1) (r2,x2) -> (r2,x1,x2))
<$>
Conceit (fold1Fallibly (pure ()) producer1)
<*>
Conceit (exhaustiveCont f producer2)
instance Bifunctor (Fold2 b1 b2) where
bimap f g (Fold2 x) = Fold2 (case x of
Pure a -> Pure (g a)
Other o -> Other (bimap f g o))
instance Bifunctor (Fold2_ b1 b2) where
bimap f g (First s) = First (bimap f g s)
bimap f g (Second s) = Second (bimap f g s)
bimap f g (Both s) = Both (fmap (fmap (fmap (bimap f (\(x1,x2,x3) -> (g x1,x2,x3))))) s)
instance Applicative (Fold2 b1 b2 e) where
pure a = Fold2 (pure a)
Fold2 fa <*> Fold2 a = Fold2 (fa <*> a)
instance Applicative (Fold2_ b1 b2 e) where
pure a = fmap (const a) (separated_ (pure ()) (pure ()))
Both fs <*> Both as = Both (\producer1 producer2 -> do
(outbox1a,inbox1a,seal1a) <- spawn' (bounded 1)
(outbox2a,inbox2a,seal2a) <- spawn' (bounded 1)
(outbox1b,inbox1b,seal1b) <- spawn' (bounded 1)
(outbox2b,inbox2b,seal2b) <- spawn' (bounded 1)
runConceit $
(\(f,(),()) (x,(),()) r1 r2 -> (f x,r1,r2))
<$>
Conceit (fs (fromInput inbox1a) (fromInput inbox1b) `finally` atomically seal1a `finally` atomically seal1b)
<*>
Conceit (as (fromInput inbox2a) (fromInput inbox2b) `finally` atomically seal2a `finally` atomically seal2b)
<*>
(_Conceit $
(runEffect (producer1 >-> Pipes.tee (toOutput outbox1a *> Pipes.drain)
>-> (toOutput outbox2a *> Pipes.drain)))
`finally` atomically seal1a
`finally` atomically seal2a)
<*>
(_Conceit $
(runEffect (producer2 >-> Pipes.tee (toOutput outbox1b *> Pipes.drain)
>-> (toOutput outbox2b *> Pipes.drain)))
`finally` atomically seal1b
`finally` atomically seal2b))
First fs <*> First as = First (fs <*> as)
Second fs <*> Second as = Second (fs <*> as)
First fs <*> Second as = uncurry ($) <$> separated_ fs as
Second fs <*> First as = uncurry (flip ($)) <$> separated_ as fs
First fs <*> Both as = (\(f,()) x -> f x) <$> separated_ fs (pure ()) <*> Both as
Both fs <*> First as = (\f (x,()) -> f x) <$> Both fs <*> separated_ as (pure ())
Second fs <*> Both as = (\((),f) x -> f x) <$> separated_ (pure ()) fs <*> Both as
Both fs <*> Second as = (\f ((),x) -> f x) <$> Both fs <*> separated_ (pure ()) as
instance (S.Semigroup a) => S.Semigroup (Fold2 b1 b2 e a) where
s1 <> s2 = (S.<>) <$> s1 <*> s2
#if !(MIN_VERSION_base(4,11,0))
instance (Monoid a,S.Semigroup a) => Monoid (Fold2 b1 b2 e a) where
#else
instance (Monoid a) => Monoid (Fold2 b1 b2 e a) where
#endif
mempty = pure mempty
#if !(MIN_VERSION_base(4,11,0))
mappend = (S.<>)
#endif
fold2Fallibly :: Fold2 b1 b2 e a -> Producer b1 IO r1 -> Producer b2 IO r2 -> IO (Either e (a,r1,r2))
fold2Fallibly (Fold2 (fold2Fallibly_ . unLift -> s)) = s
fold2 :: Fold2 b1 b2 Void a -> Producer b1 IO r1 -> Producer b2 IO r2 -> IO (a,r1,r2)
fold2 s producer1 producer2 = liftM (either absurd id) (fold2Fallibly s producer1 producer2)
liftFirst :: Fold1 b1 e r1 -> Fold2 b1 b2 e r1
liftFirst (unLift . runFold1 -> f1) = Fold2 (Other (First f1))
liftSecond :: Fold1 b2 e r1 -> Fold2 b1 b2 e r1
liftSecond (unLift . runFold1 -> f1) = Fold2 (Other (Second f1))
separated_ :: Fold1_ b1 e r1 -> Fold1_ b2 e r2 -> Fold2_ b1 b2 e (r1,r2)
separated_ f1 f2 = Both (\producer1 producer2 ->
runConceit $
(\(r1,x1) (r2,x2) -> ((r1,r2),x1,x2))
<$>
Conceit (exhaustiveCont f1 producer1)
<*>
Conceit (exhaustiveCont f2 producer2))
separated :: Fold1 b1 e r1 -> Fold1 b2 e r2 -> Fold2 b1 b2 e (r1,r2)
separated f1 f2 = Fold2 (Other (separated_ (unLift . runFold1 $ f1) (unLift . runFold1 $ f2)))
combined :: Transducer Delimited b1 e x -> Transducer Delimited b2 e x -> Fold1 x e a -> Fold2 b1 b2 e a
combined t1 t2 f = Fold2 (Other (Both (\producer1 producer2 -> do
(outbox, inbox, seal) <- spawn' (bounded 1)
lock <- newMVar outbox
runConceit $
(\(((),r1),((),r2)) (a,()) -> (a,r1,r2))
<$>
Conceit
((runConceit $
(,)
<$>
Conceit (fold1Fallibly (transduce1 (folds (withCont' (iterTLines lock)) t1) (pure ())) producer1)
<*>
Conceit (fold1Fallibly (transduce1 (folds (withCont' (iterTLines lock)) t2) (pure ())) producer2)
) `finally` atomically seal)
<*>
Conceit (fold1Fallibly f (fromInput inbox) `finally` atomically seal))))
where
iterTLines mvar = \textProducer -> fmap (\x -> ((),x)) $ do
withMVar mvar $ \output -> do
runEffect $ textProducer >-> (toOutput output >> Pipes.drain)