jet-stream-1.0.0.0: Yet another streaming library.
Safe HaskellNone
LanguageHaskell2010

Jet.Internal

Description

Tampering with the internals lets you write invalid Jets that don't respect stop signals from consumers, so be careful.

Also, the internals expose Line and ByteBundle as thin coats of paint over lazy text and lazy bytestring, respectively.

Synopsis

Documentation

>>> :set -XTypeApplications
>>> :set -XImportQualifiedPost
>>> :set -XScopedTypeVariables
>>> :set -XLambdaCase
>>> :set -XNumDecimals
>>> import Jet (Jet, (&))
>>> import Jet qualified as J
>>> import Control.Foldl qualified as L
>>> import Control.Concurrent
>>> import Data.IORef
>>> import Data.Text qualified as T

newtype Jet a Source #

A Jet is a sequence of values produced through IO effects.

It allows consuming the elements as they are produced and doesn't force them to be present in memory all at the same time, unlike functions like replicateM from base.

Constructors

Jet 

Fields

  • runJet :: forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
     

Instances

Instances details
Monad Jet Source #

Similar to the instance for pure lists, that does search.

>>> :{
do string <- J.each ["ab","cd"]
   J.each string
&
J.toList
:}
"abcd"
Instance details

Defined in Jet.Internal

Methods

(>>=) :: Jet a -> (a -> Jet b) -> Jet b

(>>) :: Jet a -> Jet b -> Jet b

return :: a -> Jet a

Functor Jet Source #

Maps over the yielded elements. (<&>) can be used to put the function last.

>>> J.each "aa" <&> succ & J.toList
"bb"
Instance details

Defined in Jet.Internal

Methods

fmap :: (a -> b) -> Jet a -> Jet b

(<$) :: a -> Jet b -> Jet a

MonadFail Jet Source #

A failed pattern-match in a do-block produces mzero.

>>> :{
do Just c <- J.each [Nothing, Just 'a', Nothing, Just 'b']
   pure c
& J.toList
:}
"ab"
Instance details

Defined in Jet.Internal

Methods

fail :: String -> Jet a

Applicative Jet Source #

Similar to the instance for pure lists, that generates combinations.

>>> (,) <$> J.each "ab" <*> J.each "cd" & J.toList
[('a','c'),('a','d'),('b','c'),('b','d')]
Instance details

Defined in Jet.Internal

Methods

pure :: a -> Jet a

(<*>) :: Jet (a -> b) -> Jet a -> Jet b

liftA2 :: (a -> b -> c) -> Jet a -> Jet b -> Jet c

(*>) :: Jet a -> Jet b -> Jet b

(<*) :: Jet a -> Jet b -> Jet a

Alternative Jet Source #

Same as Monoid.

Instance details

Defined in Jet.Internal

Methods

empty :: Jet a

(<|>) :: Jet a -> Jet a -> Jet a

some :: Jet a -> Jet [a]

many :: Jet a -> Jet [a]

MonadPlus Jet Source #

Same as Monoid

Instance details

Defined in Jet.Internal

Methods

mzero :: Jet a

mplus :: Jet a -> Jet a -> Jet a

MonadIO Jet Source #
>>> liftIO (putStrLn "foo") <> liftIO (putStrLn "bar") & J.toList
foo
bar
[(),()]
Instance details

Defined in Jet.Internal

Methods

liftIO :: IO a -> Jet a

Semigroup (Jet a) Source #

Jet concatenation.

>>> J.each "ab" <> J.each "cd" & J.toList
"abcd"
Instance details

Defined in Jet.Internal

Methods

(<>) :: Jet a -> Jet a -> Jet a

sconcat :: NonEmpty (Jet a) -> Jet a

stimes :: Integral b => b -> Jet a -> Jet a

Monoid (Jet a) Source #

mempty is the empty Jet.

>>> mempty <> J.each "ab" <> mempty & J.toList
"ab"
Instance details

Defined in Jet.Internal

Methods

