streaming-0.1.4.5: an elementary streaming prelude and general stream type.

Streaming.Internal

Synopsis

The Stream data type is equivalent to FreeT and can represent any effectful succession of steps, where the form of the steps or commands is specified by the first (functor) parameter.

data Stream f m r = Step !(f (Stream f m r)) | Effect (m (Stream f m r)) | Return r

The producer concept uses the simple functor  (a,_)  - or the stricter  Of a _ . Then the news at each step or layer is just: an individual item of type a. Since Stream (Of a) m r is equivalent to Pipe.Producer a m r, much of the pipes Prelude can easily be mirrored in a streaming Prelude. Similarly, a simple Consumer a m r or Parser a m r concept arises when the base functor is  (a -> _)  . Stream ((->) input) m result consumes input until it returns a result.

To avoid breaking reasoning principles, the constructors should not be used directly. A pattern-match should go by way of inspect - or, in the producer case, next

The constructors are exported by the Internal module.

data Stream f m r Source #

Constructors

 Step !(f (Stream f m r)) Effect (m (Stream f m r)) Return r

Instances

 (MonadBase b m, Functor f) => MonadBase b (Stream f m) Source # MethodsliftBase :: b α -> Stream f m α # (Functor f, MonadError e m) => MonadError e (Stream f m) Source # MethodsthrowError :: e -> Stream f m a #catchError :: Stream f m a -> (e -> Stream f m a) -> Stream f m a # (Functor f, MonadReader r m) => MonadReader r (Stream f m) Source # Methodsask :: Stream f m r #local :: (r -> r) -> Stream f m a -> Stream f m a #reader :: (r -> a) -> Stream f m a # (Functor f, MonadState s m) => MonadState s (Stream f m) Source # Methodsget :: Stream f m s #put :: s -> Stream f m () #state :: (s -> (a, s)) -> Stream f m a # Functor f => MFunctor (Stream f) Source # Methodshoist :: Monad m => (forall a. m a -> n a) -> Stream f m b -> Stream f n b # Functor f => MMonad (Stream f) Source # Methodsembed :: Monad n => (forall a. m a -> Stream f n a) -> Stream f m b -> Stream f n b # Functor f => MonadTrans (Stream f) Source # Methodslift :: Monad m => m a -> Stream f m a # (Functor f, Monad m) => Monad (Stream f m) Source # Methods(>>=) :: Stream f m a -> (a -> Stream f m b) -> Stream f m b #(>>) :: Stream f m a -> Stream f m b -> Stream f m b #return :: a -> Stream f m a #fail :: String -> Stream f m a # (Functor f, Monad m) => Functor (Stream f m) Source # Methodsfmap :: (a -> b) -> Stream f m a -> Stream f m b #(<$) :: a -> Stream f m b -> Stream f m a # (Functor f, Monad m) => Applicative (Stream f m) Source # Methodspure :: a -> Stream f m a #(<*>) :: Stream f m (a -> b) -> Stream f m a -> Stream f m b #(*>) :: Stream f m a -> Stream f m b -> Stream f m b #(<*) :: Stream f m a -> Stream f m b -> Stream f m a # (MonadIO m, Functor f) => MonadIO (Stream f m) Source # MethodsliftIO :: IO a -> Stream f m a # (Applicative f, Monad m) => Alternative (Stream f m) Source # The Alternative instance glues streams together stepwise.empty = never (<|>) = zipsWith (liftA2 (,))See also never, untilJust and delays Methodsempty :: Stream f m a #(<|>) :: Stream f m a -> Stream f m a -> Stream f m a #some :: Stream f m a -> Stream f m [a] #many :: Stream f m a -> Stream f m [a] # (Applicative f, Monad m) => MonadPlus (Stream f m) Source # Methodsmzero :: Stream f m a #mplus :: Stream f m a -> Stream f m a -> Stream f m a # (MonadThrow m, Functor f) => MonadThrow (Stream f m) Source # MethodsthrowM :: Exception e => e -> Stream f m a # (MonadCatch m, Functor f) => MonadCatch (Stream f m) Source # Methodscatch :: Exception e => Stream f m a -> (e -> Stream f m a) -> Stream f m a # (MonadResource m, Functor f) => MonadResource (Stream f m) Source # MethodsliftResourceT :: ResourceT IO a -> Stream f m a # (Eq r, Eq (m (Stream f m r)), Eq (f (Stream f m r))) => Eq (Stream f m r) Source # Methods(==) :: Stream f m r -> Stream f m r -> Bool #(/=) :: Stream f m r -> Stream f m r -> Bool # (Typeable (* -> *) f, Typeable (* -> *) m, Data r, Data (m (Stream f m r)), Data (f (Stream f m r))) => Data (Stream f m r) Source # Methodsgfoldl :: (forall d b. Data d => c (d -> b) -> d -> c b) -> (forall g. g -> c g) -> Stream f m r -> c (Stream f m r) #gunfold :: (forall b a. Data b => c (b -> a) -> c a) -> (forall a. a -> c a) -> Constr -> c (Stream f m r) #toConstr :: Stream f m r -> Constr #dataTypeOf :: Stream f m r -> DataType #dataCast1 :: Typeable (* -> *) t => (forall d. Data d => c (t d)) -> Maybe (c (Stream f m r)) #dataCast2 :: Typeable (* -> * -> *) t => (forall d e. (Data d, Data e) => c (t d e)) -> Maybe (c (Stream f m r)) #gmapT :: (forall b. Data b => b -> b) -> Stream f m r -> Stream f m r #gmapQl :: (r -> r' -> r) -> r -> (forall d. Data d => d -> r') -> Stream f m r -> r #gmapQr :: (r' -> r -> r) -> r -> (forall d. Data d => d -> r') -> Stream f m r -> r #gmapQ :: (forall d. Data d => d -> u) -> Stream f m r -> [u] #gmapQi :: Int -> (forall d. Data d => d -> u) -> Stream f m r -> u #gmapM :: Monad m => (forall d. Data d => d -> m d) -> Stream f m r -> m (Stream f m r) #gmapMp :: MonadPlus m => (forall d. Data d => d -> m d) -> Stream f m r -> m (Stream f m r) #gmapMo :: MonadPlus m => (forall d. Data d => d -> m d) -> Stream f m r -> m (Stream f m r) # (Show r, Show (m (Stream f m r)), Show (f (Stream f m r))) => Show (Stream f m r) Source # MethodsshowsPrec :: Int -> Stream f m r -> ShowS #show :: Stream f m r -> String #showList :: [Stream f m r] -> ShowS # (Functor f, Monad m, Monoid w) => Monoid (Stream f m w) Source # Methodsmempty :: Stream f m w #mappend :: Stream f m w -> Stream f m w -> Stream f m w #mconcat :: [Stream f m w] -> Stream f m w # Introducing a stream unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r Source # Build a Stream by unfolding steps starting from a seed. See also the specialized unfoldr in the prelude. unfold inspect = id -- modulo the quotient we work with unfold Pipes.next :: Monad m => Producer a m r -> Stream ((,) a) m r unfold (curry (:>) . Pipes.next) :: Monad m => Producer a m r -> Stream (Of a) m r replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m () Source # Repeat a functorial layer, command or instruction a fixed number of times. replicates n = takes n . repeats repeats :: (Monad m, Functor f) => f () -> Stream f m r Source # Repeat a functorial layer (a "command" or "instruction") forever. repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r Source # Repeat an effect containing a functorial layer, command or instruction forever. effect :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r Source # Wrap an effect that returns a stream effect = join . lift wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r Source # Wrap a new layer of a stream. So, e.g. S.cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r S.cons a str = wrap (a :> str) and, recursively: S.each :: (Monad m, Foldable t) => t a -> Stream (Of a) m () S.each = foldr (\a b -> wrap (a :> b)) (return ()) The two operations wrap :: (Monad m, Functor f ) => f (Stream f m r) -> Stream f m r effect :: (Monad m, Functor f ) => m (Stream f m r) -> Stream f m r are fundamental. We can define the parallel operations yields and lift in terms of them yields :: (Monad m, Functor f ) => f r -> Stream f m r yields = wrap . fmap return lift :: (Monad m, Functor f ) => m r -> Stream f m r lift = effect . fmap return yields :: (Monad m, Functor f) => f r -> Stream f m r Source # yields is like lift for items in the streamed functor. It makes a singleton or one-layer succession. lift :: (Monad m, Functor f) => m r -> Stream f m r yields :: (Monad m, Functor f) => f r -> Stream f m r Viewed in another light, it is like a functor-general version of yield: S.yield a = yields (a :> ()) streamBuild :: (forall b. (r -> b) -> (m b -> b) -> (f b -> b) -> b) -> Stream f m r Source # Reflect a church-encoded stream; cp. GHC.Exts.build streamFold return_ effect_ step_ (streamBuild psi) = psi return_ effect_ step_ cycles :: (Monad m, Functor f) => Stream f m () -> Stream f m r Source # Construct an infinite stream by cycling a finite one cycles = forever >>>   never :: (Monad m, Applicative f) => Stream f m r Source # never interleaves the pure applicative action with the return of the monad forever. It is the empty of the Alternative instance, thus never <|> a = a a <|> never = a and so on. If w is a monoid then never :: Stream (Of w) m r is the infinite sequence of mempty, and str1 <|> str2 appends the elements monoidally until one of streams ends. Thus we have, e.g. >>> S.stdoutLn$ S.take 2 $S.stdinLn <|> S.repeat " " <|> S.stdinLn <|> S.repeat " " <|> S.stdinLn 1<Enter> 2<Enter> 3<Enter> 1 2 3 4<Enter> 5<Enter> 6<Enter> 4 5 6  This is equivalent to >>> S.stdoutLn$ S.take 2 $foldr (<|>) never [S.stdinLn, S.repeat " ", S.stdinLn, S.repeat " ", S.stdinLn ]  Where f is a monad, (<|>) sequences the conjoined streams stepwise. See the definition of paste here, where the separate steps are bytestreams corresponding to the lines of a file. Given, say, data Branch r = Branch r r deriving Functor -- add obvious applicative instance then never :: Stream Branch Identity r is the pure infinite binary tree with (inaccessible) rs in its leaves. Given two binary trees, tree1 <|> tree2 intersects them, preserving the leaves that came first, so tree1 <|> never = tree1 Stream Identity m r is an action in m that is indefinitely delayed. Such an action can be constructed with e.g. untilJust. untilJust :: (Monad m, Applicative f) => m (Maybe r) -> Stream f m r Given two such items, <|> instance races them. It is thus the iterative monad transformer specially defined in Control.Monad.Trans.Iter So, for example, we might write >>> let justFour str = if length str == 4 then Just str else Nothing >>> let four = untilJust (liftM justFour getLine) >>> run four one<Enter> two<Enter> three<Enter> four<Enter> "four"  The Alternative instance in Control.Monad.Trans.Free is avowedly wrong, though no explanation is given for this. untilJust :: (Monad m, Applicative f) => m (Maybe r) -> Stream f m r Source # Repeat a Eliminating a stream intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m x -> Stream (t m) m r -> t m r Source # Interpolate a layer at each segment. This specializes to e.g. intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r Source # Dissolves the segmentation into layers of Stream f m layers. iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a Source # Specialized fold following the usage of Control.Monad.Trans.Free iterT alg = streamFold return join alg iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a Source # Specialized fold following the usage of Control.Monad.Trans.Free iterTM alg = streamFold return (join . lift) destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b Source # Map a stream directly to its church encoding; compare Data.List.foldr streamFold :: (Functor f, Monad m) => (r -> b) -> (m b -> b) -> (f b -> b) -> Stream f m r -> b Source # streamFold reorders the arguments of destroy to be more akin to foldr It is more convenient to query in ghci to figure out what kind of 'algebra' you need to write. >>> :t streamFold return join (Monad m, Functor f) => (f (m a) -> m a) -> Stream f m a -> m a -- iterT  >>> :t streamFold return (join . lift) (Monad m, Monad (t m), Functor f, MonadTrans t) => (f (t m a) -> t m a) -> Stream f m a -> t m a -- iterTM  >>> :t streamFold return effect (Monad m, Functor f, Functor g) => (f (Stream g m r) -> Stream g m r) -> Stream f m r -> Stream g m r  >>> :t \f -> streamFold return effect (wrap . f) (Monad m, Functor f, Functor g) => (f (Stream g m a) -> g (Stream g m a)) -> Stream f m a -> Stream g m a -- maps  >>> :t \f -> streamFold return effect (effect . liftM wrap . f) (Monad m, Functor f, Functor g) => (f (Stream g m a) -> m (g (Stream g m a))) -> Stream f m a -> Stream g m a -- mapped  Inspecting a stream wrap by wrap inspect :: (Functor f, Monad m) => Stream f m r -> m (Either r (f (Stream f m r))) Source # Inspect the first stage of a freely layered sequence. Compare Pipes.next and the replica Streaming.Prelude.next. This is the uncons for the general unfold. unfold inspect = id Streaming.Prelude.unfoldr StreamingPrelude.next = id Transforming streams maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source # Map layers of one functor to another with a transformation. Compare hoist, which has a similar effect on the monadic parameter. maps id = id maps f . maps g = maps (f . g) mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source # Map layers of one functor to another with a transformation involving the base monad maps is more fundamental than mapsM, which is best understood as a convenience for effecting this frequent composition: mapsM phi = decompose . maps (Compose . phi) The streaming prelude exports the same function under the better name mapped, which overlaps with the lens libraries. decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r Source # Rearrange a succession of layers of the form Compose m (f x). we could as well define decompose by mapsM: decompose = mapped getCompose but mapped is best understood as: mapped phi = decompose . maps (Compose . phi) since maps and hoist are the really fundamental operations that preserve the shape of the stream: maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r hoist :: (Monad m, Functor f) => (forall a. m a -> n a) -> Stream f m r -> Stream f n r mapsM_ :: (Functor f, Monad m) => (forall x. f x -> m x) -> Stream f m r -> m r Source # Map each layer to an effect, and run them all. run :: Monad m => Stream m m r -> m r Source # Run the effects in a stream that merely layers effects. distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r Source # Make it possible to 'run' the underlying transformed monad. groups :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream (Sum (Stream f m) (Stream g m)) m r Source # Group layers in an alternating stream into adjoining sub-streams of one type or another. Splitting streams chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r Source # Break a stream into substreams each with n functorial layers. >>> S.print$ mapped S.sum $chunksOf 2$ each [1,1,1,1,1]
