{-# LANGUAGE NoMonomorphismRestriction, PatternGuards, CPP #-}

module General.Conduit(
    module Data.Conduit, MonadIO, liftIO,
    sourceList, sinkList, sourceLStr,
    mapC, mapAccumC, filterC,
    mapMC, mapAccumMC,
    (|$|), pipelineC, groupOnLastC,
    zipFromC, linesCR
    ) where

import Data.Void
import Data.Conduit
import Data.Conduit.List as C
import Data.Conduit.Binary as C
import Data.Maybe
import Control.Applicative
import Control.Monad.Extra
import Control.Exception
import qualified Data.ByteString.Char8 as BS
import Control.Concurrent.Extra hiding (yield)
import Control.Monad.IO.Class
import General.Str
import Prelude


mapC :: (a -> b) -> ConduitT a b m ()
mapC = (a -> b) -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map
mapMC :: (a -> m b) -> ConduitT a b m ()
mapMC = (a -> m b) -> ConduitT a b m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM
mapAccumC :: (t -> t -> (t, b)) -> t -> ConduitT t b m t
mapAccumC t -> t -> (t, b)
f = (t -> t -> (t, b)) -> t -> ConduitT t b m t
forall (m :: * -> *) a s b.
Monad m =>
(a -> s -> (s, b)) -> s -> ConduitT a b m s
C.mapAccum (\t
x t
a -> t
a t -> (t, b) -> (t, b)
`seq` t -> t -> (t, b)
f t
a t
x)
mapAccumMC :: (t -> t -> m (t, b)) -> t -> ConduitT t b m t
mapAccumMC t -> t -> m (t, b)
f = (t -> t -> m (t, b)) -> t -> ConduitT t b m t
forall (m :: * -> *) a s b.
Monad m =>
(a -> s -> m (s, b)) -> s -> ConduitT a b m s
C.mapAccumM (\t
x t
a -> t
a t -> m (t, b) -> m (t, b)
`seq` t -> t -> m (t, b)
f t
a t
x)
filterC :: (a -> Bool) -> ConduitT a a m ()
filterC = (a -> Bool) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
C.filter

zipFromC :: (Monad m, Enum i) => i -> ConduitM a (i, a) m ()
zipFromC :: i -> ConduitM a (i, a) m ()
zipFromC = ConduitT a (i, a) m i -> ConduitM a (i, a) m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ConduitT a (i, a) m i -> ConduitM a (i, a) m ())
-> (i -> ConduitT a (i, a) m i) -> i -> ConduitM a (i, a) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (i -> a -> (i, (i, a))) -> i -> ConduitT a (i, a) m i
forall (m :: * -> *) t t b.
Monad m =>
(t -> t -> (t, b)) -> t -> ConduitT t b m t
mapAccumC (\i
i a
x -> (i -> i
forall a. Enum a => a -> a
succ i
i, (i
i,a
x)))

(|$|) :: Monad m => ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1,r2)
|$| :: ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1, r2)
(|$|) ConduitM i o m r1
a ConduitM i o m r2
b = ZipConduit i o m (r1, r2) -> ConduitM i o m (r1, r2)
forall i o (m :: * -> *) r. ZipConduit i o m r -> ConduitT i o m r
getZipConduit (ZipConduit i o m (r1, r2) -> ConduitM i o m (r1, r2))
-> ZipConduit i o m (r1, r2) -> ConduitM i o m (r1, r2)
forall a b. (a -> b) -> a -> b
$ (,) (r1 -> r2 -> (r1, r2))
-> ZipConduit i o m r1 -> ZipConduit i o m (r2 -> (r1, r2))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitM i o m r1 -> ZipConduit i o m r1
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit ConduitM i o m r1
a ZipConduit i o m (r2 -> (r1, r2))
-> ZipConduit i o m r2 -> ZipConduit i o m (r1, r2)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ConduitM i o m r2 -> ZipConduit i o m r2
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit ConduitM i o m r2
b