mempty :: Jet a

mappend :: Jet a -> Jet a -> Jet a

mconcat :: [Jet a] -> Jet a

run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s Source #

Go through the elements produced by a Jet, while threading an state s and possibly performing some effect.

The caller is the one who chooses the type of the state s, and must pass an initial value for it. The state is kept in weak-head normal form.

The caller must also provide a predicate on the state that informs the Jet when to stop producing values: whenever the predicate returns True.

consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s Source #

Like run, but always goes through all elements produced by the Jet.

Equivalent to run (const False).

for :: Jet a -> (a -> IO b) -> Jet b Source #

for_ :: Jet a -> (a -> IO b) -> IO () Source #

traverse :: (a -> IO b) -> Jet a -> Jet b Source #

Apply an effectful transformation to each element in a Jet.

>>> :{
J.each "abc" 
& J.traverse (\c -> let c' = succ c in putStrLn ([c] ++ " -> " ++ [c']) *> pure c')
& J.toList
:}
a -> b
b -> c
c -> d
"bcd"

traverse_ :: (a -> IO b) -> Sink a Source #

drain :: Sink a Source #

Go through the Jet only for the IO effects, discarding all yielded elements.

each :: forall a f. Foldable f => f a -> Jet a Source #

Build a Jet from any Foldable container

>>> J.each [True,False] & J.toList
[True,False]

repeat :: a -> Jet a Source #

>>> J.repeat True & J.take 2 & J.toList
[True,True]

repeatIO :: IO a -> Jet a Source #

>>> J.repeatIO (putStrLn "hi" *> pure True) & J.take 2 & J.toList
hi
hi
[True,True]

replicate :: Int -> a -> Jet a Source #

>>> J.replicate 2 True & J.toList
[True,True]

replicateIO :: Int -> IO a -> Jet a Source #

>>> J.replicateIO 2 (putStrLn "hi" *> pure True) & J.toList
hi
hi
[True,True]

Don't confuse this with Control.Monad.replicateM :: Int -> Jet a -> Jet [a] which has a combinatorial behavior.

iterate :: (a -> a) -> a -> Jet a Source #

>>> J.iterate succ (1 :: Int) & J.take 2 & J.toList
[1,2]

iterateIO :: (a -> IO a) -> a -> Jet a Source #

>>> J.iterateIO (\x -> putStrLn "hi" *> pure (succ x)) (1 :: Int) & J.take 2 & J.toList
hi
[1,2]

unfold :: (b -> Maybe (a, b)) -> b -> Jet a Source #

>>> J.unfold (\case [] -> Nothing ; c : cs -> Just (c,cs)) "abc" & J.toList
"abc"

unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a Source #

>>> :{
J.unfoldIO (\x -> do putStrLn "hi" 
                     pure $ case x of 
                        [] -> Nothing 
                        c : cs -> Just (c,cs)) 
           "abc" 
& J.toList
:}                             
hi
hi
hi
hi
"abc"

untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a Source #

>>> j = J.untilEOF System.IO.hIsEOF System.IO.hGetLine :: Handle -> Jet String

untilNothing :: IO (Maybe a) -> Jet a Source #

>>> :{
do ref <- newIORef "abc"
   let pop = atomicModifyIORef ref (\case [] -> ([], Nothing)
                                          x : xs -> (xs, Just x)) 
   J.untilNothing pop & J.toList                                       
:}
"abc"

toList :: Jet a -> IO [a] Source #

Convert to a regular list. This breaks streaming.

>>> J.each "abc" & J.toList
"abc"

Alternatively, we can use fold in combination with list form the foldl library:

>>> L.purely (J.fold (J.each "abc")) L.list
"abc"

which is more verbose, but more composable.

length :: Jet a -> IO Int Source #

Returns the number of elements yielded by the Jet, exhausting it in the process.

>>> J.each "abc" & J.length
3

Alternatively, we can use fold in combination with length form the foldl library:

