-- | -- Module : StreamKOps -- Copyright : (c) 2018 Harendra Kumar -- -- License : BSD3 -- Maintainer : harendra.kumar@gmail.com {-# LANGUAGE FlexibleContexts #-} module StreamKOps where import Prelude (Monad, Int, (+), ($), (.), return, fmap, even, (>), (<=), subtract, undefined, Maybe(..), not) import qualified Streamly.Streams.StreamK as S hiding (runStream) -- import qualified Streamly.Streams.Serial as S import qualified Streamly as S import qualified Streamly.Prelude as P value, maxValue :: Int value = 1000000 maxValue = value + 1000 ------------------------------------------------------------------------------- -- Benchmark ops ------------------------------------------------------------------------------- {-# INLINE toNull #-} {-# INLINE uncons #-} {-# INLINE nullHeadTail #-} {-# INLINE scan #-} {-# INLINE map #-} {-# INLINE filterEven #-} {-# INLINE filterAllOut #-} {-# INLINE filterAllIn #-} {-# INLINE takeOne #-} {-# INLINE takeAll #-} {-# INLINE takeWhileTrue #-} {-# INLINE dropAll #-} {-# INLINE dropWhileTrue #-} {-# INLINE zip #-} {-# INLINE concat #-} {-# INLINE composeAllInFilters #-} {-# INLINE composeAllOutFilters #-} {-# INLINE composeMapAllInFilter #-} toNull, uncons, nullHeadTail, scan, map, filterEven, filterAllOut, filterAllIn, takeOne, takeAll, takeWhileTrue, dropAll, dropWhileTrue, zip, concat, composeAllInFilters, composeAllOutFilters, composeMapAllInFilter :: Monad m => Stream m Int -> m () {-# INLINE composeMapM #-} composeMapM :: S.MonadAsync m => Stream m Int -> m () {-# INLINE toList #-} toList :: Monad m => Stream m Int -> m [Int] {-# INLINE foldl #-} foldl :: Monad m => Stream m Int -> m Int {-# INLINE last #-} last :: Monad m => Stream m Int -> m (Maybe Int) {-# INLINE mapM #-} mapM :: S.MonadAsync m => Stream m Int -> m () ------------------------------------------------------------------------------- -- Stream generation and elimination ------------------------------------------------------------------------------- type Stream m a = S.SerialT m a {-# INLINE sourceUnfoldr #-} sourceUnfoldr :: Int -> Stream m Int sourceUnfoldr n = S.unfoldr step n where step cnt = if cnt > n + value then Nothing else (Just (cnt, cnt + 1)) {-# INLINE sourceUnfoldrM #-} sourceUnfoldrM :: S.MonadAsync m => Int -> Stream m Int sourceUnfoldrM n = S.unfoldrM step n where step cnt = if cnt > n + value then return Nothing else return (Just (cnt, cnt + 1)) {- {-# INLINE sourceFromEnum #-} sourceFromEnum :: Monad m => Int -> Stream m Int sourceFromEnum n = S.enumFromStepN n 1 value -} {-# INLINE sourceFromFoldable #-} sourceFromFoldable :: Int -> Stream m Int sourceFromFoldable n = S.fromFoldable [n..n+value] {-# INLINE sourceFromFoldableM #-} sourceFromFoldableM :: S.MonadAsync m => Int -> Stream m Int sourceFromFoldableM n = P.fromFoldableM (Prelude.fmap return [n..n+value]) {-# INLINE sourceFoldMapWith #-} sourceFoldMapWith :: Monad m => Int -> Stream m Int sourceFoldMapWith n = S.foldMapWith (S.<>) return [n..n+value] {-# INLINE sourceFoldMapWithM #-} sourceFoldMapWithM :: Monad m => Int -> Stream m Int sourceFoldMapWithM n = S.foldMapWith (S.<>) (S.yieldM . return) [n..n+value] {-# INLINE source #-} source :: S.MonadAsync m => Int -> Stream m Int source n = sourceUnfoldrM n ------------------------------------------------------------------------------- -- Elimination ------------------------------------------------------------------------------- {-# INLINE runStream #-} runStream :: Monad m => Stream m a -> m () runStream = S.runStream toNull = runStream uncons s = do r <- S.uncons s case r of Nothing -> return () Just (_, t) -> uncons t nullHeadTail s = do r <- S.null s if not r then do _ <- S.head s t <- S.tail s case t of Nothing -> return () Just x -> nullHeadTail x else return () toList = S.toList foldl = S.foldl' (+) 0 last = S.last ------------------------------------------------------------------------------- -- Transformation ------------------------------------------------------------------------------- {-# INLINE transform #-} transform :: Monad m => Stream m a -> m () transform = runStream scan = transform . S.scanl' (+) 0 map = transform . fmap (+1) mapM = transform . S.mapM return filterEven = transform . S.filter even filterAllOut = transform . S.filter (> maxValue) filterAllIn = transform . S.filter (<= maxValue) takeOne = transform . S.take 1 takeAll = transform . S.take maxValue takeWhileTrue = transform . P.takeWhile (<= maxValue) dropAll = transform . P.drop maxValue dropWhileTrue = transform . P.dropWhile (<= maxValue) ------------------------------------------------------------------------------- -- Zipping and concat ------------------------------------------------------------------------------- zip src = transform $ (P.zipWith (,) src src) concat _n = return () ------------------------------------------------------------------------------- -- Composition ------------------------------------------------------------------------------- {-# INLINE compose #-} compose :: Monad m => (Stream m Int -> Stream m Int) -> Stream m Int -> m () compose f = transform . f . f . f . f composeMapM = compose (S.mapM return) composeAllInFilters = compose (S.filter (<= maxValue)) composeAllOutFilters = compose (S.filter (> maxValue)) composeMapAllInFilter = compose (S.filter (<= maxValue) . fmap (subtract 1)) {-# INLINABLE composeScaling #-} composeScaling :: Monad m => Int -> Stream m Int -> m () composeScaling m = case m of 1 -> transform . f 2 -> transform . f . f 3 -> transform . f . f . f 4 -> transform . f . f . f . f _ -> undefined where f = S.filter (<= maxValue)