| Safe Haskell | None | 
|---|---|
| Language | Haskell2010 | 
Jet
Description
A streaming library build around the Jet type, which behaves as a kind of "effectful list".
For example, here's a way to print the first ten lines of a file to stdout:
>>>action = J.jet @Line (File "foo.txt") & J.limit 10 & J.sink stdout
The code is using the jet function to create a Jet of Line values
 (read using the default system encoding). jet is part of the
 JetSource helper typeclass. Meanwhile, sink is part of the
 complementary JetSink typeclass.
Note also the use of (&), which is simply a flipped ($). I've found it
 useful to define forward-chained pipelines.
If instead of printing to stdout we wanted to store the lines in a list:
>>>action = J.jet @Line (File "foo.txt") & J.limit 10 & J.toList
Imagine we wanted to print the combined lines of two files, excepting the first 10 lines of each:
>>>:{action = do file <- J.each [File "foo.txt", File "bar.txt"] jet @Line file & J.drop 10 & J.sink stdout :}
Here we are making use of the Monad instance of Jet, which resembles
 that of conventional lists. We are mixing monadic do-blocks and conventional
 function application. Also we use each, a function which creates a Jet
 out of any Foldable container. 
Jets are Monoids too, so we could have written:
>>>action = [File "foo.txt", File "bar.txt"] & foldMap (J.drop 10 . J.jet @Line) & J.sink stdout
Here's an interesting use of sink. Imagine we have a big utf8-encoded file
 and we want to split it into a number of files of no more than 100000 bytes
 each, with the extra condition that we don't want to split any line between
 two files. We could do it like this:
>>>:{action = let buckets = BoundedSize 100000 . File . ("result.txt." ++) . show <$> [1..] in jet (File "12999.txt.utf-8") & J.decodeUtf8 & J.lines <&> (\line -> J.lineToUtf8 line <> J.textToUtf8 J.newline) & J.sink buckets :}
In this example we aren't using the default system encoding: instead of
 that, we are reading bytes, explicity decoding them with decodeUtf8 and
 finding lines. Then we create a ByteBundle for each Line to signify
 that it shouldn't be broken, and end by writing to a sequence of
 BoundedSize Files.