>>> L.purely (J.fold (J.each "abc")) L.length
3

which is more verbose, but more composable.

data Pair a b Source #

Constructors

Pair !a !b 

Instances

Instances details
(Show a, Show b) => Show (Pair a b) Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> Pair a b -> ShowS

show :: Pair a b -> String

showList :: [Pair a b] -> ShowS

pairExtract :: Pair a b -> b Source #

pairEnv :: Pair a b -> a Source #

data Triple a b c Source #

Constructors

Triple !a !b !c 

drop :: Int -> Jet a -> Jet a Source #

>>> J.each "abc" & J.drop 2 & J.toList
"c"

dropWhile :: (a -> Bool) -> Jet a -> Jet a Source #

>>> J.each [1..5] & J.dropWhile (<3) & J.toList
[3,4,5]

dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a Source #

take :: Int -> Jet a -> Jet a Source #

>>> J.each "abc" & J.take 2 & J.toList
"ab"

limit :: Int -> Jet a -> Jet a Source #

Synonym for take.

takeWhile :: (a -> Bool) -> Jet a -> Jet a Source #

>>> J.each [1..] & J.takeWhile (<5) & J.toList
[1,2,3,4]

takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a Source #

filter :: (a -> Bool) -> Jet a -> Jet a Source #

>>> J.each "abc" & J.filter (=='a') & J.toList
"a"

filterIO :: (a -> IO Bool) -> Jet a -> Jet a Source #

mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c Source #

Behaves like a combination of fmap and foldl; it applies a function to each element of a structure passing an accumulating parameter from left to right.

The resulting Jet has the same number of elements as the original one.

Unlike mapAccumL, it doesn't make the final state available.

>>> J.each [1,2,3,4] & J.mapAccum (\a b -> (a + b,a)) 0 & J.toList
[0,1,3,6]

mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c Source #

intersperse :: a -> Jet a -> Jet a Source #

>>> J.each "abc" & J.intersperse '-' & J.toList
"a-b-c"

zip :: Foldable f => f a -> Jet b -> Jet (a, b) Source #

>>> J.each "abc" & J.zip [1..] & J.toList
[(1,'a'),(2,'b'),(3,'c')]
>>> J.each [1..] & J.zip "abc" & J.toList
[('a',1),('b',2),('c',3)]

zipWith :: Foldable f => (a -> b -> c) -> f a -> Jet b -> Jet c Source #

zipIO :: Foldable f => f (IO a) -> Jet b -> Jet (a, b) Source #

