{-# LANGUAGE BangPatterns #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE RankNTypes #-} -- | -- Module : Data.Conduino.Combinators -- Copyright : (c) Justin Le 2019 -- License : BSD3 -- -- Maintainer : justin@jle.im -- Stability : experimental -- Portability : non-portable -- -- A basic collection of base 'Pipe's that serve as a "prelude" for the -- package. This module is meant to be imported qualified. -- -- > import qualified Data.Conduino.Combinators as C -- module Data.Conduino.Combinators ( -- * Sources -- ** Pure -- *** Infinite unfold , iterate , repeat -- *** Finite , unfoldMaybe , unfoldEither , iterateMaybe , iterateEither , sourceList , replicate -- ** Monadic -- *** Infinite , repeatM -- *** Finite , repeatMaybeM , repeatEitherM , replicateM , sourceHandleLines , stdinLines , sourceHandle , stdin -- * Pipes , map , mapM , iterM , scan , mapAccum , take , takeWhile , filter , concatMap , concat , pairs , consecutive -- * Sinks -- ** Pure , drop , dropWhile , foldr , foldl , foldMap , fold , sinkNull , sinkList , last -- ** Monadic , sinkHandle , stdout , stderr ) where import Control.Applicative import Control.Exception import Control.Monad hiding (mapM, replicateM) import Control.Monad.IO.Class import Control.Monad.Trans.Class import Control.Monad.Trans.Maybe import Data.Conduino import Data.Either import Data.Foldable hiding (foldr, foldl, fold, concat, concatMap, foldMap) import Data.Maybe import Data.Semigroup import Prelude hiding (map, iterate, mapM, replicate, repeat, foldr, drop, foldl, last, take, concatMap, filter, concat, takeWhile, dropWhile, foldMap) import System.IO.Error import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy.Internal as BSL import qualified Data.Sequence as Seq import qualified System.IO as S -- | A version of 'unfoldMaybe' that can choose the "result" value by -- passing it in as 'Left'. unfoldEither :: (s -> Either a (o, s)) -> s -> Pipe i o u m a unfoldEither f = go where go z = case f z of Left r -> pure r Right (x, z') -> yield x *> go z' -- | A version of 'unfold' that can terminate and end by returning -- 'Nothing'. unfoldMaybe :: (s -> Maybe (o, s)) -> s -> Pipe i o u m () unfoldMaybe f = unfoldEither (maybe (Left ()) Right . f) -- | Repeatedly apply an "unfolding" function to a given initial state, -- yielding the first item in the tuple as output and updating the state as -- the second item in the tuple. Goes on forever. See 'unfoldMaybe' for -- a version that stops. unfold :: (s -> (o, s)) -> s -> Pipe i o u m a unfold f = go where go z = yield x *> go z' where (x, z') = f z -- | A version of 'iterateMaybe' that can specify a result value by -- providing it in the 'Left'. iterateEither :: (o -> Either a o) -> o -> Pipe i o u m a iterateEither f = unfoldEither (fmap (join (,)) . f) -- | A version of 'iterate' that can choose to terminate and stop by -- returning 'Nothing'. iterateMaybe :: (o -> Maybe o) -> o -> Pipe i o u m () iterateMaybe f = unfoldMaybe (fmap (join (,)) . f) -- | Repeatedly apply a function to a given starting value and yield each -- result forever. -- -- >>> runPipePure $ iterate succ 0 -- .| take 5 -- .| sinkList -- -- [1,2,3,4,5] -- -- This doesn't yield the original starting value. However, you can yield -- it iterate after: -- -- >>> runPipePure $ (yield 0 >> iterate succ 0) -- .| take 5 -- .| sinkList -- -- [0,1,2,3,4,5] iterate :: (o -> o) -> o -> Pipe i o u m a iterate f = unfold (join (,) . f) -- | Yield every item in a foldable container. sourceList :: Foldable t => t a -> Pipe i a u m () sourceList = traverse_ yield -- | Repeatedly yield a given item forever. repeat :: o -> Pipe i o u m a repeat = forever . yield -- | Yield a given item a certain number of times. replicate :: Int -> o -> Pipe i o u m () replicate n = replicateM_ n . yield -- | Like 'repeatMaybeM', but allow specification of a final result type. repeatEitherM :: Monad m => m (Either a o) -> Pipe i o u m a repeatEitherM x = go where go = lift x >>= \case Left r -> pure r Right y -> yield y *> go -- | Repeat a monadic action, yielding the item in the 'Just' every time. -- As soon as it sees 'Nothing', stop producing forever. -- -- Remember that each item will only be "executed" when something -- downstream requests output. repeatMaybeM :: Monad m => m (Maybe o) -> Pipe i o u m () repeatMaybeM = repeatEitherM . fmap (maybe (Left ()) Right) -- | Repeat a monadic action a given number of times, yielding each result, -- and then stop producing forever. -- -- Remember that each item will only be "executed" when something -- downstream requests output. replicateM :: Monad m => Int -> m o -> Pipe i o u m () replicateM n x = replicateM_ n $ lift x >>= yield -- | Source from each line received from 'stdin'. This stops as soon as -- end-of-file is reached, or an empty line is seen. stdinLines :: MonadIO m => Pipe i String u m () stdinLines = sourceHandleLines S.stdin -- | Source from stdin, yielding bytestrings as they are drawn. If you -- want to retrieve each line as a string, see 'stdinLines'. stdin :: MonadIO m => Pipe i BS.ByteString u m () stdin = sourceHandle S.stdin -- | Source from a given I/O handle, yielding each line drawn as a string. -- To draw raw bytes, use 'sourceHandle'. -- -- This stop as soon as end-of-file is reached, or an empty line is seen. sourceHandleLines :: MonadIO m => S.Handle -> Pipe i String u m () sourceHandleLines h = repeatMaybeM $ do d <- liftIO $ S.hIsEOF h if d then pure Nothing else liftIO . catchJust (guard . isEOFError) (mfilter (not . null) . Just <$> S.hGetLine h) $ \_ -> pure Nothing -- | Source from a given I/O handle, yielding bytestrings as they are -- pulled. If you want to retrieve each line as a string, see -- 'sourceHandleLines'. sourceHandle :: MonadIO m => S.Handle -> Pipe i BS.ByteString u m () sourceHandle h = repeatMaybeM . fmap (mfilter (not . BS.null) . Just) . liftIO $ BS.hGetSome h BSL.defaultChunkSize -- | Sink into a given I/O handle, writing each input to the handle. sinkHandle :: MonadIO m => S.Handle -> Pipe BS.ByteString o u m () sinkHandle h = mapM (liftIO . BS.hPut h) .| sinkNull -- | A sink into stdout. stdout :: MonadIO m => Pipe BS.ByteString o u m () stdout = sinkHandle S.stdout -- | A sink into stderr. stderr :: MonadIO m => Pipe BS.ByteString o u m () stderr = sinkHandle S.stderr -- | Repeat a monadic action forever, yielding each output. -- -- Remember that each item will only be "executed" when something -- downstream requests output. repeatM :: Monad m => m o -> Pipe i o u m a repeatM x = go where go = (yield =<< lift x) *> go -- | Process every incoming item with a pure function, and yield its -- output. map :: (i -> o) -> Pipe i o u m u map f = awaitForever (yield . f) -- | Map a monadic function to process every input, and yield its output. mapM :: Monad m => (i -> m o) -> Pipe i o u m u mapM f = awaitForever ((yield =<<) . lift . f) -- | Execute a monadic function to process every input, passing through the -- original value back downstream. -- -- @since 0.2.1.0 iterM :: Monad m => (i -> m ()) -> Pipe i i u m u iterM f = mapM (\x -> x <$ f x) -- | Map a pure "stateful" function over each incoming item. Give -- a function to update the state and return an output and an initial -- state. mapAccum :: (i -> s -> (s, o)) -- ^ update state and output -> s -- ^ initial state -> Pipe i o u m u mapAccum f = go where go !x = awaitWith $ \y -> let (!x', !z) = f y x in yield z *> go x' -- | Like 'foldl', but yields every accumulator value downstream. -- -- >>> runPipePure $ sourceList [1..10] -- .| scan (+) 0 -- .| sinkList -- [1,3,6,10,15,21,28,36,45,55] -- @ scan :: (o -> i -> o) -> o -> Pipe i o u m u scan f = go where go !x = awaitWith $ \y -> let x' = f x y in yield x' *> go x' -- | Yield consecutive pairs of values. -- -- >>> runPipePure $ sourceList [1..5] -- .| pairs -- .| sinkList -- [(1,2),(2,3),(3,4),(4,5)] pairs :: Pipe i (i, i) u m u pairs = awaitWith go where go x = awaitWith $ \y -> do yield (x, y) go y -- | Yield consecutive runs of at most @n@ of values, starting with an -- empty sequence. -- -- To get only "full" sequences, pipe with 'filter'. -- -- >>> runPipePure $ sourceList [1..6] -- .| consecutive 3 -- .| map toList -- .| sinkList -- [[],[1],[1,2],[1,2,3],[2,3,4],[3,4,5],[4,5,6]] -- -- >>> runPipePure $ sourceList [1..6] -- .| consecutive 3 -- .| filter ((== 3) . Seq.length) -- .| map toList -- .| sinkList -- [[1,2,3],[2,3,4],[3,4,5],[4,5,6]] consecutive :: Int -> Pipe i (Seq.Seq i) u m u consecutive n = go Seq.empty where go xs = do yield xs awaitWith $ \y -> go . Seq.drop (Seq.length xs - n + 1) $ (xs Seq.:|> y) -- | Let a given number of items pass through the stream uninhibited, and -- then stop producing forever. -- -- This is most useful if you sequence a second conduit after it. -- -- >>> runPipePure $ sourceList [1..8] -- .| (do take 3 .| map (*2) -- double the first 3 items -- map negate -- negate the rest -- ) -- .| sinkList -- [2,4,6,-4,-5,-6,-7,-8] take :: Int -> Pipe i i u m () take n = void . runMaybeT . replicateM_ n $ lift . yield =<< MaybeT await -- | Let elements pass until an element is received that does not satisfy -- the predicate, then stop producing forever. -- -- Like 'take', is most useful if you sequence a second conduit after it. takeWhile :: (i -> Bool) -> Pipe i i u m () takeWhile p = go where go = await >>= \case Nothing -> pure () Just x | p x -> yield x *> go | otherwise -> pure () -- | Only allow values satisfying a predicate to pass. filter :: (i -> Bool) -> Pipe i i u m u filter p = awaitForever $ \x -> when (p x) $ yield x -- | Map a function returning a container onto every incoming item, and -- yield all of the outputs from that function. concatMap :: Foldable t => (i -> t o) -> Pipe i o u m u concatMap f = awaitForever (sourceList . f) -- | Take an input of containers and output each of their elements -- successively. concat :: Foldable t => Pipe (t i) i u m u concat = awaitForever sourceList -- | Right-fold every input into an accumulated value. -- -- Essentially this builds up a giant continuation that will be run all at -- once on the final result. foldr :: (a -> b -> b) -> b -> Pipe a o u m b foldr f z = go where go = await >>= \case Nothing -> pure z Just x -> f x <$> go -- | Left-fold every input into an accumulated value. -- -- Essentially this maintains a state and modifies that state with every -- input, using the given accumulating function. foldl :: (b -> a -> b) -> b -> Pipe a o u m b foldl f = go where go !z = await >>= \case Nothing -> pure z Just !x -> go (f z x) -- | Fold every incoming item monoidally, and return the result once -- finished. fold :: Monoid a => Pipe a o u m a fold = foldl (<>) mempty -- | Fold every incoming item according to a monoidal projection, and -- return the result once finished. -- -- This can be used to implement many useful consumers, like ones that find -- the sum or the maximum item: -- -- @ -- sum :: Num i => Pipe i o u m i -- sum = getSum <$> foldMap Sum -- -- maximum :: Ord i => Pipe i o u m (Maybe i) -- maximum = fmap getMax <$> foldMap (Just . Max) -- @ foldMap :: Monoid a => (i -> a) -> Pipe i o u m a foldMap f = foldl (\x y -> x <> f y) mempty -- | Sink every incoming item into a list. -- -- Note that this keeps the entire list in memory until it is all -- eventually read. sinkList :: Pipe i o u m [i] sinkList = foldr (:) [] -- | Ignore a certain amount of items from the input stream, and then stop -- producing forever. -- -- This is most useful if you sequence a second consumer after it: -- -- >>> runPipePure $ sourceList [1..8] -- .| (drop 3 >> 'sinkList') -- [4,5,6,7,8] drop :: Int -> Pipe i o u m () drop n = replicateM_ n await -- | Ignore items from an input stream as long as they match a predicate. -- Afterwards, stop producing forever. -- -- Like for 'drop', is most useful of you sequence a second consumer after -- it. dropWhile :: (i -> Bool) -> Pipe i o u m () dropWhile p = go where go = await >>= \case Nothing -> pure () Just x | p x -> go | otherwise -> pure () -- | Consume an entire input stream and ignore all of its outputs. sinkNull :: Pipe i o u m () sinkNull = await >>= \case Nothing -> pure () Just _ -> sinkNull -- | Get the last item emitted by a stream. -- -- To get the first item ("head"), use 'await' or 'awaitSurely'. last :: Pipe i o u m (Maybe i) last = fmap getLast <$> foldMap (Just . Last)