Synopsis
- data Jet a
 - run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
 - consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
 - drain :: Sink a
 - each :: forall a f. Foldable f => f a -> Jet a
 - repeat :: a -> Jet a
 - repeatIO :: IO a -> Jet a
 - replicate :: Int -> a -> Jet a
 - replicateIO :: Int -> IO a -> Jet a
 - iterate :: (a -> a) -> a -> Jet a
 - iterateIO :: (a -> IO a) -> a -> Jet a
 - unfold :: (b -> Maybe (a, b)) -> b -> Jet a
 - unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a
 - untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
 - untilNothing :: IO (Maybe a) -> Jet a
 - toList :: Jet a -> IO [a]
 - length :: Jet a -> IO Int
 - traverse :: (a -> IO b) -> Jet a -> Jet b
 - traverse_ :: (a -> IO b) -> Sink a
 - for :: Jet a -> (a -> IO b) -> Jet b
 - for_ :: Jet a -> (a -> IO b) -> IO ()
 - filter :: (a -> Bool) -> Jet a -> Jet a
 - filterIO :: (a -> IO Bool) -> Jet a -> Jet a
 - take :: Int -> Jet a -> Jet a
 - limit :: Int -> Jet a -> Jet a
 - takeWhile :: (a -> Bool) -> Jet a -> Jet a
 - takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
 - drop :: Int -> Jet a -> Jet a
 - dropWhile :: (a -> Bool) -> Jet a -> Jet a
 - dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
 - mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c
 - mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
 - intersperse :: a -> Jet a -> Jet a
 - zip :: Foldable f => f a -> Jet b -> Jet (a, b)
 - zipWith :: Foldable f => (a -> b -> c) -> f a -> Jet b -> Jet c
 - zipIO :: Foldable f => f (IO a) -> Jet b -> Jet (a, b)
 - zipWithIO :: Foldable f => (a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
 - withFile :: FilePath -> IOMode -> Jet Handle
 - bracket :: forall a b. IO a -> (a -> IO b) -> Jet a
 - bracket_ :: forall a b. IO a -> IO b -> Jet ()
 - bracketOnError :: forall a b. IO a -> (a -> IO b) -> Jet a
 - finally :: IO a -> Jet ()
 - onException :: IO a -> Jet ()
 - control :: forall resource. (forall x. (resource -> IO x) -> IO x) -> Jet resource
 - unsafeCoerceControl :: forall resource. (forall x. (resource -> IO x) -> IO x) -> forall x. (resource -> IO x) -> IO x
 - control_ :: (forall x. IO x -> IO x) -> Jet ()
 - unsafeCoerceControl_ :: (forall x. IO x -> IO x) -> forall x. IO x -> IO x
 - fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r
 - foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r
 - bytes :: ChunkSize -> Handle -> Jet ByteString
 - data ChunkSize
 - data ByteBundle
 - bundle :: Foldable f => f ByteString -> ByteBundle
 - bundleLength :: ByteBundle -> Int
 - bundleBytes :: ByteBundle -> Jet ByteString
 - decodeUtf8 :: Jet ByteString -> Jet Text
 - encodeUtf8 :: Jet Text -> Jet ByteString
 - data Line where
 - lines :: Jet Text -> Jet Line
 - unlines :: Jet Line -> Jet Text
 - newline :: Text
 - lineToText :: Line -> Text
 - lineToUtf8 :: Line -> ByteBundle
 - textToLine :: Text -> Line
 - textToUtf8 :: Text -> ByteBundle
 - stringToLine :: String -> Line
 - lineContains :: Text -> Line -> Bool
 - lineBeginsWith :: Text -> Line -> Bool
 - prefixLine :: Text -> Line -> Line
 - traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b
 - data PoolConf
 - defaults :: a -> a
 - inputQueueSize :: Int -> PoolConf -> PoolConf
 - numberOfWorkers :: Int -> PoolConf -> PoolConf
 - outputQueueSize :: Int -> PoolConf -> PoolConf
 - throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString
 - linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
 - utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
 - type ProcConf = ProcConf_ ByteString ByteString
 - bufferStdin :: Bool -> ProcConf -> ProcConf
 - readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf
 - handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf
 - class JetSource a source where
 - class JetSink a target where
 - type Sink a = Jet a -> IO ()
 - newtype File = File {
- getFilePath :: FilePath
 
 - data BoundedSize x = BoundedSize Int x
 - data BucketOverflow = BucketOverflow
 - recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
 - type Splitter a b = MealyIO a (SplitStepResult b)
 - data MealyIO a b where
 - data SplitStepResult b = SplitStepResult {
- continuationOfPreviouslyStartedGroup :: [b]
 - entireGroups :: [[b]]
 - startOfNewGroup :: [b]
 
 - bytesOverBuckets :: [Int] -> Splitter ByteString ByteString
 - byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString
 - data Combiners a b
 - combiners :: forall s a b r. (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
 - withCombiners :: forall h s a b r. (h -> s -> a -> IO s) -> (h -> s -> IO b) -> (h -> IO ()) -> [(IO h, h -> IO s)] -> (Combiners a b -> IO r) -> IO r
 - withCombiners_ :: forall h a r. (h -> a -> IO ()) -> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
 - combineIntoLists :: Combiners a [a]
 - (&) :: a -> (a -> b) -> b
 - (<&>) :: Functor f => f a -> (a -> b) -> f b
 - stdin :: Handle
 - stdout :: Handle
 - stderr :: Handle
 - data UnicodeException
 - proc :: FilePath -> [String] -> CreateProcess
 - shell :: String -> CreateProcess
 
The Jet type
A Jet is a sequence of values produced through IO effects.
It allows consuming the elements as they are produced and doesn't force them
 to be present in memory all at the same time, unlike functions like
 replicateM from base.
Instances
| Monad Jet Source # | Similar to the instance for pure lists, that does search. 
  | 
| Functor Jet Source # | Maps over the yielded elements.  
  | 
| MonadFail Jet Source # | A failed pattern-match in a do-block produces  
  | 
Defined in Jet.Internal  | |
| Applicative Jet Source # | Similar to the instance for pure lists, that generates combinations. 
  | 
| Alternative Jet Source # | Same as   | 
| MonadPlus Jet Source # | Same as   | 
| MonadIO Jet Source # | 
  | 
Defined in Jet.Internal  | |
| Semigroup (Jet a) Source # | 
 
  | 
| Monoid (Jet a) Source # | 
 
  | 
run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s Source #
Go through the elements produced by a Jet, while threading an
 state s and possibly performing some effect.
The caller is the one who chooses the type of the state s, and must pass
 an initial value for it. The state is kept in weak-head normal form.
The caller must also provide a predicate on the state that informs the Jet
 when to stop producing values: whenever the predicate returns
 True.
Go through the Jet only for the IO effects, discarding all yielded elements.
Building Jets
each :: forall a f. Foldable f => f a -> Jet a Source #
Build a Jet from any Foldable container
>>>J.each [True,False] & J.toList[True,False]
repeatIO :: IO a -> Jet a Source #
>>>J.repeatIO (putStrLn "hi" *> pure True) & J.take 2 & J.toListhi hi [True,True]
replicateIO :: Int -> IO a -> Jet a Source #
>>>J.replicateIO 2 (putStrLn "hi" *> pure True) & J.toListhi hi [True,True]
Don't confuse this with Control.Monad.replicateM :: Int -> Jet a -> Jet [a] which has a combinatorial behavior.
iterateIO :: (a -> IO a) -> a -> Jet a Source #
>>>J.iterateIO (\x -> putStrLn "hi" *> pure (succ x)) (1 :: Int) & J.take 2 & J.toListhi [1,2]
unfold :: (b -> Maybe (a, b)) -> b -> Jet a Source #
>>>J.unfold (\case [] -> Nothing ; c : cs -> Just (c,cs)) "abc" & J.toList"abc"
unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a Source #
>>>:{J.unfoldIO (\x -> do putStrLn "hi" pure $ case x of [] -> Nothing c : cs -> Just (c,cs)) "abc" & J.toList :} hi hi hi hi "abc"
untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a Source #
>>>j = J.untilEOF System.IO.hIsEOF System.IO.hGetLine :: Handle -> Jet String
untilNothing :: IO (Maybe a) -> Jet a Source #
>>>:{do ref <- newIORef "abc" let pop = atomicModifyIORef ref (\case [] -> ([], Nothing) x : xs -> (xs, Just x)) J.untilNothing pop & J.toList :} "abc"
List-like functions
In these functions, the Jet is working as a kind of "effectful list".
 The effects which produce the elements, and the effects with which we
 transform and consume the elements, are always IO effects.
Don't confuse these functions with similarly named functions from
 Traversable or Monad, for which Jet doesn't work as the
 "container", but as the Applicative/Monadic effect itself.
traverse :: (a -> IO b) -> Jet a -> Jet b Source #
Apply an effectful transformation to each element in a Jet.
>>>:{J.each "abc" & J.traverse (\c -> let c' = succ c in putStrLn ([c] ++ " -> " ++ [c']) *> pure c') & J.toList :} a -> b b -> c c -> d "bcd"
takeWhile :: (a -> Bool) -> Jet a -> Jet a Source #
>>>J.each [1..] & J.takeWhile (<5) & J.toList[1,2,3,4]
takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a Source #
dropWhile :: (a -> Bool) -> Jet a -> Jet a Source #
>>>J.each [1..5] & J.dropWhile (<3) & J.toList[3,4,5]
dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a Source #
mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c Source #
Behaves like a combination of fmap and foldl; it applies a function to
 each element of a structure passing an accumulating parameter from left to right.
The resulting Jet has the same number of elements as the original one.
Unlike mapAccumL, it doesn't make the final state available. 
>>>J.each [1,2,3,4] & J.mapAccum (\a b -> (a + b,a)) 0 & J.toList[0,1,3,6]
mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c Source #
intersperse :: a -> Jet a -> Jet a Source #
>>>J.each "abc" & J.intersperse '-' & J.toList"a-b-c"
Zips
It's not possible to zip two Jets together. But Jets can be zipped with
 pure lists, or with lists of IO actions.
zip :: Foldable f => f a -> Jet b -> Jet (a, b) Source #
>>>J.each "abc" & J.zip [1..] & J.toList[(1,'a'),(2,'b'),(3,'c')]
>>>J.each [1..] & J.zip "abc" & J.toList[('a',1),('b',2),('c',3)]
Control operations
Some Jets must allocate resources to do its work. For example, opening a
 text file and yielding its lines. These resources must be promptly released
 when the Jet itself finishes or the consumers stops it (for example, by
 using limit on the Jet). They must also be released in the face of
 exceptions.
Here are various control operations like those from Control.Exception, but
 lifted to work on Jets.
When put in a do-block, these operations "protect" every statement in the do-block below the operation itself.
Arguments
| :: forall a b. IO a | allocator  | 
| -> (a -> IO b) | finalizer  | 
| -> Jet a | 
>>>:{do r <- J.bracket (putStrLn "allocating" *> pure "foo") (\r -> putStrLn $ "deallocating " ++ r) liftIO $ putStrLn $ "using resource " ++ r & drain :} allocating using resource foo deallocating foo
Arguments
| :: forall a b. IO a | allocator  | 
| -> (a -> IO b) | finalizer  | 
| -> Jet a | 
finally :: IO a -> Jet () Source #
Notice how the finalizer runs even when we limit the Jet:
>>>:{do J.finally (putStrLn "hi") -- protects statements below liftIO (putStrLn "hey") J.each "abc" & J.limit 2 & J.toList :} hey hi "ab"
But if the protected Jet is not consumed at all, the finalizer might not run.
>>>:{do J.finally (putStrLn "hi") -- protects statements below liftIO (putStrLn "hey") J.each "abc" & J.limit 0 & J.toList :} ""
onException :: IO a -> Jet () Source #
Building your own
These are for advanced usage.
Sometimes we want to lift some existing
 resource-handling operation not already covered, one that works with plain
 IO values. These functions help with that.
They have a linear type to statically forbid
 "funny"
 operations like \x -> x *> x that disrupt proper threading of the
 consumer state.
control :: forall resource. (forall x. (resource -> IO x) -> IO x) -> Jet resource Source #
Lift a control operation (like bracket) for which the
 callback uses the allocated resource.
unsafeCoerceControl :: forall resource. (forall x. (resource -> IO x) -> IO x) -> forall x. (resource -> IO x) -> IO x Source #
"morally", all control operations compatible with this library should execute the callback only once, which means that they should have a linear type. But because linear types are not widespread, they usually are given a less precise non-linear type. If you know what you are doing, use this function to give them a linear type.
control_ :: (forall x. IO x -> IO x) -> Jet () Source #
Lift a control operation (like finally) for which the
 callback doesn't use the allocated resource.
unsafeCoerceControl_ :: (forall x. IO x -> IO x) -> forall x. IO x -> IO x Source #
Line unsafeCoerceControl, for when the callback doesn't use the
 allocated resource.
Folding Jets
These functions can be used directly, but they're also useful for
 interfacing with the Applicative folds from the
 foldl library, with the help of
 functions like Control.Foldl.purely and Control.Foldl.impurely.
Applicative folds are useful because they let you run multiple
 "analyses" of a Jet while going through it only once.
fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r Source #
>>>L.purely (J.fold (J.each "abc")) ((,) <$> L.list <*> L.length)("abc",3)
foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r Source #
>>>L.impurely (J.foldIO (J.each "abc")) (L.FoldM (\() c -> putStrLn [c]) (pure ()) pure *> L.generalize L.length)a b c 3
Byte utils
Constructors
| DefaultChunkSize | |
| ChunkSize Int | |
| ChunkSize1K | |
| ChunkSize4K | |
| ChunkSize8K | |
| ChunkSize16K | |
| ChunkSize1M | |
| ChunkSize2M | 
data ByteBundle Source #
A sequence of bytes that we might want to keep together.
Instances
| Show ByteBundle Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> ByteBundle -> ShowS show :: ByteBundle -> String showList :: [ByteBundle] -> ShowS  | |
| Semigroup ByteBundle Source # | |
Defined in Jet.Internal Methods (<>) :: ByteBundle -> ByteBundle -> ByteBundle sconcat :: NonEmpty ByteBundle -> ByteBundle stimes :: Integral b => b -> ByteBundle -> ByteBundle  | |
| Monoid ByteBundle Source # | |
Defined in Jet.Internal Methods mempty :: ByteBundle mappend :: ByteBundle -> ByteBundle -> ByteBundle mconcat :: [ByteBundle] -> ByteBundle  | |
| JetSink ByteBundle Handle Source # | |
Defined in Jet.Internal Methods sink :: Handle -> Sink ByteBundle Source #  | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each   | 
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source #  | |
bundle :: Foldable f => f ByteString -> ByteBundle Source #
Constructs a ByteBundle out of the bytes of some Foldable container.
bundleLength :: ByteBundle -> Int Source #
Length in bytes.
bundleBytes :: ByteBundle -> Jet ByteString Source #
Text and line utils
decodeUtf8 :: Jet ByteString -> Jet Text Source #
THROWS:
encodeUtf8 :: Jet Text -> Jet ByteString Source #
A line of text.
While it is guaranteed that the Lines coming out of the lines function
 do not contain newlines, that invariant is not otherwise enforced. 
Bundled Patterns
| pattern Line :: Text -> Line | Unidirectional pattern that allows converting a   | 
Instances
| Eq Line Source # | |
| Ord Line Source # | |
| Show Line Source # | |
| IsString Line Source # | |
Defined in Jet.Internal Methods fromString :: String -> Line  | |
| Semigroup Line Source # | |
| Monoid Line Source # | |
| JetSink Line Handle Source # | Uses the default system locale. Adds newlines.  | 
| JetSource Line Handle Source # | Uses the default system locale.  | 
lineToText :: Line -> Text Source #
Converts a Line back to text, without adding the newline.
lineToUtf8 :: Line -> ByteBundle Source #
Converts a Line to an utf8-encdoed ByteBundle, without adding the newline.
textToLine :: Text -> Line Source #
textToUtf8 :: Text -> ByteBundle Source #
stringToLine :: String -> Line Source #
lineContains :: Text -> Line -> Bool Source #
lineBeginsWith :: Text -> Line -> Bool Source #
Concurrency
traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b Source #
Process the values yielded by the upstream Jet in a concurrent way,
 and return the results in the form of another Jet as they are produced.
NB: this function might scramble the order of the returned values. Right now there isn't a function for unscrambling them.
>>>:{J.each [(3,'a'), (2,'b'), (1,'c')] & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c) & J.toList :} "cba"
What happens if we limit the resulting Jet and we reach that limit, or
 if we otherwise stop consuming the Jet before it gets exhausted? In those
 cases, all pending IO b tasks are cancelled.
>>>:{J.each [(9999,'a'), (2,'b'), (1,'c')] & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c) & J.take 2 & J.toList :} "cb"
Configuration record for the worker pool.
An alias for id. Useful with functions like traverseConcurrently and
 throughProcess, for which it means "use the default configuration".
inputQueueSize :: Int -> PoolConf -> PoolConf Source #
Size of the waiting queue into the worker pool. The default is 1.
numberOfWorkers :: Int -> PoolConf -> PoolConf Source #
The size of the worker pool. The default is 1.
outputQueueSize :: Int -> PoolConf -> PoolConf Source #
Size of the queue holding results out of the working pool before they
 are yielded downstream. The default is 1.
Process invocation
throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString Source #
Feeds the upstream Jet to an external process' stdin and returns the
 process' stdout as another Jet. The feeding and reading of the standard
 streams is done concurrently in order to avoid deadlocks.
What happens if we limit the resulting Jet and we reach that limit, or
 if we otherwise stop consuming the Jet before it gets exhausted? In those
 cases, the external process is promptly terminated.
linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #
Like throughProcess, but feeding and reading Lines using the default
 system encoding.
>>>:{J.each ["aaa","bbb","ccc"] <&> J.stringToLine & linesThroughProcess defaults (shell "cat") & J.toList :} ["aaa","bbb","ccc"]
An example of not reading all the lines from a long-lived process that gets cancelled:
>>>:{mempty & linesThroughProcess defaults (shell "{ printf \"aaa\\nbbb\\nccc\\n\" ; sleep infinity ; }") & J.limit 2 & J.toList :} ["aaa","bbb"]
utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #
Like throughProcess, but feeding and reading Lines encoded in UTF8.
type ProcConf = ProcConf_ ByteString ByteString Source #
Configuration record with some extra options in addition to those in CreateProcess.
bufferStdin :: Bool -> ProcConf -> ProcConf Source #
Should we buffer the process' stdin? Usually should be True for
 interactive scenarios.
By default, False.
readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf Source #
Sets the function that reads a single line of output from the process
 stderr.  It's called repeatedly until stderr is exhausted. The reads are
 done concurrently with the reads from stdout.
By default, lines of text are read using the system's default encoding.
This is a good place to throw an exception if we don't like what comes out
 of stderr.
handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf Source #
Sets the function that handles the final ExitCode of the process.
The default behavior is to throw the ExitCode as an exception if it's not
 a success.
Conversion helpers
class JetSource a source where Source #
Helper multi-parameter typeclass for creating Jet values out of a
   variety of common sources.
Because there's no functional dependency, sometimes we need to use
   TypeApplications to give the compiler a hint about the type of elements
   we want to produce. For example, here we want Lines and not, say,
   ByteStrings:
>>>action = J.jet @Line (File "foo.txt") & J.sink J.stdout
class JetSink a target where Source #
Helper multi-parameter typeclass for creating Jet-consuming functions
 out of a variety of common destinations.
>>>J.each ["aaa","bbb","ccc"] <&> J.stringToLine & J.sink J.stdoutaaa bbb ccc
Instances
| JetSink a Handle => JetSink a File Source # | |
| JetSink Text Handle Source # | Uses the default system locale.  | 
Defined in Jet.Internal  | |
| JetSink ByteString Handle Source # | |
Defined in Jet.Internal  | |
| JetSink Line Handle Source # | Uses the default system locale. Adds newlines.  | 
| JetSink ByteBundle Handle Source # | |
Defined in Jet.Internal Methods sink :: Handle -> Sink ByteBundle Source #  | |
| JetSink ByteString [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.  | 
Defined in Jet.Internal  | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each   | 
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source #  | |
type Sink a = Jet a -> IO () Source #
A function that consumes a Jet totally or partially, without returning a result.
FilePaths are plain strings. This newtype provides a small measure of
 safety over them.
Constructors
| File | |
Fields 
  | |
Instances
| Show File Source # | |
| JetSink a Handle => JetSink a File Source # | |
| JetSource a Handle => JetSource a File Source # | |
| JetSink ByteString [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.  | 
Defined in Jet.Internal  | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each   | 
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source #  | |
data BoundedSize x Source #
The maximum size in bytes of some destination into which we write the
 bytes produced by a Jet.
Constructors
| BoundedSize Int x | 
Instances
| JetSink ByteString [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.  | 
Defined in Jet.Internal  | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each   | 
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source #  | |
| Read x => Read (BoundedSize x) Source # | |
Defined in Jet.Internal Methods readsPrec :: Int -> ReadS (BoundedSize x) readList :: ReadS [BoundedSize x] readPrec :: ReadPrec (BoundedSize x) readListPrec :: ReadPrec [BoundedSize x]  | |
| Show x => Show (BoundedSize x) Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> BoundedSize x -> ShowS show :: BoundedSize x -> String showList :: [BoundedSize x] -> ShowS  | |
data BucketOverflow Source #
Exception thrown when we try to write too much data in a size-bounded destination.
Constructors
| BucketOverflow | 
Instances
| Show BucketOverflow Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> BucketOverflow -> ShowS show :: BucketOverflow -> String showList :: [BucketOverflow] -> ShowS  | |
| Exception BucketOverflow Source # | |
Defined in Jet.Internal Methods toException :: BucketOverflow -> SomeException fromException :: SomeException -> Maybe BucketOverflow displayException :: BucketOverflow -> String  | |
Some complicated stuff
I didn't manage to make this stuff simpler.
recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c Source #
This is a complex, unwieldly, yet versatile function. It can be used to define grouping operations, but also for decoding and other purposes.
Groups are delimited in the input Jet using the Splitter, and the
 contents of those groups are then combined using Combiners. The result of
 each combiner is yielded by the return Jet.
If the list of combiners is finite and becomes exhausted, we stop splitting
 and the return Jet stops.
type Splitter a b = MealyIO a (SplitStepResult b) Source #
Delimits groups in the values yielded by a Jet, and can also transform
 those values.
data MealyIO a b where Source #
A Mealy machine with an existentially hidden state.
Very much like a FoldM IO  from the
 foldl
 library, but it emits an output at each step, not only at the end.
data SplitStepResult b Source #
For each value coming from upstream, what has the Splitter learned?
- Perhaps we should continue some group we have already started in a previous step.
 - Perhaps we have found entire groups that we should emit in one go, groups we know are already complete.
 - Perhaps we should start a new group that will continue in the next steps.
 
Constructors
| SplitStepResult | |
Fields 
  | |
Instances
| Functor SplitStepResult Source # | |
Defined in Jet.Internal Methods fmap :: (a -> b) -> SplitStepResult a -> SplitStepResult b (<$) :: a -> SplitStepResult b -> SplitStepResult a  | |
| Show b => Show (SplitStepResult b) Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> SplitStepResult b -> ShowS show :: SplitStepResult b -> String showList :: [SplitStepResult b] -> ShowS  | |
| Semigroup (SplitStepResult b) Source # | |
Defined in Jet.Internal Methods (<>) :: SplitStepResult b -> SplitStepResult b -> SplitStepResult b sconcat :: NonEmpty (SplitStepResult b) -> SplitStepResult b stimes :: Integral b0 => b0 -> SplitStepResult b -> SplitStepResult b  | |
| Monoid (SplitStepResult b) Source # | |
Defined in Jet.Internal Methods mempty :: SplitStepResult b mappend :: SplitStepResult b -> SplitStepResult b -> SplitStepResult b mconcat :: [SplitStepResult b] -> SplitStepResult b  | |
bytesOverBuckets :: [Int] -> Splitter ByteString ByteString Source #
Splits a stream of bytes into groups bounded by maximum byte sizes. When one group "fills up", the next one is started.
When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.
Useful in combination with recast.
byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString Source #
Splits a stream of ByteBundles into groups bounded by maximum byte
 sizes.  Bytes belonging to the same ByteBundle are always put in the same
 group. When one group "fills up", the next one is started.
When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.
Useful in combination with recast.
THROWS:
BucketOverflowexception if the size bound of a group turns out to be too small for holding even a singleByteBundlevalue.
A Combiners value knows how to process a sequence of groups, while
 keeping a (existentially hidden) state for each group.
Very much like a FoldM IO  from the
 foldl
 library, but "restartable" with a list of starting states.
For converting one into the other, this function should do the trick:
\(L.FoldM step allocator coda) -> combiners step coda (Prelude.repeat allocator)
Arguments
| :: forall s a b r. (s -> a -> IO s) | Step function that threads the state   | 
| -> (s -> IO b) | Coda invoked when a group closes.  | 
| -> [IO s] | Actions that produce the initial states   | 
| -> Combiners a b | 
Constructor for Combiners values.
Arguments
| :: forall h s a b r. (h -> s -> a -> IO s) | Step function that accesses the resource   | 
| -> (h -> s -> IO b) | Coda invoked when a group closes.  | 
| -> (h -> IO ()) | Finalizer to run after each coda, and also in the case of an exception.  | 
| -> [(IO h, h -> IO s)] | Actions that allocate a sequence of resources   | 
| -> (Combiners a b -> IO r) | The   | 
| -> IO r | 
Combiners thread a state s while processing each group. Sometimes, in
 addition to that, we want to allocate a resource h when we start
 processing a group, and deallocate it after we finish processing the group
 or an exception is thrown. The typical example is allocating a Handle for
 writing the elements of the group as they arrive.
Arguments
| :: forall h a r. (h -> a -> IO ()) | Step function that accesses the resource   | 
| -> (h -> IO ()) | Finalizer to run after closing each group, and also in the case of an exception.  | 
| -> [IO h] | Actions that allocate a sequence of resources   | 
| -> (Combiners a () -> IO r) | The   | 
| -> IO r | 
A simpler version of withCombiners that doen't thread a state; it merely
 allocates and deallocates the resource h.
combineIntoLists :: Combiners a [a] Source #
Puts the elements of each group into a list that is kept in memory. This breaks streaming within the group.
Useful with recast.
Re-exports
I've found that the & (reverse application) and <&> (reverse fmap)
 operators feel quite natural for building pipelines.
The standard streams, useful with functions like sink.
Thrown when decoding UTF8.
data UnicodeException #
Instances
| Eq UnicodeException | |
Defined in Data.Text.Encoding.Error Methods (==) :: UnicodeException -> UnicodeException -> Bool (/=) :: UnicodeException -> UnicodeException -> Bool  | |
| Show UnicodeException | |
Defined in Data.Text.Encoding.Error Methods showsPrec :: Int -> UnicodeException -> ShowS show :: UnicodeException -> String showList :: [UnicodeException] -> ShowS  | |
| Exception UnicodeException | |
Defined in Data.Text.Encoding.Error Methods toException :: UnicodeException -> SomeException fromException :: SomeException -> Maybe UnicodeException displayException :: UnicodeException -> String  | |
| NFData UnicodeException | |
Defined in Data.Text.Encoding.Error Methods rnf :: UnicodeException -> ()  | |
Functions that create process specs for use with throughProcess. For more control, import the whole of System.Process.