sinkList :: Monad m => ConduitM a o m [a]
sinkList :: ConduitM a o m [a]
sinkList = ConduitM a o m [a]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
consume

-- | Group things while they have the same function result, only return the last value.
--   Conduit version of @groupOnLast f = map last . groupOn f@.
groupOnLastC :: (Monad m, Eq b) => (a -> b) -> ConduitM a a m ()
groupOnLastC :: (a -> b) -> ConduitM a a m ()
groupOnLastC a -> b
op = do
    Maybe a
x <- ConduitT a a m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
    Maybe a -> (a -> ConduitM a a m ()) -> ConduitM a a m ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe a
x ((a -> ConduitM a a m ()) -> ConduitM a a m ())
-> (a -> ConduitM a a m ()) -> ConduitM a a m ()
forall a b. (a -> b) -> a -> b
$ \a
x -> b -> a -> ConduitM a a m ()
forall (m :: * -> *). Monad m => b -> a -> ConduitT a a m ()
f (a -> b
op a
x) a
x
    where
        f :: b -> a -> ConduitT a a m ()
f b
k a
v = ConduitT a a m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a a m (Maybe a)
-> (Maybe a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe a
x -> case Maybe a
x of
            Maybe a
Nothing -> a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
v
            Just a
v2 | let k2 :: b
k2 = a -> b
op a
v2 -> do
                Bool -> ConduitT a a m () -> ConduitT a a m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (b
k b -> b -> Bool
forall a. Eq a => a -> a -> Bool
/= b
k2) (ConduitT a a m () -> ConduitT a a m ())
-> ConduitT a a m () -> ConduitT a a m ()
forall a b. (a -> b) -> a -> b
$ a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
v
                b -> a -> ConduitT a a m ()
f b
k2 a
v2


linesCR :: Monad m => ConduitM BStr BStr m ()
linesCR :: ConduitM BStr BStr m ()
linesCR = ConduitM BStr BStr m ()
forall (m :: * -> *). Monad m => ConduitT BStr BStr m ()
C.lines ConduitM BStr BStr m ()
-> ConduitM BStr BStr m () -> ConduitM BStr BStr m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (BStr -> BStr) -> ConduitM BStr BStr m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC BStr -> BStr
f
    where f :: BStr -> BStr
f BStr
x | Just (BStr
x, Char
'\r') <- BStr -> Maybe (BStr, Char)
BS.unsnoc BStr
x = BStr
x
              | Bool
otherwise = BStr
x

sourceLStr :: Monad m => LBStr -> ConduitM i BStr m ()
sourceLStr :: LBStr -> ConduitM i BStr m ()
sourceLStr = [BStr] -> ConduitM i BStr m ()
forall (m :: * -> *) a i. Monad m => [a] -> ConduitT i a m ()
sourceList ([BStr] -> ConduitM i BStr m ())
-> (LBStr -> [BStr]) -> LBStr -> ConduitM i BStr m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LBStr -> [BStr]
lbstrToChunks


pipelineC :: Int -> ConduitM o Void IO r -> ConduitM o Void IO r
pipelineC :: Int -> ConduitM o Void IO r -> ConduitM o Void IO r
pipelineC Int
buffer ConduitM o Void IO r
sink = do
    QSem
sem <- IO QSem -> ConduitT o Void IO QSem
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QSem -> ConduitT o Void IO QSem)
-> IO QSem -> ConduitT o Void IO QSem
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem Int
buffer  -- how many are in flow, to avoid memory leaks
    Chan (Maybe o)
chan <- IO (Chan (Maybe o)) -> ConduitT o Void IO (Chan (Maybe o))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan (Maybe o))
forall a. IO (Chan a)
newChan          -- the items in flow (type o)
    Barrier r
