{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE LinearTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE GADTSyntax #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneKindSignatures #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ApplicativeDo #-}
{-# OPTIONS_GHC -Wno-partial-type-signatures  #-}
-- | Tampering with the internals lets you write invalid 'Jet's that don't

-- respect stop signals from consumers, so be careful.

--

-- Also, the internals expose 'Line' and 'ByteBundle' as thin coats of paint

-- over lazy text and lazy bytestring, respectively.

module Jet.Internal where

import Control.Applicative
import Control.Monad
import Control.Monad.IO.Class
import Control.Exception
import Data.Foldable qualified
import Prelude hiding (traverse_, for_, filter, drop, dropWhile, fold, take,
                       takeWhile, unfold, zip, zipWith, filterM, lines, intersperse, unlines)
import Prelude qualified
import Unsafe.Coerce qualified
import System.IO (Handle, IOMode(..), hClose, openBinaryFile)
import System.IO qualified
import Data.Function ((&))
import Data.Functor ((<&>))

import Data.Bifunctor
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.IO qualified as T
import Data.Text.Encoding qualified as T
import Data.Text.Encoding.Error qualified as T
import Data.Text.Lazy qualified as TL
import Data.Text.Lazy.Encoding qualified as TL
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Lazy qualified as BL

import Control.Concurrent
import Data.IORef
import Control.Concurrent.STM
import Control.Concurrent.MVar
import Control.Concurrent.Conceit
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import System.Process
import System.Exit
import Data.String (IsString(..))
import Data.Typeable
import Data.Traversable qualified
import Data.Maybe
import Data.List qualified
import Data.Bifunctor (first)
-- import Debug.Trace


-- $setup

--

-- >>> :set -XTypeApplications

-- >>> :set -XImportQualifiedPost

-- >>> :set -XScopedTypeVariables

-- >>> :set -XLambdaCase

-- >>> :set -XNumDecimals

-- >>> import Jet (Jet, (&))

-- >>> import Jet qualified as J

-- >>> import Control.Foldl qualified as L

-- >>> import Control.Concurrent

-- >>> import Data.IORef

-- >>> import Data.Text qualified as T


-- | A 'Jet' is a sequence of values produced through 'IO' effects.

--

-- It allows consuming the elements as they are produced and doesn't force them

-- to be present in memory all at the same time, unlike functions like

-- 'Control.Monad.replicateM' from @base@.

-- 

newtype Jet a = Jet {
        forall a.
Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
runJet :: forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
    } 

-- | Maps over the yielded elements. '(<&>)' can be used to put the function last.

--

-- >>> J.each "aa" <&> succ & J.toList

-- "bb"

deriving stock instance Functor Jet

-- | Go through the elements produced by a 'Jet', while threading an

-- state @s@ and possibly performing some effect.

--

-- The caller is the one who chooses the type of the state @s@, and must pass

-- an initial value for it. The state is kept in [weak-head normal form](https://en.wikibooks.org/wiki/Haskell/Graph_reduction#Weak_Head_Normal_Form).

--

-- The caller must also provide a predicate on the state that informs the `Jet`

-- when to stop producing values: whenever the predicate returns

-- @True@.

run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run Jet a
j = Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall a.
Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
runJet Jet a
j

-- | Like 'run', but always goes through all elements produced by the 'Jet'.

--

-- Equivalent to @run (const False)@.

consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
consume Jet a
j = Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run Jet a
j (Bool -> s -> Bool
forall a b. a -> b -> a
const Bool
False)

for :: Jet a -> (a -> IO b) -> Jet b
for :: forall a b. Jet a -> (a -> IO b) -> Jet b
for Jet a
j a -> IO b
k = (() -> a -> IO b) -> [IO ()] -> Jet a -> Jet b
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO (\() -> a -> IO b
k) (IO () -> [IO ()]
forall a. a -> [a]
Prelude.repeat (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())) Jet a
j

for_ :: Jet a -> (a -> IO b) -> IO ()
for_ :: forall a b. Jet a -> (a -> IO b) -> IO ()
for_ Jet a
j a -> IO b
k = Jet a -> (() -> a -> IO ()) -> () -> IO ()
forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
consume Jet a
j (\() -> IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> IO b
k) () 

-- | Apply an effectful transformation to each element in a 'Jet'.

--

-- >>> :{

-- J.each "abc" 

-- & J.traverse (\c -> let c' = succ c in putStrLn ([c] ++ " -> " ++ [c']) *> pure c')

-- & J.toList

-- :}

-- a -> b

-- b -> c

-- c -> d

-- "bcd"

--

traverse :: (a -> IO b) -> Jet a -> Jet b
traverse :: forall a b. (a -> IO b) -> Jet a -> Jet b
traverse =  (Jet a -> (a -> IO b) -> Jet b) -> (a -> IO b) -> Jet a -> Jet b
forall a b c. (a -> b -> c) -> b -> a -> c
flip Jet a -> (a -> IO b) -> Jet b
forall a b. Jet a -> (a -> IO b) -> Jet b
for

traverse_ :: (a -> IO b) -> Sink a
traverse_ :: forall a b. (a -> IO b) -> Sink a
traverse_  = (Jet a -> (a -> IO b) -> IO ()) -> (a -> IO b) -> Jet a -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Jet a -> (a -> IO b) -> IO ()
forall a b. Jet a -> (a -> IO b) -> IO ()
for_

-- | Go through the 'Jet' only for the 'IO' effects, discarding all yielded elements.

drain :: Sink a
drain :: forall a. Sink a
drain = (a -> IO a) -> Sink a
forall a b. (a -> IO b) -> Sink a
traverse_ a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

-- | Similar to the instance for pure lists, that generates combinations.

--

-- >>> (,) <$> J.each "ab" <*> J.each "cd" & J.toList

-- [('a','c'),('a','d'),('b','c'),('b','d')]

--

instance Applicative Jet where
  pure :: forall a. a -> Jet a
pure a
i = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial ->
    if
        | s -> Bool
stop s
initial -> s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
        | Bool
otherwise -> s -> a -> IO s
step s
initial a
i
  Jet forall s. (s -> Bool) -> (s -> (a -> b) -> IO s) -> s -> IO s
left <*> :: forall a b. Jet (a -> b) -> Jet a -> Jet b
<*> Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
right = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial ->
    -- Here we assume that the first Jet correctly handles the stop signal.

    let step' :: (t -> b) -> s -> t -> IO s
step' t -> b
f s
s t
a = s -> b -> IO s
step s
s (t -> b
f t
a)
     in (s -> Bool) -> (s -> (a -> b) -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> (a -> b) -> IO s) -> s -> IO s
left s -> Bool
stop (\s
s a -> b
f -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
right s -> Bool
stop ((a -> b) -> s -> a -> IO s
forall {t}. (t -> b) -> s -> t -> IO s
step' a -> b
f) s
s) s
initial

-- | Similar to the instance for pure lists, that does search.

--

-- >>> :{

-- do string <- J.each ["ab","cd"]

--    J.each string

-- &

-- J.toList

-- :}

-- "abcd"

instance Monad Jet where
  return :: forall a. a -> Jet a
return = a -> Jet a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
m >>= :: forall a b. Jet a -> (a -> Jet b) -> Jet b
>>= a -> Jet b
k = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial ->
    (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
m s -> Bool
stop (\s
s a
a -> Jet b -> forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
forall a.
Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
runJet (a -> Jet b
k a
a) s -> Bool
stop s -> b -> IO s
step s
s) s
initial

-- |

-- >>> liftIO (putStrLn "foo") <> liftIO (putStrLn "bar") & J.toList

-- foo

-- bar

-- [(),()]

instance MonadIO Jet where
  liftIO :: forall a. IO a -> Jet a
liftIO IO a
action = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial ->
    if
        | s -> Bool
stop s
initial -> s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
        | Bool
otherwise -> do
          a
a <- IO a
action
          s -> a -> IO s
step s
initial a
a

-- | 'Jet' concatenation.

-- 

-- >>> J.each "ab" <> J.each "cd" & J.toList

-- "abcd"

instance Semigroup (Jet a) where
  Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f1 <> :: Jet a -> Jet a -> Jet a
<> Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f2 = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
s0 -> do
    -- perhaps some of the stop checks are redundant, the first one in particular?

    if
        | s -> Bool
stop s
s0 ->
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s0
        | Bool
otherwise -> do
          !s
s1 <- (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f1 s -> Bool
stop s -> a -> IO s
step s
s0
          if
              | s -> Bool
stop s
s1 ->
                s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s1
              | Bool
otherwise -> do
                !s
s2 <- (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f2 s -> Bool
stop s -> a -> IO s
step s
s1
                s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s2

-- | 'mempty' is the empty 'Jet'.

--

-- >>> mempty <> J.each "ab" <> mempty & J.toList

-- "ab"

instance Monoid (Jet a) where
  mempty :: Jet a
mempty = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
_ s -> a -> IO s
_ s
initial -> s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial

-- | Same as 'Monoid'.

instance Alternative Jet where
  <|> :: forall a. Jet a -> Jet a -> Jet a
(<|>) = Jet a -> Jet a -> Jet a
forall a. Semigroup a => a -> a -> a
(<>)
  empty :: forall a. Jet a
empty = Jet a
forall a. Monoid a => a
mempty

-- | Same as 'Monoid'

instance MonadPlus Jet where
  mzero :: forall a. Jet a
mzero = Jet a
forall a. Monoid a => a
mempty
  mplus :: forall a. Jet a -> Jet a -> Jet a
mplus = Jet a -> Jet a -> Jet a
forall a. Semigroup a => a -> a -> a
(<>)

-- | A failed pattern-match in a do-block produces 'mzero'. 

--

-- >>> :{

-- do Just c <- J.each [Nothing, Just 'a', Nothing, Just 'b']

--    pure c

-- & J.toList

-- :}

-- "ab"

--

instance MonadFail Jet where
  fail :: forall a. String -> Jet a
fail String
_ = Jet a
forall (m :: * -> *) a. MonadPlus m => m a
mzero

-- | Build a 'Jet' from any 'Foldable' container

--

-- >>> J.each [True,False] & J.toList

-- [True,False]

--

each :: forall a f . Foldable f => f a -> Jet a
each :: forall a (f :: * -> *). Foldable f => f a -> Jet a
each (f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList -> [a]
seed) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step ->
  -- This could be done with Jet.unfold, but let's leave as it is.

  let go :: [a] -> s -> IO s
go [a]
b s
s =
        if
            | s -> Bool
stop s
s ->
              s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
            | Bool
otherwise ->
              case [a]
b of
                [] ->
                  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                -- see corresponding comment in unfold.

                a
x : [a]
xs -> do
                  !s
s' <- s -> a -> IO s
step s
s a
x
                  [a] -> s -> IO s
go [a]
xs s
s'
   in [a] -> s -> IO s
go [a]
seed

-- |

--

-- >>> J.repeat True & J.take 2 & J.toList

-- [True,True]

--

repeat :: a -> Jet a
repeat :: forall a. a -> Jet a
repeat a
a = IO a -> Jet a
forall a. IO a -> Jet a
repeatIO (a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a)


-- |

--

-- >>> J.repeatIO (putStrLn "hi" *> pure True) & J.take 2 & J.toList

-- hi

-- hi

-- [True,True]

--

repeatIO :: IO a -> Jet a
repeatIO :: forall a. IO a -> Jet a
repeatIO IO a
action = IO (Maybe a) -> Jet a
forall a. IO (Maybe a) -> Jet a
untilNothing ((a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just IO a
action)

-- |

--

-- >>> J.replicate 2 True & J.toList

-- [True,True]

--

replicate :: Int -> a -> Jet a
replicate :: forall a. Int -> a -> Jet a
replicate Int
n a
a = Int -> IO a -> Jet a
forall a. Int -> IO a -> Jet a
replicateIO Int
n (a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a)

-- |

-- >>> J.replicateIO 2 (putStrLn "hi" *> pure True) & J.toList

-- hi

-- hi

-- [True,True]

--

-- Don't confuse this with @Control.Monad.replicateM :: Int -> Jet a -> Jet [a]@ which has a combinatorial behavior.

--

replicateIO :: Int -> IO a -> Jet a
replicateIO :: forall a. Int -> IO a -> Jet a
replicateIO Int
n IO a
ioa = Int -> Jet a -> Jet a
forall a. Int -> Jet a -> Jet a
take Int
n (IO a -> Jet a
forall a. IO a -> Jet a
repeatIO IO a
ioa)

-- |

--

-- >>> J.iterate succ (1 :: Int) & J.take 2 & J.toList

-- [1,2]

--

iterate :: (a -> a) -> a -> Jet a
iterate :: forall a. (a -> a) -> a -> Jet a
iterate a -> a
h = (a -> IO a) -> a -> Jet a
forall a. (a -> IO a) -> a -> Jet a
iterateIO ((a -> IO a) -> (a -> a) -> a -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> a
h)

-- |

--

-- >>> J.iterateIO (\x -> putStrLn "hi" *> pure (succ x)) (1 :: Int) & J.take 2 & J.toList

-- hi

-- [1,2]

--

iterateIO :: (a -> IO a) -> a -> Jet a
iterateIO :: forall a. (a -> IO a) -> a -> Jet a
iterateIO a -> IO a
h a
a = a -> Jet a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a Jet a -> Jet a -> Jet a
forall a. Semigroup a => a -> a -> a
<> (a -> IO (Maybe (a, a))) -> a -> Jet a
forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO ((IO a -> IO (Maybe (a, a)))
-> (a -> IO a) -> a -> IO (Maybe (a, a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> Maybe (a, a)) -> IO a -> IO (Maybe (a, a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\a
x -> (a, a) -> Maybe (a, a)
forall a. a -> Maybe a
Just (a
x,a
x))) a -> IO a
h) a
a     

-- |

-- >>> J.unfold (\case [] -> Nothing ; c : cs -> Just (c,cs)) "abc" & J.toList

-- "abc"

--

unfold :: (b -> Maybe (a, b)) -> b -> Jet a
unfold :: forall b a. (b -> Maybe (a, b)) -> b -> Jet a
unfold b -> Maybe (a, b)
h = (b -> IO (Maybe (a, b))) -> b -> Jet a
forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO ((Maybe (a, b) -> IO (Maybe (a, b)))
-> (b -> Maybe (a, b)) -> b -> IO (Maybe (a, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (a, b) -> IO (Maybe (a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure b -> Maybe (a, b)
h)

-- |

-- >>> :{ 

-- J.unfoldIO (\x -> do putStrLn "hi" 

--                      pure $ case x of 

--                         [] -> Nothing 

--                         c : cs -> Just (c,cs)) 

--            "abc" 

-- & J.toList

-- :}                             

-- hi

-- hi

-- hi

-- hi

-- "abc"

--

unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO :: forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO b -> IO (Maybe (a, b))
h b
seed = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step ->
  let go :: b -> s -> IO s
go b
b s
s =
        if
            | s -> Bool
stop s
s ->
              s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
            | Bool
otherwise -> do
              Maybe (a, b)
next <- b -> IO (Maybe (a, b))
h b
b
              case Maybe (a, b)
next of
                Maybe (a, b)
Nothing ->
                  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                -- strictness only on the states. Good idea, or bad?

                Just (a
a, !b
b') -> do
                  !s
s' <- s -> a -> IO s
step s
s a
a
                  b -> s -> IO s
go b
b' s
s'
   in b -> s -> IO s
go b
seed

-- |

-- >>> j = J.untilEOF System.IO.hIsEOF System.IO.hGetLine :: Handle -> Jet String

--

untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF :: forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF handle -> IO Bool
hIsEOF' handle -> IO a
hGetLine' handle
handle = IO (Maybe a) -> Jet a
forall a. IO (Maybe a) -> Jet a
untilNothing do
      Bool
eof <- handle -> IO Bool
hIsEOF' handle
handle
      if
          | Bool
eof -> 
            Maybe a -> IO (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
          | Bool
otherwise ->
            a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> handle -> IO a
hGetLine' handle
handle

-- | 

--

-- >>> :{ 

-- do ref <- newIORef "abc"

--    let pop = atomicModifyIORef ref (\case [] -> ([], Nothing)

--                                           x : xs -> (xs, Just x)) 

--    J.untilNothing pop & J.toList                                       

-- :}

-- "abc"

--

untilNothing :: IO (Maybe a) -> Jet a
untilNothing :: forall a. IO (Maybe a) -> Jet a
untilNothing IO (Maybe a)
action = (() -> IO (Maybe (a, ()))) -> () -> Jet a
forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO (\() -> (Maybe a -> Maybe (a, ())) -> IO (Maybe a) -> IO (Maybe (a, ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> (a, ())) -> Maybe a -> Maybe (a, ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,())) IO (Maybe a)
action) ()

-- | Convert to a regular list. This breaks streaming.

--

-- >>> J.each "abc" & J.toList

-- "abc"

--

-- Alternatively, we can use 'fold' in combination with 'Control.Foldl.list' form the [foldl](https://hackage.haskell.org/package/foldl) library:

--

-- >>> L.purely (J.fold (J.each "abc")) L.list 

-- "abc"

--

-- which is more verbose, but more composable.

toList :: Jet a -> IO [a]
toList :: forall a. Jet a -> IO [a]
toList (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = do
    [a]
as <- ([a] -> Bool) -> ([a] -> a -> IO [a]) -> [a] -> IO [a]
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> [a] -> Bool
forall a b. a -> b -> a
const Bool
False) (\[a]
xs a
x -> [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs)) []
    pure ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
as)

-- | Returns the number of elements yielded by the 'Jet', exhausting it in the process.

--

-- >>> J.each "abc" & J.length

-- 3

--

-- Alternatively, we can use 'fold' in combination with 'Control.Foldl.length' form the [foldl](https://hackage.haskell.org/package/foldl) library:

--

-- >>> L.purely (J.fold (J.each "abc")) L.length

-- 3

--

-- which is more verbose, but more composable.

length :: Jet a -> IO Int
length :: forall a. Jet a -> IO Int
length (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = do
    Int
l <- (Int -> Bool) -> (Int -> a -> IO Int) -> Int -> IO Int
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> Int -> Bool
forall a b. a -> b -> a
const Bool
False) (\Int
s a
_ -> Int -> IO Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> Int
forall a. Enum a => a -> a
succ Int
s)) Int
0
    pure Int
l

data Pair a b = Pair !a !b deriving Int -> Pair a b -> ShowS
[Pair a b] -> ShowS
Pair a b -> String
(Int -> Pair a b -> ShowS)
-> (Pair a b -> String) -> ([Pair a b] -> ShowS) -> Show (Pair a b)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall a b. (Show a, Show b) => Int -> Pair a b -> ShowS
forall a b. (Show a, Show b) => [Pair a b] -> ShowS
forall a b. (Show a, Show b) => Pair a b -> String
showList :: [Pair a b] -> ShowS
$cshowList :: forall a b. (Show a, Show b) => [Pair a b] -> ShowS
show :: Pair a b -> String
$cshow :: forall a b. (Show a, Show b) => Pair a b -> String
showsPrec :: Int -> Pair a b -> ShowS
$cshowsPrec :: forall a b. (Show a, Show b) => Int -> Pair a b -> ShowS
Show

pairExtract :: Pair a b -> b
pairExtract (Pair a
_ b
b) = b
b

pairEnv :: Pair a b -> a
pairEnv (Pair a
a b
_) = a
a

data Triple a b c = Triple !a !b !c

tripleExtract :: Triple a b c -> c
tripleExtract (Triple a
_ b
_ c
c) = c
c

-- fromTuple :: (a, b) -> Pair a b

-- fromTuple (a, b) -> Pair a b


-- | >>> J.each "abc" & J.drop 2 & J.toList

-- "c"

--

drop :: Int -> Jet a -> Jet a
drop :: forall a. Int -> Jet a -> Jet a
drop Int
limit (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair Int s -> a -> IO (Pair Int s)
step' (Pair Int
count s
s) a
a =
        if
            | Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
limit -> do
              Pair Int s -> IO (Pair Int s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair (Int -> Int
forall a. Enum a => a -> a
succ Int
count) s
s)
            | Bool
otherwise -> do
              !s
s' <- s -> a -> IO s
step s
s a
a
              Pair Int s -> IO (Pair Int s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair Int
count s
s')
      initial' :: Pair Int s
initial' = Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair Int
0 s
initial
  Pair Int
_ s
final <- (Pair Int s -> Bool)
-> (Pair Int s -> a -> IO (Pair Int s))
-> Pair Int s
-> IO (Pair Int s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair Int s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair Int s -> a -> IO (Pair Int s)
step' Pair Int s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

data DropState = StillDropping | DroppingNoMore

-- | >>> J.each [1..5] & J.dropWhile (<3) & J.toList

-- [3,4,5]

--

dropWhile :: (a -> Bool) -> Jet a -> Jet a
dropWhile :: forall a. (a -> Bool) -> Jet a -> Jet a
dropWhile a -> Bool
p = (a -> IO Bool) -> Jet a -> Jet a
forall a. (a -> IO Bool) -> Jet a -> Jet a
dropWhileIO ((Bool -> IO Bool) -> (a -> Bool) -> a -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> Bool
p)

dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
dropWhileIO :: forall a. (a -> IO Bool) -> Jet a -> Jet a
dropWhileIO a -> IO Bool
p (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair DropState s -> a -> IO (Pair DropState s)
step' (Pair DropState
DroppingNoMore s
s) a
a = do
        !s
s' <- s -> a -> IO s
step s
s a
a
        Pair DropState s -> IO (Pair DropState s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DropState -> s -> Pair DropState s
forall a b. a -> b -> Pair a b
Pair DropState
DroppingNoMore s
s')
      step' (Pair DropState
StillDropping s
s) a
a = do
        Bool
keepDropping <- a -> IO Bool
p a
a
        if
            | Bool
keepDropping ->
              Pair DropState s -> IO (Pair DropState s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DropState -> s -> Pair DropState s
forall a b. a -> b -> Pair a b
Pair DropState
StillDropping s
s)
            | Bool
otherwise -> do
              !s
s' <- s -> a -> IO s
step s
s a
a
              Pair DropState s -> IO (Pair DropState s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DropState -> s -> Pair DropState s
forall a b. a -> b -> Pair a b
Pair DropState
DroppingNoMore s
s')
      initial' :: Pair DropState s
initial' = (DropState -> s -> Pair DropState s
forall a b. a -> b -> Pair a b
Pair DropState
StillDropping s
initial)
  Pair DropState
_ s
final <- (Pair DropState s -> Bool)
-> (Pair DropState s -> a -> IO (Pair DropState s))
-> Pair DropState s
-> IO (Pair DropState s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair DropState s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair DropState s -> a -> IO (Pair DropState s)
step' Pair DropState s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

-- | >>> J.each "abc" & J.take 2 & J.toList

-- "ab"

--

take :: Int -> Jet a -> Jet a
take :: forall a. Int -> Jet a -> Jet a
take Int
limit (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair Int s -> Bool
stop' (Pair Int
count s
s) =
        Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
limit Bool -> Bool -> Bool
|| s -> Bool
stop s
s
      step' :: Pair a s -> a -> IO (Pair a s)
step' (Pair a
count s
s) a
a = do
        !s
s' <- s -> a -> IO s
step s
s a
a
        Pair a s -> IO (Pair a s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> s -> Pair a s
forall a b. a -> b -> Pair a b
Pair (a -> a
forall a. Enum a => a -> a
succ a
count) s
s')
      initial' :: Pair Int s
initial' = Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair Int
0 s
initial
  Pair Int
_ s
final <- (Pair Int s -> Bool)
-> (Pair Int s -> a -> IO (Pair Int s))
-> Pair Int s
-> IO (Pair Int s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair Int s -> Bool
stop' Pair Int s -> a -> IO (Pair Int s)
forall {a}. Enum a => Pair a s -> a -> IO (Pair a s)
step' Pair Int s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

-- | Synonym for 'take'.

limit :: Int -> Jet a -> Jet a
limit :: forall a. Int -> Jet a -> Jet a
limit = Int -> Jet a -> Jet a
forall a. Int -> Jet a -> Jet a
take

data TakeState = StillTaking | TakingNoMore

-- | >>> J.each [1..] & J.takeWhile (<5) & J.toList

-- [1,2,3,4]

--

takeWhile :: (a -> Bool) -> Jet a -> Jet a
takeWhile :: forall a. (a -> Bool) -> Jet a -> Jet a
takeWhile a -> Bool
p = (a -> IO Bool) -> Jet a -> Jet a
forall a. (a -> IO Bool) -> Jet a -> Jet a
takeWhileIO ((Bool -> IO Bool) -> (a -> Bool) -> a -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> Bool
p)

takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
takeWhileIO :: forall a. (a -> IO Bool) -> Jet a -> Jet a
takeWhileIO a -> IO Bool
p (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair TakeState s -> Bool
stop' (Pair TakeState
TakingNoMore s
_) =
        Bool
True
      stop' (Pair TakeState
StillTaking s
s) =
        s -> Bool
stop s
s
      step' :: Pair TakeState s -> a -> IO (Pair TakeState s)
step' (Pair TakeState
internal s
s) a
a = do
        Bool
keepTaking <- a -> IO Bool
p a
a
        if
            | Bool
keepTaking -> do
              !s
s' <- s -> a -> IO s
step s
s a
a
              Pair TakeState s -> IO (Pair TakeState s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TakeState -> s -> Pair TakeState s
forall a b. a -> b -> Pair a b
Pair TakeState
internal s
s')
            | Bool
otherwise ->
              Pair TakeState s -> IO (Pair TakeState s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TakeState -> s -> Pair TakeState s
forall a b. a -> b -> Pair a b
Pair TakeState
TakingNoMore s
s)
      initial' :: Pair TakeState s
initial' = TakeState -> s -> Pair TakeState s
forall a b. a -> b -> Pair a b
Pair TakeState
StillTaking s
initial
  Pair TakeState
_ s
final <- (Pair TakeState s -> Bool)
-> (Pair TakeState s -> a -> IO (Pair TakeState s))
-> Pair TakeState s
-> IO (Pair TakeState s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair TakeState s -> Bool
stop' Pair TakeState s -> a -> IO (Pair TakeState s)
step' Pair TakeState s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

-- | 

-- >>> J.each "abc" & J.filter (=='a') & J.toList

-- "a"

--

filter :: (a -> Bool) -> Jet a -> Jet a
filter :: forall a. (a -> Bool) -> Jet a -> Jet a
filter a -> Bool
p = (a -> IO Bool) -> Jet a -> Jet a
forall a. (a -> IO Bool) -> Jet a -> Jet a
filterIO ((Bool -> IO Bool) -> (a -> Bool) -> a -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> Bool
p)

filterIO :: (a -> IO Bool) -> Jet a -> Jet a
filterIO :: forall a. (a -> IO Bool) -> Jet a -> Jet a
filterIO a -> IO Bool
p (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let step' :: s -> a -> IO s
step' s
s a
a = do
        Bool
shouldPass <- a -> IO Bool
p a
a
        if
            | Bool
shouldPass -> do
              !s
s' <- s -> a -> IO s
step s
s a
a
              s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s'
            | Bool
otherwise ->
              s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
  (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f s -> Bool
stop s -> a -> IO s
step' s
initial

-- | Behaves like a combination of 'fmap' and 'foldl'; it applies a function to

-- each element of a structure passing an accumulating parameter from left to right.

--

-- The resulting 'Jet' has the same number of elements as the original one.

--

-- Unlike 'Data.Traversable.mapAccumL', it doesn't make the final state available. 

--

-- >>> J.each [1,2,3,4] & J.mapAccum (\a b -> (a + b,a)) 0 & J.toList

-- [0,1,3,6]

--

mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c
mapAccum :: forall a b c. (a -> b -> (a, c)) -> a -> Jet b -> Jet c
mapAccum a -> b -> (a, c)
stepAcc = (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
forall a b c. (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
mapAccumIO (((b -> (a, c)) -> b -> IO (a, c))
-> (a -> b -> (a, c)) -> a -> b -> IO (a, c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((a, c) -> IO (a, c)) -> (b -> (a, c)) -> b -> IO (a, c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, c) -> IO (a, c)
forall (f :: * -> *) a. Applicative f => a -> f a
pure) a -> b -> (a, c)
stepAcc)

mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
mapAccumIO :: forall a b c. (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
mapAccumIO a -> b -> IO (a, c)
stepAcc a
initialAcc (Jet forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> c -> IO s) -> s -> IO s) -> Jet c
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> c -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair a s -> b -> IO (Pair a s)
step' (Pair a
acc s
s) b
b = do
        (a
acc', c
c) <- a -> b -> IO (a, c)
stepAcc a
acc b
b
        !s
s' <- s -> c -> IO s
step s
s c
c
        Pair a s -> IO (Pair a s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> s -> Pair a s
forall a b. a -> b -> Pair a b
Pair a
acc' s
s')
      initial' :: Pair a s
initial' = a -> s -> Pair a s
forall a b. a -> b -> Pair a b
Pair a
initialAcc s
initial
  Pair a
_ s
final <- (Pair a s -> Bool)
-> (Pair a s -> b -> IO (Pair a s)) -> Pair a s -> IO (Pair a s)
forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f Pair a s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair a s -> b -> IO (Pair a s)
step' Pair a s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

data Touched = 
      NotYetTouched
    | AlreadyTouched

-- TODO: there's a bug here!!!!


-- | 

-- >>> J.each "abc" & J.intersperse '-' & J.toList

-- "a-b-c"

--

intersperse :: a -> Jet a -> Jet a
intersperse :: forall a. a -> Jet a -> Jet a
intersperse a
intrusion (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair Touched s -> a -> IO (Pair Touched s)
step' (Pair Touched
AlreadyTouched s
s) a
a = do
        !s
s' <- s -> a -> IO s
step s
s a
intrusion
        if 
            | s -> Bool
stop s
s' ->
                Pair Touched s -> IO (Pair Touched s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Touched -> s -> Pair Touched s
forall a b. a -> b -> Pair a b
Pair Touched
AlreadyTouched s
s')
            | Bool
otherwise -> do
                !s
s'' <- s -> a -> IO s
step s
s' a
a
                Pair Touched s -> IO (Pair Touched s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Touched -> s -> Pair Touched s
forall a b. a -> b -> Pair a b
Pair Touched
AlreadyTouched s
s'')
      step' (Pair Touched
NotYetTouched s
s) a
a = do
        !s
s' <- s -> a -> IO s
step s
s a
a
        Pair Touched s -> IO (Pair Touched s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Touched -> s -> Pair Touched s
forall a b. a -> b -> Pair a b
Pair Touched
AlreadyTouched s
s')
      initial' :: Pair Touched s
initial' = Touched -> s -> Pair Touched s
forall a b. a -> b -> Pair a b
Pair Touched
NotYetTouched s
initial
  Pair Touched
_ s
final <- (Pair Touched s -> Bool)
-> (Pair Touched s -> a -> IO (Pair Touched s))
-> Pair Touched s
-> IO (Pair Touched s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream Pair Touched s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair Touched s -> a -> IO (Pair Touched s)
step' Pair Touched s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

-- | 

-- >>> J.each "abc" & J.zip [1..] & J.toList

-- [(1,'a'),(2,'b'),(3,'c')]

--

-- >>> J.each [1..] & J.zip "abc" & J.toList

-- [('a',1),('b',2),('c',3)]

--

zip :: Foldable f => f a -> Jet b -> Jet (a, b)
zip :: forall (f :: * -> *) a b. Foldable f => f a -> Jet b -> Jet (a, b)
zip = (a -> b -> (a, b)) -> f a -> Jet b -> Jet (a, b)
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> c) -> f a -> Jet b -> Jet c
zipWith (,)

zipWith :: Foldable f => (a -> b -> c) -> f a -> Jet b -> Jet c
zipWith :: forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> c) -> f a -> Jet b -> Jet c
zipWith a -> b -> c
zf (f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList -> [a]
as0) = (a -> b -> IO c) -> [IO a] -> Jet b -> Jet c
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO (((b -> c) -> b -> IO c) -> (a -> b -> c) -> a -> b -> IO c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> IO c) -> (b -> c) -> b -> IO c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap c -> IO c
forall (f :: * -> *) a. Applicative f => a -> f a
pure) a -> b -> c
zf) ((a -> IO a) -> [a] -> [IO a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
as0)

zipIO :: Foldable f => f (IO a) -> Jet b -> Jet (a, b)
zipIO :: forall (f :: * -> *) a b.
Foldable f =>
f (IO a) -> Jet b -> Jet (a, b)
zipIO = (a -> b -> IO (a, b)) -> f (IO a) -> Jet b -> Jet (a, b)
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO (\a
x b
y -> (a, b) -> IO (a, b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
x, b
y))

-- |

-- Zips a list of 'IO' actions with a 'Jet', where the combining function can also have effects.

--

-- If the list of actions is exhausted, the 'Jet' stops:

--

-- >>> J.each [1..] <&> show & zipWithIO (\c1 c2 -> putStrLn (c1 ++ c2)) [pure "a", pure "b"] & J.toList

-- a1

-- b2

-- [(),()]

--

zipWithIO :: Foldable f => (a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO :: forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO a -> b -> IO c
zf (f (IO a) -> [IO a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList -> [IO a]
ioas0) (Jet forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> c -> IO s) -> s -> IO s) -> Jet c
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> c -> IO s
step s
initial -> do
  let stop' :: Pair [a] s -> Bool
stop' (Pair [] s
_) = Bool
True
      stop' (Pair [a]
_ s
s) = s -> Bool
stop s
s
      step' :: Pair [IO a] s -> b -> IO (Pair [IO a] s)
step' (Pair (IO a
ioa : [IO a]
ioas) s
s) b
b = do
        a
a <- IO a
ioa
        c
z <- a -> b -> IO c
zf a
a b
b
        !s
s' <- s -> c -> IO s
step s
s c
z
        Pair [IO a] s -> IO (Pair [IO a] s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([IO a] -> s -> Pair [IO a] s
forall a b. a -> b -> Pair a b
Pair [IO a]
ioas s
s')
      step' (Pair [] s
_) b
_ = String -> IO (Pair [IO a] s)
forall a. HasCallStack => String -> a
error String
"never happens"
      initial' :: Pair [IO a] s
initial' = [IO a] -> s -> Pair [IO a] s
forall a b. a -> b -> Pair a b
Pair [IO a]
ioas0 s
initial
  Pair [IO a]
_ s
final <- (Pair [IO a] s -> Bool)
-> (Pair [IO a] s -> b -> IO (Pair [IO a] s))
-> Pair [IO a] s
-> IO (Pair [IO a] s)
forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f Pair [IO a] s -> Bool
forall {a}. Pair [a] s -> Bool
stop' Pair [IO a] s -> b -> IO (Pair [IO a] s)
step' Pair [IO a] s
initial'
  s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final


-- | Opens a file and makes the 'Handle' available to all following statements

-- in the do-block.

--

-- Notice that it's often simpler to use the 'JetSource' (for reading) and

-- 'JetSink' (for writing) instances of 'File'.

withFile :: FilePath -> IOMode -> Jet Handle
withFile :: String -> IOMode -> Jet Handle
withFile String
path IOMode
iomode = forall resource.
(forall x. (resource -> IO x) %1 -> IO x) -> Jet resource
control @Handle (forall resource.
(forall x. (resource -> IO x) -> IO x)
-> forall x. (resource -> IO x) %1 -> IO x
unsafeCoerceControl @Handle (String -> IOMode -> (Handle -> IO x) -> IO x
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
System.IO.withFile String
path IOMode
iomode))

-- |

--

-- >>> :{

-- do r <- J.bracket (putStrLn "allocating" *> pure "foo") (\r -> putStrLn $ "deallocating " ++ r)

--    liftIO $ putStrLn $ "using resource " ++ r

-- & drain

-- :}

-- allocating

-- using resource foo

-- deallocating foo

--

bracket :: forall a b . IO a -- ^ allocator

        -> (a -> IO b) -- ^ finalizer

        -> Jet a
bracket :: forall a b. IO a -> (a -> IO b) -> Jet a
bracket IO a
allocate a -> IO b
free = forall resource.
(forall x. (resource -> IO x) %1 -> IO x) -> Jet resource
control @a (forall resource.
(forall x. (resource -> IO x) -> IO x)
-> forall x. (resource -> IO x) %1 -> IO x
unsafeCoerceControl @a (IO a -> (a -> IO b) -> (a -> IO x) -> IO x
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Control.Exception.bracket IO a
allocate a -> IO b
free))

bracket_ :: forall a b . IO a -- ^ allocator

         -> IO b -- ^ finalizer 

         -> Jet ()
bracket_ :: forall a b. IO a -> IO b -> Jet ()
bracket_ IO a
allocate IO b
free = (forall x. IO x %1 -> IO x) -> Jet ()
control_ ((forall x. IO x -> IO x) -> forall x. IO x %1 -> IO x
unsafeCoerceControl_ (IO a -> IO b -> IO x -> IO x
forall a b c. IO a -> IO b -> IO c -> IO c
Control.Exception.bracket_ IO a
allocate IO b
free))

bracketOnError :: forall a b . IO a -- ^ allocator

               -> (a -> IO b) -- ^ finalizer

               -> Jet a
bracketOnError :: forall a b. IO a -> (a -> IO b) -> Jet a
bracketOnError IO a
allocate a -> IO b
free = forall resource.
(forall x. (resource -> IO x) %1 -> IO x) -> Jet resource
control @a (forall resource.
(forall x. (resource -> IO x) -> IO x)
-> forall x. (resource -> IO x) %1 -> IO x
unsafeCoerceControl @a (IO a -> (a -> IO b) -> (a -> IO x) -> IO x
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Control.Exception.bracketOnError IO a
allocate a -> IO b
free))

-- | 

--

-- Notice how the finalizer runs even when we limit the 'Jet':

--

-- >>> :{ 

-- do J.finally (putStrLn "hi") -- protects statements below

--    liftIO (putStrLn "hey")

--    J.each "abc" 

-- & J.limit 2 

-- & J.toList

-- :}

-- hey

-- hi

-- "ab"

--

-- But if the protected 'Jet' is not consumed at all, the finalizer might not run.

--

-- >>> :{ 

-- do J.finally (putStrLn "hi") -- protects statements below 

--    liftIO (putStrLn "hey") 

--    J.each "abc" 

-- & J.limit 0 

-- & J.toList

-- :}

-- ""

--

finally :: IO a -> Jet ()
finally :: forall a. IO a -> Jet ()
finally IO a
afterward =
    (forall x. IO x %1 -> IO x) -> Jet ()
control_ ((forall x. IO x -> IO x) -> forall x. IO x %1 -> IO x
unsafeCoerceControl_ ((IO x -> IO a -> IO x) -> IO a -> IO x -> IO x
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO x -> IO a -> IO x
forall a b. IO a -> IO b -> IO a
Control.Exception.finally IO a
afterward))

onException :: IO a -> Jet ()
onException :: forall a. IO a -> Jet ()
onException IO a
afterward =
    (forall x. IO x %1 -> IO x) -> Jet ()
control_ ((forall x. IO x -> IO x) -> forall x. IO x %1 -> IO x
unsafeCoerceControl_ ((IO x -> IO a -> IO x) -> IO a -> IO x -> IO x
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO x -> IO a -> IO x
forall a b. IO a -> IO b -> IO a
Control.Exception.onException IO a
afterward))

-- | Lift a control operation (like 'Control.Exception.bracket') for which the

-- callback uses the allocated resource.

control :: forall resource. (forall x. (resource -> IO x) %1 -> IO x) -> Jet resource
control :: forall resource.
(forall x. (resource -> IO x) %1 -> IO x) -> Jet resource
control forall x. (resource -> IO x) %1 -> IO x
f =
  (forall s. (s -> Bool) -> (s -> resource -> IO s) -> s -> IO s)
-> Jet resource
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> resource -> IO s
step s
initial ->
    if
        | s -> Bool
stop s
initial ->
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
        | Bool
otherwise -> do
          (resource -> IO s) %1 -> IO s
forall x. (resource -> IO x) %1 -> IO x
f (s -> resource -> IO s
step s
initial)

-- | Lift a control operation (like 'Control.Exception.finally') for which the

-- callback doesn't use the allocated resource.

control_ :: (forall x. IO x %1-> IO x) -> Jet ()
control_ :: (forall x. IO x %1 -> IO x) -> Jet ()
control_ forall x. IO x %1 -> IO x
f =
  (forall s. (s -> Bool) -> (s -> () -> IO s) -> s -> IO s) -> Jet ()
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> () -> IO s
step s
initial ->
    if
        | s -> Bool
stop s
initial -> do
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
        | Bool
otherwise -> do
          IO s %1 -> IO s
forall x. IO x %1 -> IO x
f (s -> () -> IO s
step s
initial ())

-- | \"morally\", all control operations compatible with this library should

-- execute the callback only once, which means that they should have a linear

-- type. But because linear types are not widespread, they usually are given a

-- less precise non-linear type. If you know what you are doing, use this

-- function to give them a linear type.

unsafeCoerceControl :: forall resource . (forall x. (resource -> IO x) -> IO x) -> (forall x. (resource -> IO x) %1 -> IO x)
unsafeCoerceControl :: forall resource.
(forall x. (resource -> IO x) -> IO x)
-> forall x. (resource -> IO x) %1 -> IO x
unsafeCoerceControl forall x. (resource -> IO x) -> IO x
f = ((resource -> IO Any) -> IO Any) -> (resource -> IO x) %1 -> IO x
forall a b. a -> b
Unsafe.Coerce.unsafeCoerce (resource -> IO Any) -> IO Any
forall x. (resource -> IO x) -> IO x
f

-- | Line 'unsafeCoerceControl', for when the callback doesn't use the

-- allocated resource.

unsafeCoerceControl_ :: (forall x. IO x -> IO x) -> (forall x. IO x %1 -> IO x)
unsafeCoerceControl_ :: (forall x. IO x -> IO x) -> forall x. IO x %1 -> IO x
unsafeCoerceControl_ forall x. IO x -> IO x
f = (IO Any -> IO Any) -> IO x %1 -> IO x
forall a b. a -> b
Unsafe.Coerce.unsafeCoerce IO Any -> IO Any
forall x. IO x -> IO x
f

-- | 

--

-- >>> L.purely (J.fold (J.each "abc")) ((,) <$> L.list <*> L.length)

-- ("abc",3)

--

fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r
fold :: forall a s r. Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r
fold (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) s -> a -> s
step s
initial s -> r
coda = do
  s
r <- (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> s -> Bool
forall a b. a -> b -> a
const Bool
False) (((a -> s) -> a -> IO s) -> (s -> a -> s) -> s -> a -> IO s
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((s -> IO s) -> (a -> s) -> a -> IO s
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure) s -> a -> s
step) s
initial
  pure $ s -> r
coda s
r

-- |

-- >>> L.impurely (J.foldIO (J.each "abc")) (L.FoldM (\() c -> putStrLn [c]) (pure ()) pure *> L.generalize L.length)

-- a

-- b

-- c

-- 3

--

foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r
foldIO :: forall a s r.
Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r
foldIO (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) s -> a -> IO s
step IO s
initialIO s -> IO r
coda = do
  s
initial <- IO s
initialIO
  s
r <- (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> s -> Bool
forall a b. a -> b -> a
const Bool
False) s -> a -> IO s
step s
initial
  s -> IO r
coda s
r


-- Byte Jets


-- https://stackoverflow.com/questions/49852060/how-to-choose-chunk-size-when-reading-a-large-file

-- https://askubuntu.com/questions/641900/how-file-system-block-size-works

-- https://stackoverflow.com/questions/1111661/8192-bytes-when-creating-file

data ChunkSize =
      DefaultChunkSize
    | ChunkSize Int
    | ChunkSize1K
    | ChunkSize4K
    | ChunkSize8K
    | ChunkSize16K
    | ChunkSize1M
    | ChunkSize2M
    deriving Int -> ChunkSize -> ShowS
[ChunkSize] -> ShowS
ChunkSize -> String
(Int -> ChunkSize -> ShowS)
-> (ChunkSize -> String)
-> ([ChunkSize] -> ShowS)
-> Show ChunkSize
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ChunkSize] -> ShowS
$cshowList :: [ChunkSize] -> ShowS
show :: ChunkSize -> String
$cshow :: ChunkSize -> String
showsPrec :: Int -> ChunkSize -> ShowS
$cshowsPrec :: Int -> ChunkSize -> ShowS
Show

chunkSize :: ChunkSize -> Int
chunkSize :: ChunkSize -> Int
chunkSize = \case
    ChunkSize
DefaultChunkSize -> Int
8192
    ChunkSize Int
c -> Int
c 
    ChunkSize
ChunkSize1K -> Int
1024
    ChunkSize
ChunkSize4K -> Int
4096
    ChunkSize
ChunkSize8K -> Int
8192
    ChunkSize
ChunkSize16K -> Int
16384 
    ChunkSize
ChunkSize1M -> Int
1048576
    ChunkSize
ChunkSize2M -> Int
2097152

-- | Helper multi-parameter typeclass for creating 'Jet' values out of a

--   variety of common sources.

--

--   Because there's no functional dependency, sometimes we need to use

--   @TypeApplications@ to give the compiler a hint about the type of elements

--   we want to produce. For example, here we want 'Line's and not, say,

--   'ByteString's:

--

-- >>> action = J.jet @Line (File "foo.txt") & J.sink J.stdout

--

--

class JetSource a source where
    jet :: source -> Jet a 

bytes :: ChunkSize -> Handle -> Jet ByteString
bytes :: ChunkSize -> Handle -> Jet ByteString
bytes (ChunkSize -> Int
chunkSize -> Int
count) Handle
handle =
    (Handle -> IO Bool)
-> (Handle -> IO ByteString) -> Handle -> Jet ByteString
forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF Handle -> IO Bool
System.IO.hIsEOF ((Handle -> Int -> IO ByteString) -> Int -> Handle -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Handle -> Int -> IO ByteString
B.hGetSome Int
count) Handle
handle

instance JetSource ByteString Handle where
    jet :: Handle -> Jet ByteString
jet = ChunkSize -> Handle -> Jet ByteString
bytes ChunkSize
DefaultChunkSize

instance JetSource a Handle => JetSource a File where
    jet :: File -> Jet a
jet (File String
path) = do
        Handle
handle <- String -> IOMode -> Jet Handle
withFile String
path IOMode
ReadMode
        Handle -> Jet a
forall a source. JetSource a source => source -> Jet a
jet Handle
handle

accumByteLengths :: Jet ByteString -> Jet (Int,ByteString)
accumByteLengths :: Jet ByteString -> Jet (Int, ByteString)
accumByteLengths = (Int -> ByteString -> (Int, (Int, ByteString)))
-> Int -> Jet ByteString -> Jet (Int, ByteString)
forall a b c. (a -> b -> (a, c)) -> a -> Jet b -> Jet c
mapAccum (\Int
acc ByteString
bytes -> let acc' :: Int
acc' = Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
B.length ByteString
bytes in (Int
acc',(Int
acc',ByteString
bytes))) (Int
0 :: Int)

data AmIContinuing = Continuing
                   | NotContinuing deriving Int -> AmIContinuing -> ShowS
[AmIContinuing] -> ShowS
AmIContinuing -> String
(Int -> AmIContinuing -> ShowS)
-> (AmIContinuing -> String)
-> ([AmIContinuing] -> ShowS)
-> Show AmIContinuing
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AmIContinuing] -> ShowS
$cshowList :: [AmIContinuing] -> ShowS
show :: AmIContinuing -> String
$cshow :: AmIContinuing -> String
showsPrec :: Int -> AmIContinuing -> ShowS
$cshowsPrec :: Int -> AmIContinuing -> ShowS
Show

-- | Splits a stream of bytes into groups bounded by maximum byte sizes. When

-- one group \"fills up\", the next one is started.

--

-- When the list of buckets sizes is exhausted, all incoming bytes are put into

-- the same unbounded group.

--

-- Useful in combination with 'recast'.

bytesOverBuckets :: [Int] -> Splitter ByteString ByteString
bytesOverBuckets :: [Int] -> Splitter ByteString ByteString
bytesOverBuckets [Int]
buckets0 = (Pair AmIContinuing [Int]
 -> ByteString
 -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int]))
-> (Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString))
-> IO (Pair AmIContinuing [Int])
-> Splitter ByteString ByteString
forall s a b.
(s -> a -> IO (b, s)) -> (s -> IO b) -> IO s -> MealyIO a b
MealyIO Pair AmIContinuing [Int]
-> ByteString
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString)
forall a. Monoid a => a
mempty (Pair AmIContinuing [Int] -> IO (Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets0))
    where
    -- logStep s@(Pair c zzz) a = do

    --     putStrLn "foooo!"

    --     System.IO.hFlush System.IO.stdout

    --     traceIO ("state: " ++ show c)

    --     traceIO ("bucket: " ++ show (Prelude.take 2 zzz))

    --     traceIO ("input: " ++ show a)

    --     r@(nexts, _) <- step s a

    --     traceIO ("output: " ++ show nexts)

    --     pure r

    step :: Pair AmIContinuing [Int] -> ByteString -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
    step :: Pair AmIContinuing [Int]
-> ByteString
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step Pair AmIContinuing [Int]
splitterState ByteString
b = do
        (SplitStepResult ByteString
continueResult, Pair AmIContinuing
continuing' [Int]
buckets', ByteString
b') <- Pair AmIContinuing [Int]
-> ByteString
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
continue Pair AmIContinuing [Int]
splitterState ByteString
b
        if | ByteString -> Bool
B.null ByteString
b' -> 
             (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SplitStepResult ByteString
continueResult, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
continuing' [Int]
buckets') 
           | Bool
otherwise ->  do
             (SplitStepResult ByteString
entiresResult, Pair AmIContinuing [Int]
splitterState') <- DList ByteString
-> ByteString
-> [Int]
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
makeEntires DList ByteString
forall a. Monoid a => a
mempty ByteString
b' [Int]
buckets'
             (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SplitStepResult ByteString
continueResult SplitStepResult ByteString
-> SplitStepResult ByteString -> SplitStepResult ByteString
forall a. Semigroup a => a -> a -> a
<> SplitStepResult ByteString
entiresResult, Pair AmIContinuing [Int]
splitterState')
    continue :: Pair AmIContinuing [Int] -> ByteString -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
    continue :: Pair AmIContinuing [Int]
-> ByteString
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
continue (Pair AmIContinuing
NotContinuing []) ByteString
b = (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b , AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [] , ByteString
B.empty)
    continue (Pair AmIContinuing
Continuing []) ByteString
b =    (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
b , AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing [] , ByteString
B.empty)
    continue (Pair AmIContinuing
NotContinuing (Int
bucket : [Int]
buckets)) ByteString
b = do
        let blen :: Int
blen = ByteString -> Int
B.length ByteString
b
        -- traceIO ("b = " ++ show b ++ " bucket size= " ++ show bucket)

        (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
blen Int
bucket of
            Ordering
LT -> (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
blen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets), ByteString
B.empty)
            Ordering
EQ -> (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith (ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
b), AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
B.empty)
            Ordering
GT -> let (ByteString
left,ByteString
right) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
bucket ByteString
b
                   in (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith (ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
left), AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
right)  
    continue (Pair AmIContinuing
Continuing (Int
bucket : [Int]
buckets)) ByteString
b = do
        let blen :: Int
blen = ByteString -> Int
B.length ByteString
b
        (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
blen Int
bucket of
            Ordering
LT -> (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
blen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets), ByteString
B.empty)
            Ordering
EQ -> (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
B.empty)
            Ordering
GT -> let (ByteString
left,ByteString
right) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
bucket ByteString
b
                   in (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
left, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
right)  
    makeEntires :: DList ByteString -> ByteString -> [Int] -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
    makeEntires :: DList ByteString
-> ByteString
-> [Int]
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
makeEntires DList ByteString
acc ByteString
b []                 = (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith DList ByteString
acc SplitStepResult ByteString
-> SplitStepResult ByteString -> SplitStepResult ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing [])
    makeEntires DList ByteString
acc ByteString
b (Int
bucket : [Int]
buckets) = do
        let blen :: Int
blen = ByteString -> Int
B.length ByteString
b
        case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
blen Int
bucket of
            Ordering
LT -> (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith DList ByteString
acc SplitStepResult ByteString
-> SplitStepResult ByteString -> SplitStepResult ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
blen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets))
            Ordering
EQ -> (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith (DList ByteString
acc DList ByteString -> DList ByteString -> DList ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
b), AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets)
            Ordering
GT -> do let (ByteString
left,ByteString
right) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
bucket ByteString
b
                     DList ByteString
-> ByteString
-> [Int]
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
makeEntires (DList ByteString
acc DList ByteString -> DList ByteString -> DList ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
left) ByteString
right [Int]
buckets -- non-terminal

    continueWith :: b -> SplitStepResult b
continueWith b
b = SplitStepResult b
forall a. Monoid a => a
mempty { continuationOfPreviouslyStartedGroup :: [b]
continuationOfPreviouslyStartedGroup = [b
b] }
    entireWith :: DList b -> SplitStepResult b
entireWith DList b
bdf = SplitStepResult b
forall a. Monoid a => a
mempty { entireGroups :: [[b]]
entireGroups = (b -> [b]) -> [b] -> [[b]]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList b -> [b]
forall a. DList a -> [a]
closeDList DList b
bdf) }
    nextWith :: b -> SplitStepResult b
nextWith b
b = SplitStepResult b
forall a. Monoid a => a
mempty { startOfNewGroup :: [b]
startOfNewGroup = [b
b] }

-- | A sequence of bytes that we might want to keep together.

newtype ByteBundle = ByteBundle BL.ByteString deriving newtype (Int -> ByteBundle -> ShowS
[ByteBundle] -> ShowS
ByteBundle -> String
(Int -> ByteBundle -> ShowS)
-> (ByteBundle -> String)
-> ([ByteBundle] -> ShowS)
-> Show ByteBundle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ByteBundle] -> ShowS
$cshowList :: [ByteBundle] -> ShowS
show :: ByteBundle -> String
$cshow :: ByteBundle -> String
showsPrec :: Int -> ByteBundle -> ShowS
$cshowsPrec :: Int -> ByteBundle -> ShowS
Show, NonEmpty ByteBundle -> ByteBundle
ByteBundle -> ByteBundle -> ByteBundle
(ByteBundle -> ByteBundle -> ByteBundle)
-> (NonEmpty ByteBundle -> ByteBundle)
-> (forall b. Integral b => b -> ByteBundle -> ByteBundle)
-> Semigroup ByteBundle
forall b. Integral b => b -> ByteBundle -> ByteBundle
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
stimes :: forall b. Integral b => b -> ByteBundle -> ByteBundle
$cstimes :: forall b. Integral b => b -> ByteBundle -> ByteBundle
sconcat :: NonEmpty ByteBundle -> ByteBundle
$csconcat :: NonEmpty ByteBundle -> ByteBundle
<> :: ByteBundle -> ByteBundle -> ByteBundle
$c<> :: ByteBundle -> ByteBundle -> ByteBundle
Semigroup, Semigroup ByteBundle
ByteBundle
Semigroup ByteBundle
-> ByteBundle
-> (ByteBundle -> ByteBundle -> ByteBundle)
-> ([ByteBundle] -> ByteBundle)
-> Monoid ByteBundle
[ByteBundle] -> ByteBundle
ByteBundle -> ByteBundle -> ByteBundle
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
mconcat :: [ByteBundle] -> ByteBundle
$cmconcat :: [ByteBundle] -> ByteBundle
mappend :: ByteBundle -> ByteBundle -> ByteBundle
$cmappend :: ByteBundle -> ByteBundle -> ByteBundle
mempty :: ByteBundle
$cmempty :: ByteBundle
Monoid)

-- | Constructs a 'ByteBundle' out of the bytes of some 'Foldable' container.

bundle :: Foldable f => f ByteString -> ByteBundle
bundle :: forall (f :: * -> *). Foldable f => f ByteString -> ByteBundle
bundle = ByteString -> ByteBundle
ByteBundle (ByteString -> ByteBundle)
-> (f ByteString -> ByteString) -> f ByteString -> ByteBundle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
BL.fromChunks ([ByteString] -> ByteString)
-> (f ByteString -> [ByteString]) -> f ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. f ByteString -> [ByteString]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList

-- | Length in bytes.

bundleLength :: ByteBundle -> Int
bundleLength :: ByteBundle -> Int
bundleLength (ByteBundle ByteString
value) = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int64
BL.length ByteString
value) -- Int64, but unlikely we'll reach the limit


bundleBytes :: ByteBundle -> Jet ByteString
bundleBytes :: ByteBundle -> Jet ByteString
bundleBytes (ByteBundle ByteString
value) = [ByteString] -> Jet ByteString
forall a (f :: * -> *). Foldable f => f a -> Jet a
each (ByteString -> [ByteString]
BL.toChunks ByteString
value)

-- | Exception thrown when we try to write too much data in a size-bounded destination.

data BucketOverflow = BucketOverflow
  deriving (Int -> BucketOverflow -> ShowS
[BucketOverflow] -> ShowS
BucketOverflow -> String
(Int -> BucketOverflow -> ShowS)
-> (BucketOverflow -> String)
-> ([BucketOverflow] -> ShowS)
-> Show BucketOverflow
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BucketOverflow] -> ShowS
$cshowList :: [BucketOverflow] -> ShowS
show :: BucketOverflow -> String
$cshow :: BucketOverflow -> String
showsPrec :: Int -> BucketOverflow -> ShowS
$cshowsPrec :: Int -> BucketOverflow -> ShowS
Show, Typeable)

instance Exception BucketOverflow

-- | Splits a stream of 'ByteBundles' into groups bounded by maximum byte

-- sizes.  Bytes belonging to the same 'ByteBundle' are always put in the same

-- group. When one group \"fills up\", the next one is started.

--

-- When the list of buckets sizes is exhausted, all incoming bytes are put into

-- the same unbounded group.

--

-- Useful in combination with 'recast'.

--

-- __THROWS__: 

--

-- * 'BucketOverflow' exception if the size bound of a group turns out to be

-- too small for holding even a single 'ByteBundle' value.

--

--

byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString
byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString
byteBundlesOverBuckets [Int]
buckets0 = (Pair AmIContinuing [Int]
 -> ByteBundle
 -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int]))
-> (Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString))
-> IO (Pair AmIContinuing [Int])
-> Splitter ByteBundle ByteString
forall s a b.
(s -> a -> IO (b, s)) -> (s -> IO b) -> IO s -> MealyIO a b
MealyIO Pair AmIContinuing [Int]
-> ByteBundle
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString)
forall a. Monoid a => a
mempty (Pair AmIContinuing [Int] -> IO (Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets0))
    where
    step :: Pair AmIContinuing [Int] -> ByteBundle -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
    step :: Pair AmIContinuing [Int]
-> ByteBundle
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step (Pair AmIContinuing
splitterState []) (ByteBundle ByteString
pieces) = 
        -- We assume [] means "infinite bucket" so once we enter it we'll only be able to continue. 

        (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( case AmIContinuing
splitterState of
                 AmIContinuing
Continuing -> ByteString -> SplitStepResult ByteString
continueWith ByteString
pieces
                 AmIContinuing
NotContinuing -> ByteString -> SplitStepResult ByteString
nextWith ByteString
pieces 
             , AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing [])
    step (Pair AmIContinuing
splitterState (Int
bucket : [Int]
buckets)) e :: ByteBundle
e@(ByteBundle ByteString
pieces) = do
        let elen :: Int
elen = ByteBundle -> Int
bundleLength ByteBundle
e
        case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
elen Int
bucket of
            Ordering
LT -> (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( case AmIContinuing
splitterState of
                             AmIContinuing
Continuing -> ByteString -> SplitStepResult ByteString
continueWith ByteString
pieces
                             AmIContinuing
NotContinuing -> ByteString -> SplitStepResult ByteString
nextWith ByteString
pieces
                       , AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
elen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets) )
            Ordering
EQ -> (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( case AmIContinuing
splitterState of
                            AmIContinuing
Continuing -> ByteString -> SplitStepResult ByteString
continueWith ByteString
pieces
                            AmIContinuing
NotContinuing -> ByteString -> SplitStepResult ByteString
entireWith ByteString
pieces 
                       ,  AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets )
            -- NB: It's possible to close a bucket and open the next one in the same iteration.

            Ordering
GT -> case AmIContinuing
splitterState of
                AmIContinuing
Continuing -> Pair AmIContinuing [Int]
-> ByteBundle
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step (AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets) ByteBundle
e
                -- If we are not continuing, that means that the brand-new bucket hasn't 

                -- enough space to hold a single entity. 

                AmIContinuing
NotContinuing -> BucketOverflow
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall e a. Exception e => e -> IO a
throwIO BucketOverflow
BucketOverflow
    continueWith :: ByteString -> SplitStepResult ByteString
continueWith ByteString
bs = SplitStepResult ByteString
forall a. Monoid a => a
mempty { continuationOfPreviouslyStartedGroup :: [ByteString]
continuationOfPreviouslyStartedGroup = ByteString -> [ByteString]
BL.toChunks ByteString
bs }
    entireWith :: ByteString -> SplitStepResult ByteString
entireWith ByteString
pieces = SplitStepResult ByteString
forall a. Monoid a => a
mempty { entireGroups :: [[ByteString]]
entireGroups = [ByteString -> [ByteString]
BL.toChunks ByteString
pieces] }
    nextWith :: ByteString -> SplitStepResult ByteString
nextWith ByteString
bs = SplitStepResult ByteString
forall a. Monoid a => a
mempty { startOfNewGroup :: [ByteString]
startOfNewGroup = ByteString -> [ByteString]
BL.toChunks ByteString
bs }

-- | Uses the default system locale.

instance JetSource Line Handle where
    jet :: Handle -> Jet Line
jet Handle
handle = 
        Text -> Line
textToLine (Text -> Line) -> Jet Text -> Jet Line
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Handle -> IO Bool) -> (Handle -> IO Text) -> Handle -> Jet Text
forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF Handle -> IO Bool
System.IO.hIsEOF Handle -> IO Text
T.hGetLine Handle
handle

--

--

-- Text Jets


-- | 

-- __THROWS__: 

--

-- * 'T.UnicodeException'

decodeUtf8 :: Jet ByteString -> Jet Text
decodeUtf8 :: Jet ByteString -> Jet Text
decodeUtf8 (Jet forall s. (s -> Bool) -> (s -> ByteString -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> Text -> IO s) -> s -> IO s)
-> Jet Text
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> Text -> IO s
step s
initial -> do
    let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
        step' :: Pair a s -> ByteString -> IO (Pair (ByteString -> Decoding) s)
step' (Pair a
leftovers s
s) ByteString
bytes = do
            T.Some !Text
text !ByteString
_ !ByteString -> Decoding
leftovers' <- Decoding -> IO Decoding
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Decoding -> IO Decoding) -> Decoding -> IO Decoding
forall a b. (a -> b) -> a -> b
$ ByteString -> Decoding
T.streamDecodeUtf8 ByteString
bytes
            !s
s' <- s -> Text -> IO s
step s
s Text
text
            Pair (ByteString -> Decoding) s
-> IO (Pair (ByteString -> Decoding) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ByteString -> Decoding) -> s -> Pair (ByteString -> Decoding) s
forall a b. a -> b -> Pair a b
Pair ByteString -> Decoding
leftovers' s
s')
        initial' :: Pair (ByteString -> Decoding) s
initial' = (ByteString -> Decoding) -> s -> Pair (ByteString -> Decoding) s
forall a b. a -> b -> Pair a b
Pair ByteString -> Decoding
leftovers0 s
initial
    Pair ByteString -> Decoding
leftovers s
final <-  (Pair (ByteString -> Decoding) s -> Bool)
-> (Pair (ByteString -> Decoding) s
    -> ByteString -> IO (Pair (ByteString -> Decoding) s))
-> Pair (ByteString -> Decoding) s
-> IO (Pair (ByteString -> Decoding) s)
forall s. (s -> Bool) -> (s -> ByteString -> IO s) -> s -> IO s
f Pair (ByteString -> Decoding) s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair (ByteString -> Decoding) s
-> ByteString -> IO (Pair (ByteString -> Decoding) s)
forall {a}.
Pair a s -> ByteString -> IO (Pair (ByteString -> Decoding) s)
step' Pair (ByteString -> Decoding) s
initial'  
    T.Some !Text
_ !ByteString
bytes !ByteString -> Decoding
_ <- Decoding -> IO Decoding
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Decoding -> IO Decoding) -> Decoding -> IO Decoding
forall a b. (a -> b) -> a -> b
$ ByteString -> Decoding
T.streamDecodeUtf8 ByteString
B.empty
    if | Bool -> Bool
not (ByteString -> Bool
B.null ByteString
bytes) -> 
         UnicodeException -> IO s
forall e a. Exception e => e -> IO a
throwIO (String -> Maybe Word8 -> UnicodeException
T.DecodeError String
"Unconsumed leftovers at end." Maybe Word8
forall a. Maybe a
Nothing)
       | Bool
otherwise -> 
         s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final
  where 
    leftovers0 :: ByteString -> Decoding
leftovers0 = 
        let T.Some Text
_ ByteString
_ ByteString -> Decoding
g = ByteString -> Decoding
T.streamDecodeUtf8 ByteString
B.empty
         in ByteString -> Decoding
g

encodeUtf8 :: Jet Text -> Jet ByteString
encodeUtf8 :: Jet Text -> Jet ByteString
encodeUtf8 = (Text -> ByteString) -> Jet Text -> Jet ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> ByteString
T.encodeUtf8

-- | A line of text.

--

-- While it is guaranteed that the 'Line's coming out of the 'lines' function

-- do not contain newlines, that invariant is not otherwise enforced. 

newtype Line = Line_ TL.Text
    deriving newtype (Line -> Line -> Bool
(Line -> Line -> Bool) -> (Line -> Line -> Bool) -> Eq Line
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Line -> Line -> Bool
$c/= :: Line -> Line -> Bool
== :: Line -> Line -> Bool
$c== :: Line -> Line -> Bool
Eq,Eq Line
Eq Line
-> (Line -> Line -> Ordering)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Line)
-> (Line -> Line -> Line)
-> Ord Line
Line -> Line -> Bool
Line -> Line -> Ordering
Line -> Line -> Line
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Line -> Line -> Line
$cmin :: Line -> Line -> Line
max :: Line -> Line -> Line
$cmax :: Line -> Line -> Line
>= :: Line -> Line -> Bool
$c>= :: Line -> Line -> Bool
> :: Line -> Line -> Bool
$c> :: Line -> Line -> Bool
<= :: Line -> Line -> Bool
$c<= :: Line -> Line -> Bool
< :: Line -> Line -> Bool
$c< :: Line -> Line -> Bool
compare :: Line -> Line -> Ordering
$ccompare :: Line -> Line -> Ordering
Ord,NonEmpty Line -> Line
Line -> Line -> Line
(Line -> Line -> Line)
-> (NonEmpty Line -> Line)
-> (forall b. Integral b => b -> Line -> Line)
-> Semigroup Line
forall b. Integral b => b -> Line -> Line
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
stimes :: forall b. Integral b => b -> Line -> Line
$cstimes :: forall b. Integral b => b -> Line -> Line
sconcat :: NonEmpty Line -> Line
$csconcat :: NonEmpty Line -> Line
<> :: Line -> Line -> Line
$c<> :: Line -> Line -> Line
Semigroup,Semigroup Line
Line
Semigroup Line
-> Line
-> (Line -> Line -> Line)
-> ([Line] -> Line)
-> Monoid Line
[Line] -> Line
Line -> Line -> Line
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
mconcat :: [Line] -> Line
$cmconcat :: [Line] -> Line
mappend :: Line -> Line -> Line
$cmappend :: Line -> Line -> Line
mempty :: Line
$cmempty :: Line
Monoid,Int -> Line -> ShowS
[Line] -> ShowS
Line -> String
(Int -> Line -> ShowS)
-> (Line -> String) -> ([Line] -> ShowS) -> Show Line
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Line] -> ShowS
$cshowList :: [Line] -> ShowS
show :: Line -> String
$cshow :: Line -> String
showsPrec :: Int -> Line -> ShowS
$cshowsPrec :: Int -> Line -> ShowS
Show,String -> Line
(String -> Line) -> IsString Line
forall a. (String -> a) -> IsString a
fromString :: String -> Line
$cfromString :: String -> Line
IsString)

-- https://ghc.gitlab.haskell.org/ghc/doc/users_guide/exts/pattern_synonyms.html


-- | Unidirectional pattern that allows converting a 'Line' into a 'Text'

-- during pattern-matching.

pattern $mLine :: forall {r}. Line -> (Text -> r) -> (Void# -> r) -> r
Line text <- Line_ (TL.toStrict -> text)

-- | Converts a 'Line' back to text, without adding the newline.

lineToText :: Line -> Text
lineToText :: Line -> Text
lineToText (Line_ Text
text) = Text -> Text
TL.toStrict Text
text

-- | Converts a 'Line' to an utf8-encdoed 'ByteBundle', without adding the newline.

lineToUtf8 :: Line -> ByteBundle
lineToUtf8 :: Line -> ByteBundle
lineToUtf8 (Line_ Text
l) = Text -> [Text]
TL.toChunks Text
l [Text] -> (Text -> ByteString) -> [ByteString]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> Text -> ByteString
T.encodeUtf8 [ByteString] -> ([ByteString] -> ByteBundle) -> ByteBundle
forall a b. a -> (a -> b) -> b
& [ByteString] -> ByteBundle
forall (f :: * -> *). Foldable f => f ByteString -> ByteBundle
bundle

textToLine :: Text -> Line
textToLine :: Text -> Line
textToLine = Text -> Line
Line_ (Text -> Line) -> (Text -> Text) -> Text -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
TL.fromStrict

-- | @Data.Text.singleton '\\n'@

newline :: Text
newline :: Text
newline = Char -> Text
T.singleton Char
'\n'

textToUtf8 :: Text -> ByteBundle
textToUtf8 :: Text -> ByteBundle
textToUtf8 Text
t = ByteString -> ByteBundle
ByteBundle (Text
t Text -> (Text -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
& Text -> ByteString
T.encodeUtf8 ByteString -> (ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
& ByteString -> ByteString
BL.fromStrict)

lineContains :: Text -> Line -> Bool 
lineContains :: Text -> Line -> Bool
lineContains Text
t (Line_ Text
l)  = Text -> Text -> Bool
TL.isInfixOf (Text -> Text
TL.fromStrict Text
t) Text
l

lineBeginsWith :: Text -> Line -> Bool
lineBeginsWith :: Text -> Line -> Bool
lineBeginsWith Text
t (Line_ Text
l) = Text -> Text -> Bool
TL.isPrefixOf (Text -> Text
TL.fromStrict Text
t) Text
l

-- | Adds the 'Text' to the beginning of the 'Line'.

prefixLine :: Text -> Line -> Line
prefixLine :: Text -> Line -> Line
prefixLine Text
t (Line_ Text
l) = Text -> Line
Line_ ([Text] -> Text
TL.fromChunks (Text
t Text -> [Text] -> [Text]
forall a. a -> [a] -> [a]
: Text -> [Text]
TL.toChunks Text
l))

-- textToLine :: Text -> Line

-- textToLine text 

--     | Just _ <- T.find (=='\n') text = throw NewlineForbidden

--     | otherwise = Line_ (removeTrailingCarriageReturn text)


stringToLine :: String -> Line
stringToLine :: String -> Line
stringToLine = Text -> Line
Line_ (Text -> Line) -> (String -> Text) -> String -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
TL.pack

-- withLineText :: (Text -> r) -> Line -> r

-- withLineText f (Line text) = f text 


isEmptyLine :: Line -> Bool
isEmptyLine :: Line -> Bool
isEmptyLine (Line_ Text
text) = Text -> Bool
TL.null Text
text 

emptyLine :: Line
emptyLine :: Line
emptyLine = Text -> Line
Line_ Text
TL.empty

-- | Exception thrown when we find newlines in functions which don't accept them.

--

-- A direct copy of the @NewlineForbidden@ exception from the [turtle](https://hackage.haskell.org/package/turtle) package.

data NewlineForbidden = NewlineForbidden
  deriving (Int -> NewlineForbidden -> ShowS
[NewlineForbidden] -> ShowS
NewlineForbidden -> String
(Int -> NewlineForbidden -> ShowS)
-> (NewlineForbidden -> String)
-> ([NewlineForbidden] -> ShowS)
-> Show NewlineForbidden
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [NewlineForbidden] -> ShowS
$cshowList :: [NewlineForbidden] -> ShowS
show :: NewlineForbidden -> String
$cshow :: NewlineForbidden -> String
showsPrec :: Int -> NewlineForbidden -> ShowS
$cshowsPrec :: Int -> NewlineForbidden -> ShowS
Show, Typeable)

instance Exception NewlineForbidden

removeTrailingCarriageReturn :: Text -> Text
removeTrailingCarriageReturn :: Text -> Text
removeTrailingCarriageReturn Text
text 
    | Text -> Bool
T.null Text
text = Text
text
    | Text -> Char
T.last Text
text Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'\r' = Text -> Text
T.init Text
text
    | Bool
otherwise = Text
text

lines :: Jet Text -> Jet Line
lines :: Jet Text -> Jet Line
lines (Jet forall s. (s -> Bool) -> (s -> Text -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> Line -> IO s) -> s -> IO s)
-> Jet Line
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> Line -> IO s
step s
initial -> do
    let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
        findLinesInCurrentBlock :: Text -> [Line]
findLinesInCurrentBlock Text
text  
            | Text -> Bool
T.null Text
text =
              []
            | Bool
otherwise =
              (Text -> Line) -> [Text] -> [Line]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> Line
textToLine (Text -> Line) -> (Text -> Text) -> Text -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
removeTrailingCarriageReturn) (Text -> [Text]
T.lines Text
text)
              [Line] -> [Line] -> [Line]
forall a. [a] -> [a] -> [a]
++ 
              if
                  | Text -> Char
T.last Text
text Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'\n' -> 
                      [Line
forall a. Monoid a => a
mempty]
                  | Bool
otherwise -> 
                      []
        step' :: Pair (DList Line) s -> Text -> IO (Pair (DList Line) s)
step' (Pair DList Line
lineUnderConstruction s
s) (Text -> [Line]
findLinesInCurrentBlock -> [Line]
linesInCurrentBlock) = do
            case [Line]
linesInCurrentBlock of
                [] -> do
                    Pair (DList Line) s -> IO (Pair (DList Line) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair DList Line
lineUnderConstruction s
s)
                [Line
l] -> do
                    Pair (DList Line) s -> IO (Pair (DList Line) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair (DList Line
lineUnderConstruction DList Line -> DList Line -> DList Line
forall a. Semigroup a => a -> a -> a
<> Line -> DList Line
forall a. a -> DList a
singleton Line
l) s
s)
                Line
l : rest :: [Line]
rest@(Line
x : [Line]
xs) -> do
                    -- Ineficcient mconcat, better strictify a lazy text here?

                    let completedLine :: Line
completedLine = [Line] -> Line
forall a. Monoid a => [a] -> a
mconcat ([Line] -> Line) -> [Line] -> Line
forall a b. (a -> b) -> a -> b
$ DList Line -> [Line] -> [Line]
forall a. DList a -> [a] -> [a]
runDList DList Line
lineUnderConstruction [Line
l]
                    s
s' <- (s -> Bool) -> (s -> Line -> IO s) -> [Line] -> s -> IO s
forall s x. (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
downstream s -> Bool
stop s -> Line -> IO s
step (Line
completedLine Line -> [Line] -> [Line]
forall a. a -> [a] -> [a]
: [Line] -> [Line]
forall a. [a] -> [a]
init [Line]
rest) s
s
                    pure (DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair (Line -> DList Line
forall a. a -> DList a
singleton ([Line] -> Line
forall a. [a] -> a
last [Line]
linesInCurrentBlock)) s
s')
        initial' :: Pair (DList Line) s
initial' = DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair DList Line
forall a. Monoid a => a
mempty s
initial
    Pair ([Line] -> Line
forall a. Monoid a => [a] -> a
mconcat ([Line] -> Line) -> (DList Line -> [Line]) -> DList Line -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DList Line -> [Line]
forall a. DList a -> [a]
closeDList -> Line
lineUnderConstruction) s
final <- (Pair (DList Line) s -> Bool)
-> (Pair (DList Line) s -> Text -> IO (Pair (DList Line) s))
-> Pair (DList Line) s
-> IO (Pair (DList Line) s)
forall s. (s -> Bool) -> (s -> Text -> IO s) -> s -> IO s
f Pair (DList Line) s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair (DList Line) s -> Text -> IO (Pair (DList Line) s)
step' Pair (DList Line) s
initial'  
    if
        | s -> Bool
stop s
final -> 
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final
        | Line -> Bool
isEmptyLine Line
lineUnderConstruction -> 
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final
        | Bool
otherwise ->
          s -> Line -> IO s
step s
final Line
lineUnderConstruction
        
unlines :: Jet Line -> Jet Text
unlines :: Jet Line -> Jet Text
unlines Jet Line
j = do
    Line Text
text <- Jet Line
j
    Text -> Jet Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure Text
text Jet Text -> Jet Text -> Jet Text
forall a. Semigroup a => a -> a -> a
<> Text -> Jet Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Char -> Text
T.singleton Char
'\n') 

downstream :: (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
downstream :: forall s x. (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
downstream s -> Bool
stop s -> x -> IO s
step = [x] -> s -> IO s
go
  where
    go :: [x] -> s -> IO s
go [] s
s = 
        s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
    go (x
x : [x]
xs) s
s 
        | s -> Bool
stop s
s =
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
        | Bool
otherwise = do
            !s
s' <- s -> x -> IO s
step s
s x
x
            [x] -> s -> IO s
go [x]
xs s
s'

-- General sinks


-- | A function that consumes a 'Jet' totally or partially, without returning a result.

type Sink a = Jet a -> IO ()

-- | Helper multi-parameter typeclass for creating 'Jet'-consuming functions

-- out of a variety of common destinations.

--

-- >>> J.each ["aaa","bbb","ccc"] <&> J.stringToLine & J.sink J.stdout

-- aaa

-- bbb

-- ccc

--

class JetSink a target where
    sink :: target -> Sink a

instance JetSink ByteString Handle where
    sink :: Handle -> Sink ByteString
sink Handle
handle Jet ByteString
j = Jet ByteString -> (ByteString -> IO ()) -> IO ()
forall a b. Jet a -> (a -> IO b) -> IO ()
for_ Jet ByteString
j (Handle -> ByteString -> IO ()
B.hPut Handle
handle)

instance JetSink a Handle => JetSink a File where
    sink :: File -> Sink a
sink (File String
path) Jet a
j = String -> IOMode -> (Handle -> IO ()) -> IO ()
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
System.IO.withFile String
path IOMode
System.IO.WriteMode \Handle
handle ->
        Handle -> Sink a
forall a target. JetSink a target => target -> Sink a
sink Handle
handle Jet a
j

-- | Uses the default system locale. Adds newlines.

instance JetSink Line Handle where
    sink :: Handle -> Sink Line
sink Handle
handle = (Line -> IO ()) -> Sink Line
forall a b. (a -> IO b) -> Sink a
traverse_ (Handle -> Text -> IO ()
T.hPutStrLn Handle
handle (Text -> IO ()) -> (Line -> Text) -> Line -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Line -> Text
lineToText)

-- | Uses the default system locale.

instance JetSink Text Handle where
    sink :: Handle -> Sink Text
sink Handle
handle = (Text -> IO ()) -> Sink Text
forall a b. (a -> IO b) -> Sink a
traverse_ (Handle -> Text -> IO ()
T.hPutStr Handle
handle)

-- | 'FilePaths' are plain strings. This newtype provides a small measure of

-- safety over them.

newtype File = File { File -> String
getFilePath :: FilePath } deriving Int -> File -> ShowS
[File] -> ShowS
File -> String
(Int -> File -> ShowS)
-> (File -> String) -> ([File] -> ShowS) -> Show File
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [File] -> ShowS
$cshowList :: [File] -> ShowS
show :: File -> String
$cshow :: File -> String
showsPrec :: Int -> File -> ShowS
$cshowsPrec :: Int -> File -> ShowS
Show

-- | The maximum size in bytes of some destination into which we write the

-- bytes produced by a 'Jet'.

data BoundedSize x = BoundedSize Int x deriving stock (Int -> BoundedSize x -> ShowS
[BoundedSize x] -> ShowS
BoundedSize x -> String
(Int -> BoundedSize x -> ShowS)
-> (BoundedSize x -> String)
-> ([BoundedSize x] -> ShowS)
-> Show (BoundedSize x)
forall x. Show x => Int -> BoundedSize x -> ShowS
forall x. Show x => [BoundedSize x] -> ShowS
forall x. Show x => BoundedSize x -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BoundedSize x] -> ShowS
$cshowList :: forall x. Show x => [BoundedSize x] -> ShowS
show :: BoundedSize x -> String
$cshow :: forall x. Show x => BoundedSize x -> String
showsPrec :: Int -> BoundedSize x -> ShowS
$cshowsPrec :: forall x. Show x => Int -> BoundedSize x -> ShowS
Show,ReadPrec [BoundedSize x]
ReadPrec (BoundedSize x)
Int -> ReadS (BoundedSize x)
ReadS [BoundedSize x]
(Int -> ReadS (BoundedSize x))
-> ReadS [BoundedSize x]
-> ReadPrec (BoundedSize x)
-> ReadPrec [BoundedSize x]
-> Read (BoundedSize x)
forall x. Read x => ReadPrec [BoundedSize x]
forall x. Read x => ReadPrec (BoundedSize x)
forall x. Read x => Int -> ReadS (BoundedSize x)
forall x. Read x => ReadS [BoundedSize x]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [BoundedSize x]
$creadListPrec :: forall x. Read x => ReadPrec [BoundedSize x]
readPrec :: ReadPrec (BoundedSize x)
$creadPrec :: forall x. Read x => ReadPrec (BoundedSize x)
readList :: ReadS [BoundedSize x]
$creadList :: forall x. Read x => ReadS [BoundedSize x]
readsPrec :: Int -> ReadS (BoundedSize x)
$creadsPrec :: forall x. Read x => Int -> ReadS (BoundedSize x)
Read)

instance JetSink ByteBundle Handle where
    sink :: Handle -> Sink ByteBundle
sink Handle
handle Jet ByteBundle
j = (ByteString -> IO ()) -> Sink ByteString
forall a b. (a -> IO b) -> Sink a
traverse_ (Handle -> ByteString -> IO ()
B.hPut Handle
handle) do
        ByteBundle
s <- Jet ByteBundle
j
        ByteBundle -> Jet ByteString
bundleBytes ByteBundle
s

-- | Distributes incoming bytes through a sequence of files. Once a file is

-- full, we start writing the next one.

instance JetSink ByteString [BoundedSize File] where
    sink :: [BoundedSize File] -> Sink ByteString
sink [BoundedSize File]
bucketFiles Jet ByteString
j = 
        (Handle -> ByteString -> IO ())
-> (Handle -> IO ())
-> [IO Handle]
-> (Combiners ByteString () -> IO ())
-> IO ()
forall h a r.
(h -> a -> IO ())
-> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
withCombiners_ 
               (\Handle
handle ByteString
b -> Handle -> ByteString -> IO ()
B.hPut Handle
handle ByteString
b)
               Handle -> IO ()
hClose
               (BoundedSize File -> IO Handle
makeAllocator (BoundedSize File -> IO Handle)
-> [BoundedSize File] -> [IO Handle]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BoundedSize File]
bucketFiles)
               (\Combiners ByteString ()
combiners -> Sink ()
forall a. Sink a
drain Sink () -> Sink ()
forall a b. (a -> b) -> a -> b
$ Splitter ByteString ByteString
-> Combiners ByteString () -> Jet ByteString -> Jet ()
forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast ([Int] -> Splitter ByteString ByteString
bytesOverBuckets [Int]
bucketSizes) Combiners ByteString ()
combiners Jet ByteString
j)
      where
        bucketSizes :: [Int]
bucketSizes = (BoundedSize File -> Int) -> [BoundedSize File] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (\(BoundedSize Int
size File
_) -> Int
size) [BoundedSize File]
bucketFiles

-- | Distributes incoming bytes through a sequence of files. Once a file is

-- full, we start writing the next one.

--

-- Each 'ByteBundle' value is garanteed to be written to a single file. If a

-- file turns out to be too small for even a single 'ByteBundle' value, a

-- 'BucketOverflow' exception is thrown.

instance JetSink ByteBundle [BoundedSize File] where
    sink :: [BoundedSize File] -> Sink ByteBundle
sink [BoundedSize File]
bucketFiles Jet ByteBundle
j = 
        (Handle -> ByteString -> IO ())
-> (Handle -> IO ())
-> [IO Handle]
-> (Combiners ByteString () -> IO ())
-> IO ()
forall h a r.
(h -> a -> IO ())
-> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
withCombiners_ 
               (\Handle
handle ByteString
b -> Handle -> ByteString -> IO ()
B.hPut Handle
handle ByteString
b)
               Handle -> IO ()
hClose
               (BoundedSize File -> IO Handle
makeAllocator (BoundedSize File -> IO Handle)
-> [BoundedSize File] -> [IO Handle]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BoundedSize File]
bucketFiles)
               (\Combiners ByteString ()
combiners -> Sink ()
forall a. Sink a
drain Sink () -> Sink ()
forall a b. (a -> b) -> a -> b
$ Splitter ByteBundle ByteString
-> Combiners ByteString () -> Jet ByteBundle -> Jet ()
forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast ([Int] -> Splitter ByteBundle ByteString
byteBundlesOverBuckets [Int]
bucketSizes) Combiners ByteString ()
combiners Jet ByteBundle
j)
      where
        bucketSizes :: [Int]
bucketSizes = (BoundedSize File -> Int) -> [BoundedSize File] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (\(BoundedSize Int
size File
_) -> Int
size) [BoundedSize File]
bucketFiles

makeAllocator :: BoundedSize File -> IO Handle
makeAllocator :: BoundedSize File -> IO Handle
makeAllocator (BoundedSize Int
_ (File String
path)) = String -> IOMode -> IO Handle
openBinaryFile String
path IOMode
WriteMode

-- DList helper

newtype DList a = DList { forall a. DList a -> [a] -> [a]
runDList :: [a] -> [a] }

instance Semigroup (DList a) where
    DList [a] -> [a]
a1 <> :: DList a -> DList a -> DList a
<> DList [a] -> [a]
a2 = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList ([a] -> [a]
a1 ([a] -> [a]) -> ([a] -> [a]) -> [a] -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> [a]
a2)

instance Monoid (DList a) where
    mempty :: DList a
mempty = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList [a] -> [a]
forall a. a -> a
id

makeDList :: [a] -> DList a
makeDList :: forall a. [a] -> DList a
makeDList [a]
as = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList \[a]
xs -> [a]
as [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
xs

closeDList :: DList a -> [a]
closeDList :: forall a. DList a -> [a]
closeDList (DList [a] -> [a]
f) = [a] -> [a]
f [] 

singleton :: a -> DList a
singleton :: forall a. a -> DList a
singleton a
a = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList (([a] -> [a]) -> DList a) -> ([a] -> [a]) -> DList a
forall a b. (a -> b) -> a -> b
$ (a
a a -> [a] -> [a]
forall a. a -> [a] -> [a]
:) 

--

-- concurrency


-- | Process the values yielded by the upstream 'Jet' in a concurrent way,

-- and return the results in the form of another 'Jet' as they are produced.

--

-- __NB__: this function might scramble the order of the returned values. Right

-- now there isn't a function for unscrambling them.

--

-- >>> :{

--  J.each [(3,'a'), (2,'b'), (1,'c')]

--  & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)

--  & J.toList

-- :}

-- "cba"

--

-- What happens if we 'limit' the resulting 'Jet' and we reach that limit, or

-- if we otherwise stop consuming the 'Jet' before it gets exhausted? In those

-- cases, all pending @IO b@ tasks are cancelled.

--

-- >>> :{

--  J.each [(9999,'a'), (2,'b'), (1,'c')]

--  & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)

--  & J.take 2

--  & J.toList

-- :}

-- "cb"

--

traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b
-- TODO:

-- It would be nice to have 0-lengh channels for which one side blocks until

-- the other side takes the job.

traverseConcurrently :: forall a b. (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b
traverseConcurrently PoolConf -> PoolConf
adaptConf a -> IO b
makeTask Jet a
upstream = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial -> do
    if 
        -- If we know we aren't going to do any work, don't bother starting the

        -- whole boondoggle.

        | s -> Bool
stop s
initial ->
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
        | Bool
otherwise -> do
          -- At this point we know we should do at least one step.

          let PoolConf {Int
_inputQueueSize :: PoolConf -> Int
_inputQueueSize :: Int
_inputQueueSize,Int
_numberOfWorkers :: PoolConf -> Int
_numberOfWorkers :: Int
_numberOfWorkers,Int
_outputQueueSize :: PoolConf -> Int
_outputQueueSize :: Int
_outputQueueSize} = PoolConf -> PoolConf
adaptConf PoolConf
defaultPoolConf
          TBMQueue (IO b)
input <- Int -> IO (TBMQueue (IO b))
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
_inputQueueSize
          IORef Bool
inputQueueWriterShouldStop <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
          IORef Int
aliveWorkers <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
_numberOfWorkers
          TBMQueue b
output <- Int -> IO (TBMQueue b)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
_outputQueueSize
          let 
              -- The inputQueueWriter should *not* be interrupted aynchronously.

              -- After each iteration, it reads the IORef to see if it should stop.

              -- Once it stops, it closes the input queue.

              inputQueueWriter :: IO ()
inputQueueWriter = do
                  Jet a
-> (Bool -> Bool) -> (Bool -> a -> IO Bool) -> Bool -> IO Bool
forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run 
                    Jet a
upstream 
                    Bool -> Bool
forall a. a -> a
id 
                    (\Bool
_ a
a -> do
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> IO b -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (IO b)
input (a -> IO b
makeTask a
a)
                        IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
inputQueueWriterShouldStop) 
                    Bool
False
                  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (IO b)
input
              -- Workers *can* be interrupted asynchronously.

              worker :: IO ()
worker = do
                  Maybe (IO b)
mtask <- STM (Maybe (IO b)) -> IO (Maybe (IO b))
forall a. STM a -> IO a
atomically (STM (Maybe (IO b)) -> IO (Maybe (IO b)))
-> STM (Maybe (IO b)) -> IO (Maybe (IO b))
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> STM (Maybe (IO b))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (IO b)
input
                  case Maybe (IO b)
mtask of
                      Maybe (IO b)
Nothing -> do
                        Int
remaining <- do
                            IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
aliveWorkers \Int
count -> 
                                let count' :: Int
count' = Int -> Int
forall a. Enum a => a -> a
pred Int
count 
                                 in (Int
count', Int
count')
                        if 
                            | Int
remaining Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 -> do
                              STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue b -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue b
output
                            | Bool
otherwise -> do
                              () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                      Just IO b
task -> do
                        b
result <- IO b
task
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue b -> b -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue b
output b
result
                        IO ()
worker
              outputQueueReader :: s -> IO s
outputQueueReader s
s = do
                  if
                      | s -> Bool
stop s
s -> do
                        -- tell the inserter from upstream that it should stop. is this enough?

                        IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
inputQueueWriterShouldStop Bool
True
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (IO b)
input -- perhaps unnecessary?

                        pure s
s
                      | Bool
otherwise -> do
                        Maybe b
mresult <- STM (Maybe b) -> IO (Maybe b)
forall a. STM a -> IO a
atomically (STM (Maybe b) -> IO (Maybe b)) -> STM (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ TBMQueue b -> STM (Maybe b)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue b
output
                        case Maybe b
mresult of
                            Maybe b
Nothing -> do
                              s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                            Just b
result -> do
                              !s
s' <- s -> b -> IO s
step s
s b
result
                              s -> IO s
outputQueueReader s
s'
          Concurrently s -> IO s
forall a. Concurrently a -> IO a
runConcurrently (Concurrently s -> IO s) -> Concurrently s -> IO s
forall a b. (a -> b) -> a -> b
$
              IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently do
                  IO ()
inputQueueWriter
              Concurrently () -> Concurrently s -> Concurrently s
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
              IO s -> Concurrently s
forall a. IO a -> Concurrently a
Concurrently do
                  Either s ()
finalLeft <- do
                      Conceit s () -> IO (Either s ())
forall e a. Conceit e a -> IO (Either e a)
runConceit (Conceit s () -> IO (Either s ()))
-> Conceit s () -> IO (Either s ())
forall a b. (a -> b) -> a -> b
$ 
                          -- The worker pool is always killed when the output reader finishes,

                          -- but for the "happy path" the workers will already be dead.

                          IO (Either s ()) -> Conceit s ()
forall e a. IO (Either e a) -> Conceit e a
Conceit (() -> Either s ()
forall a b. b -> Either a b
Right (() -> Either s ()) -> IO () -> IO (Either s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO () -> IO ()
forall a. Int -> IO a -> IO ()
replicateConcurrently_ Int
_numberOfWorkers IO ()
worker)
                          Conceit s () -> Conceit s () -> Conceit s ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> 
                          -- This Left is what kills the worker pool.

                          IO (Either s ()) -> Conceit s ()
forall e a. IO (Either e a) -> Conceit e a
Conceit (s -> Either s ()
forall a b. a -> Either a b
Left (s -> Either s ()) -> IO s -> IO (Either s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> IO s
outputQueueReader s
initial)
                  case Either s ()
finalLeft of
                      Right () -> do
                          String -> IO s
forall a. HasCallStack => String -> a
error String
"never happens, the Left always wins"
                      Left s
final -> do
                          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

-- | Configuration record for the worker pool.

data PoolConf = PoolConf {
        PoolConf -> Int
_inputQueueSize :: Int,
        PoolConf -> Int
_numberOfWorkers :: Int,
        PoolConf -> Int
_outputQueueSize :: Int
    } deriving Int -> PoolConf -> ShowS
[PoolConf] -> ShowS
PoolConf -> String
(Int -> PoolConf -> ShowS)
-> (PoolConf -> String) -> ([PoolConf] -> ShowS) -> Show PoolConf
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PoolConf] -> ShowS
$cshowList :: [PoolConf] -> ShowS
show :: PoolConf -> String
$cshow :: PoolConf -> String
showsPrec :: Int -> PoolConf -> ShowS
$cshowsPrec :: Int -> PoolConf -> ShowS
Show

defaultPoolConf :: PoolConf
defaultPoolConf = PoolConf :: Int -> Int -> Int -> PoolConf
PoolConf {
        _inputQueueSize :: Int
_inputQueueSize = Int
1,
        _numberOfWorkers :: Int
_numberOfWorkers = Int
1,
        _outputQueueSize :: Int
_outputQueueSize = Int
1
 }

-- | Size of the waiting queue into the worker pool. The default is @1@.

inputQueueSize :: Int -> PoolConf -> PoolConf
inputQueueSize :: Int -> PoolConf -> PoolConf
inputQueueSize Int
size PoolConf
poolConf = PoolConf
poolConf { _inputQueueSize :: Int
_inputQueueSize = Int
size }

-- | The size of the worker pool. The default is @1@.

numberOfWorkers :: Int -> PoolConf -> PoolConf
numberOfWorkers :: Int -> PoolConf -> PoolConf
numberOfWorkers Int
number PoolConf
poolConf = PoolConf
poolConf { _numberOfWorkers :: Int
_numberOfWorkers = Int
number }

-- | Size of the queue holding results out of the working pool before they

-- are yielded downstream. The default is @1@.

outputQueueSize :: Int -> PoolConf -> PoolConf 
outputQueueSize :: Int -> PoolConf -> PoolConf
outputQueueSize Int
size PoolConf
poolConf = PoolConf
poolConf { _outputQueueSize :: Int
_outputQueueSize = Int
size }

-- | An alias for 'id'. Useful with functions like 'traverseConcurrently' and

-- 'throughProcess', for which it means \"use the default configuration\".

defaults :: a -> a
defaults :: forall a. a -> a
defaults = a -> a
forall a. a -> a
id

-- 

-- process invocation


-- | Feeds the upstream 'Jet' to an external process' @stdin@ and returns the

-- process' @stdout@ as another @Jet@. The feeding and reading of the standard

-- streams is done concurrently in order to avoid deadlocks.

--

-- What happens if we 'limit' the resulting 'Jet' and we reach that limit, or

-- if we otherwise stop consuming the 'Jet' before it gets exhausted? In those

-- cases, the external process is promptly terminated.

throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString
throughProcess :: (ProcConf -> ProcConf)
-> CreateProcess -> Jet ByteString -> Jet ByteString
throughProcess ProcConf -> ProcConf
adaptConf = ProcConf -> CreateProcess -> Jet ByteString -> Jet ByteString
forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ (ProcConf -> ProcConf
adaptConf ProcConf
defaultProcConf)

-- | Like 'throughProcess', but feeding and reading 'Line's using the default

-- system encoding.

--

-- >>> :{

-- J.each ["aaa","bbb","ccc"]

-- <&> J.stringToLine

-- & linesThroughProcess defaults (shell "cat")

-- & J.toList

-- :}

-- ["aaa","bbb","ccc"]

--

-- An example of not reading all the lines from a long-lived process that gets cancelled:

--

-- >>> :{

-- mempty

-- & linesThroughProcess defaults (shell "{ printf \"aaa\\nbbb\\nccc\\n\" ; sleep infinity ; }")

-- & J.limit 2

-- & J.toList

-- :}

-- ["aaa","bbb"]

--

linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
linesThroughProcess ProcConf -> ProcConf
adaptConf CreateProcess
procSpec = do
    let textLinesProcConf :: ProcConf_ Text Text
textLinesProcConf = (ProcConf -> ProcConf
adaptConf ProcConf
defaultProcConf) {
                _writeToStdIn :: Handle -> Text -> IO ()
_writeToStdIn = Handle -> Text -> IO ()
T.hPutStrLn,
                _readFromStdout :: Handle -> IO Text
_readFromStdout = Handle -> IO Text
T.hGetLine
            }
    (Text -> Line) -> Jet Text -> Jet Line
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> Line
textToLine (Jet Text -> Jet Line)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcConf_ Text Text -> CreateProcess -> Jet Text -> Jet Text
forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ ProcConf_ Text Text
textLinesProcConf CreateProcess
procSpec (Jet Text -> Jet Text)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Line -> Text) -> Jet Line -> Jet Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Line -> Text
lineToText

-- | Like 'throughProcess', but feeding and reading 'Line's encoded in UTF8.

utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
utf8LinesThroughProcess ProcConf -> ProcConf
adaptConf CreateProcess
procSpec = do
    Jet Text -> Jet Line
lines (Jet Text -> Jet Line)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Jet ByteString -> Jet Text
decodeUtf8 (Jet ByteString -> Jet Text)
-> (Jet Line -> Jet ByteString) -> Jet Line -> Jet Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProcConf -> ProcConf)
-> CreateProcess -> Jet ByteString -> Jet ByteString
throughProcess ProcConf -> ProcConf
adaptConf CreateProcess
procSpec (Jet ByteString -> Jet ByteString)
-> (Jet Line -> Jet ByteString) -> Jet Line -> Jet ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Jet Text -> Jet ByteString
encodeUtf8 (Jet Text -> Jet ByteString)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Jet Line -> Jet Text
unlines

throughProcess_ :: forall a b . ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ :: forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_  ProcConf_ a b
procConf CreateProcess
procSpec Jet a
upstream = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial -> do
    let ProcConf_ {Bool
_bufferStdin :: forall a b. ProcConf_ a b -> Bool
_bufferStdin :: Bool
_bufferStdin, Handle -> a -> IO ()
_writeToStdIn :: Handle -> a -> IO ()
_writeToStdIn :: forall a b. ProcConf_ a b -> Handle -> a -> IO ()
_writeToStdIn, Handle -> IO b
_readFromStdout :: Handle -> IO b
_readFromStdout :: forall a b. ProcConf_ a b -> Handle -> IO b
_readFromStdout,Handle -> IO ()
_readFromStderr :: forall a b. ProcConf_ a b -> Handle -> IO ()
_readFromStderr :: Handle -> IO ()
_readFromStderr, ExitCode -> IO ()
_handleExitCode :: forall a b. ProcConf_ a b -> ExitCode -> IO ()
_handleExitCode :: ExitCode -> IO ()
_handleExitCode} = ProcConf_ a b
procConf
    if 
        -- If we know we aren't going to do any work, don't bother starting the

        -- whole boondoggle.

        | s -> Bool
stop s
initial ->
          s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
        | Bool
otherwise -> do
          let procSpec' :: CreateProcess
procSpec' = CreateProcess
procSpec {
                    std_in :: StdStream
std_in = StdStream
CreatePipe,
                    std_out :: StdStream
std_out = StdStream
CreatePipe,
                    std_err :: StdStream
std_err = StdStream
CreatePipe
                }
          TBMQueue a
input <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @a Int
1
          IORef Bool
inputQueueWriterShouldStop <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
          -- remember to drain stderr concurrently with stdout...

          let inputQueueWriter :: IO ()
inputQueueWriter = do
                  Jet a
-> (Bool -> Bool) -> (Bool -> a -> IO Bool) -> Bool -> IO Bool
forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run 
                    Jet a
upstream 
                    Bool -> Bool
forall a. a -> a
id 
                    (\Bool
_ a
a -> do
                        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> a -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue a
input a
a
                        IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
inputQueueWriterShouldStop) 
                    Bool
False
                  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue a
input
          Either s s
finalEither <- 
              Concurrently (Either s s) -> IO (Either s s)
forall a. Concurrently a -> IO a
runConcurrently (Concurrently (Either s s) -> IO (Either s s))
-> Concurrently (Either s s) -> IO (Either s s)
forall a b. (a -> b) -> a -> b
$
              IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently do
                  IO ()
inputQueueWriter
              Concurrently ()
-> Concurrently (Either s s) -> Concurrently (Either s s)
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
              IO (Either s s) -> Concurrently (Either s s)
forall a. IO a -> Concurrently a
Concurrently do
                  CreateProcess
-> (Maybe Handle
    -> Maybe Handle
    -> Maybe Handle
    -> ProcessHandle
    -> IO (Either s s))
-> IO (Either s s)
forall a.
CreateProcess
-> (Maybe Handle
    -> Maybe Handle -> Maybe Handle -> ProcessHandle -> IO a)
-> IO a
withCreateProcess CreateProcess
procSpec' \(Just Handle
stdin') (Just Handle
stdout') (Just Handle
stderr') ProcessHandle
phandle -> do
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
_bufferStdin) (Handle -> BufferMode -> IO ()
System.IO.hSetBuffering Handle
stdin' BufferMode
System.IO.NoBuffering)
                    let stdinWriter :: IO ()
stdinWriter = do
                          Maybe a
ma <- STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> IO (Maybe a)) -> STM (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
input
                          case Maybe a
ma of 
                              Maybe a
Nothing -> do
                                  Handle -> IO ()
hClose Handle
stdin'
                              Just a
a -> do
                                  Handle -> a -> IO ()
_writeToStdIn Handle
stdin' a
a
                                  IO ()
stdinWriter
                        stderrReader :: IO ()
stderrReader = do
                            (Handle -> IO Bool) -> (Handle -> IO b) -> Handle -> Jet b
forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF Handle -> IO Bool
System.IO.hIsEOF Handle -> IO b
_readFromStdout Handle
stderr' Jet b -> (Jet b -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
& Jet b -> IO ()
forall a. Sink a
drain
                        stdoutReader :: s -> IO (Either s s)
stdoutReader s
s = do
                          if | s -> Bool
stop s
s -> do
                               IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
inputQueueWriterShouldStop Bool
True
                               pure (s -> Either s s
forall a b. a -> Either a b
Left s
s)
                             | Bool
otherwise -> do
                               Bool
eof <- Handle -> IO Bool
System.IO.hIsEOF Handle
stdout'
                               if
                                   | Bool
eof -> do 
                                     IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
inputQueueWriterShouldStop Bool
True
                                     ExitCode
exitCode <- ProcessHandle -> IO ExitCode
waitForProcess ProcessHandle
phandle
                                     ExitCode -> IO ()
_handleExitCode ExitCode
exitCode
                                     pure (s -> Either s s
forall a b. b -> Either a b
Right s
s)
                                   | Bool
otherwise -> do
                                     b
b <- Handle -> IO b
_readFromStdout Handle
stdout'
                                     !s
s' <- s -> b -> IO s
step s
s b
b
                                     s -> IO (Either s s)
stdoutReader s
s'
                    Conceit s s -> IO (Either s s)
forall e a. Conceit e a -> IO (Either e a)
runConceit (Conceit s s -> IO (Either s s)) -> Conceit s s -> IO (Either s s)
forall a b. (a -> b) -> a -> b
$ 
                        IO () -> Conceit s ()
forall a e. IO a -> Conceit e a
_Conceit do IO ()
stdinWriter
                        Conceit s () -> Conceit s () -> Conceit s ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> 
                        IO () -> Conceit s ()
forall a e. IO a -> Conceit e a
_Conceit do IO ()
stderrReader
                        Conceit s () -> Conceit s s -> Conceit s s
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> 
                        IO (Either s s) -> Conceit s s
forall e a. IO (Either e a) -> Conceit e a
Conceit do s -> IO (Either s s)
stdoutReader s
initial
          pure ((s -> s) -> (s -> s) -> Either s s -> s
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either s -> s
forall a. a -> a
id s -> s
forall a. a -> a
id Either s s
finalEither) 

-- | Configuration record with some extra options in addition to those in "CreateProcess".

type ProcConf = ProcConf_ ByteString ByteString
data ProcConf_ a b = ProcConf_ {
        forall a b. ProcConf_ a b -> Bool
_bufferStdin :: Bool,
        forall a b. ProcConf_ a b -> Handle -> a -> IO ()
_writeToStdIn :: Handle -> a -> IO (),
        forall a b. ProcConf_ a b -> Handle -> IO b
_readFromStdout :: Handle -> IO b,
        forall a b. ProcConf_ a b -> Handle -> IO ()
_readFromStderr :: Handle -> IO (),
        forall a b. ProcConf_ a b -> ExitCode -> IO ()
_handleExitCode :: ExitCode -> IO ()
    }

defaultProcConf :: ProcConf 
defaultProcConf :: ProcConf
defaultProcConf = ProcConf_ :: forall a b.
Bool
-> (Handle -> a -> IO ())
-> (Handle -> IO b)
-> (Handle -> IO ())
-> (ExitCode -> IO ())
-> ProcConf_ a b
ProcConf_ {
        _bufferStdin :: Bool
_bufferStdin = Bool
False,
        _writeToStdIn :: Handle -> ByteString -> IO ()
_writeToStdIn = Handle -> ByteString -> IO ()
B.hPut,
        _readFromStdout :: Handle -> IO ByteString
_readFromStdout = (Handle -> Int -> IO ByteString) -> Int -> Handle -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Handle -> Int -> IO ByteString
B.hGetSome Int
8192,
        _readFromStderr :: Handle -> IO ()
_readFromStderr = IO Text -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Text -> IO ()) -> (Handle -> IO Text) -> Handle -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> IO Text
T.hGetLine ,
        _handleExitCode :: ExitCode -> IO ()
_handleExitCode = \ExitCode
exitCode -> case ExitCode
exitCode of
            ExitFailure Int
_ -> ExitCode -> IO ()
forall e a. Exception e => e -> IO a
throwIO ExitCode
exitCode 
            ExitCode
ExitSuccess -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    }

-- | Should we buffer the process' @stdin@? Usually should be 'True' for

-- interactive scenarios.

--

-- By default, 'False'.

bufferStdin :: Bool -> ProcConf -> ProcConf
bufferStdin :: Bool -> ProcConf -> ProcConf
bufferStdin Bool
doBuffering ProcConf
procConf = ProcConf
procConf { _bufferStdin :: Bool
_bufferStdin = Bool
doBuffering }

-- | Sets the function that reads a single line of output from the process

-- @stderr@.  It's called repeatedly until @stderr@ is exhausted. The reads are

-- done concurrently with the reads from @stdout@.

--

-- By default, lines of text are read using the system's default encoding.

--

-- This is a good place to throw an exception if we don't like what comes out

-- of @stderr@.

readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf
readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf
readFromStderr Handle -> IO ()
readFunc ProcConf
procConf = ProcConf
procConf { _readFromStderr :: Handle -> IO ()
_readFromStderr = Handle -> IO ()
readFunc } 

-- | Sets the function that handles the final `ExitCode` of the process.

--

-- The default behavior is to throw the `ExitCode` as an exception if it's not

-- a success.

handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf
handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf
handleExitCode ExitCode -> IO ()
handler ProcConf
procConf = ProcConf
procConf { _handleExitCode :: ExitCode -> IO ()
_handleExitCode = ExitCode -> IO ()
handler } 

--

--

-- complicated stufff


data AreWeInsideGroup foldState = OutsideGroup
                                | InsideGroup !foldState 
        
data RecastState foldState = RecastState !(AreWeInsideGroup foldState) [IO foldState] 

-- | This is a complex, unwieldly, yet versatile function. It can be used to

-- define grouping operations, but also for decoding and other purposes.

--

-- Groups are delimited in the input 'Jet' using the 'Splitter', and the

-- contents of those groups are then combined using 'Combiners'. The result of

-- each combiner is yielded by the return 'Jet'.

--

-- If the list of combiners is finite and becomes exhausted, we stop splitting

-- and the return 'Jet' stops.

recast :: forall a b c . Splitter a b -> Combiners b c -> Jet a -> Jet c
recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast (MealyIO s -> a -> IO (SplitStepResult b, s)
splitterStep s -> IO (SplitStepResult b)
splitterCoda IO s
splitterAlloc) 
       (Combiners s -> b -> IO s
foldStep s -> IO c
foldCoda [IO s]
foldAllocs0) 
       (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream) = (forall s. (s -> Bool) -> (s -> c -> IO s) -> s -> IO s) -> Jet c
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> c -> IO s
step s
initial -> do
  s
initialSplitterState <- IO s
splitterAlloc
  let -- When to stop? Either downstream says we need to stop,

      -- or we are outside a group and there isn't another group consumer we

      -- can use to process the next one.

      stop' :: Triple a (RecastState foldState) s -> Bool
stop' (Triple a
_ (RecastState AreWeInsideGroup foldState
OutsideGroup []) s
_) = Bool
True
      stop' (Triple a
_ RecastState foldState
_ s
s) = s -> Bool
stop s
s  

      step' :: Triple s (RecastState s) s -> a -> IO (Triple s (RecastState s) s)
step' (Triple s
splitterState RecastState s
recastState s
s) a
a = do
        (SplitStepResult b
splitResult,  s
splitterState') <- s -> a -> IO (SplitStepResult b, s)
splitterStep s
splitterState a
a 
        Pair RecastState s
recastState' s
s' <- SplitStepResult b
-> RecastState s -> s -> IO (Pair (RecastState s) s)
advanceRecast SplitStepResult b
splitResult RecastState s
recastState s
s 
        Triple s (RecastState s) s -> IO (Triple s (RecastState s) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (s -> RecastState s -> s -> Triple s (RecastState s) s
forall a b c. a -> b -> c -> Triple a b c
Triple s
splitterState' RecastState s
recastState' s
s')

      advanceRecast :: SplitStepResult b
-> RecastState s -> s -> IO (Pair (RecastState s) s)
advanceRecast ssr :: SplitStepResult b
ssr@(SplitStepResult {[b]
continuationOfPreviouslyStartedGroup :: [b]
continuationOfPreviouslyStartedGroup :: forall b. SplitStepResult b -> [b]
continuationOfPreviouslyStartedGroup, [[b]]
entireGroups :: [[b]]
entireGroups :: forall b. SplitStepResult b -> [[b]]
entireGroups, [b]
startOfNewGroup :: [b]
startOfNewGroup :: forall b. SplitStepResult b -> [b]
startOfNewGroup}) (RecastState AreWeInsideGroup s
areWeInside [IO s]
foldAllocs) s
s = do
        case (AreWeInsideGroup s
areWeInside, [[b]]
entireGroups, [b]
startOfNewGroup) of
            -- If there aren't any new groups and we don't start an incomplete one, just advance the current fold

            (InsideGroup s
foldState, [], []) -> do          
                -- traceIO $ "recast inside group just continuing"

                s
foldState' <- s -> [b] -> IO s
advanceGroupWithougClosing s
foldState [b]
continuationOfPreviouslyStartedGroup
                pure (RecastState s -> s -> Pair (RecastState s) s
forall a b. a -> b -> Pair a b
Pair (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState (s -> AreWeInsideGroup s
forall foldState. foldState -> AreWeInsideGroup foldState
InsideGroup s
foldState') [IO s]
foldAllocs) s
s) -- main state didn't change

            (InsideGroup s
foldState,  [[b]]
_, [b]
_) -> do          
                -- traceIO $ "recast inside group closing"

                !c
c <- s -> [b] -> IO c
processSingleGroup s
foldState [b]
continuationOfPreviouslyStartedGroup 
                !s
s' <- s -> c -> IO s
step s
s c
c
                if 
                    | s -> Bool
stop s
s' -> do
                        -- traceIO $ "recast inside group pure"

                        Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RecastState s -> s -> Pair (RecastState s) s
forall a b. a -> b -> Pair a b
Pair (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState AreWeInsideGroup s
forall foldState. AreWeInsideGroup foldState
OutsideGroup [IO s]
foldAllocs) s
s')
                    | Bool
otherwise -> do
                        -- traceIO $ "recast inside group advancing"

                        SplitStepResult b
-> RecastState s -> s -> IO (Pair (RecastState s) s)
advanceRecast SplitStepResult b
ssr (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState AreWeInsideGroup s
forall foldState. AreWeInsideGroup foldState
OutsideGroup [IO s]
foldAllocs) s
s'
            -- if we are outside of a group, the "continuationOfPreviouslyStartedGroup" is ignored.

            (AreWeInsideGroup s
OutsideGroup, [[b]]
_, [b]
_) -> do
                -- traceIO $ "recast outside group"

                -- doens't return foldState becasue we close the groups

                Pair [IO s]
foldAllocs' s
s' <- [IO s] -> s -> [[b]] -> IO (Pair [IO s] s)
processEntireGroups [IO s]
foldAllocs s
s [[b]]
entireGroups 
                Pair (RecastState s) s
bail <- Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RecastState s -> s -> Pair (RecastState s) s
forall a b. a -> b -> Pair a b
Pair (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState AreWeInsideGroup s
forall foldState. AreWeInsideGroup foldState
OutsideGroup [IO s]
foldAllocs') s
s')
                if 
                    | s -> Bool
stop s
s' -> do
                      Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Pair (RecastState s) s
bail
                    | Bool
otherwise -> do
                        case [b]
startOfNewGroup of
                            [] -> do
                              Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Pair (RecastState s) s
bail
                            (b
_ : [b]
_) -> do
                                case [IO s]
foldAllocs of
                                    [] -> do
                                        Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Pair (RecastState s) s
bail
                                    IO s
alloc : [IO s]
allocs -> do
                                        -- traceIO $ "recast we should be allocating here"

                                        -- there is a next group, so let's begin it

                                        !s
foldState0 <- IO s
alloc
                                        s
foldState <- s -> [b] -> IO s
processBeginNextGroup s
foldState0 [b]
startOfNewGroup
                                        pure (RecastState s -> s -> Pair (RecastState s) s
forall a b. a -> b -> Pair a b
Pair (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState (s -> AreWeInsideGroup s
forall foldState. foldState -> AreWeInsideGroup foldState
InsideGroup s
foldState) [IO s]
allocs) s
s')
      -- foldM ?

      advanceGroupWithougClosing :: _ -> [b] -> IO _
      advanceGroupWithougClosing :: s -> [b] -> IO s
advanceGroupWithougClosing s
foldState [] = 
        s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
foldState
      advanceGroupWithougClosing s
foldState (b
b:[b]
bs) = do
        !s
foldState' <- s -> b -> IO s
foldStep s
foldState b
b
        s -> [b] -> IO s
advanceGroupWithougClosing s
foldState' [b]
bs
      processEntireGroups :: [IO _] -> _ -> [[b]] -> IO (Pair [IO _] _)
      -- We can't go on if there aren't any more groups

      processEntireGroups :: [IO s] -> s -> [[b]] -> IO (Pair [IO s] s)
processEntireGroups [IO s]
allocs s
s [] = do
        Pair [IO s] s -> IO (Pair [IO s] s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([IO s] -> s -> Pair [IO s] s
forall a b. a -> b -> Pair a b
Pair [IO s]
allocs s
s)
      -- We can't go on if there aren't any more fold initial state allocs

      processEntireGroups [] s
s [[b]]
_ = do
        Pair [IO s] s -> IO (Pair [IO s] s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([IO s] -> s -> Pair [IO s] s
forall a b. a -> b -> Pair a b
Pair [] s
s)
      processEntireGroups (IO s
alloc : [IO s]
allocs) s
s ([b]
bs:[[b]]
bss) = do
        !s
foldState0 <- IO s
alloc
        !c
c <- s -> [b] -> IO c
processSingleGroup s
foldState0 [b]
bs -- a single step downstream

        !s
s' <- s -> c -> IO s
step s
s c
c
        if 
            | s -> Bool
stop s
s' -> do
              Pair [IO s] s -> IO (Pair [IO s] s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([IO s] -> s -> Pair [IO s] s
forall a b. a -> b -> Pair a b
Pair [IO s]
allocs s
s')
            | Bool
otherwise -> do
              [IO s] -> s -> [[b]] -> IO (Pair [IO s] s)
processEntireGroups [IO s]
allocs s
s' [[b]]
bss 
      -- a whole fold is processed here

      processSingleGroup :: _ -> [b] -> IO c
      processSingleGroup :: s -> [b] -> IO c
processSingleGroup s
foldState [] = do
        s -> IO c
foldCoda s
foldState
      processSingleGroup s
foldState (b
b:[b]
bs) = do
        !s
foldState' <- s -> b -> IO s
foldStep s
foldState b
b
        s -> [b] -> IO c
processSingleGroup s
foldState' [b]
bs
      processBeginNextGroup :: _ -> [b] -> IO _
      processBeginNextGroup :: s -> [b] -> IO s
processBeginNextGroup s
foldState [] = do
        s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
foldState
      processBeginNextGroup s
foldState (b
b:[b]
bs) = do
        !s
foldState' <- s -> b -> IO s
foldStep s
foldState b
b
        s -> [b] -> IO s
processBeginNextGroup s
foldState' [b]
bs
      initial' :: Triple s (RecastState s) s
initial' = s -> RecastState s -> s -> Triple s (RecastState s) s
forall a b c. a -> b -> c -> Triple a b c
Triple s
initialSplitterState (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState AreWeInsideGroup s
forall foldState. AreWeInsideGroup foldState
OutsideGroup [IO s]
foldAllocs0) s
initial
  Triple s
splitterState RecastState s
recastState s
final <- (Triple s (RecastState s) s -> Bool)
-> (Triple s (RecastState s) s
    -> a -> IO (Triple s (RecastState s) s))
-> Triple s (RecastState s) s
-> IO (Triple s (RecastState s) s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream Triple s (RecastState s) s -> Bool
forall {a} {foldState}. Triple a (RecastState foldState) s -> Bool
stop' Triple s (RecastState s) s -> a -> IO (Triple s (RecastState s) s)
step' Triple s (RecastState s) s
initial'
  -- What happens if there's a fold ongoing when we stop? Right now we always close it, which seems to be a reasonable

  -- action (because the fold coda might hide a finalizer).

  --

  -- Also, when can it happen that we reach this point with an ongoing fold? 

  -- If I understand correctly:

  --    - it can only happen when the upstream closes and leaves the fold open.

  --    - it can't (?) happen when the consumer stops early. 

  let closePendingFold :: RecastState s -> IO ()
closePendingFold = \case 
        RecastState AreWeInsideGroup s
OutsideGroup [IO s]
_ -> do
            () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        RecastState (InsideGroup s
foldState) [IO s]
_ -> do
            c
_ <- s -> IO c
foldCoda s
foldState
            pure ()
  if 
    | s -> Bool
stop s
final -> do
      RecastState s -> IO ()
closePendingFold RecastState s
recastState
      pure s
final
    | Bool
otherwise -> do
      SplitStepResult b
splitResult <- s -> IO (SplitStepResult b)
splitterCoda s
splitterState
      -- We discard the "begins next group"; it doesn't make sense in this final step.

      Pair RecastState s
recastState' s
final' <- SplitStepResult b
-> RecastState s -> s -> IO (Pair (RecastState s) s)
advanceRecast (SplitStepResult b
splitResult { startOfNewGroup :: [b]
startOfNewGroup = [] }) RecastState s
recastState s
final
      if | s -> Bool
stop s
final' -> do
           -- TODO:

           -- should we dealloc here? Maybe there's a fold reaminging... we should close it. See below.

           RecastState s -> IO ()
closePendingFold RecastState s
recastState'
           pure s
final'
         | Bool
otherwise -> do
              case RecastState s
recastState' of
                RecastState AreWeInsideGroup s
OutsideGroup [IO s]
_ -> do
                    -- traceIO $ "final! outside group"

                    s -> IO s
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final'
                RecastState (InsideGroup s
foldState) [IO s]
_ -> do
                    -- traceIO $ "final! inside group"

                    c
c <- s -> IO c
foldCoda s
foldState
                    s
final'' <- s -> c -> IO s
step s
final' c
c
                    pure s
final''

-- | A 'Combiners' value knows how to process a sequence of groups, while

-- keeping a (existentially hidden) state for each group.

--

-- Very much like a @FoldM IO@  from the

-- [foldl](https://hackage.haskell.org/package/foldl-1.4.12/docs/Control-Foldl.html#t:FoldM)

-- library, but \"restartable\" with a list of starting states.

--

-- For converting one into the other, this function should do the trick:

--

-- > \(L.FoldM step allocator coda) -> combiners step coda (Prelude.repeat allocator)

data Combiners a b where 
    Combiners :: (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b

deriving stock instance Functor (Combiners a)

-- | Constructor for 'Combiners' values.

combiners :: forall s a b r -- ^ foo

     . (s -> a -> IO s) -- ^ Step function that threads the state @s@.

    -> (s -> IO b) -- ^ Coda invoked when a group closes.

    -> [IO s] -- ^ Actions that produce the initial states @s@ for processing each group.

    -> Combiners a b
combiners :: forall s a b r.
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
combiners = (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
forall s a b.
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
Combiners

-- | A simpler version of 'withCombiners' that doen't thread a state; it merely

-- allocates and deallocates the resource @h@.

withCombiners_ :: forall h a r 
     . (h -> a -> IO ()) -- ^ Step function that accesses the resource @h@.

    -> (h -> IO ()) -- ^ Finalizer to run after closing each group, and also in the case of an exception. 

    -> [IO h] -- ^ Actions that allocate a sequence of resources @h@.

    -> (Combiners a () -> IO r) -- ^ The 'Combiners' value should be consumed linearly.

    -> IO r 
withCombiners_ :: forall h a r.
(h -> a -> IO ())
-> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
withCombiners_ h -> a -> IO ()
step h -> IO ()
finalize [IO h]
allocators = do
    (h -> () -> a -> IO ())
-> (h -> () -> IO ())
-> (h -> IO ())
-> [(IO h, h -> IO ())]
-> (Combiners a () -> IO r)
-> IO r
forall h s a b r.
(h -> s -> a -> IO s)
-> (h -> s -> IO b)
-> (h -> IO ())
-> [(IO h, h -> IO s)]
-> (Combiners a b -> IO r)
-> IO r
withCombiners 
        (\h
h () a
a -> h -> a -> IO ()
step h
h a
a)
        (\h
_ () -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
        h -> IO ()
finalize
        (do IO h
allocator <- [IO h]
allocators
            pure (IO h
allocator, \h
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))

-- | 'Combiners' thread a state @s@ while processing each group. Sometimes, in

-- addition to that, we want to allocate a resource @h@ when we start

-- processing a group, and deallocate it after we finish processing the group

-- or an exception is thrown. The typical example is allocating a 'Handle' for

-- writing the elements of the group as they arrive.

withCombiners 
    :: forall h s a b r .
       (h -> s -> a -> IO s) -- ^ Step function that accesses the resource @h@ and threads the state @s@.

    -> (h -> s -> IO b) -- ^ Coda invoked when a group closes.

    -> (h -> IO ()) -- ^ Finalizer to run after each coda, and also in the case of an exception. 

    -> [(IO h, h -> IO s)] -- ^ Actions that allocate a sequence of resources @h@ and produce initial states @s@ for processing each group.

    -> (Combiners a b -> IO r) -- ^ The 'Combiners' value should be consumed linearly.

    -> IO r 
withCombiners :: forall h s a b r.
(h -> s -> a -> IO s)
-> (h -> s -> IO b)
-> (h -> IO ())
-> [(IO h, h -> IO s)]
-> (Combiners a b -> IO r)
-> IO r
withCombiners h -> s -> a -> IO s
step h -> s -> IO b
coda h -> IO ()
finalize [(IO h, h -> IO s)]
allocators Combiners a b -> IO r
continuation = do
    MVar h
resourceRef <- forall a. IO (MVar a)
newEmptyMVar @h
    let  
        step' :: Pair h s -> a -> IO (Pair h s)
step' (Pair h
h s
s) a
a = do
            s
s' <- h -> s -> a -> IO s
step h
h s
s a
a
            pure (h -> s -> Pair h s
forall a b. a -> b -> Pair a b
Pair h
h s
s')
        tryFinalize :: IO ()
tryFinalize = do
            MVar h -> IO (Maybe h)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar h
resourceRef IO (Maybe h) -> (Maybe h -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Maybe h
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Just h
resource -> h -> IO ()
finalize h
resource
        adaptAllocator :: (IO h, h -> IO s) -> IO (Pair h s)
        adaptAllocator :: (IO h, h -> IO s) -> IO (Pair h s)
adaptAllocator (IO h
allocate, h -> IO s
makeInitialState) = do
            h
h <- IO h -> IO h
forall x. IO x -> IO x
mask_ do
                h
h <- IO h
allocate
                MVar h -> h -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar h
resourceRef h
h
                pure h
h
            s
s <- h -> IO s
makeInitialState h
h 
            pure (h -> s -> Pair h s
forall a b. a -> b -> Pair a b
Pair h
h s
s)
        coda' :: Pair h s -> IO b
        coda' :: Pair h s -> IO b
coda' (Pair h
h s
s) = do
            b
b <- h -> s -> IO b
coda h
h s
s
            -- this always succeeds, we store the resource at the beginning!

            IO () -> IO ()
forall x. IO x -> IO x
mask_ IO ()
tryFinalize
            pure b
b
    r
r <- (Combiners a b -> IO r
continuation ((Pair h s -> a -> IO (Pair h s))
-> (Pair h s -> IO b) -> [IO (Pair h s)] -> Combiners a b
forall s a b r.
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
combiners Pair h s -> a -> IO (Pair h s)
step' Pair h s -> IO b
coda' ((IO h, h -> IO s) -> IO (Pair h s)
adaptAllocator ((IO h, h -> IO s) -> IO (Pair h s))
-> [(IO h, h -> IO s)] -> [IO (Pair h s)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(IO h, h -> IO s)]
allocators)))
         IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`Control.Exception.finally`
         IO ()
tryFinalize
    pure r
r

-- | Puts the elements of each group into a list that is kept in memory. This breaks streaming within the group.

--

-- Useful with 'recast'.

combineIntoLists :: Combiners a [a]
combineIntoLists :: forall a. Combiners a [a]
combineIntoLists = (DList a -> a -> IO (DList a))
-> (DList a -> IO [a]) -> [IO (DList a)] -> Combiners a [a]
forall s a b r.
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
combiners
    (\DList a
s a
a -> DList a -> IO (DList a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList a
s DList a -> DList a -> DList a
forall a. Semigroup a => a -> a -> a
<> a -> DList a
forall a. a -> DList a
singleton a
a))
    ([a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO [a]) -> (DList a -> [a]) -> DList a -> IO [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DList a -> [a]
forall a. DList a -> [a]
closeDList)
    (IO (DList a) -> [IO (DList a)]
forall a. a -> [a]
Prelude.repeat (DList a -> IO (DList a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure DList a
forall a. Monoid a => a
mempty))

-- | Delimits groups in the values yielded by a 'Jet', and can also transform

-- those values.

type Splitter a b = MealyIO a (SplitStepResult b)

-- | A [Mealy machine](https://en.wikipedia.org/wiki/Mealy_machine) with an

-- existentially hidden state.  

--

-- Very much like a @FoldM IO@  from the

-- [foldl](https://hackage.haskell.org/package/foldl-1.4.12/docs/Control-Foldl.html#t:FoldM)

-- library, but it emits an output at each step, not only at the end.

data MealyIO a b where
    MealyIO :: (s -> a -> IO (b,s)) -- ^ The step function which threads the state.

            -> (s -> IO b) -- ^ The final output, produced from the final state.

            -> IO s -- ^ An action that produces the initial state.

            -> MealyIO a b

deriving stock instance Functor (MealyIO a)

-- | For each value coming from upstream, what has the 'Splitter' learned?

--

-- * Perhaps we should continue some group we have already started in a previous step.

--

-- * Perhaps we have found entire groups that we should emit in one go, groups we know are already complete.

--

-- * Perhaps we should start a new group that will continue in the next steps. 

data SplitStepResult b = SplitStepResult {
     -- | The continued group will be \"closed"\ if in the current step we emit

     -- an entire group or we begin a new group.

     --

     -- __INVARIANT__: we should only continue a group if we have already

     -- opened a \"new one\" with one or more elements in an earlier step.

     forall b. SplitStepResult b -> [b]
continuationOfPreviouslyStartedGroup :: [b],
     -- | It's ok if the groups we find are empty.

     forall b. SplitStepResult b -> [[b]]
entireGroups :: [[b]],
     -- | __INVARIANT__: when we are in the final step, we should not yield elements

     -- for the beginning of a new one.

     forall b. SplitStepResult b -> [b]
startOfNewGroup :: [b]
  }
  deriving ((forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b)
-> (forall a b. a -> SplitStepResult b -> SplitStepResult a)
-> Functor SplitStepResult
forall a b. a -> SplitStepResult b -> SplitStepResult a
forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> SplitStepResult b -> SplitStepResult a
$c<$ :: forall a b. a -> SplitStepResult b -> SplitStepResult a
fmap :: forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b
$cfmap :: forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b
Functor, Int -> SplitStepResult b -> ShowS
[SplitStepResult b] -> ShowS
SplitStepResult b -> String
(Int -> SplitStepResult b -> ShowS)
-> (SplitStepResult b -> String)
-> ([SplitStepResult b] -> ShowS)
-> Show (SplitStepResult b)
forall b. Show b => Int -> SplitStepResult b -> ShowS
forall b. Show b => [SplitStepResult b] -> ShowS
forall b. Show b => SplitStepResult b -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SplitStepResult b] -> ShowS
$cshowList :: forall b. Show b => [SplitStepResult b] -> ShowS
show :: SplitStepResult b -> String
$cshow :: forall b. Show b => SplitStepResult b -> String
showsPrec :: Int -> SplitStepResult b -> ShowS
$cshowsPrec :: forall b. Show b => Int -> SplitStepResult b -> ShowS
Show)

instance Semigroup (SplitStepResult b) where
    SplitStepResult [b]
c1 [[b]]
e1 [b]
b1 <> :: SplitStepResult b -> SplitStepResult b -> SplitStepResult b
<> SplitStepResult [b]
c2 [[b]]
e2 [b]
b2 = 
        [b] -> [[b]] -> [b] -> SplitStepResult b
forall b. [b] -> [[b]] -> [b] -> SplitStepResult b
SplitStepResult ([b]
c1 [b] -> [b] -> [b]
forall a. Semigroup a => a -> a -> a
<> [b]
c2) ([[b]]
e1 [[b]] -> [[b]] -> [[b]]
forall a. Semigroup a => a -> a -> a
<> [[b]]
e2) ([b]
b1 [b] -> [b] -> [b]
forall a. Semigroup a => a -> a -> a
<> [b]
b2)

instance Monoid (SplitStepResult b) where
    mempty :: SplitStepResult b
mempty = [b] -> [[b]] -> [b] -> SplitStepResult b
forall b. [b] -> [[b]] -> [b] -> SplitStepResult b
SplitStepResult [] [] []

-- TODO: bring back some linear stuff? Perhaps adding a linearFmap ?

--