2
2
1


splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source #

Split a succession of layers after some number, returning a streaming or effectful pair.

>>> rest <- S.print $S.splitAt 1$ each [1..3]
1
>>> S.print rest
2
3

splitAt 0 = return
splitAt n >=> splitAt m = splitAt (m+n)

Thus, e.g.

>>> rest <- S.print $splitsAt 2 >=> splitsAt 2$ each [1..5]
1
2
3
4
>>> S.print rest
5


takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () Source #

cutoff :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Maybe r) Source #

Zipping and unzipping streams

zipsWith :: (Monad m, Functor f, Functor g, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r Source #

zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r Source #

unzips :: (Monad m, Functor f, Functor g) => Stream (Compose f g) m r -> Stream f (Stream g m) r Source #

interleaves :: (Monad m, Applicative h) => Stream h m r -> Stream h m r -> Stream h m r Source #

Interleave functor layers, with the effects of the first preceding the effects of the second.

interleaves = zipsWith (liftA2 (,))
>>> let paste = \a b -> interleaves (Q.lines a) (maps (Q.cons' '\t') (Q.lines b))
>>> Q.stdout $Q.unlines$ paste "hello\nworld\n" "goodbye\nworld\n"
hello	goodbye
world	world


separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r Source #

Given a stream on a sum of functors, make it a stream on the left functor, with the streaming on the other functor as the governing monad. This is useful for acting on one or the other functor with a fold, leaving the other material for another treatment. It generalizes partitionEithers, but actually streams properly.

>>> let odd_even = S.maps (S.distinguish even) $S.each [1..10::Int] >>> :t separate odd_even separate odd_even :: Monad m => Stream (Of Int) (Stream (Of Int) m) ()  Now, for example, it is convenient to fold on the left and right values separately: >>> S.toList$ S.toList $separate odd_even [2,4,6,8,10] :> ([1,3,5,7,9] :> ())  Or we can write them to separate files or whatever: >>> runResourceT$ S.writeFile "even.txt" . S.show $S.writeFile "odd.txt" . S.show$ S.separate odd_even
>>> :! cat even.txt
2
4
6
8
10
>>> :! cat odd.txt
1
3
5
7
9


Of course, in the special case of Stream (Of a) m r, we can achieve the above effects more simply by using copy

>>> S.toList . S.filter even $S.toList . S.filter odd$ S.copy $each [1..10::Int] [2,4,6,8,10] :> ([1,3,5,7,9] :> ())  But separate and unseparate are functor-general. unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r Source # Assorted Data.Functor.x help switch :: Sum f g r -> Sum g f r Source # Swap the order of functors in a sum of functors. >>> S.toList$ S.print $separate$ maps S.switch $maps (S.distinguish (=='a'))$ S.each "banana"
'a'
'a'
'a'
"bnn" :> ()
>>> S.toList $S.print$ separate $maps (S.distinguish (=='a'))$ S.each "banana"
'b'
'n'
'n'
"aaa" :> ()


ResourceT help

bracketStream :: (Functor f, MonadResource m) => IO a -> (a -> IO ()) -> (a -> Stream f m b) -> Stream f m b Source #

For use in implementation

unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r Source #

This is akin to the observe of Pipes.Internal . It reeffects the layering in instances of Stream f m r so that it replicates that of FreeT.

hoistExposed :: (Functor f, Monad m1) => (m1 (Stream f m r) -> m (Stream f m r)) -> Stream f m1 r -> Stream f m r Source #

mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source #

mapsMExposed :: (Functor f1, Monad m) => (f1 (Stream f m r) -> m (f (Stream f m r))) -> Stream f1 m r -> Stream f m r Source #

destroyExposed :: (Functor f, Monad m) => Stream f m t -> (f b -> b) -> (m b -> b) -> (t -> b) -> b Source #