module Control.Pipe ( -- * Tutorial -- -- | This library provides a single data type: 'Pipe'. -- -- 'Pipe' is a monad transformer that extends the base monad with the ability -- to 'await' input from or 'yield' output to other 'Pipe's. 'Pipe's resemble -- enumeratees in other libraries because they receive an input stream and -- transform it into a new stream. -- -- Let's introduce our first 'Pipe', which is a verbose version of the Prelude's -- 'take' function: -- -- > take' :: Int -> Pipe a a IO () -- > take' n = do -- > replicateM_ n $ do -- > x <- await -- > yield x -- > lift $ putStrLn "You shall not pass!" -- -- This 'Pipe' allows the first @n@ values it receives to pass through -- undisturbed, then it outputs a cute message and shuts down. Shutdown is -- automatic when you reach the end of the monad. You don't need to send a -- special signal to connected 'Pipe's to let them know you are done handling -- input or generating output. -- -- Let's dissect the above 'Pipe''s type to learn a bit about how 'Pipe's work: -- -- > | Input Type | Output Type | Base monad | Return value -- > Pipe a a IO () -- -- So @take'@ 'await's input of type @a@ from upstream 'Pipe's and 'yield's -- output of type @a@ to downstream 'Pipe's. @take'@ uses 'IO' as its base -- monad because it invokes the 'putStrLn' function. If we remove the call to -- 'putStrLn' the compiler infers the following type instead, which is -- polymorphic in the base monad: -- -- > take' :: (Monad m) => Int -> Pipe a a m () -- -- 'Pipe's are conservative about using the base monad. In fact, you can only -- invoke the base monad by using the 'lift' function. If you never use -- 'lift', your 'Pipe' will translate into pure code. -- -- Now let's create a function that converts a list into a 'Pipe' by -- 'yield'ing each element of the list: -- -- > fromList :: (Monad m) => [a] -> Pipe () a m () -- > fromList = mapM_ yield -- -- We use @()@ as the input type of the 'Pipe' since it doesn't need any input -- from an upstream 'Pipe'. You can think of @fromList@ as a one way 'Pipe' -- that can only deliver output, which makes it suitable for the first stage in -- a 'Pipeline'. We provide a type synonym for this common case: -- -- > type Producer b m r = Pipe () b m r -- -- You can then rewrite the type signature for @fromList@ as: -- -- > fromList :: (Monad m) => [a] -> Producer a m () -- -- The compiler would be ok with a polymorphic input type, since without any -- calls to 'await' it doesn't need to constrain it. However, using @()@ makes -- it clear in the types that this 'Pipe' is designed to be used as a 'Producer', -- and statically prevents a number of mistakes when 'Pipe's are combined. -- -- 'Producer's resemble enumerators in other libraries because they are a data -- source. It is not illegal to use 'await' in a 'Producer', it just returns -- @()@ immediately without blocking. -- -- Now let's create a 'Pipe' that prints every value delivered to it and never -- terminates: -- -- > printer :: (Show a) => Pipe a Void IO b -- > printer = forever $ do -- > x <- await -- > lift $ print x -- -- The 'Void' in @printer@'s type signature indicates that it never delivers -- output downstream, so it represents the final stage in a 'Pipeline'. Again, -- we provide a type synonym for this common case: -- -- > type Consumer a m r = Pipe a Void m r -- -- So we could instead write @printer@'s type as: -- -- > printer :: (Show a) => Consumer a IO b -- -- 'Consumer's resemble iteratees in other libraries because they are a data -- sink. 'Consumer's never use 'yield' statements. -- -- What makes 'Pipe's useful is the ability to compose them into 'Pipeline's. -- For that, we provide a '>+>' operator (and its right-to-left counterpart -- '<+<'): -- -- > (>+>) :: Pipe a b m r -> Pipe b c m r -> Pipe a c m r -- > (<+<) :: Pipe b c m r -> Pipe a b m r -> Pipe a c m r -- -- For example, here is how you can compose the above 'Pipe's: -- -- > pipeline :: Pipe () Void IO () -- > pipeline = fromList [1..] >+> take' 3 >+> printer -- -- This represents a self-contained 'Pipeline' and we provide a type synonym -- for this common case: -- -- > type Pipeline m r = Pipe () Void m r -- -- Like many other monad transformers, you convert the 'Pipe' monad back to the -- base monad using some sort of \"@run...@\" function. In this case, we -- provide a 'runPipe' function: -- -- > runPipe :: Pipeline IO r -> IO r -- -- 'runPipe' is actually more general, since it works with any -- 'MonadBaseControl', but we will work with the above simplified signature in -- this tutorial. -- -- There are also more general versione of 'runPipe' which work in -- any monad, but don't have any exception-safety guarantees, so they should -- only be used for 'Pipe's that don't allocate any scarce resources. -- -- > runPurePipe :: (Monad m) => Pipeline m r -> m (Either SomeException r) -- > runPurePipe_ :: (Monad m) => Pipeline m r -> m r -- -- 'runPipe', 'runPurePipe' and 'runPurePipe_' only work on self-contained -- 'Pipeline's. We explicitly require @()@ as input type and 'Void' as output -- type to ensure that the pipeline doesn't 'await' or 'yield' any value. If a -- 'Pipe' is polymorphic in its input type (for example because it never uses -- 'await'), then it can always be used as the first stage of a 'Pipeline'. -- Similarly, a 'Pipe' that is polymorphic in its output type can be used as -- the final stage. -- -- It is generally good practice to use @()@ (resp. 'Void') explicitly as the -- input (resp. output) type of a producer (resp. consumer), since it gives the -- compiler more information on the intent of the 'Pipe', and makes some common -- errors detectable at compile time. -- -- Let's try using 'runPipe': -- -- >>> runPipe pipeline -- 1 -- 2 -- 3 -- You shall not pass! -- -- Fascinating! Our 'Pipe' terminated even though @printer@ never terminates -- and @fromList@ never terminates when given an infinite list. To illustrate -- why our 'Pipe' terminated, let's outline the flow control rules for 'Pipe' -- composition. -- -- * Execution begins at the most downstream 'Pipe' (@printer@ in our example). -- -- * If a 'Pipe' 'await's input, it blocks and transfers control to the next -- 'Pipe' upstream until that 'Pipe' 'yield's back a value. -- -- * If a 'Pipe' 'yield's output, it restores control to the original -- downstream 'Pipe' that was 'await'ing its input and binds its result to -- the return value of the 'await' command. -- -- * If a 'Pipe' terminates, it terminates every other 'Pipe' composed with it. -- -- The last rule is crucial. If a 'Pipe' terminates then every downstream -- 'Pipe' depending on its output cannot proceed, and upstream 'Pipe's are -- never evaluated because the terminated 'Pipe' will not request values from -- them any longer. -- -- So in our previous example, the 'Pipeline' terminated because @take' 3@ -- terminated and brought down the entire 'Pipeline' with it. -- -- 'Pipe's promote loose coupling, allowing you to mix and match them -- transparently using composition. For example, we can define a new -- 'Producer' pipe that indefinitely prompts the user for integers: -- -- > prompt :: Producer Int IO a -- > prompt = forever $ do -- > lift $ putStrLn "Enter a number: " -- > n <- read <$> lift getLine -- > yield n -- -- Now we can compose it with any of our previous 'Pipe's: -- -- >>> runPipe $ prompt >+> take' 3 >+> printer -- Enter a number: -- 1 -- 1 -- Enter a number: -- 2 -- 2 -- Enter a number: -- 3 -- 3 -- You shall not pass! -- -- You can easily \"vertically\" concatenate 'Pipe's, 'Producer's, and -- 'Consumer's, all using simple monad sequencing: ('>>'). For example, here -- is how you concatenate 'Producer's: -- -- >>> runPipe $ (fromList [1..3] >> fromList [10..12]) >+> printer -- 1 -- 2 -- 3 -- 10 -- 11 -- 12 -- -- Here's how you would concatenate 'Consumer's: -- -- > print' :: (Show a) => Int -> Consumer a IO () -- > print' n = take' n >+> printer -- -- >>> runPipe $ fromList [1..] >+> (print' 3 >> print' 4) -- 1 -- 2 -- 3 -- You shall not pass! -- 4 -- 5 -- 6 -- 7 -- You shall not pass! -- -- ... but the above example is gratuitous because we could have just -- concatenated the intermediate @take'@ 'Pipe': -- -- >>> runPipe $ fromList [1..] >+> (take' 3 >> take' 4) >+> printer -- 1 -- 2 -- 3 -- You shall not pass! -- 4 -- 5 -- 6 -- 7 -- You shall not pass! -- -- Pipe composition imposes an important limitation: You can only compose -- 'Pipe's that have the same return type. For example, we could write the -- following function: -- -- > deliver :: (Monad m) => Int -> Consumer a m [a] -- > deliver n = replicateM n await -- -- ... and we might try to compose it with @fromList@: -- -- >>> runPipe $ fromList [1..10] >+> deliver 3 -- wrong! -- -- ... but this wouldn't type-check, because @fromList@ has a return type of -- @()@ and @deliver@ has a return type of @[Int]@. All 'Pipe's in a -- composition need to have the same return type, since the return value of the -- composed 'Pipe' is taken from the 'Pipe' that terminates first, and there's -- no general way to determine which one it is statically. -- -- Fortunately, we don't have to rewrite the @fromList@ function because we can -- add a return value using vertical concatenation: -- -- >>> runPipe $ (fromList [1..10] >> return []) >+> deliver 3 -- [1,2,3] -- -- ... although a more idiomatic Haskell version would be: -- -- >>> runPipe $ (fromList [1..10] *> pure Nothing) >+> (Just <$> deliver 3) -- Just [1,2,3] -- -- which can be written using the 'Control.Pipe.Combinators.$$' operator: -- -- >>> runPipe $ fromList [1..10] $$ deliver 3 -- Just [1,2,3] -- -- This forces you to cover all code paths by thinking about what return value -- you would provide if something were to go wrong. For example, let's say I -- make a mistake and request more input than @fromList@ can deliver: -- -- >>> runPipe $ fromList [1..10] $$ deliver 99 -- Nothing -- -- The type system saved me by forcing me to handle all possible ways my -- program could terminate. -- -- Now what if you want to write a 'Pipe' that only reads from its input end -- (i.e. a 'Consumer') and returns a list of every value delivered to it when -- its input 'Pipe' terminates? In 'Control.Pipe.Combinators' we find: -- -- > consume :: (Monad m) => Consumer a m [a] -- -- but it turns out that it's not possible to write such a 'Pipe' using only -- the primitive introduced so far, since we need a way to intercept upstream -- termination and return the current accumulated list of input values before -- terminating ourselves. -- -- So we need to introduce a new primitive operation: -- -- > tryAwait :: (Monad m) => Pipe a b m (Maybe a) -- -- 'tryAwait' works very similarly to 'await', with two key differences: -- -- 1. When upstream 'yield's some value @x@, 'tryAwait' returns @Just x@. -- -- 2. When upstream terminates, 'tryAwait' returns @Nothing@ instead of -- terminating the current 'Pipe' immediately. -- -- When 'tryAwait' returns @Nothing@, the current 'Pipe' has a chance to -- perform some final actions (typically 'yield' a final value or terminate -- with a result) before being forcefully shut down. At that stage, further -- invocations of 'tryAwait' will keep returning @Nothing@, while using 'await' -- will terminate the pipe immediately. -- -- Note that 'Pipe' termination only propagates through composition. To -- illustrate this, let's use the following example: -- -- > p = do a <+< b -- > c -- -- @a@, @b@, and @c@ are 'Pipe's, and @c@ shares the same input and output as -- @a <+< b@, otherwise we cannot combine them within the same monad. In the -- above example, either @a@ or @b@ could terminate and bring down the other -- one since they are composed, but @c@ is guaranteed to continue after -- @a <+< b@ terminates because it is not composed with them. Conceptually, -- we can think of this as @c@ automatically taking over the 'Pipe''s -- channeling responsibilities when @a <+< b@ can no longer continue. There -- is no need to \"restart\" the input or output manually as in some other -- iteratee libraries. -- -- We now turn our attention to a very important feature of pipes: resource -- finalization. -- -- Say we have the file \"test.txt\" with the following contents: -- -- > This is a test. -- > Don't panic! -- > Calm down, please! -- -- .. and we wish to lazily read a line at a time from it: -- -- > handleReader' :: Handle -> Producer Text IO () -- > handleReader' h = do -- > eof <- lift $ hIsEOF h -- > unless eof $ do -- > s <- lift $ pack <$> hGetLine h -- > yield s -- > handleReader' h -- -- Suppose, for the sake of example, that we know in advance how many lines we -- need to read from the file. We can then use composition and the 'Monad' -- instance to try to build a resource-efficient version that only reads as -- many lines as we request: -- -- > read' :: Int -> Producer Text IO () -- > read' n = do -- > lift $ putStrLn "Opening file ..." -- > h <- lift $ openFile "test.txt" ReadMode -- > take' n <+< handleReader' h -- > lift $ putStrLn "Closing file ..." -- > lift $ hClose h -- -- Now compose! -- -- >>> runPipe $ read' 2 >+> printer -- Opening file ... -- "This is a test." -- "Don't panic!" -- You shall not pass! -- Closing file ... -- -- >>> runPipe $ read' 99 >+> printer -- Opening file ... -- "This is a test." -- "Don't panic!" -- "Calm down, please!" -- Closing file ... -- -- In the first example, the pipeline terminates because @take'@ only requested -- 2 lines. In the second example, it terminates because @readFile'@ ran out -- of input. However, in both cases the 'Pipe' never reads more lines than we -- request and frees \"test.txt\" immediately when it is no longer needed. -- -- Even more importantly, the @file@ is never opened if we replace @printer@ -- with a 'Pipe' that never demands input: -- -- >>> runPipe $ read' 2 >+> lift (putStrLn "I don't need input") -- I don't need input -- -- However, this @read'@ is not resource-safe in certain situations. For -- example, take the following pipe: -- -- >>> runPipe $ read' 3 >+> take' 1 >+> printer -- Opening file ... -- "This is a test." -- You shall not pass! -- -- Oh no! Our 'Pipe' didn't properly close our file! @take' 1@ terminated -- before @read' 3@, preventing @read' 3@ from properly closing \"test.txt\". -- -- Similarly, any exception thrown during execution of the 'Pipeline' can cause -- the @hClose@ statement to be skipped, leaking an open handle. -- -- We can force the @read' 3@ 'Pipe' to always close the file handle regardless -- of exceptions or premature termination by using the 'bracket' function: -- -- > safeRead' :: Int -> Producer Text IO () -- > safeRead' n = bracket -- > (putStrLn "Opening file..." >> openFile "test.txt" ReadMode) -- > (\h -> putStrLn "Closing file..." >> hClose h) -- > (\h -> handleReader' h >+> take' n) -- -- 'bracket' is similar to the homonymous function in 'Control.Exception': it -- takes a function that creates some \"resource\", a function that disposes of -- the created resource, and a function which takes the resource and returns a -- 'Pipe': -- -- > bracket :: Monad m -- > => m r -- create resource -- > -> (r -> m y) -- destroy resource -- > -> (r -> Pipe a b m x) -- use resource in a 'Pipe' -- > -> Pipe a b m x -- -- Note that the \"create\" and \"destroy\" actions operate within the base -- monad, so it's not possible to use 'yield' and 'await' there. -- -- Using @safeRead'@ instead of @read'@ will now produce the desired behavior: -- -- >>> runPipe $ safeRead' 3 >+> take' 1 >+> printer -- Opening file... -- "This is a test." -- You shall not pass! -- Closing file... -- -- We also provide exception-handling primitives like 'catch' and -- 'onException'. See 'Control.Pipe.Exception' for more details on exception -- handling and a complete list of primitives. -- -- Resource finalization and exception handling functionalities work in any -- base monad, so we provide a 'Pipe'-specific mechanism for throwing -- exceptions which does not suffer from the limitation of only being catchable -- in the @IO@ monad: -- -- > throw :: (Monad m, Exception e) => e -> Pipe a b m r -- -- However, exceptions thrown by other means (like 'error' or @throw@ in -- 'Control.Exception'), can only be caught when the 'Pipeline' is run with -- 'runPipe'. If you use 'runPurePipe', such an exception will abruptly -- terminate the whole 'Pipeline', and resource finalization will not be -- guaranteed. -- * Implementation module Control.Pipe.Common, module Control.Pipe.Monoidal ) where import Control.Pipe.Common import Control.Pipe.Monoidal