{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses #-} {- | Module : Streaming.Concurrent Description : Concurrency support for the streaming ecosystem Copyright : Ivan Lazar Miljenovic License : MIT Maintainer : Ivan.Miljenovic@gmail.com Consider a physical desk for someone that has to deal with correspondence. A typical system is to have two baskets\/trays: one for incoming papers that still needs to be processed, and another for outgoing papers that have already been processed. We use this metaphor for dealing with 'Buffer's: data is fed into one using the 'InBasket' (until the buffer indicates that it has had enough) and taken out from the 'OutBasket'. -} module Streaming.Concurrent ( -- * Buffers Buffer , unbounded , bounded , latest , newest -- * Using a buffer , withBuffer , withBufferedTransform , InBasket(..) , OutBasket(..) -- * Stream support , writeStreamBasket , withStreamBasket , withMergedStreams -- ** Mapping -- $mapping , withStreamMap , withStreamMapM , withStreamTransform -- *** Primitives , joinBuffers , joinBuffersM , joinBuffersStream ) where import Streaming (Of, Stream) import qualified Streaming.Prelude as S import Control.Applicative ((<|>)) import Control.Concurrent.Async.Lifted (concurrently, forConcurrently_, replicateConcurrently_) import qualified Control.Concurrent.STM as STM import Control.Monad (when) import Control.Monad.Base (MonadBase, liftBase) import Control.Monad.Catch (MonadMask, bracket, finally) import Control.Monad.Trans.Control (MonadBaseControl) import Data.Foldable (forM_) -------------------------------------------------------------------------------- -- | Concurrently merge multiple streams together. -- -- The resulting order is unspecified. -- -- Note that the monad of the resultant Stream can be different from -- the final result. -- -- @since 0.2.0.0 withMergedStreams :: (MonadMask m, MonadBaseControl IO m, MonadBase IO n, Foldable t) => Buffer a -> t (Stream (Of a) m v) -> (Stream (Of a) n () -> m r) -> m r withMergedStreams buff strs f = withBuffer buff (forConcurrently_ strs . flip writeStreamBasket) (`withStreamBasket` f) -- | Write a single stream to a buffer. -- -- Type written to make it easier if this is the only stream being -- written to the buffer. writeStreamBasket :: (MonadBase IO m) => Stream (Of a) m r -> InBasket a -> m () writeStreamBasket stream (InBasket send) = go stream where go str = do eNxt <- S.next str -- uncons requires r ~ () forM_ eNxt $ \(a, str') -> do continue <- liftBase (STM.atomically (send a)) when continue (go str') -- | Read the output of a buffer into a stream. -- -- @since 0.2.0.0 withStreamBasket :: (MonadBase IO m) => OutBasket a -> (Stream (Of a) m () -> r) -> r withStreamBasket (OutBasket receive) f = f (S.untilRight getNext) where getNext = maybe (Right ()) Left <$> liftBase (STM.atomically receive) -------------------------------------------------------------------------------- {- $mapping These functions provide (concurrency-based rather than parallelism-based) pseudo-equivalents to . Note however that in practice, these seem to be no better than - and indeed often worse - than using 'S.map' and 'S.mapM'. A benchmarking suite is available with this library that tries to compare different scenarios. These implementations try to be relatively conservative in terms of memory usage; it is possible to get better performance by using an 'unbounded' 'Buffer' but if you feed elements into a 'Buffer' much faster than you can consume them then memory usage will increase. The \"Primitives\" available below can assist you with defining your own custom mapping function in conjunction with 'withBufferedTransform'. -} -- | Use buffers to concurrently transform the provided data. -- -- In essence, this is a @demultiplexer -> multiplexer@ -- transformation: the incoming data is split into @n@ individual -- segments, the results of which are then merged back together -- again. -- -- Note: ordering of elements in the output is undeterministic. -- -- @since 0.2.0.0 withBufferedTransform :: (MonadMask m, MonadBaseControl IO m) => Int -- ^ How many concurrent computations to run. -> (OutBasket a -> InBasket b -> m ab) -- ^ What to do with each individual concurrent -- computation; result is ignored. -> (InBasket a -> m i) -- ^ Provide initial data; result is ignored. -> (OutBasket b -> m r) -> m r withBufferedTransform n transform feed consume = withBuffer buff feed $ \obA -> withBuffer buff (replicateConcurrently_ n . transform obA) consume where buff :: Buffer v buff = bounded n -- | Concurrently map a function over all elements of a 'Stream'. -- -- Note: ordering of elements in the output is undeterministic. -- -- @since 0.2.0.0 withStreamMap :: (MonadMask m, MonadBaseControl IO m, MonadBase IO n) => Int -- ^ How many concurrent computations to run. -> (a -> b) -> Stream (Of a) m i -> (Stream (Of b) n () -> m r) -> m r withStreamMap n f inp cont = withBufferedTransform n transform feed consume where feed = writeStreamBasket inp transform = joinBuffers f consume = flip withStreamBasket cont -- | Concurrently map a monadic function over all elements of a -- 'Stream'. -- -- Note: ordering of elements in the output is undeterministic. -- -- @since 0.2.0.0 withStreamMapM :: (MonadMask m, MonadBaseControl IO m, MonadBase IO n) => Int -- ^ How many concurrent computations to run. -> (a -> m b) -> Stream (Of a) m i -> (Stream (Of b) n () -> m r) -> m r withStreamMapM n f inp cont = withBufferedTransform n transform feed consume where feed = writeStreamBasket inp transform = joinBuffersM f consume = flip withStreamBasket cont -- | Concurrently split the provided stream into @n@ streams and -- transform them all using the provided function. -- -- Note: ordering of elements in the output is undeterministic. -- -- @since 0.2.0.0 withStreamTransform :: (MonadMask m, MonadBaseControl IO m, MonadBase IO n) => Int -- ^ How many concurrent computations to run. -> (Stream (Of a) m () -> Stream (Of b) m t) -> Stream (Of a) m i -> (Stream (Of b) n () -> m r) -> m r withStreamTransform n f inp cont = withBufferedTransform n transform feed consume where feed = writeStreamBasket inp transform = joinBuffersStream f consume = flip withStreamBasket cont -- | Take an item out of one 'Buffer', apply a function to it and then -- place it into another 'Buffer. -- -- @since 0.3.1.0 joinBuffers :: (MonadBase IO m) => (a -> b) -> OutBasket a -> InBasket b -> m () joinBuffers f obA ibB = liftBase go where go = do ma <- STM.atomically (receiveMsg obA) forM_ ma $ \a -> do s <- STM.atomically (sendMsg ibB (f a)) when s go -- | As with 'joinBuffers' but apply a monadic function. -- -- @since 0.3.1.0 joinBuffersM :: (MonadBase IO m) => (a -> m b) -> OutBasket a -> InBasket b -> m () joinBuffersM f obA ibB = go where go = do ma <- liftBase (STM.atomically (receiveMsg obA)) forM_ ma $ \a -> do b <- f a s <- liftBase (STM.atomically (sendMsg ibB b)) when s go -- | As with 'joinBuffers' but read and write the values as 'Stream's. -- -- @since 0.3.1.0 joinBuffersStream :: (MonadBase IO m) => (Stream (Of a) m () -> Stream (Of b) m t) -> OutBasket a -> InBasket b -> m () joinBuffersStream f obA ibB = withStreamBasket obA (flip writeStreamBasket ibB . f) -------------------------------------------------------------------------------- -- This entire section is almost completely taken from -- pipes-concurrent by Gabriel Gonzalez: -- https://github.com/Gabriel439/Haskell-Pipes-Concurrency-Library -- | 'Buffer' specifies how to buffer messages between our 'InBasket' -- and our 'OutBasket'. data Buffer a = Unbounded | Bounded Int | Single | Latest a | Newest Int | New -- | Store an unbounded number of messages in a FIFO queue. unbounded :: Buffer a unbounded = Unbounded -- | Store a bounded number of messages, specified by the 'Int' -- argument. -- -- A buffer size @<= 0@ will result in a permanently empty buffer, -- which could result in a system that hangs. bounded :: Int -> Buffer a bounded 1 = Single bounded n = Bounded n -- | Only store the \"latest\" message, beginning with an initial -- value. -- -- This buffer is never empty nor full; as such, it is up to the -- caller to ensure they only take as many values as they need -- (e.g. using @'S.print' . 'readStreamBasket'@ as the final -- parameter to 'withBuffer' will -- after all other values are -- processed -- keep printing the last value over and over again). latest :: a -> Buffer a latest = Latest -- | Like 'bounded', but 'sendMsg' never fails (the buffer is never -- full). Instead, old elements are discard to make room for new -- elements. -- -- As with 'bounded', providing a size @<= 0@ will result in no -- values being provided to the buffer, thus no values being read -- and hence the system will most likely hang. newest :: Int -> Buffer a newest 1 = New newest n = Newest n -- | An exhaustible source of values. -- -- 'receiveMsg' returns 'Nothing' if the source is exhausted. newtype OutBasket a = OutBasket { receiveMsg :: STM.STM (Maybe a) } -- | An exhaustible sink of values. -- -- 'sendMsg' returns 'False' if the sink is exhausted. newtype InBasket a = InBasket { sendMsg :: a -> STM.STM Bool } -- | Use a buffer to asynchronously communicate. -- -- Two functions are taken as parameters: -- -- * How to provide input to the buffer (the result of this is -- discarded) -- -- * How to take values from the buffer -- -- As soon as one function indicates that it is complete then the -- other is terminated. This is safe: trying to write data to a -- closed buffer will not achieve anything. -- -- However, reading a buffer that has not indicated that it is -- closed (e.g. waiting on an action to complete to be able to -- provide the next value) but contains no values will block. withBuffer :: (MonadMask m, MonadBaseControl IO m) => Buffer a -> (InBasket a -> m i) -> (OutBasket a -> m r) -> m r withBuffer buffer sendIn readOut = bracket (liftBase openBasket) (\(_, _, _, seal) -> liftBase (STM.atomically seal)) $ \(writeB, readB, sealed, seal) -> snd <$> concurrently (withIn writeB sealed seal) (withOut readB sealed seal) where openBasket = do (writeB, readB) <- case buffer of Bounded n -> do q <- STM.newTBQueueIO (fromIntegral n) return (STM.writeTBQueue q, STM.readTBQueue q) Unbounded -> do q <- STM.newTQueueIO return (STM.writeTQueue q, STM.readTQueue q) Single -> do m <- STM.newEmptyTMVarIO return (STM.putTMVar m, STM.takeTMVar m) Latest a -> do t <- STM.newTVarIO a return (STM.writeTVar t, STM.readTVar t) New -> do m <- STM.newEmptyTMVarIO return (\x -> STM.tryTakeTMVar m *> STM.putTMVar m x, STM.takeTMVar m) Newest n -> do q <- STM.newTBQueueIO (fromIntegral n) let writeB x = STM.writeTBQueue q x <|> (STM.tryReadTBQueue q *> writeB x) return (writeB, STM.readTBQueue q) -- We use this TVar as the communication mechanism between -- inputs and outputs as to whether either sub-continuation has -- finished. sealed <- STM.newTVarIO False let seal = STM.writeTVar sealed True return (writeB, readB, sealed, seal) withIn writeB sealed seal = sendIn (InBasket sendOrEnd) `finally` liftBase (STM.atomically seal) where sendOrEnd a = do canWrite <- not <$> STM.readTVar sealed when canWrite (writeB a) return canWrite withOut readB sealed seal = readOut (OutBasket readOrEnd) `finally` liftBase (STM.atomically seal) where readOrEnd = (Just <$> readB) <|> (do b <- STM.readTVar sealed STM.check b return Nothing ) {-# INLINABLE withBuffer #-}