bar <- IO (Barrier r) -> ConduitT o Void IO (Barrier r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Barrier r)
forall a. IO (Barrier a)
newBarrier        -- the result type (type r)
    ThreadId
me <- IO ThreadId -> ConduitT o Void IO ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
    IO ThreadId -> ConduitT o Void IO ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> ConduitT o Void IO ThreadId)
-> IO ThreadId -> ConduitT o Void IO ThreadId
forall a b. (a -> b) -> a -> b
$ (IO r -> (Either SomeException r -> IO ()) -> IO ThreadId)
-> (Either SomeException r -> IO ()) -> IO r -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO r -> (Either SomeException r -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally ((SomeException -> IO ())
-> (r -> IO ()) -> Either SomeException r -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me) (Barrier r -> r -> IO ()
forall a. Partial => Barrier a -> a -> IO ()
signalBarrier Barrier r
bar)) (IO r -> IO ThreadId) -> IO r -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        ConduitT () Void IO r -> IO r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO r -> IO r) -> ConduitT () Void IO r -> IO r
forall a b. (a -> b) -> a -> b
$
            (ConduitT () o IO Bool -> ConduitT () o IO ()
forall (m :: * -> *). Monad m => m Bool -> m ()
whileM (ConduitT () o IO Bool -> ConduitT () o IO ())
-> ConduitT () o IO Bool -> ConduitT () o IO ()
forall a b. (a -> b) -> a -> b
$ do
                Maybe o
x <- IO (Maybe o) -> ConduitT () o IO (Maybe o)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe o) -> ConduitT () o IO (Maybe o))
-> IO (Maybe o) -> ConduitT () o IO (Maybe o)
forall a b. (a -> b) -> a -> b
$ Chan (Maybe o) -> IO (Maybe o)
forall a. Chan a -> IO a
readChan Chan (Maybe o)
chan
                IO () -> ConduitT () o IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT () o IO ()) -> IO () -> ConduitT () o IO ()
forall a b. (a -> b) -> a -> b
$ QSem -> IO ()
signalQSem QSem
sem
                Maybe o -> (o -> ConduitT () o IO ()) -> ConduitT () o IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe o
x o -> ConduitT () o IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield
                Bool -> ConduitT () o IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> ConduitT () o IO Bool) -> Bool -> ConduitT () o IO Bool
forall a b. (a -> b) -> a -> b
$ Maybe o -> Bool
forall a. Maybe a -> Bool
isJust Maybe o
x) ConduitT () o IO ()
-> ConduitM o Void IO r -> ConduitT () Void IO r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|
            ConduitM o Void IO r
sink
    (o -> ConduitT o Void IO ()) -> ConduitT o Void IO ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((o -> ConduitT o Void IO ()) -> ConduitT o Void IO ())
-> (o -> ConduitT o Void IO ()) -> ConduitT o Void IO ()
forall a b. (a -> b) -> a -> b
$ \o
x -> IO () -> ConduitT o Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT o Void IO ()) -> IO () -> ConduitT o Void IO ()
forall a b. (a -> b) -> a -> b
$ do
        QSem -> IO ()
waitQSem QSem
sem
        Chan (Maybe o) -> Maybe o -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe o)
chan (Maybe o -> IO ()) -> Maybe o -> IO ()
forall a b. (a -> b) -> a -> b
$ o -> Maybe o
forall a. a -> Maybe a
Just o
x
    IO () -> ConduitT o Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT o Void IO ()) -> IO () -> ConduitT o Void IO ()
forall a b. (a -> b) -> a -> b
$ Chan (Maybe o) -> Maybe o -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe o)
chan Maybe o
forall a. Maybe a
Nothing
    IO r -> ConduitM o Void IO r
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO r -> ConduitM o Void IO r) -> IO r -> ConduitM o Void IO r
forall a b. (a -> b) -> a -> b
$ Barrier r -> IO r
forall a. Barrier a -> IO a
waitBarrier Barrier r
bar