{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE LinearTypes #-} {-# LANGUAGE QualifiedDo #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# OPTIONS_GHC -Wno-name-shadowing #-} {-# OPTIONS_HADDOCK hide #-} -- | This module contains all functions that do something with -- multiple streams as input or output. This includes combining -- streams, splitting a stream, etc. module Streaming.Linear.Internal.Many ( -- * Operations that use or return multiple 'Stream's -- ** Zips and Unzip unzip, ZipResidual, ZipResidual3, zip, zipR, zipWith, zipWithR, zip3, zip3R, zipWith3, zipWith3R, Either3 (..), -- ** Merging -- $ merge, mergeOn, mergeBy, ) where import qualified Control.Functor.Linear as Control import Prelude.Linear (($), (&)) import Streaming.Linear.Internal.Consume import Streaming.Linear.Internal.Type import Prelude (Either (..), Ord (..), Ordering (..)) -- # Zips and Unzip ------------------------------------------------------------------------------- -- | The type -- -- > Data.List.unzip :: [(a,b)] -> ([a],[b]) -- -- might lead us to expect -- -- > Streaming.unzip :: Stream (Of (a,b)) m r -> Stream (Of a) m (Stream (Of b) m r) -- -- which would not stream, since it would have to accumulate the second stream (of @b@s). -- Of course, @Data.List@ 'Data.List.unzip' doesn't stream either. -- -- This @unzip@ does -- stream, though of course you can spoil this by using e.g. 'toList': -- -- @ -- \>\>\> let xs = Prelude.map (\x -> (x, Prelude.show x)) [1..5 :: Int] -- -- \>\>\> S.toList $ S.toList $ S.unzip (S.each' xs) -- ["1","2","3","4","5"] :> ([1,2,3,4,5] :> ()) -- -- \>\>\> Prelude.unzip xs -- ([1,2,3,4,5],["1","2","3","4","5"]) -- @ -- -- Note the difference of order in the results. It may be of some use to think why. -- The first application of 'toList' was applied to a stream of integers: -- -- @ -- \>\>\> :t S.unzip $ S.each' xs -- S.unzip $ S.each' xs :: Control.Monad m => Stream (Of Int) (Stream (Of String) m) () -- @ -- -- Like any fold, 'toList' takes no notice of the monad of effects. -- -- > toList :: Control.Monad m => Stream (Of a) m r %1-> m (Of [a] r) -- -- In the case at hand (since I am in @ghci@) @m = Stream (Of String) IO@. -- So when I apply 'toList', I exhaust that stream of integers, folding -- it into a list: -- -- @ -- \>\>\> :t S.toList $ S.unzip $ S.each' xs -- S.toList $ S.unzip $ S.each' xs -- :: Control.Monad m => Stream (Of String) m (Of [Int] ()) -- @ -- -- When I apply 'toList' to /this/, I reduce everything to an ordinary action in @IO@, -- and return a list of strings: -- -- @ -- \>\>\> S.toList $ S.toList $ S.unzip (S.each' xs) -- ["1","2","3","4","5"] :> ([1,2,3,4,5] :> ()) -- @ -- -- 'unzip' can be considered a special case of either 'unzips' or 'expand': -- -- @ -- unzip = 'unzips' . 'maps' (\((a,b) :> x) -> Compose (a :> b :> x)) -- unzip = 'expand' $ \p ((a,b) :> abs) -> b :> p (a :> abs) -- @ unzip :: Control.Monad m => Stream (Of (a, b)) m r %1 -> Stream (Of a) (Stream (Of b) m) r unzip = loop where loop :: Control.Monad m => Stream (Of (a, b)) m r %1 -> Stream (Of a) (Stream (Of b) m) r loop stream = stream & \case Return r -> Return r Effect m -> Effect $ Control.fmap loop $ Control.lift m Step ((a, b) :> rest) -> Step (a :> Effect (Step (b :> Return (loop rest)))) {-# INLINEABLE unzip #-} {- Remarks on the design of zip functions Zip functions have two design choices: (1) What do we do with the end-of-stream values of both streams? (2) If the streams are of different length, do we keep or throw out the remainder of the longer stream? * We are assuming not to take infinite streams as input and instead deal with reasonably small finite streams. * To avoid making choices for the user, we keep both end-of-stream payloads * The default zips (ones without a prime in the name) use @effects@ to consume the remainder stream after zipping. We include zip function variants that return no remainder (for equal length streams), or the remainder of the longer stream. -} data Either3 a b c where Left3 :: a %1 -> Either3 a b c Middle3 :: b %1 -> Either3 a b c Right3 :: c %1 -> Either3 a b c -- | The remainder of zipping two streams type ZipResidual a b m r1 r2 = Either3 (r1, r2) (r1, Stream (Of b) m r2) (Stream (Of a) m r1, r2) -- | @zipWithR@ zips two streams applying a function along the way, -- keeping the remainder of zipping if there is one. Note. If two streams have -- the same length, but one needs to perform some effects to obtain the -- end-of-stream result, that stream is treated as a residual. zipWithR :: Control.Monad m => (a -> b -> c) -> Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m (ZipResidual a b m r1 r2) zipWithR = loop where loop :: Control.Monad m => (a -> b -> c) -> Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m (ZipResidual a b m r1 r2) loop f st1 st2 = st1 & \case Effect ms -> Effect $ Control.fmap (\s -> loop f s st2) ms Return r1 -> st2 & \case Return r2 -> Return $ Left3 (r1, r2) st2' -> Return $ Middle3 (r1, st2') Step (a :> as) -> st2 & \case Effect ms -> Effect $ Control.fmap (\s -> loop f (Step (a :> as)) s) ms Return r2 -> Return $ Right3 (Step (a :> as), r2) Step (b :> bs) -> Step $ (f a b) :> loop f as bs {-# INLINEABLE zipWithR #-} zipWith :: Control.Monad m => (a -> b -> c) -> Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m (r1, r2) zipWith f s1 s2 = Control.do result <- zipWithR f s1 s2 result & \case Left3 rets -> Control.return rets Middle3 (r1, s2') -> Control.do r2 <- Control.lift $ effects s2' Control.return (r1, r2) Right3 (s1', r2) -> Control.do r1 <- Control.lift $ effects s1' Control.return (r1, r2) {-# INLINEABLE zipWith #-} -- | @zip@ zips two streams exhausing the remainder of the longer -- stream and consuming its effects. zip :: Control.Monad m => Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of (a, b)) m (r1, r2) zip = zipWith (,) {-# INLINE zip #-} -- | @zipR@ zips two streams keeping the remainder if there is one. zipR :: Control.Monad m => Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of (a, b)) m (ZipResidual a b m r1 r2) zipR = zipWithR (,) {-# INLINE zipR #-} -- Remark. For simplicity, we do not create an @Either7@ which is the -- proper remainder type for 'zip3R'. Our type simply has one impossible -- case which is when all three streams have a remainder. -- | The (liberal) remainder of zipping three streams. -- This has the downside that the possibility of three remainders -- is allowed, though it will never occur. type ZipResidual3 a b c m r1 r2 r3 = ( Either r1 (Stream (Of a) m r1), Either r2 (Stream (Of b) m r2), Either r3 (Stream (Of c) m r3) ) -- | Like @zipWithR@ but with three streams. zipWith3R :: Control.Monad m => (a -> b -> c -> d) -> Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m r3 %1 -> Stream (Of d) m (ZipResidual3 a b c m r1 r2 r3) zipWith3R = loop where loop :: Control.Monad m => (a -> b -> c -> d) -> Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m r3 %1 -> Stream (Of d) m (ZipResidual3 a b c m r1 r2 r3) loop f s1 s2 s3 = s1 & \case Effect ms -> Effect $ Control.fmap (\s -> loop f s s2 s3) ms Return r1 -> (s2, s3) & \case (Return r2, Return r3) -> Return (Left r1, Left r2, Left r3) (s2', s3') -> Return (Left r1, Right s2', Right s3') Step (a :> as) -> s2 & \case Effect ms -> Effect $ Control.fmap (\s -> loop f (Step $ a :> as) s s3) ms Return r2 -> Return (Right (Step $ a :> as), Left r2, Right s3) Step (b :> bs) -> s3 & \case Effect ms -> Effect $ Control.fmap (\s -> loop f (Step $ a :> as) (Step $ b :> bs) s) ms Return r3 -> Return (Right (Step $ a :> as), Right (Step $ b :> bs), Left r3) Step (c :> cs) -> Step $ (f a b c) :> loop f as bs cs {-# INLINEABLE zipWith3R #-} -- | Like @zipWith@ but with three streams zipWith3 :: Control.Monad m => (a -> b -> c -> d) -> Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m r3 %1 -> Stream (Of d) m (r1, r2, r3) zipWith3 f s1 s2 s3 = Control.do result <- zipWith3R f s1 s2 s3 result & \case (res1, res2, res3) -> Control.do r1 <- Control.lift $ extractResult res1 r2 <- Control.lift $ extractResult res2 r3 <- Control.lift $ extractResult res3 Control.return (r1, r2, r3) {-# INLINEABLE zipWith3 #-} -- | Like @zipR@ but with three streams. zip3 :: Control.Monad m => Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m r3 %1 -> Stream (Of (a, b, c)) m (r1, r2, r3) zip3 = zipWith3 (,,) {-# INLINEABLE zip3 #-} -- | Like @zipR@ but with three streams. zip3R :: Control.Monad m => Stream (Of a) m r1 %1 -> Stream (Of b) m r2 %1 -> Stream (Of c) m r3 %1 -> Stream (Of (a, b, c)) m (ZipResidual3 a b c m r1 r2 r3) zip3R = zipWith3R (,,) {-# INLINEABLE zip3R #-} -- | Internal function to consume a stream remainder to -- get the payload extractResult :: Control.Monad m => Either r (Stream (Of a) m r) %1 -> m r extractResult (Left r) = Control.return r extractResult (Right s) = effects s -- # Merging ------------------------------------------------------------------------------- -- $merging -- These functions combine two sorted streams of orderable elements -- into one sorted stream. The elements of the merged stream are -- guaranteed to be in a sorted order if the two input streams are -- also sorted. -- -- The merge operation is /left-biased/: when merging two elements -- that compare as equal, the left element is chosen first. -- | Merge two streams of elements ordered with their 'Ord' instance. -- -- The return values of both streams are returned. -- -- @ -- \>\>\> S.print $ merge (each [1,3,5]) (each [2,4]) -- 1 -- 2 -- 3 -- 4 -- 5 -- ((), ()) -- @ merge :: (Control.Monad m, Ord a) => Stream (Of a) m r %1 -> Stream (Of a) m s %1 -> Stream (Of a) m (r, s) merge = mergeBy compare {-# INLINE merge #-} -- | Merge two streams, ordering them by applying the given function to -- each element before comparing. -- -- The return values of both streams are returned. mergeOn :: (Control.Monad m, Ord b) => (a -> b) -> Stream (Of a) m r %1 -> Stream (Of a) m s %1 -> Stream (Of a) m (r, s) mergeOn f = mergeBy (\x y -> compare (f x) (f y)) {-# INLINE mergeOn #-} -- | Merge two streams, ordering the elements using the given comparison function. -- -- The return values of both streams are returned. mergeBy :: forall m a r s. Control.Monad m => (a -> a -> Ordering) -> Stream (Of a) m r %1 -> Stream (Of a) m s %1 -> Stream (Of a) m (r, s) mergeBy comp s1 s2 = loop s1 s2 where loop :: Stream (Of a) m r %1 -> Stream (Of a) m s %1 -> Stream (Of a) m (r, s) loop s1 s2 = s1 & \case Return r -> Effect $ effects s2 Control.>>= \s -> Control.return $ Return (r, s) Effect ms -> Effect $ ms Control.>>= \s1' -> Control.return $ mergeBy comp s1' s2 Step (a :> as) -> s2 & \case Return s -> Effect $ effects as Control.>>= \r -> Control.return $ Return (r, s) Effect ms -> Effect $ ms Control.>>= \s2' -> Control.return $ mergeBy comp (Step (a :> as)) s2' Step (b :> bs) -> case comp a b of LT -> Step (a :> Step (b :> mergeBy comp as bs)) _ -> Step (b :> Step (a :> mergeBy comp as bs)) {-# INLINEABLE mergeBy #-}