{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses #-}
{- |
Module : Main
Description : Benchmark how to concurrently map a function
Copyright : Ivan Lazar Miljenovic
License : MIT
Maintainer : Ivan.Miljenovic@gmail.com
This duplicates a lot of functionality from Streaming.Concurrent to
help verify that the right design choices were made (we don't
import existing definitions in case of locality\/inlining\/etc.).
To be also able to better match the behaviour of the non-concurrent
variants, the type signatures of all functions have been
specialised slightly.
Note that for the cases of @n = 1@, the concurrent variants have a
slight advantage in that - despite the overhead of concurrency -
three threads (one for writing, one for mapping and one for
reading) are used. If @replicateConcurrently_ n@ is removed and
the benchmarks manually re-run, one of these threads are removed
and the overhead becomes more apparent.
Suffixes used:
[@B@] Uses bounded buffers (buffers are the same size as the number of
concurrent tasks).
[@D@] As with @B@ but uses buffers that are double the size of the
number of concurrent tasks.
[@U@] Uses unbounded buffers.
[@S@] Uses 'Stream's between the two buffers.
[@I@] Does not use 'Stream's between the two buffers (read a value,
operates, immediately writes to the next buffer).
[@N@] Non-concurrent (i.e. uses definition from "Streaming.Prelude")
-}
module Main (main) where
import Streaming.Concurrent (Buffer, InBasket(..), OutBasket(..), bounded,
joinBuffers, joinBuffersM, joinBuffersStream,
unbounded, withBuffer, withStreamBasket,
writeStreamBasket)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted (replicateConcurrently_)
import Control.Monad.Catch (MonadMask)
import Control.Monad.Trans.Control (MonadBaseControl)
import Streaming
import qualified Streaming.Prelude as S
import Test.HUnit ((@?))
import TestBench
--------------------------------------------------------------------------------
main :: IO ()
main = testBench $ do
collection "Pure maps" $ do
compareFuncAllIO "show" (pureMap 10 show inputs S.toList_) normalFormIO
collection "Fibonacci" $
mapM_ compFib numThreads
collection "Monadic maps" $ do
collection "Fibonacci (return'ed)" $
mapM_ compFibM numThreads
collection "Identical sleep" $
mapM_ compDelaySame numThreads
collection "Different sleep" $
mapM_ compDelayDiffer numThreads
where
compFib n = compareFuncAllIO (show n ++ " tasks")
(pureMap n fib inputs S.toList_)
normalFormIO
compFibM n = compareFuncAllIO (show n ++ " tasks")
(monadicMap n (return . fib) inputs S.toList_)
normalFormIO
compDelaySame n = compareFuncAllIO (show n ++ " tasks")
(monadicMap n delayReturn inputs S.toList_)
normalFormIO
compDelayDiffer n = compareFuncAllIO (show n ++ " tasks")
(monadicMap n ensureDelay mixedInputs S.length_)
normalFormIO
numThreads = [1, 5, 10]
-- | We use the same value repeated to avoid having to sort the
-- results, as that would give the non-concurrent variants an
-- advantage.
inputs :: Stream (Of Int) IO ()
inputs = S.replicate 1000 20
delayReturn :: Int -> IO Int
delayReturn n = threadDelay n `seq` return n
-- For when ordering can't be enforced.
ensureDelay :: Int -> IO ()
ensureDelay n = threadDelay n `seq` return ()
mixedInputs :: Stream (Of Int) IO ()
mixedInputs = S.concat (S.replicate 123 [10..17])
--------------------------------------------------------------------------------
data MapType = NonConcurrent
| BoundedImmediate
| BoundedStreamed
| DoubleImmediate
| DoubleStreamed
| UnboundedImmediate
| UnboundedStreamed
deriving (Eq, Ord, Show, Read, Bounded, Enum)
pureMap :: Int -> (a -> b) -> Stream (Of a) IO () -> (Stream (Of b) IO () -> IO r)
-> MapType -> IO r
pureMap n f inp cont pm = go pm n f inp cont
where
go NonConcurrent = withStreamMapN
go BoundedImmediate = withStreamMapBI
go BoundedStreamed = withStreamMapBS
go DoubleImmediate = withStreamMapDI
go DoubleStreamed = withStreamMapDS
go UnboundedImmediate = withStreamMapUI
go UnboundedStreamed = withStreamMapUS
monadicMap :: Int -> (a -> IO b) -> Stream (Of a) IO () -> (Stream (Of b) IO () -> IO r)
-> MapType -> IO r
monadicMap n f inp cont pm = go pm n f inp cont
where
go NonConcurrent = withStreamMapMN
go BoundedImmediate = withStreamMapMBI
go BoundedStreamed = withStreamMapMBS
go DoubleImmediate = withStreamMapMDI
go DoubleStreamed = withStreamMapMDS
go UnboundedImmediate = withStreamMapMUI
go UnboundedStreamed = withStreamMapMUS
-- Naive fibonacci implementation
fib :: Int -> Int
fib 0 = 1
fib 1 = 1
fib n = fib (n-1) + fib (n-2)
--------------------------------------------------------------------------------
-- Non-concurrent variants. Take an unused Int just to match the types.
withStreamMapN :: (Monad m) => Int -> (a -> b) -> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapN _ f str cont = cont (S.map f str)
withStreamMapMN :: (Monad m) => Int -> (a -> m b) -> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMN _ f str cont = cont (S.mapM f str)
--------------------------------------------------------------------------------
-- Bounded buffer variants
withStreamMapBI :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapBI n f inp cont =
withBufferedTransformB n (joinBuffers f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withStreamMapMBI :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> m b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMBI n f inp cont =
withBufferedTransformB n (joinBuffersM f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withStreamMapBS :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapBS n = withStreamTransformB n . S.map
withStreamMapMBS :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> m b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMBS n = withStreamTransformB n . S.mapM
withStreamTransformB :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (Stream (Of a) m () -> Stream (Of b) m t)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamTransformB n f inp cont =
withBufferedTransformB n (joinBuffersStream f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withBufferedTransformB :: (MonadMask m, MonadBaseControl IO m)
=> Int
-- ^ How many concurrent computations to run.
-> (OutBasket a -> InBasket b -> m ab)
-- ^ What to do with each individual concurrent
-- computation; result is ignored.
-> (InBasket a -> m ())
-- ^ Provide initial data; result is ignored.
-> (OutBasket b -> m r) -> m r
withBufferedTransformB n transform feed consume =
withBuffer buff feed $ \obA ->
withBuffer buff (replicateConcurrently_ n . transform obA)
consume
where
buff :: Buffer v
buff = bounded n
--------------------------------------------------------------------------------
-- Double-Bounded buffer variants
withStreamMapDI :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapDI n f inp cont =
withBufferedTransformD n (joinBuffers f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withStreamMapMDI :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> m b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMDI n f inp cont =
withBufferedTransformD n (joinBuffersM f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withStreamMapDS :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapDS n = withStreamTransformD n . S.map
withStreamMapMDS :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> m b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMDS n = withStreamTransformD n . S.mapM
withStreamTransformD :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (Stream (Of a) m () -> Stream (Of b) m t)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamTransformD n f inp cont =
withBufferedTransformD n (joinBuffersStream f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withBufferedTransformD :: (MonadMask m, MonadBaseControl IO m)
=> Int
-- ^ How many concurrent computations to run.
-> (OutBasket a -> InBasket b -> m ab)
-- ^ What to do with each individual concurrent
-- computation; result is ignored.
-> (InBasket a -> m ())
-- ^ Provide initial data; result is ignored.
-> (OutBasket b -> m r) -> m r
withBufferedTransformD n transform feed consume =
withBuffer buff feed $ \obA ->
withBuffer buff (replicateConcurrently_ n . transform obA)
consume
where
buff :: Buffer v
buff = bounded (2*n)
--------------------------------------------------------------------------------
withStreamMapUI :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapUI n f inp cont =
withBufferedTransformU n (joinBuffers f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withStreamMapMUI :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> m b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMUI n f inp cont =
withBufferedTransformU n (joinBuffersM f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withStreamMapUS :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapUS n = withStreamTransformU n . S.map
withStreamMapMUS :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (a -> m b)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamMapMUS n = withStreamTransformU n . S.mapM
withStreamTransformU :: (MonadMask m, MonadBaseControl IO m)
=> Int -- ^ How many concurrent computations to run.
-> (Stream (Of a) m () -> Stream (Of b) m t)
-> Stream (Of a) m ()
-> (Stream (Of b) m () -> m r) -> m r
withStreamTransformU n f inp cont =
withBufferedTransformU n (joinBuffersStream f) feed consume
where
feed = writeStreamBasket inp
consume = flip withStreamBasket cont
withBufferedTransformU :: (MonadMask m, MonadBaseControl IO m)
=> Int
-- ^ How many concurrent computations to run.
-> (OutBasket a -> InBasket b -> m ab)
-- ^ What to do with each individual concurrent
-- computation; result is ignored.
-> (InBasket a -> m ())
-- ^ Provide initial data; result is ignored.
-> (OutBasket b -> m r) -> m r
withBufferedTransformU n transform feed consume =
withBuffer buff feed $ \obA ->
withBuffer buff (replicateConcurrently_ n . transform obA)
consume
where
buff :: Buffer v
buff = unbounded