| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Jet.Internal
Description
Tampering with the internals lets you write invalid Jets that don't
respect stop signals from consumers, so be careful.
Also, the internals expose Line and ByteBundle as thin coats of paint
over lazy text and lazy bytestring, respectively.
Synopsis
- newtype Jet a = Jet {}
- run :: Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
- consume :: Jet a -> (s -> a -> IO s) -> s -> IO s
- for :: Jet a -> (a -> IO b) -> Jet b
- for_ :: Jet a -> (a -> IO b) -> IO ()
- traverse :: (a -> IO b) -> Jet a -> Jet b
- traverse_ :: (a -> IO b) -> Sink a
- drain :: Sink a
- each :: forall a f. Foldable f => f a -> Jet a
- repeat :: a -> Jet a
- repeatIO :: IO a -> Jet a
- replicate :: Int -> a -> Jet a
- replicateIO :: Int -> IO a -> Jet a
- iterate :: (a -> a) -> a -> Jet a
- iterateIO :: (a -> IO a) -> a -> Jet a
- unfold :: (b -> Maybe (a, b)) -> b -> Jet a
- unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a
- untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
- untilNothing :: IO (Maybe a) -> Jet a
- toList :: Jet a -> IO [a]
- length :: Jet a -> IO Int
- data Pair a b = Pair !a !b
- pairExtract :: Pair a b -> b
- pairEnv :: Pair a b -> a
- data Triple a b c = Triple !a !b !c
- tripleExtract :: Triple a b c -> c
- drop :: Int -> Jet a -> Jet a
- data DropState
- dropWhile :: (a -> Bool) -> Jet a -> Jet a
- dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
- take :: Int -> Jet a -> Jet a
- limit :: Int -> Jet a -> Jet a
- data TakeState
- takeWhile :: (a -> Bool) -> Jet a -> Jet a
- takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
- filter :: (a -> Bool) -> Jet a -> Jet a
- filterIO :: (a -> IO Bool) -> Jet a -> Jet a
- mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c
- mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
- data Touched
- intersperse :: a -> Jet a -> Jet a
- zip :: Foldable f => f a -> Jet b -> Jet (a, b)
- zipWith :: Foldable f => (a -> b -> c) -> f a -> Jet b -> Jet c
- zipIO :: Foldable f => f (IO a) -> Jet b -> Jet (a, b)
- zipWithIO :: Foldable f => (a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
- withFile :: FilePath -> IOMode -> Jet Handle
- bracket :: IO a -> (a -> IO b) -> Jet a
- bracket_ :: IO a -> IO b -> Jet ()
- bracketOnError :: IO a -> (a -> IO b) -> Jet a
- finally :: IO a -> Jet ()
- onException :: IO a -> Jet ()
- control :: (forall x. (resource -> IO x) -> IO x) -> Jet resource
- control_ :: (forall x. IO x -> IO x) -> Jet ()
- fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r
- foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r
- data ChunkSize
- chunkSize :: ChunkSize -> Int
- class JetSource a source where
- bytes :: ChunkSize -> Handle -> Jet ByteString
- accumByteLengths :: Jet ByteString -> Jet (Int, ByteString)
- data AmIContinuing
- bytesOverBuckets :: [Int] -> Splitter ByteString ByteString
- newtype ByteBundle = ByteBundle ByteString
- bundle :: Foldable f => f ByteString -> ByteBundle
- bundleLength :: ByteBundle -> Int
- bundleBytes :: ByteBundle -> Jet ByteString
- data BucketOverflow = BucketOverflow
- byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString
- decodeUtf8 :: Jet ByteString -> Jet Text
- encodeUtf8 :: Jet Text -> Jet ByteString
- newtype Line = Line_ Text
- pattern Line :: StrictText -> Line
- lineToText :: Line -> Text
- lineToUtf8 :: Line -> ByteBundle
- textToLine :: Text -> Line
- newline :: Text
- textToUtf8 :: Text -> ByteBundle
- lineContains :: Text -> Line -> Bool
- lineBeginsWith :: Text -> Line -> Bool
- prefixLine :: Text -> Line -> Line
- stringToLine :: String -> Line
- isEmptyLine :: Line -> Bool
- emptyLine :: Line
- data NewlineForbidden = NewlineForbidden
- removeTrailingCarriageReturn :: Text -> Text
- lines :: Jet Text -> Jet Line
- unlines :: Jet Line -> Jet Text
- downstream :: (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
- type Sink a = Jet a -> IO ()
- class JetSink a target where
- newtype File = File {}
- data BoundedSize x = BoundedSize Int x
- makeAllocator :: BoundedSize File -> IO Handle
- newtype DList a = DList {
- runDList :: [a] -> [a]
- makeDList :: [a] -> DList a
- closeDList :: DList a -> [a]
- singleton :: a -> DList a
- traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b
- data PoolConf = PoolConf {}
- defaultPoolConf :: PoolConf
- inputQueueSize :: Int -> PoolConf -> PoolConf
- numberOfWorkers :: Int -> PoolConf -> PoolConf
- outputQueueSize :: Int -> PoolConf -> PoolConf
- defaults :: a -> a
- throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString
- linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
- utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
- throughProcess_ :: ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
- type ProcConf = ProcConf_ ByteString ByteString
- data ProcConf_ a b = ProcConf_ {
- _bufferStdin :: Bool
- _writeToStdIn :: Handle -> a -> IO ()
- _readFromStdout :: Handle -> IO b
- _readFromStderr :: Handle -> IO ()
- _handleExitCode :: ExitCode -> IO ()
- defaultProcConf :: ProcConf
- bufferStdin :: Bool -> ProcConf -> ProcConf
- readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf
- handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf
- data AreWeInsideGroup foldState
- = OutsideGroup
- | InsideGroup !foldState
- data RecastState foldState = RecastState !(AreWeInsideGroup foldState) [IO foldState]
- recast :: Splitter a b -> Combiners b c -> Jet a -> Jet c
- data Combiners a b where
- combiners :: forall {k} s a b (r :: k). (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
- withCombiners_ :: (h -> a -> IO ()) -> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
- withCombiners :: (h -> s -> a -> IO s) -> (h -> s -> IO b) -> (h -> IO ()) -> [(IO h, h -> IO s)] -> (Combiners a b -> IO r) -> IO r
- combineIntoLists :: Combiners a [a]
- type Splitter a b = MealyIO a (SplitStepResult b)
- data MealyIO a b where
- data SplitStepResult b = SplitStepResult {
- continuationOfPreviouslyStartedGroup :: [b]
- entireGroups :: [[b]]
- startOfNewGroup :: [b]
Documentation
>>>:set -XTypeApplications>>>:set -XImportQualifiedPost>>>:set -XScopedTypeVariables>>>:set -XLambdaCase>>>:set -XNumDecimals>>>import Jet (Jet, (&))>>>import Jet qualified as J>>>import Control.Foldl qualified as L>>>import Control.Concurrent>>>import Data.IORef>>>import Data.Text qualified as T
A Jet is a sequence of values produced through IO effects.
It allows consuming the elements as they are produced and doesn't force them
to be present in memory all at the same time, unlike functions like
replicateM from base.
Instances
| MonadIO Jet Source # |
|
Defined in Jet.Internal | |
| Alternative Jet Source # | Same as |
| Applicative Jet Source # | Similar to the instance for pure lists, that generates combinations.
|
| Functor Jet Source # | Maps over the yielded elements.
|
| Monad Jet Source # | Similar to the instance for pure lists, that does search.
|
| MonadPlus Jet Source # | Same as |
| MonadFail Jet Source # | A failed pattern-match in a do-block produces
|
Defined in Jet.Internal | |
| Monoid (Jet a) Source # |
|
| Semigroup (Jet a) Source # |
|
run :: Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s Source #
Go through the elements produced by a Jet, while threading an
state s and possibly performing some effect.
The caller is the one who chooses the type of the state s, and must pass
an initial value for it. The state is kept in weak-head normal form.
The caller must also provide a predicate on the state that informs the Jet
when to stop producing values: whenever the predicate returns
True.
traverse :: (a -> IO b) -> Jet a -> Jet b Source #
Apply an effectful transformation to each element in a Jet.
>>>:{J.each "abc" & J.traverse (\c -> let c' = succ c in putStrLn ([c] ++ " -> " ++ [c']) *> pure c') & J.toList :} a -> b b -> c c -> d "bcd"
repeatIO :: IO a -> Jet a Source #
>>>J.repeatIO (putStrLn "hi" *> pure True) & J.take 2 & J.toListhi hi [True,True]
replicateIO :: Int -> IO a -> Jet a Source #
>>>J.replicateIO 2 (putStrLn "hi" *> pure True) & J.toListhi hi [True,True]
Don't confuse this with Control.Monad.replicateM :: Int -> Jet a -> Jet [a] which has a combinatorial behavior.
iterateIO :: (a -> IO a) -> a -> Jet a Source #
>>>J.iterateIO (\x -> putStrLn "hi" *> pure (succ x)) (1 :: Int) & J.take 2 & J.toListhi [1,2]
unfold :: (b -> Maybe (a, b)) -> b -> Jet a Source #
>>>J.unfold (\case [] -> Nothing ; c : cs -> Just (c,cs)) "abc" & J.toList"abc"
unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a Source #
>>>:{J.unfoldIO (\x -> do putStrLn "hi" pure $ case x of [] -> Nothing c : cs -> Just (c,cs)) "abc" & J.toList :} hi hi hi hi "abc"
untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a Source #
>>>j = J.untilEOF System.IO.hIsEOF System.IO.hGetLine :: Handle -> Jet String
untilNothing :: IO (Maybe a) -> Jet a Source #
>>>:{do ref <- newIORef "abc" let pop = atomicModifyIORef ref (\case [] -> ([], Nothing) x : xs -> (xs, Just x)) J.untilNothing pop & J.toList :} "abc"
Constructors
| Pair !a !b |
pairExtract :: Pair a b -> b Source #
tripleExtract :: Triple a b c -> c Source #
Constructors
| StillDropping | |
| DroppingNoMore |
dropWhile :: (a -> Bool) -> Jet a -> Jet a Source #
>>>J.each [1..5] & J.dropWhile (<3) & J.toList[3,4,5]
Constructors
| StillTaking | |
| TakingNoMore |
takeWhile :: (a -> Bool) -> Jet a -> Jet a Source #
>>>J.each [1..] & J.takeWhile (<5) & J.toList[1,2,3,4]
mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c Source #
Behaves like a combination of fmap and foldl; it applies a function to
each element of a structure passing an accumulating parameter from left to right.
The resulting Jet has the same number of elements as the original one.
Unlike mapAccumL, it doesn't make the final state available.
>>>J.each [1,2,3,4] & J.mapAccum (\a b -> (a + b,a)) 0 & J.toList[0,1,3,6]
Constructors
| NotYetTouched | |
| AlreadyTouched |
intersperse :: a -> Jet a -> Jet a Source #
>>>J.each "abc" & J.intersperse '-' & J.toList"a-b-c"
zip :: Foldable f => f a -> Jet b -> Jet (a, b) Source #
>>>J.each "abc" & J.zip [1..] & J.toList[(1,'a'),(2,'b'),(3,'c')]
>>>J.each [1..] & J.zip "abc" & J.toList[('a',1),('b',2),('c',3)]
>>>:{do r <- J.bracket (putStrLn "allocating" *> pure "foo") (\r -> putStrLn $ "deallocating " ++ r) liftIO $ putStrLn $ "using resource " ++ r & drain :} allocating using resource foo deallocating foo
finally :: IO a -> Jet () Source #
Notice how the finalizer runs even when we limit the Jet:
>>>:{do J.finally (putStrLn "hi") -- protects statements below liftIO (putStrLn "hey") J.each "abc" & J.limit 2 & J.toList :} hey hi "ab"
But if the protected Jet is not consumed at all, the finalizer might not run.
>>>:{do J.finally (putStrLn "hi") -- protects statements below liftIO (putStrLn "hey") J.each "abc" & J.limit 0 & J.toList :} ""
onException :: IO a -> Jet () Source #
control :: (forall x. (resource -> IO x) -> IO x) -> Jet resource Source #
Lift a control operation (like bracket) for which the
callback uses the allocated resource.
BEWARE: the control operation shouldn't do weird things like executing the callback twice.
control_ :: (forall x. IO x -> IO x) -> Jet () Source #
Lift a control operation (like finally) for which the
callback doesn't use the allocated resource.
BEWARE: the control operation shouldn't do weird things like executing the callback twice.
fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r Source #
>>>L.purely (J.fold (J.each "abc")) ((,) <$> L.list <*> L.length)("abc",3)
foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r Source #
>>>L.impurely (J.foldIO (J.each "abc")) (L.FoldM (\() c -> putStrLn [c]) (pure ()) pure *> L.generalize L.length)a b c 3
class JetSource a source where Source #
Helper multi-parameter typeclass for creating Jet values out of a
variety of common sources.
Because there's no functional dependency, sometimes we need to use
TypeApplications to give the compiler a hint about the type of elements
we want to produce. For example, here we want Lines and not, say,
ByteStrings:
>>>action = J.jet @Line (File "foo.txt") & J.sink J.stdout
accumByteLengths :: Jet ByteString -> Jet (Int, ByteString) Source #
data AmIContinuing Source #
Constructors
| Continuing | |
| NotContinuing |
Instances
| Show AmIContinuing Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> AmIContinuing -> ShowS # show :: AmIContinuing -> String # showList :: [AmIContinuing] -> ShowS # | |
bytesOverBuckets :: [Int] -> Splitter ByteString ByteString Source #
Splits a stream of bytes into groups bounded by maximum byte sizes. When one group "fills up", the next one is started.
When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.
Useful in combination with recast.
newtype ByteBundle Source #
A sequence of bytes that we might want to keep together.
Constructors
| ByteBundle ByteString |
Instances
bundle :: Foldable f => f ByteString -> ByteBundle Source #
Constructs a ByteBundle out of the bytes of some Foldable container.
bundleLength :: ByteBundle -> Int Source #
Length in bytes.
bundleBytes :: ByteBundle -> Jet ByteString Source #
data BucketOverflow Source #
Exception thrown when we try to write too much data in a size-bounded destination.
Constructors
| BucketOverflow |
Instances
| Exception BucketOverflow Source # | |
Defined in Jet.Internal Methods toException :: BucketOverflow -> SomeException # fromException :: SomeException -> Maybe BucketOverflow # displayException :: BucketOverflow -> String # backtraceDesired :: BucketOverflow -> Bool # | |
| Show BucketOverflow Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> BucketOverflow -> ShowS # show :: BucketOverflow -> String # showList :: [BucketOverflow] -> ShowS # | |
byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString Source #
Splits a stream of ByteBundles into groups bounded by maximum byte
sizes. Bytes belonging to the same ByteBundle are always put in the same
group. When one group "fills up", the next one is started.
When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.
Useful in combination with recast.
THROWS:
BucketOverflowexception if the size bound of a group turns out to be too small for holding even a singleByteBundlevalue.
decodeUtf8 :: Jet ByteString -> Jet Text Source #
THROWS:
encodeUtf8 :: Jet Text -> Jet ByteString Source #
A line of text.
While it is guaranteed that the Lines coming out of the lines function
do not contain newlines, that invariant is not otherwise enforced.
Instances
| Monoid Line Source # | |
| Semigroup Line Source # | |
| IsString Line Source # | |
Defined in Jet.Internal Methods fromString :: String -> Line # | |
| Show Line Source # | |
| Eq Line Source # | |
| Ord Line Source # | |
| JetSink Line Handle Source # | Uses the default system locale. Adds newlines. |
| JetSource Line Handle Source # | Uses the default system locale. |
pattern Line :: StrictText -> Line Source #
lineToUtf8 :: Line -> ByteBundle Source #
Converts a Line to an utf8-encdoed ByteBundle, without adding the newline.
textToLine :: Text -> Line Source #
textToUtf8 :: Text -> ByteBundle Source #
stringToLine :: String -> Line Source #
isEmptyLine :: Line -> Bool Source #
data NewlineForbidden Source #
Exception thrown when we find newlines in functions which don't accept them.
A direct copy of the NewlineForbidden exception from the turtle package.
Constructors
| NewlineForbidden |
Instances
| Exception NewlineForbidden Source # | |
Defined in Jet.Internal Methods toException :: NewlineForbidden -> SomeException # fromException :: SomeException -> Maybe NewlineForbidden # | |
| Show NewlineForbidden Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> NewlineForbidden -> ShowS # show :: NewlineForbidden -> String # showList :: [NewlineForbidden] -> ShowS # | |
type Sink a = Jet a -> IO () Source #
A function that consumes a Jet totally or partially, without returning a result.
class JetSink a target where Source #
Helper multi-parameter typeclass for creating Jet-consuming functions
out of a variety of common destinations.
>>>J.each ["aaa","bbb","ccc"] <&> J.stringToLine & J.sink J.stdoutaaa bbb ccc
Instances
| JetSink ByteString Handle Source # | |
Defined in Jet.Internal | |
| JetSink ByteBundle Handle Source # | |
Defined in Jet.Internal | |
| JetSink Line Handle Source # | Uses the default system locale. Adds newlines. |
| JetSink Text Handle Source # | Uses the default system locale. |
| JetSink a Handle => JetSink a File Source # | |
| JetSink ByteString [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. |
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteString Source # | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each |
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source # | |
FilePaths are plain strings. This newtype provides a small measure of
safety over them.
Constructors
| File | |
Fields | |
Instances
| Show File Source # | |
| JetSink a Handle => JetSink a File Source # | |
| JetSource a Handle => JetSource a File Source # | |
| JetSink ByteString [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. |
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteString Source # | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each |
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source # | |
data BoundedSize x Source #
The maximum size in bytes of some destination into which we write the
bytes produced by a Jet.
Constructors
| BoundedSize Int x |
Instances
| JetSink ByteString [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. |
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteString Source # | |
| JetSink ByteBundle [BoundedSize File] Source # | Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one. Each |
Defined in Jet.Internal Methods sink :: [BoundedSize File] -> Sink ByteBundle Source # | |
| Read x => Read (BoundedSize x) Source # | |
Defined in Jet.Internal Methods readsPrec :: Int -> ReadS (BoundedSize x) # readList :: ReadS [BoundedSize x] # readPrec :: ReadPrec (BoundedSize x) # readListPrec :: ReadPrec [BoundedSize x] # | |
| Show x => Show (BoundedSize x) Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> BoundedSize x -> ShowS # show :: BoundedSize x -> String # showList :: [BoundedSize x] -> ShowS # | |
makeAllocator :: BoundedSize File -> IO Handle Source #
closeDList :: DList a -> [a] Source #
traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b Source #
Process the values yielded by the upstream Jet in a concurrent way,
and return the results in the form of another Jet as they are produced.
NB: this function might scramble the order of the returned values. Right now there isn't a function for unscrambling them.
>>>:{J.each [(3,'a'), (2,'b'), (1,'c')] & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c) & J.toList :} "cba"
What happens if we limit the resulting Jet and we reach that limit, or
if we otherwise stop consuming the Jet before it gets exhausted? In those
cases, all pending IO b tasks are cancelled.
>>>:{J.each [(9999,'a'), (2,'b'), (1,'c')] & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c) & J.take 2 & J.toList :} "cb"
Configuration record for the worker pool.
Constructors
| PoolConf | |
Fields
| |
inputQueueSize :: Int -> PoolConf -> PoolConf Source #
Size of the waiting queue into the worker pool. The default is 1.
numberOfWorkers :: Int -> PoolConf -> PoolConf Source #
The size of the worker pool. The default is 1.
outputQueueSize :: Int -> PoolConf -> PoolConf Source #
Size of the queue holding results out of the working pool before they
are yielded downstream. The default is 1.
An alias for id. Useful with functions like traverseConcurrently and
throughProcess, for which it means "use the default configuration".
throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString Source #
Feeds the upstream Jet to an external process' stdin and returns the
process' stdout as another Jet. The feeding and reading of the standard
streams is done concurrently in order to avoid deadlocks.
What happens if we limit the resulting Jet and we reach that limit, or
if we otherwise stop consuming the Jet before it gets exhausted? In those
cases, the external process is promptly terminated.
linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #
Like throughProcess, but feeding and reading Lines using the default
system encoding.
>>>:{J.each ["aaa","bbb","ccc"] <&> J.stringToLine & linesThroughProcess defaults (shell "cat") & J.toList :} ["aaa","bbb","ccc"]
An example of not reading all the lines from a long-lived process that gets cancelled:
>>>:{mempty & linesThroughProcess defaults (shell "{ printf \"aaa\\nbbb\\nccc\\n\" ; sleep infinity ; }") & J.limit 2 & J.toList :} ["aaa","bbb"]
utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #
Like throughProcess, but feeding and reading Lines encoded in UTF8.
throughProcess_ :: ProcConf_ a b -> CreateProcess -> Jet a -> Jet b Source #
type ProcConf = ProcConf_ ByteString ByteString Source #
Configuration record with some extra options in addition to those in CreateProcess.
Constructors
| ProcConf_ | |
Fields
| |
readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf Source #
Sets the function that reads a single line of output from the process
stderr. It's called repeatedly until stderr is exhausted. The reads are
done concurrently with the reads from stdout.
By default, lines of text are read using the system's default encoding.
This is a good place to throw an exception if we don't like what comes out
of stderr.
data AreWeInsideGroup foldState Source #
Constructors
| OutsideGroup | |
| InsideGroup !foldState |
data RecastState foldState Source #
Constructors
| RecastState !(AreWeInsideGroup foldState) [IO foldState] |
recast :: Splitter a b -> Combiners b c -> Jet a -> Jet c Source #
This is a complex, unwieldly, yet versatile function. It can be used to define grouping operations, but also for decoding and other purposes.
Groups are delimited in the input Jet using the Splitter, and the
contents of those groups are then combined using Combiners. The result of
each combiner is yielded by the return Jet.
If the list of combiners is finite and becomes exhausted, we stop splitting
and the return Jet stops.
data Combiners a b where Source #
A Combiners value knows how to process a sequence of groups, while
keeping a (existentially hidden) state for each group.
Very much like a FoldM IO from the
foldl
library, but "restartable" with a list of starting states.
For converting one into the other, this function should do the trick:
\(L.FoldM step allocator coda) -> combiners step coda (Prelude.repeat allocator)
Arguments
| :: forall {k} s a b (r :: k). (s -> a -> IO s) | Step function that threads the state |
| -> (s -> IO b) | Coda invoked when a group closes. |
| -> [IO s] | Actions that produce the initial states |
| -> Combiners a b |
Constructor for Combiners values.
Arguments
| :: (h -> a -> IO ()) | Step function that accesses the resource |
| -> (h -> IO ()) | Finalizer to run after closing each group, and also in the case of an exception. |
| -> [IO h] | Actions that allocate a sequence of resources |
| -> (Combiners a () -> IO r) | The |
| -> IO r |
A simpler version of withCombiners that doen't thread a state; it merely
allocates and deallocates the resource h.
Arguments
| :: (h -> s -> a -> IO s) | Step function that accesses the resource |
| -> (h -> s -> IO b) | Coda invoked when a group closes. |
| -> (h -> IO ()) | Finalizer to run after each coda, and also in the case of an exception. |
| -> [(IO h, h -> IO s)] | Actions that allocate a sequence of resources |
| -> (Combiners a b -> IO r) | The |
| -> IO r |
Combiners thread a state s while processing each group. Sometimes, in
addition to that, we want to allocate a resource h when we start
processing a group, and deallocate it after we finish processing the group
or an exception is thrown. The typical example is allocating a Handle for
writing the elements of the group as they arrive.
combineIntoLists :: Combiners a [a] Source #
Puts the elements of each group into a list that is kept in memory. This breaks streaming within the group.
Useful with recast.
type Splitter a b = MealyIO a (SplitStepResult b) Source #
Delimits groups in the values yielded by a Jet, and can also transform
those values.
data MealyIO a b where Source #
A Mealy machine with an existentially hidden state.
Very much like a FoldM IO from the
foldl
library, but it emits an output at each step, not only at the end.
Constructors
| MealyIO | |
data SplitStepResult b Source #
For each value coming from upstream, what has the Splitter learned?
- Perhaps we should continue some group we have already started in a previous step.
- Perhaps we have found entire groups that we should emit in one go, groups we know are already complete.
- Perhaps we should start a new group that will continue in the next steps.
Constructors
| SplitStepResult | |
Fields
| |
Instances
| Functor SplitStepResult Source # | |
Defined in Jet.Internal Methods fmap :: (a -> b) -> SplitStepResult a -> SplitStepResult b # (<$) :: a -> SplitStepResult b -> SplitStepResult a # | |
| Monoid (SplitStepResult b) Source # | |
Defined in Jet.Internal Methods mempty :: SplitStepResult b # mappend :: SplitStepResult b -> SplitStepResult b -> SplitStepResult b # mconcat :: [SplitStepResult b] -> SplitStepResult b # | |
| Semigroup (SplitStepResult b) Source # | |
Defined in Jet.Internal Methods (<>) :: SplitStepResult b -> SplitStepResult b -> SplitStepResult b # sconcat :: NonEmpty (SplitStepResult b) -> SplitStepResult b # stimes :: Integral b0 => b0 -> SplitStepResult b -> SplitStepResult b # | |
| Show b => Show (SplitStepResult b) Source # | |
Defined in Jet.Internal Methods showsPrec :: Int -> SplitStepResult b -> ShowS # show :: SplitStepResult b -> String # showList :: [SplitStepResult b] -> ShowS # | |