zipWithIO :: Foldable f => (a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c Source #

Zips a list of IO actions with a Jet, where the combining function can also have effects.

If the list of actions is exhausted, the Jet stops:

>>> J.each [1..] <&> show & zipWithIO (\c1 c2 -> putStrLn (c1 ++ c2)) [pure "a", pure "b"] & J.toList
a1
b2
[(),()]

withFile :: FilePath -> IOMode -> Jet Handle Source #

Opens a file and makes the Handle available to all following statements in the do-block.

Notice that it's often simpler to use the JetSource (for reading) and JetSink (for writing) instances of File.

bracket Source #

Arguments

:: forall a b. IO a

allocator

-> (a -> IO b)

finalizer

-> Jet a 
>>> :{
do r <- J.bracket (putStrLn "allocating" *> pure "foo") (\r -> putStrLn $ "deallocating " ++ r)
   liftIO $ putStrLn $ "using resource " ++ r
& drain
:}
allocating
using resource foo
deallocating foo

bracket_ Source #

Arguments

:: forall a b. IO a

allocator

-> IO b

finalizer

-> Jet () 

bracketOnError Source #

Arguments

:: forall a b. IO a

allocator

-> (a -> IO b)

finalizer

-> Jet a 

finally :: IO a -> Jet () Source #

Notice how the finalizer runs even when we limit the Jet:

>>> :{
do J.finally (putStrLn "hi") -- protects statements below
   liftIO (putStrLn "hey")
   J.each "abc" 
& J.limit 2 
& J.toList
:}
hey
hi
"ab"

But if the protected Jet is not consumed at all, the finalizer might not run.

>>> :{
do J.finally (putStrLn "hi") -- protects statements below 
   liftIO (putStrLn "hey") 
   J.each "abc" 
& J.limit 0 
& J.toList
:}
""

onException :: IO a -> Jet () Source #

control :: forall resource. (forall x. (resource -> IO x) -> IO x) -> Jet resource Source #

Lift a control operation (like bracket) for which the callback uses the allocated resource.

control_ :: (forall x. IO x -> IO x) -> Jet () Source #

Lift a control operation (like finally) for which the callback doesn't use the allocated resource.

unsafeCoerceControl :: forall resource. (forall x. (resource -> IO x) -> IO x) -> forall x. (resource -> IO x) -> IO x Source #

"morally", all control operations compatible with this library should execute the callback only once, which means that they should have a linear type. But because linear types are not widespread, they usually are given a less precise non-linear type. If you know what you are doing, use this function to give them a linear type.

unsafeCoerceControl_ :: (forall x. IO x -> IO x) -> forall x. IO x -> IO x Source #

Line unsafeCoerceControl, for when the callback doesn't use the allocated resource.

fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r Source #

>>> L.purely (J.fold (J.each "abc")) ((,) <$> L.list <*> L.length)
("abc",3)

foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r Source #

>>> L.impurely (J.foldIO (J.each "abc")) (L.FoldM (\() c -> putStrLn [c]) (pure ()) pure *> L.generalize L.length)
a
b
c
3

data ChunkSize Source #

Instances

Instances details
Show ChunkSize Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> ChunkSize -> ShowS

show :: ChunkSize -> String

showList :: [ChunkSize] -> ShowS

class JetSource a source where Source #

Helper multi-parameter typeclass for creating Jet values out of a variety of common sources.

Because there's no functional dependency, sometimes we need to use TypeApplications to give the compiler a hint about the type of elements we want to produce. For example, here we want Lines and not, say, ByteStrings:

>>> action = J.jet @Line (File "foo.txt") & J.sink J.stdout

Methods

jet :: source -> Jet a Source #

Instances

Instances details
JetSource a Handle => JetSource a File Source # 
Instance details

Defined in Jet.Internal

Methods

jet :: File -> Jet a Source #

JetSource ByteString Handle Source # 
Instance details

Defined in Jet.Internal

Methods

jet :: Handle -> Jet ByteString Source #

JetSource Line Handle Source #

Uses the default system locale.

Instance details

Defined in Jet.Internal

Methods

jet :: Handle -> Jet Line Source #

bytes :: ChunkSize -> Handle -> Jet ByteString Source #

accumByteLengths :: Jet ByteString -> Jet (Int, ByteString) Source #

data AmIContinuing Source #

Constructors

Continuing 
NotContinuing 

Instances

Instances details
Show AmIContinuing Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> AmIContinuing -> ShowS

show :: AmIContinuing -> String

showList :: [AmIContinuing] -> ShowS

bytesOverBuckets :: [Int] -> Splitter ByteString ByteString Source #

Splits a stream of bytes into groups bounded by maximum byte sizes. When one group "fills up", the next one is started.

When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.

Useful in combination with recast.

newtype ByteBundle Source #

A sequence of bytes that we might want to keep together.

Constructors

ByteBundle ByteString 

Instances

Instances details
Show ByteBundle Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> ByteBundle -> ShowS

show :: ByteBundle -> String

showList :: [ByteBundle] -> ShowS

Semigroup ByteBundle Source # 
Instance details

Defined in Jet.Internal

Methods

(<>) :: ByteBundle -> ByteBundle -> ByteBundle

sconcat :: NonEmpty ByteBundle -> ByteBundle

stimes :: Integral b => b -> ByteBundle -> ByteBundle

Monoid ByteBundle Source # 
Instance details

Defined in Jet.Internal

JetSink ByteBundle Handle Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink ByteBundle Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

bundle :: Foldable f => f ByteString -> ByteBundle Source #

Constructs a ByteBundle out of the bytes of some Foldable container.

bundleLength :: ByteBundle -> Int Source #

Length in bytes.

bundleBytes :: ByteBundle -> Jet ByteString Source #

data BucketOverflow Source #

Exception thrown when we try to write too much data in a size-bounded destination.

Constructors

BucketOverflow 

Instances

Instances details
Show BucketOverflow Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> BucketOverflow -> ShowS

show :: BucketOverflow -> String

showList :: [BucketOverflow] -> ShowS

Exception BucketOverflow Source # 
Instance details

Defined in Jet.Internal

Methods

toException :: BucketOverflow -> SomeException

fromException :: SomeException -> Maybe BucketOverflow

displayException :: BucketOverflow -> String

byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString Source #

Splits a stream of ByteBundles into groups bounded by maximum byte sizes. Bytes belonging to the same ByteBundle are always put in the same group. When one group "fills up", the next one is started.

When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.

Useful in combination with recast.

THROWS:

  • BucketOverflow exception if the size bound of a group turns out to be too small for holding even a single ByteBundle value.

decodeUtf8 :: Jet ByteString -> Jet Text Source #

encodeUtf8 :: Jet Text -> Jet ByteString Source #

newtype Line Source #

A line of text.

While it is guaranteed that the Lines coming out of the lines function do not contain newlines, that invariant is not otherwise enforced.

Constructors

Line_ Text 

Instances

Instances details
Eq Line Source # 
Instance details

Defined in Jet.Internal

Methods

(==) :: Line -> Line -> Bool

(/=) :: Line -> Line -> Bool

Ord Line Source # 
Instance details

Defined in Jet.Internal

Methods

compare :: Line -> Line -> Ordering

(<) :: Line -> Line -> Bool

(<=) :: Line -> Line -> Bool

(>) :: Line -> Line -> Bool

(>=) :: Line -> Line -> Bool

max :: Line -> Line -> Line

min :: Line -> Line -> Line

Show Line Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> Line -> ShowS

show :: Line -> String

showList :: [Line] -> ShowS

IsString Line Source # 
Instance details

Defined in Jet.Internal

Methods

fromString :: String -> Line

Semigroup Line Source # 
Instance details

Defined in Jet.Internal

Methods

(<>) :: Line -> Line -> Line

sconcat :: NonEmpty Line -> Line

stimes :: Integral b => b -> Line -> Line

Monoid Line Source # 
Instance details

Defined in Jet.Internal

Methods

mempty :: Line

mappend :: Line -> Line -> Line

mconcat :: [Line] -> Line

JetSink Line Handle Source #

Uses the default system locale. Adds newlines.

Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink Line Source #

JetSource Line Handle Source #

Uses the default system locale.

Instance details

Defined in Jet.Internal

Methods

jet :: Handle -> Jet Line Source #

pattern Line :: Text -> Line Source #

Unidirectional pattern that allows converting a Line into a Text during pattern-matching.

lineToText :: Line -> Text Source #

Converts a Line back to text, without adding the newline.

lineToUtf8 :: Line -> ByteBundle Source #

Converts a Line to an utf8-encdoed ByteBundle, without adding the newline.

textToLine :: Text -> Line Source #

newline :: Text Source #

Data.Text.singleton '\n'

lineContains :: Text -> Line -> Bool Source #

lineBeginsWith :: Text -> Line -> Bool Source #

prefixLine :: Text -> Line -> Line Source #

Adds the Text to the beginning of the Line.

stringToLine :: String -> Line Source #

data NewlineForbidden Source #

Exception thrown when we find newlines in functions which don't accept them.

A direct copy of the NewlineForbidden exception from the turtle package.

Constructors

NewlineForbidden 

Instances

Instances details
Show NewlineForbidden Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> NewlineForbidden -> ShowS

show :: NewlineForbidden -> String

showList :: [NewlineForbidden] -> ShowS

Exception NewlineForbidden Source # 
Instance details

Defined in Jet.Internal

Methods

toException :: NewlineForbidden -> SomeException

fromException :: SomeException -> Maybe NewlineForbidden

displayException :: NewlineForbidden -> String

lines :: Jet Text -> Jet Line Source #

unlines :: Jet Line -> Jet Text Source #

downstream :: (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s Source #

type Sink a = Jet a -> IO () Source #

A function that consumes a Jet totally or partially, without returning a result.

class JetSink a target where Source #

Helper multi-parameter typeclass for creating Jet-consuming functions out of a variety of common destinations.

>>> J.each ["aaa","bbb","ccc"] <&> J.stringToLine & J.sink J.stdout
aaa
bbb
ccc

Methods

sink :: target -> Sink a Source #

Instances

Instances details
JetSink a Handle => JetSink a File Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: File -> Sink a Source #

JetSink Text Handle Source #

Uses the default system locale.

Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink Text Source #

JetSink ByteString Handle Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink ByteString Source #

JetSink Line Handle Source #

Uses the default system locale. Adds newlines.

Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink Line Source #

JetSink ByteBundle Handle Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink ByteBundle Source #

JetSink ByteString [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Instance details

Defined in Jet.Internal

Methods

sink :: [BoundedSize File] -> Sink ByteString Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

newtype File Source #

FilePaths are plain strings. This newtype provides a small measure of safety over them.

Constructors

File 

Fields

Instances

Instances details
Show File Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> File -> ShowS

show :: File -> String

showList :: [File] -> ShowS

JetSink a Handle => JetSink a File Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: File -> Sink a Source #

JetSource a Handle => JetSource a File Source # 
Instance details

Defined in Jet.Internal

Methods

jet :: File -> Jet a Source #

JetSink ByteString [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Instance details

Defined in Jet.Internal

Methods

sink :: [BoundedSize File] -> Sink ByteString Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

data BoundedSize x Source #

The maximum size in bytes of some destination into which we write the bytes produced by a Jet.

Constructors

BoundedSize Int x 

Instances

Instances details
JetSink ByteString [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Instance details

Defined in Jet.Internal

Methods

sink :: [BoundedSize File] -> Sink ByteString Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

Read x => Read (BoundedSize x) Source # 
Instance details

Defined in Jet.Internal

Methods

readsPrec :: Int -> ReadS (BoundedSize x)

readList :: ReadS [BoundedSize x]

readPrec :: ReadPrec (BoundedSize x)

readListPrec :: ReadPrec [BoundedSize x]

Show x => Show (BoundedSize x) Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> BoundedSize x -> ShowS

show :: BoundedSize x -> String

showList :: [BoundedSize x] -> ShowS

newtype DList a Source #

Constructors

DList 

Fields

Instances

Instances details
Semigroup (DList a) Source # 
Instance details

Defined in Jet.Internal

Methods

(<>) :: DList a -> DList a -> DList a

sconcat :: NonEmpty (DList a) -> DList a

stimes :: Integral b => b -> DList a -> DList a

Monoid (DList a) Source # 
Instance details

Defined in Jet.Internal

Methods

mempty :: DList a

mappend :: DList a -> DList a -> DList a

mconcat :: [DList a] -> DList a

makeDList :: [a] -> DList a Source #

closeDList :: DList a -> [a] Source #

traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b Source #

Process the values yielded by the upstream Jet in a concurrent way, and return the results in the form of another Jet as they are produced.

NB: this function might scramble the order of the returned values. Right now there isn't a function for unscrambling them.

>>> :{
 J.each [(3,'a'), (2,'b'), (1,'c')]
 & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)
 & J.toList
:}
"cba"

What happens if we limit the resulting Jet and we reach that limit, or if we otherwise stop consuming the Jet before it gets exhausted? In those cases, all pending IO b tasks are cancelled.

>>> :{
 J.each [(9999,'a'), (2,'b'), (1,'c')]
 & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)
 & J.take 2
 & J.toList
:}
"cb"

data PoolConf Source #

Configuration record for the worker pool.

Constructors

PoolConf 

Fields

Instances

Instances details
Show PoolConf Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> PoolConf -> ShowS

show :: PoolConf -> String

showList :: [PoolConf] -> ShowS

inputQueueSize :: Int -> PoolConf -> PoolConf Source #

Size of the waiting queue into the worker pool. The default is 1.

numberOfWorkers :: Int -> PoolConf -> PoolConf Source #

The size of the worker pool. The default is 1.

outputQueueSize :: Int -> PoolConf -> PoolConf Source #

Size of the queue holding results out of the working pool before they are yielded downstream. The default is 1.

defaults :: a -> a Source #

An alias for id. Useful with functions like traverseConcurrently and throughProcess, for which it means "use the default configuration".

throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString Source #

Feeds the upstream Jet to an external process' stdin and returns the process' stdout as another Jet. The feeding and reading of the standard streams is done concurrently in order to avoid deadlocks.

What happens if we limit the resulting Jet and we reach that limit, or if we otherwise stop consuming the Jet before it gets exhausted? In those cases, the external process is promptly terminated.

linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #

Like throughProcess, but feeding and reading Lines using the default system encoding.

>>> :{
J.each ["aaa","bbb","ccc"]
<&> J.stringToLine
& linesThroughProcess defaults (shell "cat")
& J.toList
:}
["aaa","bbb","ccc"]

An example of not reading all the lines from a long-lived process that gets cancelled:

>>> :{
mempty
& linesThroughProcess defaults (shell "{ printf \"aaa\\nbbb\\nccc\\n\" ; sleep infinity ; }")
& J.limit 2
& J.toList
:}
["aaa","bbb"]

utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #

Like throughProcess, but feeding and reading Lines encoded in UTF8.

throughProcess_ :: forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b Source #

type ProcConf = ProcConf_ ByteString ByteString Source #

Configuration record with some extra options in addition to those in CreateProcess.

data ProcConf_ a b Source #

Constructors

ProcConf_ 

Fields

bufferStdin :: Bool -> ProcConf -> ProcConf Source #

Should we buffer the process' stdin? Usually should be True for interactive scenarios.

By default, False.

readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf Source #

Sets the function that reads a single line of output from the process stderr. It's called repeatedly until stderr is exhausted. The reads are done concurrently with the reads from stdout.

By default, lines of text are read using the system's default encoding.

This is a good place to throw an exception if we don't like what comes out of stderr.

handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf Source #

Sets the function that handles the final ExitCode of the process.

The default behavior is to throw the ExitCode as an exception if it's not a success.

data AreWeInsideGroup foldState Source #

Constructors

OutsideGroup 
InsideGroup !foldState 

data RecastState foldState Source #

Constructors

RecastState !(AreWeInsideGroup foldState) [IO foldState] 

recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c Source #

This is a complex, unwieldly, yet versatile function. It can be used to define grouping operations, but also for decoding and other purposes.

Groups are delimited in the input Jet using the Splitter, and the contents of those groups are then combined using Combiners. The result of each combiner is yielded by the return Jet.

If the list of combiners is finite and becomes exhausted, we stop splitting and the return Jet stops.

data Combiners a b where Source #

A Combiners value knows how to process a sequence of groups, while keeping a (existentially hidden) state for each group.

Very much like a FoldM IO from the foldl library, but "restartable" with a list of starting states.

For converting one into the other, this function should do the trick:

\(L.FoldM step allocator coda) -> combiners step coda (Prelude.repeat allocator)

Constructors

Combiners :: (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b 

Instances

Instances details
Functor (Combiners a) Source # 
Instance details

Defined in Jet.Internal

Methods

fmap :: (a0 -> b) -> Combiners a a0 -> Combiners a b

(<$) :: a0 -> Combiners a b -> Combiners a a0

combiners Source #

Arguments

:: forall s a b r. (s -> a -> IO s)

Step function that threads the state s.

-> (s -> IO b)

Coda invoked when a group closes.

-> [IO s]

Actions that produce the initial states s for processing each group.

-> Combiners a b 

Constructor for Combiners values.

withCombiners_ Source #

Arguments

:: forall h a r. (h -> a -> IO ())

Step function that accesses the resource h.

-> (h -> IO ())

Finalizer to run after closing each group, and also in the case of an exception.

-> [IO h]

Actions that allocate a sequence of resources h.

-> (Combiners a () -> IO r)

The Combiners value should be consumed linearly.

-> IO r 

A simpler version of withCombiners that doen't thread a state; it merely allocates and deallocates the resource h.

withCombiners Source #

Arguments

:: forall h s a b r. (h -> s -> a -> IO s)

Step function that accesses the resource h and threads the state s.

-> (h -> s -> IO b)

Coda invoked when a group closes.

-> (h -> IO ())

Finalizer to run after each coda, and also in the case of an exception.

-> [(IO h, h -> IO s)]

Actions that allocate a sequence of resources h and produce initial states s for processing each group.

-> (Combiners a b -> IO r)

The Combiners value should be consumed linearly.

-> IO r 

Combiners thread a state s while processing each group. Sometimes, in addition to that, we want to allocate a resource h when we start processing a group, and deallocate it after we finish processing the group or an exception is thrown. The typical example is allocating a Handle for writing the elements of the group as they arrive.

combineIntoLists :: Combiners a [a] Source #

Puts the elements of each group into a list that is kept in memory. This breaks streaming within the group.

Useful with recast.

type Splitter a b = MealyIO a (SplitStepResult b) Source #

Delimits groups in the values yielded by a Jet, and can also transform those values.

data MealyIO a b where Source #

A Mealy machine with an existentially hidden state.

Very much like a FoldM IO from the foldl library, but it emits an output at each step, not only at the end.

Constructors

MealyIO 

Fields

  • :: (s -> a -> IO (b, s))

    The step function which threads the state.

  • -> (s -> IO b)

    The final output, produced from the final state.

  • -> IO s

    An action that produces the initial state.

  • -> MealyIO a b
     

Instances

Instances details
Functor (MealyIO a) Source # 
Instance details

Defined in Jet.Internal

Methods

fmap :: (a0 -> b) -> MealyIO a a0 -> MealyIO a b

(<$) :: a0 -> MealyIO a b -> MealyIO a a0

data SplitStepResult b Source #

For each value coming from upstream, what has the Splitter learned?

  • Perhaps we should continue some group we have already started in a previous step.
  • Perhaps we have found entire groups that we should emit in one go, groups we know are already complete.
  • Perhaps we should start a new group that will continue in the next steps.

Constructors

SplitStepResult 

Fields

  • continuationOfPreviouslyStartedGroup :: [b]

    The continued group will be "closed" if in the current step we emit an entire group or we begin a new group.

    INVARIANT: we should only continue a group if we have already opened a "new one" with one or more elements in an earlier step.

  • entireGroups :: [[b]]

    It's ok if the groups we find are empty.

  • startOfNewGroup :: [b]

    INVARIANT: when we are in the final step, we should not yield elements for the beginning of a new one.

Instances

Instances details
Functor SplitStepResult Source # 
Instance details

Defined in Jet.Internal

Methods

fmap :: (a -> b) -> SplitStepResult a -> SplitStepResult b

(<$) :: a -> SplitStepResult b -> SplitStepResult a

Show b => Show (SplitStepResult b) Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> SplitStepResult b -> ShowS

show :: SplitStepResult b -> String

showList :: [SplitStepResult b] -> ShowS

Semigroup (SplitStepResult b) Source # 
Instance details

Defined in Jet.Internal

Monoid (SplitStepResult b) Source # 
Instance details

Defined in Jet.Internal