{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE LinearTypes #-} {-# LANGUAGE QualifiedDo #-} {-# LANGUAGE RebindableSyntax #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE NoImplicitPrelude #-} {-# OPTIONS_GHC -Wno-name-shadowing #-} {-# OPTIONS_HADDOCK hide #-} -- | This module provides all functions which produce a -- 'Stream (Of a) m r' from some given non-stream inputs. module Streaming.Linear.Internal.Produce ( -- * Constructing Finite 'Stream's yield, each', unfoldr, fromHandle, readFile, replicate, replicateM, replicateZip, untilRight, -- * Working with infinite 'Stream's stdinLnN, stdinLnUntil, stdinLnUntilM, stdinLnZip, readLnN, readLnUntil, readLnUntilM, readLnZip, iterateN, iterateZip, iterateMN, iterateMZip, cycleN, cycleZip, enumFromN, enumFromZip, enumFromThenN, enumFromThenZip, ) where import qualified Control.Functor.Linear as Control import Data.Text (Text) import qualified Data.Text as Text import Data.Unrestricted.Linear import GHC.Stack import Prelude.Linear (($), (&)) import Streaming.Linear.Internal.Consume (effects) import Streaming.Linear.Internal.Process import Streaming.Linear.Internal.Type import qualified System.IO as System import System.IO.Linear import System.IO.Resource.Linear import Prelude ( Bool (..), Either (..), Enum, Eq (..), FilePath, Int, Num (..), Ord (..), Read, fromEnum, otherwise, toEnum, ) import qualified Prelude -- # The Finite Stream Constructors ------------------------------------------------------------------------------- -- | A singleton stream -- -- @ -- \>\>\> stdoutLn $ yield "hello" -- hello -- @ -- -- @ -- \>\>\> S.sum $ do {yield 1; yield 2; yield 3} -- 6 :> () -- @ yield :: Control.Monad m => a -> Stream (Of a) m () yield x = Step $ x :> Return () {-# INLINE yield #-} -- | Stream the elements of a pure, foldable container. -- -- @ -- \>\>\> S.print $ each' [1..3] -- 1 -- 2 -- 3 -- @ each' :: Control.Monad m => [a] -> Stream (Of a) m () each' xs = Prelude.foldr (\a stream -> Step $ a :> stream) (Return ()) xs {-# INLINEABLE each' #-} -- | Build a @Stream@ by unfolding steps starting from a seed. In particular note -- that @S.unfoldr S.next = id@. unfoldr :: Control.Monad m => (s %1 -> m (Either r (Ur a, s))) -> s %1 -> Stream (Of a) m r unfoldr step s = unfoldr' step s where unfoldr' :: Control.Monad m => (s %1 -> m (Either r (Ur a, s))) -> s %1 -> Stream (Of a) m r unfoldr' step s = Effect $ step s Control.>>= \case Left r -> Control.return $ Return r Right (Ur a, s') -> Control.return $ Step $ a :> unfoldr step s' {-# INLINEABLE unfoldr #-} -- Note: we use the RIO monad from linear base to enforce -- the protocol of file handles and file I/O fromHandle :: Handle %1 -> Stream (Of Text) RIO () fromHandle h = loop h where loop :: Handle %1 -> Stream (Of Text) RIO () loop h = Control.do (Ur isEOF, h') <- Control.lift $ hIsEOF h case isEOF of True -> Control.do Control.lift $ hClose h' Control.return () False -> Control.do (Ur text, h'') <- Control.lift $ hGetLine h' yield text fromHandle h'' {-# INLINEABLE fromHandle #-} -- | Read the lines of a file given the filename. readFile :: FilePath -> Stream (Of Text) RIO () readFile path = Control.do handle <- Control.lift $ openFile path System.ReadMode fromHandle handle -- | Repeat an element several times. replicate :: (HasCallStack, Control.Monad m) => Int -> a -> Stream (Of a) m () replicate n a | n < 0 = Prelude.error "Cannot replicate a stream of negative length" | otherwise = loop n a where loop :: Control.Monad m => Int -> a -> Stream (Of a) m () loop n a | n == 0 = Return () | otherwise = Effect $ Control.return $ Step $ a :> loop (n - 1) a {-# INLINEABLE replicate #-} -- | Repeat an action several times, streaming its results. -- -- @ -- \>\>\> import qualified Unsafe.Linear as Unsafe -- \>\>\> import qualified Data.Time as Time -- \>\>\> let getCurrentTime = fromSystemIO (Unsafe.coerce Time.getCurrentTime) -- \>\>\> S.print $ S.replicateM 2 getCurrentTime -- 2015-08-18 00:57:36.124508 UTC -- 2015-08-18 00:57:36.124785 UTC -- @ replicateM :: Control.Monad m => Int -> m (Ur a) -> Stream (Of a) m () replicateM n ma | n < 0 = Prelude.error "Cannot replicate a stream of negative length" | otherwise = loop n ma where loop :: Control.Monad m => Int -> m (Ur a) -> Stream (Of a) m () loop n ma | n == 0 = Return () | otherwise = Effect $ Control.do Ur a <- ma Control.return $ Step $ a :> (replicateM (n - 1) ma) -- | Replicate a constant element and zip it with the finite stream which -- is the first argument. replicateZip :: Control.Monad m => Stream (Of x) m r -> a -> Stream (Of (a, x)) m r replicateZip stream a = map ((,) a) stream {-# INLINEABLE replicateZip #-} untilRight :: forall m a r. Control.Monad m => m (Either (Ur a) r) -> Stream (Of a) m r untilRight mEither = Effect loop where loop :: m (Stream (Of a) m r) loop = Control.do either <- mEither either & \case Left (Ur a) -> Control.return $ Step $ a :> (untilRight mEither) Right r -> Control.return $ Return r {-# INLINEABLE untilRight #-} -- # The \"Affine\" 'Stream' ------------------------------------------------------------------------------- -- | An *affine stream is represented with a state of type @x@, -- a possibly terminating step function of type @(x %1-> m (Either (f x) r))@, -- and a stop-short function @(x %1-> m r)@. -- -- This mirrors the unfold of a normal stream: -- -- > data Stream f m r where -- > Stream :: x %1-> (x %1-> m (Either (f x) r)) -> Stream f m r -- -- *Though referred to as an \"affine stream\" this might not be the correct -- definition for affine streams. Sorting this out requires a bit more -- careful thought. data AffineStream f m r where AffineStream :: x %1 -> (x %1 -> m (Either (f x) r)) -> (x %1 -> m r) -> AffineStream f m r -- | Take @n@ number of elements from the affine stream, for non-negative -- @n@. (Negative @n@ is treated as 0.) take :: forall f m r. (Control.Monad m, Control.Functor f) => Int -> AffineStream f m r %1 -> Stream f m r take = loop where loop :: Int -> AffineStream f m r %1 -> Stream f m r loop n (AffineStream s step end) | n <= 0 = Effect $ Control.fmap Control.return $ end s | otherwise = Effect $ Control.do next <- step s next & \case Right r -> Control.return (Return r) Left fx -> Control.return $ Step $ Control.fmap (\x -> loop (n - 1) (AffineStream x step end)) fx {-# INLINEABLE take #-} -- | Run an affine stream until it ends or a monadic test succeeds. -- Drop the element it succeeds on. untilM :: forall a m r. Control.Monad m => (a -> m Bool) -> AffineStream (Of a) m r %1 -> Stream (Of a) m r untilM = loop where loop :: (a -> m Bool) -> AffineStream (Of a) m r %1 -> Stream (Of a) m r loop test (AffineStream s step end) = Effect $ Control.do next <- step s next & \case Right r -> Control.return (Return r) Left (a :> next) -> Control.do testResult <- test a testResult & \case False -> Control.return $ Step $ a :> loop test (AffineStream next step end) True -> Control.fmap Control.return $ end next {-# INLINEABLE untilM #-} -- | Like 'untilM' but without the monadic test. until :: forall a m r. Control.Monad m => (a -> Bool) -> AffineStream (Of a) m r %1 -> Stream (Of a) m r until = loop where loop :: (a -> Bool) -> AffineStream (Of a) m r %1 -> Stream (Of a) m r loop test (AffineStream s step end) = Effect $ Control.do next <- step s next & \case Right r -> Control.return (Return r) Left (a :> next) -> case test a of True -> Control.fmap Control.return $ end next False -> Control.return $ Step $ a :> loop test (AffineStream next step end) {-# INLINEABLE until #-} -- | Zip a finite stream with an affine stream. zip :: forall a x m r1 r2. Control.Monad m => Stream (Of x) m r1 %1 -> AffineStream (Of a) m r2 %1 -> Stream (Of (x, a)) m (r1, r2) zip = loop where loop :: Stream (Of x) m r1 %1 -> AffineStream (Of a) m r2 %1 -> Stream (Of (x, a)) m (r1, r2) loop stream (AffineStream s step end) = stream & \case Return r1 -> Effect $ Control.fmap (\r2 -> Control.return $ (r1, r2)) $ end s Effect m -> Effect $ Control.fmap (\str -> loop str (AffineStream s step end)) m Step (x :> rest) -> Effect $ Control.do next <- step s next & \case Right r2 -> Control.do r1 <- effects rest Control.return (Return (r1, r2)) Left (a :> rest') -> Control.return $ Step $ (x, a) :> loop rest (AffineStream rest' step end) {-# INLINEABLE zip #-} -- | An affine stream of standard input lines. stdinLn :: AffineStream (Of Text) IO () stdinLn = AffineStream () getALine Control.pure where getALine :: () %1 -> IO (Either (Of Text ()) ()) getALine () = Control.do Ur line <- fromSystemIOU System.getLine Control.return $ Left (Text.pack line :> ()) -- | An affine stream of reading lines, crashing on failed parse. readLn :: Read a => AffineStream (Of a) IO () readLn = AffineStream () readALine Control.pure where readALine :: Read a => () %1 -> IO (Either (Of a ()) ()) readALine () = Control.do Ur line <- fromSystemIOU System.getLine Control.return $ Left (Prelude.read line :> ()) -- | An affine stream iterating an initial state forever. iterate :: forall a m. Control.Monad m => a -> (a -> a) -> AffineStream (Of a) m () iterate a step = AffineStream (Ur a) stepper (\x -> Control.return $ consume x) where stepper :: Ur a %1 -> m (Either (Of a (Ur a)) ()) stepper (Ur a) = Control.return $ Left $ a :> Ur (step a) -- | An affine stream monadically iterating an initial state forever. iterateM :: forall a m. Control.Monad m => m (Ur a) -> (a -> m (Ur a)) -> AffineStream (Of a) m () iterateM ma step = AffineStream ma stepper (Control.fmap consume) where stepper :: m (Ur a) %1 -> m (Either (Of a (m (Ur a))) ()) stepper ma = Control.do Ur a <- ma Control.return $ Left $ a :> (step a) -- Remark. In order to implement the affine break function, which is the third -- argument of the constructor, we need to specify the functor as @Of@. -- Approaches to keeping it functor general seem messy. -- | An affine stream cycling through a given finite stream forever. cycle :: forall a m r. (Control.Monad m, Consumable r) => Stream (Of a) m r -> AffineStream (Of a) m r cycle stream = -- Note. The state is (original stream, stream_in_current_cycle) AffineStream (Ur stream, stream) stepStream leftoverEffects where leftoverEffects :: (Ur (Stream (Of a) m r), Stream (Of a) m r) %1 -> m r leftoverEffects (Ur _, str) = effects str stepStream :: Control.Functor f => (Ur (Stream f m r), Stream f m r) %1 -> m (Either (f (Ur (Stream f m r), Stream f m r)) r) stepStream (Ur s, str) = str & \case Return r -> lseq r $ stepStream (Ur s, s) Effect m -> m Control.>>= (\stream -> stepStream (Ur s, stream)) Step f -> Control.return $ Left $ Control.fmap ((,) (Ur s)) f -- | An affine stream iterating an enumerated stream forever. enumFrom :: (Control.Monad m, Enum e) => e -> AffineStream (Of e) m () enumFrom e = iterate e Prelude.succ -- | An affine stream iterating an enumerated stream forever, using the -- first two elements to determine the gap to skip by. -- E.g., @enumFromThen 3 5@ is like @[3,5..]@. enumFromThen :: forall e m. (Control.Monad m, Enum e) => e -> e -> AffineStream (Of e) m () enumFromThen e e' = iterate e enumStep where enumStep :: e -> e enumStep enum = toEnum $ (fromEnum enum) + ((fromEnum e') - (fromEnum e)) -- Think: \enum -> enum + stepSize where stepSize = (e1 - e0) -- # Working with infinite 'Stream's ------------------------------------------------------------------------------- -- | @stdinLnN n@ is a stream of @n@ lines from standard input stdinLnN :: Int -> Stream (Of Text) IO () stdinLnN n = take n stdinLn {-# INLINE stdinLnN #-} -- | Provides a stream of standard input and omits the first line -- that satisfies the predicate, possibly requiring IO stdinLnUntilM :: (Text -> IO Bool) -> Stream (Of Text) IO () stdinLnUntilM test = untilM test stdinLn {-# INLINE stdinLnUntilM #-} -- | Provides a stream of standard input and omits the first line -- that satisfies the predicate stdinLnUntil :: (Text -> Bool) -> Stream (Of Text) IO () stdinLnUntil test = until test stdinLn {-# INLINE stdinLnUntil #-} -- | Given a finite stream, provide a stream of lines of standard input -- zipped with that finite stream stdinLnZip :: Stream (Of x) IO r %1 -> Stream (Of (x, Text)) IO r stdinLnZip stream = Control.fmap (\(r, ()) -> r) $ zip stream stdinLn {-# INLINE stdinLnZip #-} readLnN :: Read a => Int -> Stream (Of a) IO () readLnN n = take n readLn {-# INLINE readLnN #-} readLnUntilM :: Read a => (a -> IO Bool) -> Stream (Of a) IO () readLnUntilM test = untilM test readLn {-# INLINE readLnUntilM #-} readLnUntil :: Read a => (a -> Bool) -> Stream (Of a) IO () readLnUntil test = until test readLn {-# INLINE readLnUntil #-} readLnZip :: Read a => Stream (Of x) IO r %1 -> Stream (Of (x, a)) IO r readLnZip stream = Control.fmap (\(r, ()) -> r) $ zip stream readLn {-# INLINE readLnZip #-} -- | Iterate a pure function from a seed value, -- streaming the results forever. iterateN :: Control.Monad m => Int -> (a -> a) -> a -> Stream (Of a) m () iterateN n step a = take n $ iterate a step {-# INLINE iterateN #-} iterateZip :: Control.Monad m => Stream (Of x) m r -> (a -> a) -> a -> Stream (Of (x, a)) m r iterateZip stream step a = Control.fmap (\(r, ()) -> r) $ zip stream $ iterate a step {-# INLINE iterateZip #-} -- | Iterate a monadic function from a seed value, -- streaming the results forever. iterateMN :: Control.Monad m => Int -> (a -> m (Ur a)) -> m (Ur a) -> Stream (Of a) m () iterateMN n step ma = take n $ iterateM ma step {-# INLINE iterateMN #-} iterateMZip :: Control.Monad m => Stream (Of x) m r %1 -> (a -> m (Ur a)) -> m (Ur a) -> Stream (Of (x, a)) m r iterateMZip stream step ma = Control.fmap (\(r, ()) -> r) $ zip stream $ iterateM ma step {-# INLINE iterateMZip #-} -- | Cycle a stream a finite number of times cycleN :: (Control.Monad m, Consumable r) => Int -> Stream (Of a) m r -> Stream (Of a) m r cycleN n stream = take n $ cycle stream {-# INLINE cycleN #-} -- | @cycleZip s1 s2@ will cycle @s2@ just enough to zip with the given finite -- stream @s1@. Note that we consume all the effects of the remainder of the -- cycled stream @s2@. That is, we consume @s2@ the smallest natural number of -- times we need to zip. cycleZip :: (Control.Monad m, Consumable s) => Stream (Of a) m r %1 -> Stream (Of b) m s -> Stream (Of (a, b)) m (r, s) cycleZip str stream = zip str $ cycle stream {-# INLINE cycleZip #-} -- | An finite sequence of enumerable values at a fixed distance, determined -- by the first and second values. -- -- @ -- \>\>\> S.print $ S.enumFromThenN 3 100 200 -- 100 -- 200 -- 300 -- @ enumFromThenN :: (Control.Monad m, Enum e) => Int -> e -> e -> Stream (Of e) m () enumFromThenN n e e' = take n $ enumFromThen e e' {-# INLINE enumFromThenN #-} -- | A finite sequence of enumerable values at a fixed distance determined -- by the first and second values. The length is limited by zipping -- with a given finite stream, i.e., the first argument. enumFromThenZip :: (Control.Monad m, Enum e) => Stream (Of a) m r %1 -> e -> e -> Stream (Of (a, e)) m r enumFromThenZip stream e e' = Control.fmap (\(r, ()) -> r) $ zip stream $ enumFromThen e e' {-# INLINE enumFromThenZip #-} -- | Like 'enumFromThenN' but where the next element in the enumeration is just -- the successor @succ n@ for a given enum @n@. enumFromN :: (Control.Monad m, Enum e) => Int -> e -> Stream (Of e) m () enumFromN n e = take n $ enumFrom e {-# INLINE enumFromN #-} -- | Like 'enumFromThenZip' but where the next element in the enumeration is just -- the successor @succ n@ for a given enum @n@. enumFromZip :: (Control.Monad m, Enum e) => Stream (Of a) m r %1 -> e -> Stream (Of (a, e)) m r enumFromZip str e = Control.fmap (\(r, ()) -> r) $ zip str $ enumFrom e {-# INLINE enumFromZip #-}