{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ViewPatterns #-}
{-# OPTIONS_GHC -Wno-partial-type-signatures #-}

-- | Tampering with the internals lets you write invalid 'Jet's that don't
-- respect stop signals from consumers, so be careful.
--
-- Also, the internals expose 'Line' and 'ByteBundle' as thin coats of paint
-- over lazy text and lazy bytestring, respectively.
module Jet.Internal where

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Bifunctor
import Data.Bifunctor (first)
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Lazy qualified as BL
import Data.Foldable qualified
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.IORef
import Data.List qualified
import Data.Maybe
import Data.String (IsString (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding qualified as T
import Data.Text.Encoding.Error qualified as T
import Data.Text.IO qualified as T
import Data.Text.Lazy qualified as TL
import Data.Text.Lazy.Encoding qualified as TL
import Data.Traversable qualified
import Data.Typeable
import System.Exit
import System.IO (Handle, IOMode (..), hClose, openBinaryFile)
import System.IO qualified
import System.Process
import Prelude hiding
  ( drop,
    dropWhile,
    filter,
    filterM,
    fold,
    for_,
    intersperse,
    lines,
    take,
    takeWhile,
    traverse_,
    unfold,
    unlines,
    zip,
    zipWith,
  )
import Prelude qualified

-- import Debug.Trace

-- $setup
--
-- >>> :set -XTypeApplications
-- >>> :set -XImportQualifiedPost
-- >>> :set -XScopedTypeVariables
-- >>> :set -XLambdaCase
-- >>> :set -XNumDecimals
-- >>> import Jet (Jet, (&))
-- >>> import Jet qualified as J
-- >>> import Control.Foldl qualified as L
-- >>> import Control.Concurrent
-- >>> import Data.IORef
-- >>> import Data.Text qualified as T

-- | A 'Jet' is a sequence of values produced through 'IO' effects.
--
-- It allows consuming the elements as they are produced and doesn't force them
-- to be present in memory all at the same time, unlike functions like
-- 'Control.Monad.replicateM' from @base@.
newtype Jet a = Jet
  { forall a.
Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
runJet :: forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
  }

-- | Maps over the yielded elements. '(<&>)' can be used to put the function last.
--
-- >>> J.each "aa" <&> succ & J.toList
-- "bb"
deriving stock instance Functor Jet

-- | Go through the elements produced by a 'Jet', while threading an
-- state @s@ and possibly performing some effect.
--
-- The caller is the one who chooses the type of the state @s@, and must pass
-- an initial value for it. The state is kept in [weak-head normal form](https://en.wikibooks.org/wiki/Haskell/Graph_reduction#Weak_Head_Normal_Form).
--
-- The caller must also provide a predicate on the state that informs the `Jet`
-- when to stop producing values: whenever the predicate returns
-- @True@.
run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run Jet a
j = Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall a.
Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
runJet Jet a
j

-- | Like 'run', but always goes through all elements produced by the 'Jet'.
--
-- Equivalent to @run (const False)@.
consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
consume Jet a
j = Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run Jet a
j (Bool -> s -> Bool
forall a b. a -> b -> a
const Bool
False)

for :: Jet a -> (a -> IO b) -> Jet b
for :: forall a b. Jet a -> (a -> IO b) -> Jet b
for Jet a
j a -> IO b
k = (() -> a -> IO b) -> [IO ()] -> Jet a -> Jet b
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO (\() -> a -> IO b
k) (IO () -> [IO ()]
forall a. a -> [a]
Prelude.repeat (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())) Jet a
j

for_ :: Jet a -> (a -> IO b) -> IO ()
for_ :: forall a b. Jet a -> (a -> IO b) -> IO ()
for_ Jet a
j a -> IO b
k = Jet a -> (() -> a -> IO ()) -> () -> IO ()
forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s
consume Jet a
j (\() -> IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> IO b
k) ()

-- | Apply an effectful transformation to each element in a 'Jet'.
--
-- >>> :{
-- J.each "abc"
-- & J.traverse (\c -> let c' = succ c in putStrLn ([c] ++ " -> " ++ [c']) *> pure c')
-- & J.toList
-- :}
-- a -> b
-- b -> c
-- c -> d
-- "bcd"
traverse :: (a -> IO b) -> Jet a -> Jet b
traverse :: forall a b. (a -> IO b) -> Jet a -> Jet b
traverse = (Jet a -> (a -> IO b) -> Jet b) -> (a -> IO b) -> Jet a -> Jet b
forall a b c. (a -> b -> c) -> b -> a -> c
flip Jet a -> (a -> IO b) -> Jet b
forall a b. Jet a -> (a -> IO b) -> Jet b
for

traverse_ :: (a -> IO b) -> Sink a
traverse_ :: forall a b. (a -> IO b) -> Sink a
traverse_ = (Jet a -> (a -> IO b) -> IO ()) -> (a -> IO b) -> Jet a -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Jet a -> (a -> IO b) -> IO ()
forall a b. Jet a -> (a -> IO b) -> IO ()
for_

-- | Go through the 'Jet' only for the 'IO' effects, discarding all yielded elements.
drain :: Sink a
drain :: forall a. Sink a
drain = (a -> IO a) -> Sink a
forall a b. (a -> IO b) -> Sink a
traverse_ a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

-- | Similar to the instance for pure lists, that generates combinations.
--
-- >>> (,) <$> J.each "ab" <*> J.each "cd" & J.toList
-- [('a','c'),('a','d'),('b','c'),('b','d')]
instance Applicative Jet where
  pure :: forall a. a -> Jet a
pure a
i = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial ->
    if
      | s -> Bool
stop s
initial -> s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
      | Bool
otherwise -> s -> a -> IO s
step s
initial a
i
  Jet forall s. (s -> Bool) -> (s -> (a -> b) -> IO s) -> s -> IO s
left <*> :: forall a b. Jet (a -> b) -> Jet a -> Jet b
<*> Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
right = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial ->
    -- Here we assume that the first Jet correctly handles the stop signal.
    let step' :: (t -> b) -> s -> t -> IO s
step' t -> b
f s
s t
a = s -> b -> IO s
step s
s (t -> b
f t
a)
     in (s -> Bool) -> (s -> (a -> b) -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> (a -> b) -> IO s) -> s -> IO s
left s -> Bool
stop (\s
s a -> b
f -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
right s -> Bool
stop ((a -> b) -> s -> a -> IO s
forall {t}. (t -> b) -> s -> t -> IO s
step' a -> b
f) s
s) s
initial

-- | Similar to the instance for pure lists, that does search.
--
-- >>> :{
-- do string <- J.each ["ab","cd"]
--    J.each string
-- &
-- J.toList
-- :}
-- "abcd"
instance Monad Jet where
  return :: forall a. a -> Jet a
return = a -> Jet a
forall a. a -> Jet a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
m >>= :: forall a b. Jet a -> (a -> Jet b) -> Jet b
>>= a -> Jet b
k = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial ->
    (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
m s -> Bool
stop (\s
s a
a -> Jet b -> forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
forall a.
Jet a -> forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
runJet (a -> Jet b
k a
a) s -> Bool
stop s -> b -> IO s
step s
s) s
initial

-- |
-- >>> liftIO (putStrLn "foo") <> liftIO (putStrLn "bar") & J.toList
-- foo
-- bar
-- [(),()]
instance MonadIO Jet where
  liftIO :: forall a. IO a -> Jet a
liftIO IO a
action = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial ->
    if
      | s -> Bool
stop s
initial -> s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
      | Bool
otherwise -> do
          a <- IO a
action
          step initial a

-- | 'Jet' concatenation.
--
-- >>> J.each "ab" <> J.each "cd" & J.toList
-- "abcd"
instance Semigroup (Jet a) where
  Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f1 <> :: Jet a -> Jet a -> Jet a
<> Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f2 = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
s0 -> do
    -- perhaps some of the stop checks are redundant, the first one in particular?
    if
      | s -> Bool
stop s
s0 ->
          s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s0
      | Bool
otherwise -> do
          !s1 <- (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f1 s -> Bool
stop s -> a -> IO s
step s
s0
          if
            | stop s1 ->
                pure s1
            | otherwise -> do
                !s2 <- f2 stop step s1
                pure s2

-- | 'mempty' is the empty 'Jet'.
--
-- >>> mempty <> J.each "ab" <> mempty & J.toList
-- "ab"
instance Monoid (Jet a) where
  mempty :: Jet a
mempty = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
_ s -> a -> IO s
_ s
initial -> s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial

-- | Same as 'Monoid'.
instance Alternative Jet where
  <|> :: forall a. Jet a -> Jet a -> Jet a
(<|>) = Jet a -> Jet a -> Jet a
forall a. Semigroup a => a -> a -> a
(<>)
  empty :: forall a. Jet a
empty = Jet a
forall a. Monoid a => a
mempty

-- | Same as 'Monoid'
instance MonadPlus Jet where
  mzero :: forall a. Jet a
mzero = Jet a
forall a. Monoid a => a
mempty
  mplus :: forall a. Jet a -> Jet a -> Jet a
mplus = Jet a -> Jet a -> Jet a
forall a. Semigroup a => a -> a -> a
(<>)

-- | A failed pattern-match in a do-block produces 'mzero'.
--
-- >>> :{
-- do Just c <- J.each [Nothing, Just 'a', Nothing, Just 'b']
--    pure c
-- & J.toList
-- :}
-- "ab"
instance MonadFail Jet where
  fail :: forall a. String -> Jet a
fail String
_ = Jet a
forall a. Jet a
forall (m :: * -> *) a. MonadPlus m => m a
mzero

-- | Build a 'Jet' from any 'Foldable' container
--
-- >>> J.each [True,False] & J.toList
-- [True,False]
each :: forall a f. (Foldable f) => f a -> Jet a
each :: forall a (f :: * -> *). Foldable f => f a -> Jet a
each (f a -> [a]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList -> [a]
seed) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step ->
  -- This could be done with Jet.unfold, but let's leave as it is.
  let go :: [a] -> s -> IO s
go [a]
b s
s =
        if
          | s -> Bool
stop s
s ->
              s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
          | Bool
otherwise ->
              case [a]
b of
                [] ->
                  s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                -- see corresponding comment in unfold.
                a
x : [a]
xs -> do
                  !s' <- s -> a -> IO s
step s
s a
x
                  go xs s'
   in [a] -> s -> IO s
go [a]
seed

-- |
--
-- >>> J.repeat True & J.take 2 & J.toList
-- [True,True]
repeat :: a -> Jet a
repeat :: forall a. a -> Jet a
repeat a
a = IO a -> Jet a
forall a. IO a -> Jet a
repeatIO (a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a)

-- |
--
-- >>> J.repeatIO (putStrLn "hi" *> pure True) & J.take 2 & J.toList
-- hi
-- hi
-- [True,True]
repeatIO :: IO a -> Jet a
repeatIO :: forall a. IO a -> Jet a
repeatIO IO a
action = IO (Maybe a) -> Jet a
forall a. IO (Maybe a) -> Jet a
untilNothing ((a -> Maybe a) -> IO a -> IO (Maybe a)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just IO a
action)

-- |
--
-- >>> J.replicate 2 True & J.toList
-- [True,True]
replicate :: Int -> a -> Jet a
replicate :: forall a. Int -> a -> Jet a
replicate Int
n a
a = Int -> IO a -> Jet a
forall a. Int -> IO a -> Jet a
replicateIO Int
n (a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a)

-- |
-- >>> J.replicateIO 2 (putStrLn "hi" *> pure True) & J.toList
-- hi
-- hi
-- [True,True]
--
-- Don't confuse this with @Control.Monad.replicateM :: Int -> Jet a -> Jet [a]@ which has a combinatorial behavior.
replicateIO :: Int -> IO a -> Jet a
replicateIO :: forall a. Int -> IO a -> Jet a
replicateIO Int
n IO a
ioa = Int -> Jet a -> Jet a
forall a. Int -> Jet a -> Jet a
take Int
n (IO a -> Jet a
forall a. IO a -> Jet a
repeatIO IO a
ioa)

-- |
--
-- >>> J.iterate succ (1 :: Int) & J.take 2 & J.toList
-- [1,2]
iterate :: (a -> a) -> a -> Jet a
iterate :: forall a. (a -> a) -> a -> Jet a
iterate a -> a
h = (a -> IO a) -> a -> Jet a
forall a. (a -> IO a) -> a -> Jet a
iterateIO ((a -> IO a) -> (a -> a) -> a -> IO a
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> a
h)

-- |
--
-- >>> J.iterateIO (\x -> putStrLn "hi" *> pure (succ x)) (1 :: Int) & J.take 2 & J.toList
-- hi
-- [1,2]
iterateIO :: (a -> IO a) -> a -> Jet a
iterateIO :: forall a. (a -> IO a) -> a -> Jet a
iterateIO a -> IO a
h a
a = a -> Jet a
forall a. a -> Jet a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a Jet a -> Jet a -> Jet a
forall a. Semigroup a => a -> a -> a
<> (a -> IO (Maybe (a, a))) -> a -> Jet a
forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO ((IO a -> IO (Maybe (a, a)))
-> (a -> IO a) -> a -> IO (Maybe (a, a))
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> Maybe (a, a)) -> IO a -> IO (Maybe (a, a))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\a
x -> (a, a) -> Maybe (a, a)
forall a. a -> Maybe a
Just (a
x, a
x))) a -> IO a
h) a
a

-- |
-- >>> J.unfold (\case [] -> Nothing ; c : cs -> Just (c,cs)) "abc" & J.toList
-- "abc"
unfold :: (b -> Maybe (a, b)) -> b -> Jet a
unfold :: forall b a. (b -> Maybe (a, b)) -> b -> Jet a
unfold b -> Maybe (a, b)
h = (b -> IO (Maybe (a, b))) -> b -> Jet a
forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO ((Maybe (a, b) -> IO (Maybe (a, b)))
-> (b -> Maybe (a, b)) -> b -> IO (Maybe (a, b))
forall a b. (a -> b) -> (b -> a) -> b -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (a, b) -> IO (Maybe (a, b))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b -> Maybe (a, b)
h)

-- |
-- >>> :{
-- J.unfoldIO (\x -> do putStrLn "hi"
--                      pure $ case x of
--                         [] -> Nothing
--                         c : cs -> Just (c,cs))
--            "abc"
-- & J.toList
-- :}
-- hi
-- hi
-- hi
-- hi
-- "abc"
unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO :: forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO b -> IO (Maybe (a, b))
h b
seed = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step ->
  let go :: b -> s -> IO s
go b
b s
s =
        if
          | s -> Bool
stop s
s ->
              s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
          | Bool
otherwise -> do
              next <- b -> IO (Maybe (a, b))
h b
b
              case next of
                Maybe (a, b)
Nothing ->
                  s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                -- strictness only on the states. Good idea, or bad?
                Just (a
a, !b
b') -> do
                  !s' <- s -> a -> IO s
step s
s a
a
                  go b' s'
   in b -> s -> IO s
go b
seed

-- |
-- >>> j = J.untilEOF System.IO.hIsEOF System.IO.hGetLine :: Handle -> Jet String
untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF :: forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF handle -> IO Bool
hIsEOF' handle -> IO a
hGetLine' handle
handle = IO (Maybe a) -> Jet a
forall a. IO (Maybe a) -> Jet a
untilNothing do
  eof <- handle -> IO Bool
hIsEOF' handle
handle
  if
    | eof ->
        pure Nothing
    | otherwise ->
        Just <$> hGetLine' handle

-- |
--
-- >>> :{
-- do ref <- newIORef "abc"
--    let pop = atomicModifyIORef ref (\case [] -> ([], Nothing)
--                                           x : xs -> (xs, Just x))
--    J.untilNothing pop & J.toList
-- :}
-- "abc"
untilNothing :: IO (Maybe a) -> Jet a
untilNothing :: forall a. IO (Maybe a) -> Jet a
untilNothing IO (Maybe a)
action = (() -> IO (Maybe (a, ()))) -> () -> Jet a
forall b a. (b -> IO (Maybe (a, b))) -> b -> Jet a
unfoldIO (\() -> (Maybe a -> Maybe (a, ())) -> IO (Maybe a) -> IO (Maybe (a, ()))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> (a, ())) -> Maybe a -> Maybe (a, ())
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,())) IO (Maybe a)
action) ()

-- | Convert to a regular list. This breaks streaming.
--
-- >>> J.each "abc" & J.toList
-- "abc"
--
-- Alternatively, we can use 'fold' in combination with 'Control.Foldl.list' form the [foldl](https://hackage.haskell.org/package/foldl) library:
--
-- >>> L.purely (J.fold (J.each "abc")) L.list
-- "abc"
--
-- which is more verbose, but more composable.
toList :: Jet a -> IO [a]
toList :: forall a. Jet a -> IO [a]
toList (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = do
  as <- ([a] -> Bool) -> ([a] -> a -> IO [a]) -> [a] -> IO [a]
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> [a] -> Bool
forall a b. a -> b -> a
const Bool
False) (\[a]
xs a
x -> [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs)) []
  pure (reverse as)

-- | Returns the number of elements yielded by the 'Jet', exhausting it in the process.
--
-- >>> J.each "abc" & J.length
-- 3
--
-- Alternatively, we can use 'fold' in combination with 'Control.Foldl.length' form the [foldl](https://hackage.haskell.org/package/foldl) library:
--
-- >>> L.purely (J.fold (J.each "abc")) L.length
-- 3
--
-- which is more verbose, but more composable.
length :: Jet a -> IO Int
length :: forall a. Jet a -> IO Int
length (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = do
  l <- (Int -> Bool) -> (Int -> a -> IO Int) -> Int -> IO Int
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> Int -> Bool
forall a b. a -> b -> a
const Bool
False) (\Int
s a
_ -> Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> Int
forall a. Enum a => a -> a
succ Int
s)) Int
0
  pure l

data Pair a b = Pair !a !b deriving (Int -> Pair a b -> ShowS
[Pair a b] -> ShowS
Pair a b -> String
(Int -> Pair a b -> ShowS)
-> (Pair a b -> String) -> ([Pair a b] -> ShowS) -> Show (Pair a b)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall a b. (Show a, Show b) => Int -> Pair a b -> ShowS
forall a b. (Show a, Show b) => [Pair a b] -> ShowS
forall a b. (Show a, Show b) => Pair a b -> String
$cshowsPrec :: forall a b. (Show a, Show b) => Int -> Pair a b -> ShowS
showsPrec :: Int -> Pair a b -> ShowS
$cshow :: forall a b. (Show a, Show b) => Pair a b -> String
show :: Pair a b -> String
$cshowList :: forall a b. (Show a, Show b) => [Pair a b] -> ShowS
showList :: [Pair a b] -> ShowS
Show)

pairExtract :: Pair a b -> b
pairExtract (Pair a
_ b
b) = b
b

pairEnv :: Pair a b -> a
pairEnv (Pair a
a b
_) = a
a

data Triple a b c = Triple !a !b !c

tripleExtract :: Triple a b c -> c
tripleExtract (Triple a
_ b
_ c
c) = c
c

-- fromTuple :: (a, b) -> Pair a b
-- fromTuple (a, b) -> Pair a b

-- | >>> J.each "abc" & J.drop 2 & J.toList
-- "c"
drop :: Int -> Jet a -> Jet a
drop :: forall a. Int -> Jet a -> Jet a
drop Int
limit (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair Int s -> a -> IO (Pair Int s)
step' (Pair Int
count s
s) a
a =
        if
          | Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
limit -> do
              Pair Int s -> IO (Pair Int s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair (Int -> Int
forall a. Enum a => a -> a
succ Int
count) s
s)
          | Bool
otherwise -> do
              !s' <- s -> a -> IO s
step s
s a
a
              pure (Pair count s')
      initial' :: Pair Int s
initial' = Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair Int
0 s
initial
  Pair _ final <- (Pair Int s -> Bool)
-> (Pair Int s -> a -> IO (Pair Int s))
-> Pair Int s
-> IO (Pair Int s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair Int s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair Int s -> a -> IO (Pair Int s)
step' Pair Int s
initial'
  pure final

data DropState = StillDropping | DroppingNoMore

-- | >>> J.each [1..5] & J.dropWhile (<3) & J.toList
-- [3,4,5]
dropWhile :: (a -> Bool) -> Jet a -> Jet a
dropWhile :: forall a. (a -> Bool) -> Jet a -> Jet a
dropWhile a -> Bool
p = (a -> IO Bool) -> Jet a -> Jet a
forall a. (a -> IO Bool) -> Jet a -> Jet a
dropWhileIO ((Bool -> IO Bool) -> (a -> Bool) -> a -> IO Bool
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> Bool
p)

dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
dropWhileIO :: forall a. (a -> IO Bool) -> Jet a -> Jet a
dropWhileIO a -> IO Bool
p (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair DropState s -> a -> IO (Pair DropState s)
step' (Pair DropState
DroppingNoMore s
s) a
a = do
        !s' <- s -> a -> IO s
step s
s a
a
        pure (Pair DroppingNoMore s')
      step' (Pair DropState
StillDropping s
s) a
a = do
        keepDropping <- a -> IO Bool
p a
a
        if
          | keepDropping ->
              pure (Pair StillDropping s)
          | otherwise -> do
              !s' <- step s a
              pure (Pair DroppingNoMore s')
      initial' :: Pair DropState s
initial' = (DropState -> s -> Pair DropState s
forall a b. a -> b -> Pair a b
Pair DropState
StillDropping s
initial)
  Pair _ final <- (Pair DropState s -> Bool)
-> (Pair DropState s -> a -> IO (Pair DropState s))
-> Pair DropState s
-> IO (Pair DropState s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair DropState s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair DropState s -> a -> IO (Pair DropState s)
step' Pair DropState s
initial'
  pure final

-- | >>> J.each "abc" & J.take 2 & J.toList
-- "ab"
take :: Int -> Jet a -> Jet a
take :: forall a. Int -> Jet a -> Jet a
take Int
limit (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair Int s -> Bool
stop' (Pair Int
count s
s) =
        Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
limit Bool -> Bool -> Bool
|| s -> Bool
stop s
s
      step' :: Pair a s -> a -> IO (Pair a s)
step' (Pair a
count s
s) a
a = do
        !s' <- s -> a -> IO s
step s
s a
a
        pure (Pair (succ count) s')
      initial' :: Pair Int s
initial' = Int -> s -> Pair Int s
forall a b. a -> b -> Pair a b
Pair Int
0 s
initial
  Pair _ final <- (Pair Int s -> Bool)
-> (Pair Int s -> a -> IO (Pair Int s))
-> Pair Int s
-> IO (Pair Int s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair Int s -> Bool
stop' Pair Int s -> a -> IO (Pair Int s)
forall {a}. Enum a => Pair a s -> a -> IO (Pair a s)
step' Pair Int s
initial'
  pure final

-- | Synonym for 'take'.
limit :: Int -> Jet a -> Jet a
limit :: forall a. Int -> Jet a -> Jet a
limit = Int -> Jet a -> Jet a
forall a. Int -> Jet a -> Jet a
take

data TakeState = StillTaking | TakingNoMore

-- | >>> J.each [1..] & J.takeWhile (<5) & J.toList
-- [1,2,3,4]
takeWhile :: (a -> Bool) -> Jet a -> Jet a
takeWhile :: forall a. (a -> Bool) -> Jet a -> Jet a
takeWhile a -> Bool
p = (a -> IO Bool) -> Jet a -> Jet a
forall a. (a -> IO Bool) -> Jet a -> Jet a
takeWhileIO ((Bool -> IO Bool) -> (a -> Bool) -> a -> IO Bool
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> Bool
p)

takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a
takeWhileIO :: forall a. (a -> IO Bool) -> Jet a -> Jet a
takeWhileIO a -> IO Bool
p (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair TakeState s -> Bool
stop' (Pair TakeState
TakingNoMore s
_) =
        Bool
True
      stop' (Pair TakeState
StillTaking s
s) =
        s -> Bool
stop s
s
      step' :: Pair TakeState s -> a -> IO (Pair TakeState s)
step' (Pair TakeState
internal s
s) a
a = do
        keepTaking <- a -> IO Bool
p a
a
        if
          | keepTaking -> do
              !s' <- step s a
              pure (Pair internal s')
          | otherwise ->
              pure (Pair TakingNoMore s)
      initial' :: Pair TakeState s
initial' = TakeState -> s -> Pair TakeState s
forall a b. a -> b -> Pair a b
Pair TakeState
StillTaking s
initial
  Pair _ final <- (Pair TakeState s -> Bool)
-> (Pair TakeState s -> a -> IO (Pair TakeState s))
-> Pair TakeState s
-> IO (Pair TakeState s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f Pair TakeState s -> Bool
stop' Pair TakeState s -> a -> IO (Pair TakeState s)
step' Pair TakeState s
initial'
  pure final

-- |
-- >>> J.each "abc" & J.filter (=='a') & J.toList
-- "a"
filter :: (a -> Bool) -> Jet a -> Jet a
filter :: forall a. (a -> Bool) -> Jet a -> Jet a
filter a -> Bool
p = (a -> IO Bool) -> Jet a -> Jet a
forall a. (a -> IO Bool) -> Jet a -> Jet a
filterIO ((Bool -> IO Bool) -> (a -> Bool) -> a -> IO Bool
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> Bool
p)

filterIO :: (a -> IO Bool) -> Jet a -> Jet a
filterIO :: forall a. (a -> IO Bool) -> Jet a -> Jet a
filterIO a -> IO Bool
p (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let step' :: s -> a -> IO s
step' s
s a
a = do
        shouldPass <- a -> IO Bool
p a
a
        if
          | shouldPass -> do
              !s' <- step s a
              pure s'
          | otherwise ->
              pure s
  (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f s -> Bool
stop s -> a -> IO s
step' s
initial

-- | Behaves like a combination of 'fmap' and 'foldl'; it applies a function to
-- each element of a structure passing an accumulating parameter from left to right.
--
-- The resulting 'Jet' has the same number of elements as the original one.
--
-- Unlike 'Data.Traversable.mapAccumL', it doesn't make the final state available.
--
-- >>> J.each [1,2,3,4] & J.mapAccum (\a b -> (a + b,a)) 0 & J.toList
-- [0,1,3,6]
mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c
mapAccum :: forall a b c. (a -> b -> (a, c)) -> a -> Jet b -> Jet c
mapAccum a -> b -> (a, c)
stepAcc = (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
forall a b c. (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
mapAccumIO (((b -> (a, c)) -> b -> IO (a, c))
-> (a -> b -> (a, c)) -> a -> b -> IO (a, c)
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((a, c) -> IO (a, c)) -> (b -> (a, c)) -> b -> IO (a, c)
forall a b. (a -> b) -> (b -> a) -> b -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, c) -> IO (a, c)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) a -> b -> (a, c)
stepAcc)

mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
mapAccumIO :: forall a b c. (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c
mapAccumIO a -> b -> IO (a, c)
stepAcc a
initialAcc (Jet forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> c -> IO s) -> s -> IO s) -> Jet c
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> c -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair a s -> b -> IO (Pair a s)
step' (Pair a
acc s
s) b
b = do
        (acc', c) <- a -> b -> IO (a, c)
stepAcc a
acc b
b
        !s' <- step s c
        pure (Pair acc' s')
      initial' :: Pair a s
initial' = a -> s -> Pair a s
forall a b. a -> b -> Pair a b
Pair a
initialAcc s
initial
  Pair _ final <- (Pair a s -> Bool)
-> (Pair a s -> b -> IO (Pair a s)) -> Pair a s -> IO (Pair a s)
forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f Pair a s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair a s -> b -> IO (Pair a s)
step' Pair a s
initial'
  pure final

data Touched
  = NotYetTouched
  | AlreadyTouched

-- TODO: there's a bug here!!!!

-- |
-- >>> J.each "abc" & J.intersperse '-' & J.toList
-- "a-b-c"
intersperse :: a -> Jet a -> Jet a
intersperse :: forall a. a -> Jet a -> Jet a
intersperse a
intrusion (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream) = (forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> a -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair Touched s -> a -> IO (Pair Touched s)
step' (Pair Touched
AlreadyTouched s
s) a
a = do
        !s' <- s -> a -> IO s
step s
s a
intrusion
        if
          | stop s' ->
              pure (Pair AlreadyTouched s')
          | otherwise -> do
              !s'' <- step s' a
              pure (Pair AlreadyTouched s'')
      step' (Pair Touched
NotYetTouched s
s) a
a = do
        !s' <- s -> a -> IO s
step s
s a
a
        pure (Pair AlreadyTouched s')
      initial' :: Pair Touched s
initial' = Touched -> s -> Pair Touched s
forall a b. a -> b -> Pair a b
Pair Touched
NotYetTouched s
initial
  Pair _ final <- (Pair Touched s -> Bool)
-> (Pair Touched s -> a -> IO (Pair Touched s))
-> Pair Touched s
-> IO (Pair Touched s)
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream Pair Touched s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair Touched s -> a -> IO (Pair Touched s)
step' Pair Touched s
initial'
  pure final

-- |
-- >>> J.each "abc" & J.zip [1..] & J.toList
-- [(1,'a'),(2,'b'),(3,'c')]
--
-- >>> J.each [1..] & J.zip "abc" & J.toList
-- [('a',1),('b',2),('c',3)]
zip :: (Foldable f) => f a -> Jet b -> Jet (a, b)
zip :: forall (f :: * -> *) a b. Foldable f => f a -> Jet b -> Jet (a, b)
zip = (a -> b -> (a, b)) -> f a -> Jet b -> Jet (a, b)
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> c) -> f a -> Jet b -> Jet c
zipWith (,)

zipWith :: (Foldable f) => (a -> b -> c) -> f a -> Jet b -> Jet c
zipWith :: forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> c) -> f a -> Jet b -> Jet c
zipWith a -> b -> c
zf (f a -> [a]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList -> [a]
as0) = (a -> b -> IO c) -> [IO a] -> Jet b -> Jet c
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO (((b -> c) -> b -> IO c) -> (a -> b -> c) -> a -> b -> IO c
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> IO c) -> (b -> c) -> b -> IO c
forall a b. (a -> b) -> (b -> a) -> b -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap c -> IO c
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) a -> b -> c
zf) ((a -> IO a) -> [a] -> [IO a]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
as0)

zipIO :: (Foldable f) => f (IO a) -> Jet b -> Jet (a, b)
zipIO :: forall (f :: * -> *) a b.
Foldable f =>
f (IO a) -> Jet b -> Jet (a, b)
zipIO = (a -> b -> IO (a, b)) -> f (IO a) -> Jet b -> Jet (a, b)
forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO (\a
x b
y -> (a, b) -> IO (a, b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
x, b
y))

-- |
-- Zips a list of 'IO' actions with a 'Jet', where the combining function can also have effects.
--
-- If the list of actions is exhausted, the 'Jet' stops:
--
-- >>> J.each [1..] <&> show & zipWithIO (\c1 c2 -> putStrLn (c1 ++ c2)) [pure "a", pure "b"] & J.toList
-- a1
-- b2
-- [(),()]
zipWithIO :: (Foldable f) => (a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO :: forall (f :: * -> *) a b c.
Foldable f =>
(a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c
zipWithIO a -> b -> IO c
zf (f (IO a) -> [IO a]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList -> [IO a]
ioas0) (Jet forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> c -> IO s) -> s -> IO s) -> Jet c
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> c -> IO s
step s
initial -> do
  let stop' :: Pair [a] s -> Bool
stop' (Pair [] s
_) = Bool
True
      stop' (Pair [a]
_ s
s) = s -> Bool
stop s
s
      step' :: Pair [IO a] s -> b -> IO (Pair [IO a] s)
step' (Pair (IO a
ioa : [IO a]
ioas) s
s) b
b = do
        a <- IO a
ioa
        z <- zf a b
        !s' <- step s z
        pure (Pair ioas s')
      step' (Pair [] s
_) b
_ = String -> IO (Pair [IO a] s)
forall a. HasCallStack => String -> a
error String
"never happens"
      initial' :: Pair [IO a] s
initial' = [IO a] -> s -> Pair [IO a] s
forall a b. a -> b -> Pair a b
Pair [IO a]
ioas0 s
initial
  Pair _ final <- (Pair [IO a] s -> Bool)
-> (Pair [IO a] s -> b -> IO (Pair [IO a] s))
-> Pair [IO a] s
-> IO (Pair [IO a] s)
forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s
f Pair [IO a] s -> Bool
forall {a}. Pair [a] s -> Bool
stop' Pair [IO a] s -> b -> IO (Pair [IO a] s)
step' Pair [IO a] s
initial'
  pure final

-- | Opens a file and makes the 'Handle' available to all following statements
-- in the do-block.
--
-- Notice that it's often simpler to use the 'JetSource' (for reading) and
-- 'JetSink' (for writing) instances of 'File'.
withFile :: FilePath -> IOMode -> Jet Handle
withFile :: String -> IOMode -> Jet Handle
withFile String
path IOMode
iomode = forall resource.
(forall x. (resource -> IO x) -> IO x) -> Jet resource
control @Handle (String -> IOMode -> (Handle -> IO x) -> IO x
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
System.IO.withFile String
path IOMode
iomode)

-- |
--
-- >>> :{
-- do r <- J.bracket (putStrLn "allocating" *> pure "foo") (\r -> putStrLn $ "deallocating " ++ r)
--    liftIO $ putStrLn $ "using resource " ++ r
-- & drain
-- :}
-- allocating
-- using resource foo
-- deallocating foo
bracket ::
  forall a b.
  -- | allocator
  IO a ->
  -- | finalizer
  (a -> IO b) ->
  Jet a
bracket :: forall a b. IO a -> (a -> IO b) -> Jet a
bracket IO a
allocate a -> IO b
free = forall resource.
(forall x. (resource -> IO x) -> IO x) -> Jet resource
control @a (IO a -> (a -> IO b) -> (a -> IO x) -> IO x
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Control.Exception.bracket IO a
allocate a -> IO b
free)

bracket_ ::
  forall a b.
  -- | allocator
  IO a ->
  -- | finalizer
  IO b ->
  Jet ()
bracket_ :: forall a b. IO a -> IO b -> Jet ()
bracket_ IO a
allocate IO b
free = (forall x. IO x -> IO x) -> Jet ()
control_ (IO a -> IO b -> IO x -> IO x
forall a b c. IO a -> IO b -> IO c -> IO c
Control.Exception.bracket_ IO a
allocate IO b
free)

bracketOnError ::
  forall a b.
  -- | allocator
  IO a ->
  -- | finalizer
  (a -> IO b) ->
  Jet a
bracketOnError :: forall a b. IO a -> (a -> IO b) -> Jet a
bracketOnError IO a
allocate a -> IO b
free = forall resource.
(forall x. (resource -> IO x) -> IO x) -> Jet resource
control @a (IO a -> (a -> IO b) -> (a -> IO x) -> IO x
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
Control.Exception.bracketOnError IO a
allocate a -> IO b
free)

-- |
--
-- Notice how the finalizer runs even when we limit the 'Jet':
--
-- >>> :{
-- do J.finally (putStrLn "hi") -- protects statements below
--    liftIO (putStrLn "hey")
--    J.each "abc"
-- & J.limit 2
-- & J.toList
-- :}
-- hey
-- hi
-- "ab"
--
-- But if the protected 'Jet' is not consumed at all, the finalizer might not run.
--
-- >>> :{
-- do J.finally (putStrLn "hi") -- protects statements below
--    liftIO (putStrLn "hey")
--    J.each "abc"
-- & J.limit 0
-- & J.toList
-- :}
-- ""
finally :: IO a -> Jet ()
finally :: forall a. IO a -> Jet ()
finally IO a
afterward =
  (forall x. IO x -> IO x) -> Jet ()
control_ ((IO x -> IO a -> IO x) -> IO a -> IO x -> IO x
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO x -> IO a -> IO x
forall a b. IO a -> IO b -> IO a
Control.Exception.finally IO a
afterward)

onException :: IO a -> Jet ()
onException :: forall a. IO a -> Jet ()
onException IO a
afterward =
  (forall x. IO x -> IO x) -> Jet ()
control_ ((IO x -> IO a -> IO x) -> IO a -> IO x -> IO x
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO x -> IO a -> IO x
forall a b. IO a -> IO b -> IO a
Control.Exception.onException IO a
afterward)

-- | Lift a control operation (like 'Control.Exception.bracket') for which the
-- callback uses the allocated resource.
--
-- __BEWARE__: the control operation shouldn't do weird things like executing
-- the callback twice.
control :: forall resource. (forall x. (resource -> IO x) -> IO x) -> Jet resource
control :: forall resource.
(forall x. (resource -> IO x) -> IO x) -> Jet resource
control forall x. (resource -> IO x) -> IO x
f =
  (forall s. (s -> Bool) -> (s -> resource -> IO s) -> s -> IO s)
-> Jet resource
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> resource -> IO s
step s
initial ->
    if
      | s -> Bool
stop s
initial ->
          s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
      | Bool
otherwise -> do
          (resource -> IO s) -> IO s
forall x. (resource -> IO x) -> IO x
f (s -> resource -> IO s
step s
initial)

-- | Lift a control operation (like 'Control.Exception.finally') for which the
-- callback doesn't use the allocated resource.
--
-- __BEWARE__: the control operation shouldn't do weird things like executing
-- the callback twice.
control_ :: (forall x. IO x -> IO x) -> Jet ()
control_ :: (forall x. IO x -> IO x) -> Jet ()
control_ forall x. IO x -> IO x
f =
  (forall s. (s -> Bool) -> (s -> () -> IO s) -> s -> IO s) -> Jet ()
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> () -> IO s
step s
initial ->
    if
      | s -> Bool
stop s
initial -> do
          s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
      | Bool
otherwise -> do
          IO s -> IO s
forall x. IO x -> IO x
f (s -> () -> IO s
step s
initial ())

-- |
--
-- >>> L.purely (J.fold (J.each "abc")) ((,) <$> L.list <*> L.length)
-- ("abc",3)
fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r
fold :: forall a s r. Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r
fold (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) s -> a -> s
step s
initial s -> r
coda = do
  r <- (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f (Bool -> s -> Bool
forall a b. a -> b -> a
const Bool
False) (((a -> s) -> a -> IO s) -> (s -> a -> s) -> s -> a -> IO s
forall a b. (a -> b) -> (s -> a) -> s -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((s -> IO s) -> (a -> s) -> a -> IO s
forall a b. (a -> b) -> (a -> a) -> a -> b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) s -> a -> s
step) s
initial
  pure $ coda r

-- |
-- >>> L.impurely (J.foldIO (J.each "abc")) (L.FoldM (\() c -> putStrLn [c]) (pure ()) pure *> L.generalize L.length)
-- a
-- b
-- c
-- 3
foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r
foldIO :: forall a s r.
Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r
foldIO (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
f) s -> a -> IO s
step IO s
initialIO s -> IO r
coda = do
  initial <- IO s
initialIO
  r <- f (const False) step initial
  coda r

-- Byte Jets

-- https://stackoverflow.com/questions/49852060/how-to-choose-chunk-size-when-reading-a-large-file
-- https://askubuntu.com/questions/641900/how-file-system-block-size-works
-- https://stackoverflow.com/questions/1111661/8192-bytes-when-creating-file
data ChunkSize
  = DefaultChunkSize
  | ChunkSize Int
  | ChunkSize1K
  | ChunkSize4K
  | ChunkSize8K
  | ChunkSize16K
  | ChunkSize1M
  | ChunkSize2M
  deriving (Int -> ChunkSize -> ShowS
[ChunkSize] -> ShowS
ChunkSize -> String
(Int -> ChunkSize -> ShowS)
-> (ChunkSize -> String)
-> ([ChunkSize] -> ShowS)
-> Show ChunkSize
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChunkSize -> ShowS
showsPrec :: Int -> ChunkSize -> ShowS
$cshow :: ChunkSize -> String
show :: ChunkSize -> String
$cshowList :: [ChunkSize] -> ShowS
showList :: [ChunkSize] -> ShowS
Show)

chunkSize :: ChunkSize -> Int
chunkSize :: ChunkSize -> Int
chunkSize = \case
  ChunkSize
DefaultChunkSize -> Int
8192
  ChunkSize Int
c -> Int
c
  ChunkSize
ChunkSize1K -> Int
1024
  ChunkSize
ChunkSize4K -> Int
4096
  ChunkSize
ChunkSize8K -> Int
8192
  ChunkSize
ChunkSize16K -> Int
16384
  ChunkSize
ChunkSize1M -> Int
1048576
  ChunkSize
ChunkSize2M -> Int
2097152

-- | Helper multi-parameter typeclass for creating 'Jet' values out of a
--   variety of common sources.
--
--   Because there's no functional dependency, sometimes we need to use
--   @TypeApplications@ to give the compiler a hint about the type of elements
--   we want to produce. For example, here we want 'Line's and not, say,
--   'ByteString's:
--
-- >>> action = J.jet @Line (File "foo.txt") & J.sink J.stdout
class JetSource a source where
  jet :: source -> Jet a

bytes :: ChunkSize -> Handle -> Jet ByteString
bytes :: ChunkSize -> Handle -> Jet ByteString
bytes (ChunkSize -> Int
chunkSize -> Int
count) Handle
handle =
  (Handle -> IO Bool)
-> (Handle -> IO ByteString) -> Handle -> Jet ByteString
forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF Handle -> IO Bool
System.IO.hIsEOF ((Handle -> Int -> IO ByteString) -> Int -> Handle -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Handle -> Int -> IO ByteString
B.hGetSome Int
count) Handle
handle

instance JetSource ByteString Handle where
  jet :: Handle -> Jet ByteString
jet = ChunkSize -> Handle -> Jet ByteString
bytes ChunkSize
DefaultChunkSize

instance (JetSource a Handle) => JetSource a File where
  jet :: File -> Jet a
jet (File String
path) = do
    handle <- String -> IOMode -> Jet Handle
withFile String
path IOMode
ReadMode
    jet handle

accumByteLengths :: Jet ByteString -> Jet (Int, ByteString)
accumByteLengths :: Jet ByteString -> Jet (Int, ByteString)
accumByteLengths = (Int -> ByteString -> (Int, (Int, ByteString)))
-> Int -> Jet ByteString -> Jet (Int, ByteString)
forall a b c. (a -> b -> (a, c)) -> a -> Jet b -> Jet c
mapAccum (\Int
acc ByteString
bytes -> let acc' :: Int
acc' = Int
acc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
B.length ByteString
bytes in (Int
acc', (Int
acc', ByteString
bytes))) (Int
0 :: Int)

data AmIContinuing
  = Continuing
  | NotContinuing
  deriving (Int -> AmIContinuing -> ShowS
[AmIContinuing] -> ShowS
AmIContinuing -> String
(Int -> AmIContinuing -> ShowS)
-> (AmIContinuing -> String)
-> ([AmIContinuing] -> ShowS)
-> Show AmIContinuing
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AmIContinuing -> ShowS
showsPrec :: Int -> AmIContinuing -> ShowS
$cshow :: AmIContinuing -> String
show :: AmIContinuing -> String
$cshowList :: [AmIContinuing] -> ShowS
showList :: [AmIContinuing] -> ShowS
Show)

-- | Splits a stream of bytes into groups bounded by maximum byte sizes. When
-- one group \"fills up\", the next one is started.
--
-- When the list of buckets sizes is exhausted, all incoming bytes are put into
-- the same unbounded group.
--
-- Useful in combination with 'recast'.
bytesOverBuckets :: [Int] -> Splitter ByteString ByteString
bytesOverBuckets :: [Int] -> Splitter ByteString ByteString
bytesOverBuckets [Int]
buckets0 = (Pair AmIContinuing [Int]
 -> ByteString
 -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int]))
-> (Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString))
-> IO (Pair AmIContinuing [Int])
-> Splitter ByteString ByteString
forall s a b.
(s -> a -> IO (b, s)) -> (s -> IO b) -> IO s -> MealyIO a b
MealyIO Pair AmIContinuing [Int]
-> ByteString
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString)
forall a. Monoid a => a
mempty (Pair AmIContinuing [Int] -> IO (Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets0))
  where
    -- logStep s@(Pair c zzz) a = do
    --     putStrLn "foooo!"
    --     System.IO.hFlush System.IO.stdout
    --     traceIO ("state: " ++ show c)
    --     traceIO ("bucket: " ++ show (Prelude.take 2 zzz))
    --     traceIO ("input: " ++ show a)
    --     r@(nexts, _) <- step s a
    --     traceIO ("output: " ++ show nexts)
    --     pure r
    step :: Pair AmIContinuing [Int] -> ByteString -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
    step :: Pair AmIContinuing [Int]
-> ByteString
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step Pair AmIContinuing [Int]
splitterState ByteString
b = do
      (continueResult, Pair continuing' buckets', b') <- Pair AmIContinuing [Int]
-> ByteString
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
continue Pair AmIContinuing [Int]
splitterState ByteString
b
      if
        | B.null b' ->
            pure (continueResult, Pair continuing' buckets')
        | otherwise -> do
            (entiresResult, splitterState') <- makeEntires mempty b' buckets'
            pure (continueResult <> entiresResult, splitterState')
    continue :: Pair AmIContinuing [Int] -> ByteString -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
    continue :: Pair AmIContinuing [Int]
-> ByteString
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
continue (Pair AmIContinuing
NotContinuing []) ByteString
b = (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [], ByteString
B.empty)
    continue (Pair AmIContinuing
Continuing []) ByteString
b = (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing [], ByteString
B.empty)
    continue (Pair AmIContinuing
NotContinuing (Int
bucket : [Int]
buckets)) ByteString
b = do
      let blen :: Int
blen = ByteString -> Int
B.length ByteString
b
      -- traceIO ("b = " ++ show b ++ " bucket size= " ++ show bucket)
      (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
blen Int
bucket of
        Ordering
LT -> (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
blen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets), ByteString
B.empty)
        Ordering
EQ -> (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith (ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
b), AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
B.empty)
        Ordering
GT ->
          let (ByteString
left, ByteString
right) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
bucket ByteString
b
           in (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith (ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
left), AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
right)
    continue (Pair AmIContinuing
Continuing (Int
bucket : [Int]
buckets)) ByteString
b = do
      let blen :: Int
blen = ByteString -> Int
B.length ByteString
b
      (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
-> IO
     (SplitStepResult ByteString, Pair AmIContinuing [Int], ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
blen Int
bucket of
        Ordering
LT -> (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
blen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets), ByteString
B.empty)
        Ordering
EQ -> (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
B.empty)
        Ordering
GT ->
          let (ByteString
left, ByteString
right) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
bucket ByteString
b
           in (ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
continueWith ByteString
left, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets, ByteString
right)
    makeEntires :: DList ByteString -> ByteString -> [Int] -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
    makeEntires :: DList ByteString
-> ByteString
-> [Int]
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
makeEntires DList ByteString
acc ByteString
b [] = (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith DList ByteString
acc SplitStepResult ByteString
-> SplitStepResult ByteString -> SplitStepResult ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing [])
    makeEntires DList ByteString
acc ByteString
b (Int
bucket : [Int]
buckets) = do
      let blen :: Int
blen = ByteString -> Int
B.length ByteString
b
      case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
blen Int
bucket of
        Ordering
LT -> (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith DList ByteString
acc SplitStepResult ByteString
-> SplitStepResult ByteString -> SplitStepResult ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> SplitStepResult ByteString
forall {b}. b -> SplitStepResult b
nextWith ByteString
b, AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
blen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets))
        Ordering
EQ -> (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList ByteString -> SplitStepResult ByteString
forall {b}. DList b -> SplitStepResult b
entireWith (DList ByteString
acc DList ByteString -> DList ByteString -> DList ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
b), AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets)
        Ordering
GT -> do
          let (ByteString
left, ByteString
right) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
bucket ByteString
b
          DList ByteString
-> ByteString
-> [Int]
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
makeEntires (DList ByteString
acc DList ByteString -> DList ByteString -> DList ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> DList ByteString
forall a. a -> DList a
singleton ByteString
left) ByteString
right [Int]
buckets -- non-terminal
    continueWith :: b -> SplitStepResult b
continueWith b
b = SplitStepResult b
forall a. Monoid a => a
mempty {continuationOfPreviouslyStartedGroup = [b]}
    entireWith :: DList b -> SplitStepResult b
entireWith DList b
bdf = SplitStepResult b
forall a. Monoid a => a
mempty {entireGroups = fmap pure (closeDList bdf)}
    nextWith :: b -> SplitStepResult b
nextWith b
b = SplitStepResult b
forall a. Monoid a => a
mempty {startOfNewGroup = [b]}

-- | A sequence of bytes that we might want to keep together.
newtype ByteBundle = ByteBundle BL.ByteString deriving newtype (Int -> ByteBundle -> ShowS
[ByteBundle] -> ShowS
ByteBundle -> String
(Int -> ByteBundle -> ShowS)
-> (ByteBundle -> String)
-> ([ByteBundle] -> ShowS)
-> Show ByteBundle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ByteBundle -> ShowS
showsPrec :: Int -> ByteBundle -> ShowS
$cshow :: ByteBundle -> String
show :: ByteBundle -> String
$cshowList :: [ByteBundle] -> ShowS
showList :: [ByteBundle] -> ShowS
Show, NonEmpty ByteBundle -> ByteBundle
ByteBundle -> ByteBundle -> ByteBundle
(ByteBundle -> ByteBundle -> ByteBundle)
-> (NonEmpty ByteBundle -> ByteBundle)
-> (forall b. Integral b => b -> ByteBundle -> ByteBundle)
-> Semigroup ByteBundle
forall b. Integral b => b -> ByteBundle -> ByteBundle
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
$c<> :: ByteBundle -> ByteBundle -> ByteBundle
<> :: ByteBundle -> ByteBundle -> ByteBundle
$csconcat :: NonEmpty ByteBundle -> ByteBundle
sconcat :: NonEmpty ByteBundle -> ByteBundle
$cstimes :: forall b. Integral b => b -> ByteBundle -> ByteBundle
stimes :: forall b. Integral b => b -> ByteBundle -> ByteBundle
Semigroup, Semigroup ByteBundle
ByteBundle
Semigroup ByteBundle =>
ByteBundle
-> (ByteBundle -> ByteBundle -> ByteBundle)
-> ([ByteBundle] -> ByteBundle)
-> Monoid ByteBundle
[ByteBundle] -> ByteBundle
ByteBundle -> ByteBundle -> ByteBundle
forall a.
Semigroup a =>
a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
$cmempty :: ByteBundle
mempty :: ByteBundle
$cmappend :: ByteBundle -> ByteBundle -> ByteBundle
mappend :: ByteBundle -> ByteBundle -> ByteBundle
$cmconcat :: [ByteBundle] -> ByteBundle
mconcat :: [ByteBundle] -> ByteBundle
Monoid)

-- | Constructs a 'ByteBundle' out of the bytes of some 'Foldable' container.
bundle :: (Foldable f) => f ByteString -> ByteBundle
bundle :: forall (f :: * -> *). Foldable f => f ByteString -> ByteBundle
bundle = ByteString -> ByteBundle
ByteBundle (ByteString -> ByteBundle)
-> (f ByteString -> ByteString) -> f ByteString -> ByteBundle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
BL.fromChunks ([ByteString] -> ByteString)
-> (f ByteString -> [ByteString]) -> f ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. f ByteString -> [ByteString]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Data.Foldable.toList

-- | Length in bytes.
bundleLength :: ByteBundle -> Int
bundleLength :: ByteBundle -> Int
bundleLength (ByteBundle ByteString
value) = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int64
BL.length ByteString
value) -- Int64, but unlikely we'll reach the limit

bundleBytes :: ByteBundle -> Jet ByteString
bundleBytes :: ByteBundle -> Jet ByteString
bundleBytes (ByteBundle ByteString
value) = [ByteString] -> Jet ByteString
forall a (f :: * -> *). Foldable f => f a -> Jet a
each (ByteString -> [ByteString]
BL.toChunks ByteString
value)

-- | Exception thrown when we try to write too much data in a size-bounded destination.
data BucketOverflow = BucketOverflow
  deriving (Int -> BucketOverflow -> ShowS
[BucketOverflow] -> ShowS
BucketOverflow -> String
(Int -> BucketOverflow -> ShowS)
-> (BucketOverflow -> String)
-> ([BucketOverflow] -> ShowS)
-> Show BucketOverflow
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BucketOverflow -> ShowS
showsPrec :: Int -> BucketOverflow -> ShowS
$cshow :: BucketOverflow -> String
show :: BucketOverflow -> String
$cshowList :: [BucketOverflow] -> ShowS
showList :: [BucketOverflow] -> ShowS
Show, Typeable)

instance Exception BucketOverflow

-- | Splits a stream of 'ByteBundles' into groups bounded by maximum byte
-- sizes.  Bytes belonging to the same 'ByteBundle' are always put in the same
-- group. When one group \"fills up\", the next one is started.
--
-- When the list of buckets sizes is exhausted, all incoming bytes are put into
-- the same unbounded group.
--
-- Useful in combination with 'recast'.
--
-- __THROWS__:
--
-- * 'BucketOverflow' exception if the size bound of a group turns out to be
-- too small for holding even a single 'ByteBundle' value.
byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString
byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString
byteBundlesOverBuckets [Int]
buckets0 = (Pair AmIContinuing [Int]
 -> ByteBundle
 -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int]))
-> (Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString))
-> IO (Pair AmIContinuing [Int])
-> Splitter ByteBundle ByteString
forall s a b.
(s -> a -> IO (b, s)) -> (s -> IO b) -> IO s -> MealyIO a b
MealyIO Pair AmIContinuing [Int]
-> ByteBundle
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step Pair AmIContinuing [Int] -> IO (SplitStepResult ByteString)
forall a. Monoid a => a
mempty (Pair AmIContinuing [Int] -> IO (Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets0))
  where
    step :: Pair AmIContinuing [Int] -> ByteBundle -> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
    step :: Pair AmIContinuing [Int]
-> ByteBundle
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step (Pair AmIContinuing
splitterState []) (ByteBundle ByteString
pieces) =
      -- We assume [] means "infinite bucket" so once we enter it we'll only be able to continue.
      (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        ( case AmIContinuing
splitterState of
            AmIContinuing
Continuing -> ByteString -> SplitStepResult ByteString
continueWith ByteString
pieces
            AmIContinuing
NotContinuing -> ByteString -> SplitStepResult ByteString
nextWith ByteString
pieces,
          AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing []
        )
    step (Pair AmIContinuing
splitterState (Int
bucket : [Int]
buckets)) e :: ByteBundle
e@(ByteBundle ByteString
pieces) = do
      let elen :: Int
elen = ByteBundle -> Int
bundleLength ByteBundle
e
      case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
elen Int
bucket of
        Ordering
LT ->
          (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            ( case AmIContinuing
splitterState of
                AmIContinuing
Continuing -> ByteString -> SplitStepResult ByteString
continueWith ByteString
pieces
                AmIContinuing
NotContinuing -> ByteString -> SplitStepResult ByteString
nextWith ByteString
pieces,
              AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
Continuing (Int
bucket Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
elen Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
: [Int]
buckets)
            )
        Ordering
EQ ->
          (SplitStepResult ByteString, Pair AmIContinuing [Int])
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            ( case AmIContinuing
splitterState of
                AmIContinuing
Continuing -> ByteString -> SplitStepResult ByteString
continueWith ByteString
pieces
                AmIContinuing
NotContinuing -> ByteString -> SplitStepResult ByteString
entireWith ByteString
pieces,
              AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets
            )
        -- NB: It's possible to close a bucket and open the next one in the same iteration.
        Ordering
GT -> case AmIContinuing
splitterState of
          AmIContinuing
Continuing -> Pair AmIContinuing [Int]
-> ByteBundle
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
step (AmIContinuing -> [Int] -> Pair AmIContinuing [Int]
forall a b. a -> b -> Pair a b
Pair AmIContinuing
NotContinuing [Int]
buckets) ByteBundle
e
          -- If we are not continuing, that means that the brand-new bucket hasn't
          -- enough space to hold a single entity.
          AmIContinuing
NotContinuing -> BucketOverflow
-> IO (SplitStepResult ByteString, Pair AmIContinuing [Int])
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO BucketOverflow
BucketOverflow
    continueWith :: ByteString -> SplitStepResult ByteString
continueWith ByteString
bs = SplitStepResult ByteString
forall a. Monoid a => a
mempty {continuationOfPreviouslyStartedGroup = BL.toChunks bs}
    entireWith :: ByteString -> SplitStepResult ByteString
entireWith ByteString
pieces = SplitStepResult ByteString
forall a. Monoid a => a
mempty {entireGroups = [BL.toChunks pieces]}
    nextWith :: ByteString -> SplitStepResult ByteString
nextWith ByteString
bs = SplitStepResult ByteString
forall a. Monoid a => a
mempty {startOfNewGroup = BL.toChunks bs}

-- | Uses the default system locale.
instance JetSource Line Handle where
  jet :: Handle -> Jet Line
jet Handle
handle =
    Text -> Line
textToLine (Text -> Line) -> Jet Text -> Jet Line
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Handle -> IO Bool) -> (Handle -> IO Text) -> Handle -> Jet Text
forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF Handle -> IO Bool
System.IO.hIsEOF Handle -> IO Text
T.hGetLine Handle
handle

--
--
-- Text Jets

-- |
-- __THROWS__:
--
-- * 'T.UnicodeException'
decodeUtf8 :: Jet ByteString -> Jet Text
decodeUtf8 :: Jet ByteString -> Jet Text
decodeUtf8 (Jet forall s. (s -> Bool) -> (s -> ByteString -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> Text -> IO s) -> s -> IO s)
-> Jet Text
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> Text -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      step' :: Pair a s -> ByteString -> IO (Pair (ByteString -> Decoding) s)
step' (Pair a
leftovers s
s) ByteString
bytes = do
        T.Some !text !_ !leftovers' <- Decoding -> IO Decoding
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Decoding -> IO Decoding) -> Decoding -> IO Decoding
forall a b. (a -> b) -> a -> b
$ ByteString -> Decoding
T.streamDecodeUtf8 ByteString
bytes
        !s' <- step s text
        pure (Pair leftovers' s')
      initial' :: Pair (ByteString -> Decoding) s
initial' = (ByteString -> Decoding) -> s -> Pair (ByteString -> Decoding) s
forall a b. a -> b -> Pair a b
Pair ByteString -> Decoding
leftovers0 s
initial
  Pair leftovers final <- (Pair (ByteString -> Decoding) s -> Bool)
-> (Pair (ByteString -> Decoding) s
    -> ByteString -> IO (Pair (ByteString -> Decoding) s))
-> Pair (ByteString -> Decoding) s
-> IO (Pair (ByteString -> Decoding) s)
forall s. (s -> Bool) -> (s -> ByteString -> IO s) -> s -> IO s
f Pair (ByteString -> Decoding) s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair (ByteString -> Decoding) s
-> ByteString -> IO (Pair (ByteString -> Decoding) s)
forall {a}.
Pair a s -> ByteString -> IO (Pair (ByteString -> Decoding) s)
step' Pair (ByteString -> Decoding) s
initial'
  T.Some !_ !bytes !_ <- pure $ T.streamDecodeUtf8 B.empty
  if
    | not (B.null bytes) ->
        throwIO (T.DecodeError "Unconsumed leftovers at end." Nothing)
    | otherwise ->
        pure final
  where
    leftovers0 :: ByteString -> Decoding
leftovers0 =
      let T.Some Text
_ ByteString
_ ByteString -> Decoding
g = ByteString -> Decoding
T.streamDecodeUtf8 ByteString
B.empty
       in ByteString -> Decoding
g

encodeUtf8 :: Jet Text -> Jet ByteString
encodeUtf8 :: Jet Text -> Jet ByteString
encodeUtf8 = (Text -> ByteString) -> Jet Text -> Jet ByteString
forall a b. (a -> b) -> Jet a -> Jet b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> ByteString
T.encodeUtf8

-- | A line of text.
--
-- While it is guaranteed that the 'Line's coming out of the 'lines' function
-- do not contain newlines, that invariant is not otherwise enforced.
newtype Line = Line_ TL.Text
  deriving newtype (Line -> Line -> Bool
(Line -> Line -> Bool) -> (Line -> Line -> Bool) -> Eq Line
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Line -> Line -> Bool
== :: Line -> Line -> Bool
$c/= :: Line -> Line -> Bool
/= :: Line -> Line -> Bool
Eq, Eq Line
Eq Line =>
(Line -> Line -> Ordering)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Bool)
-> (Line -> Line -> Line)
-> (Line -> Line -> Line)
-> Ord Line
Line -> Line -> Bool
Line -> Line -> Ordering
Line -> Line -> Line
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Line -> Line -> Ordering
compare :: Line -> Line -> Ordering
$c< :: Line -> Line -> Bool
< :: Line -> Line -> Bool
$c<= :: Line -> Line -> Bool
<= :: Line -> Line -> Bool
$c> :: Line -> Line -> Bool
> :: Line -> Line -> Bool
$c>= :: Line -> Line -> Bool
>= :: Line -> Line -> Bool
$cmax :: Line -> Line -> Line
max :: Line -> Line -> Line
$cmin :: Line -> Line -> Line
min :: Line -> Line -> Line
Ord, NonEmpty Line -> Line
Line -> Line -> Line
(Line -> Line -> Line)
-> (NonEmpty Line -> Line)
-> (forall b. Integral b => b -> Line -> Line)
-> Semigroup Line
forall b. Integral b => b -> Line -> Line
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
$c<> :: Line -> Line -> Line
<> :: Line -> Line -> Line
$csconcat :: NonEmpty Line -> Line
sconcat :: NonEmpty Line -> Line
$cstimes :: forall b. Integral b => b -> Line -> Line
stimes :: forall b. Integral b => b -> Line -> Line
Semigroup, Semigroup Line
Line
Semigroup Line =>
Line -> (Line -> Line -> Line) -> ([Line] -> Line) -> Monoid Line
[Line] -> Line
Line -> Line -> Line
forall a.
Semigroup a =>
a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
$cmempty :: Line
mempty :: Line
$cmappend :: Line -> Line -> Line
mappend :: Line -> Line -> Line
$cmconcat :: [Line] -> Line
mconcat :: [Line] -> Line
Monoid, Int -> Line -> ShowS
[Line] -> ShowS
Line -> String
(Int -> Line -> ShowS)
-> (Line -> String) -> ([Line] -> ShowS) -> Show Line
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Line -> ShowS
showsPrec :: Int -> Line -> ShowS
$cshow :: Line -> String
show :: Line -> String
$cshowList :: [Line] -> ShowS
showList :: [Line] -> ShowS
Show, String -> Line
(String -> Line) -> IsString Line
forall a. (String -> a) -> IsString a
$cfromString :: String -> Line
fromString :: String -> Line
IsString)

-- https://ghc.gitlab.haskell.org/ghc/doc/users_guide/exts/pattern_synonyms.html

-- | Unidirectional pattern that allows converting a 'Line' into a 'Text'
-- during pattern-matching.
pattern $mLine :: forall {r}. Line -> (Text -> r) -> ((# #) -> r) -> r
Line text <- Line_ (TL.toStrict -> text)

-- | Converts a 'Line' back to text, without adding the newline.
lineToText :: Line -> Text
lineToText :: Line -> Text
lineToText (Line_ Text
text) = Text -> Text
TL.toStrict Text
text

-- | Converts a 'Line' to an utf8-encdoed 'ByteBundle', without adding the newline.
lineToUtf8 :: Line -> ByteBundle
lineToUtf8 :: Line -> ByteBundle
lineToUtf8 (Line_ Text
l) = Text -> [Text]
TL.toChunks Text
l [Text] -> (Text -> ByteString) -> [ByteString]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> Text -> ByteString
T.encodeUtf8 [ByteString] -> ([ByteString] -> ByteBundle) -> ByteBundle
forall a b. a -> (a -> b) -> b
& [ByteString] -> ByteBundle
forall (f :: * -> *). Foldable f => f ByteString -> ByteBundle
bundle

textToLine :: Text -> Line
textToLine :: Text -> Line
textToLine = Text -> Line
Line_ (Text -> Line) -> (Text -> Text) -> Text -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
TL.fromStrict

-- | @Data.Text.singleton '\\n'@
newline :: Text
newline :: Text
newline = Char -> Text
T.singleton Char
'\n'

textToUtf8 :: Text -> ByteBundle
textToUtf8 :: Text -> ByteBundle
textToUtf8 Text
t = ByteString -> ByteBundle
ByteBundle (Text
t Text -> (Text -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
& Text -> ByteString
T.encodeUtf8 ByteString -> (ByteString -> ByteString) -> ByteString
forall a b. a -> (a -> b) -> b
& ByteString -> ByteString
BL.fromStrict)

lineContains :: Text -> Line -> Bool
lineContains :: Text -> Line -> Bool
lineContains Text
t (Line_ Text
l) = Text -> Text -> Bool
TL.isInfixOf (Text -> Text
TL.fromStrict Text
t) Text
l

lineBeginsWith :: Text -> Line -> Bool
lineBeginsWith :: Text -> Line -> Bool
lineBeginsWith Text
t (Line_ Text
l) = Text -> Text -> Bool
TL.isPrefixOf (Text -> Text
TL.fromStrict Text
t) Text
l

-- | Adds the 'Text' to the beginning of the 'Line'.
prefixLine :: Text -> Line -> Line
prefixLine :: Text -> Line -> Line
prefixLine Text
t (Line_ Text
l) = Text -> Line
Line_ ([Text] -> Text
TL.fromChunks (Text
t Text -> [Text] -> [Text]
forall a. a -> [a] -> [a]
: Text -> [Text]
TL.toChunks Text
l))

-- textToLine :: Text -> Line
-- textToLine text
--     | Just _ <- T.find (=='\n') text = throw NewlineForbidden
--     | otherwise = Line_ (removeTrailingCarriageReturn text)

stringToLine :: String -> Line
stringToLine :: String -> Line
stringToLine = Text -> Line
Line_ (Text -> Line) -> (String -> Text) -> String -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
TL.pack

-- withLineText :: (Text -> r) -> Line -> r
-- withLineText f (Line text) = f text

isEmptyLine :: Line -> Bool
isEmptyLine :: Line -> Bool
isEmptyLine (Line_ Text
text) = Text -> Bool
TL.null Text
text

emptyLine :: Line
emptyLine :: Line
emptyLine = Text -> Line
Line_ Text
TL.empty

-- | Exception thrown when we find newlines in functions which don't accept them.
--
-- A direct copy of the @NewlineForbidden@ exception from the [turtle](https://hackage.haskell.org/package/turtle) package.
data NewlineForbidden = NewlineForbidden
  deriving (Int -> NewlineForbidden -> ShowS
[NewlineForbidden] -> ShowS
NewlineForbidden -> String
(Int -> NewlineForbidden -> ShowS)
-> (NewlineForbidden -> String)
-> ([NewlineForbidden] -> ShowS)
-> Show NewlineForbidden
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NewlineForbidden -> ShowS
showsPrec :: Int -> NewlineForbidden -> ShowS
$cshow :: NewlineForbidden -> String
show :: NewlineForbidden -> String
$cshowList :: [NewlineForbidden] -> ShowS
showList :: [NewlineForbidden] -> ShowS
Show, Typeable)

instance Exception NewlineForbidden

removeTrailingCarriageReturn :: Text -> Text
removeTrailingCarriageReturn :: Text -> Text
removeTrailingCarriageReturn Text
text
  | Text -> Bool
T.null Text
text = Text
text
  | HasCallStack => Text -> Char
Text -> Char
T.last Text
text Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'\r' = HasCallStack => Text -> Text
Text -> Text
T.init Text
text
  | Bool
otherwise = Text
text

lines :: Jet Text -> Jet Line
lines :: Jet Text -> Jet Line
lines (Jet forall s. (s -> Bool) -> (s -> Text -> IO s) -> s -> IO s
f) = (forall s. (s -> Bool) -> (s -> Line -> IO s) -> s -> IO s)
-> Jet Line
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> Line -> IO s
step s
initial -> do
  let stop' :: Pair a s -> Bool
stop' = s -> Bool
stop (s -> Bool) -> (Pair a s -> s) -> Pair a s -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pair a s -> s
forall {a} {b}. Pair a b -> b
pairExtract
      findLinesInCurrentBlock :: Text -> [Line]
findLinesInCurrentBlock Text
text
        | Text -> Bool
T.null Text
text =
            []
        | Bool
otherwise =
            (Text -> Line) -> [Text] -> [Line]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> Line
textToLine (Text -> Line) -> (Text -> Text) -> Text -> Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text
removeTrailingCarriageReturn) (Text -> [Text]
T.lines Text
text)
              [Line] -> [Line] -> [Line]
forall a. [a] -> [a] -> [a]
++ if
                | HasCallStack => Text -> Char
Text -> Char
T.last Text
text Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'\n' ->
                    [Line
forall a. Monoid a => a
mempty]
                | Bool
otherwise ->
                    []
      step' :: Pair (DList Line) s -> Text -> IO (Pair (DList Line) s)
step' (Pair DList Line
lineUnderConstruction s
s) (Text -> [Line]
findLinesInCurrentBlock -> [Line]
linesInCurrentBlock) = do
        case [Line]
linesInCurrentBlock of
          [] -> do
            Pair (DList Line) s -> IO (Pair (DList Line) s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair DList Line
lineUnderConstruction s
s)
          [Line
l] -> do
            Pair (DList Line) s -> IO (Pair (DList Line) s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair (DList Line
lineUnderConstruction DList Line -> DList Line -> DList Line
forall a. Semigroup a => a -> a -> a
<> Line -> DList Line
forall a. a -> DList a
singleton Line
l) s
s)
          Line
l : rest :: [Line]
rest@(Line
x : [Line]
xs) -> do
            -- Ineficcient mconcat, better strictify a lazy text here?
            let completedLine :: Line
completedLine = [Line] -> Line
forall a. Monoid a => [a] -> a
mconcat ([Line] -> Line) -> [Line] -> Line
forall a b. (a -> b) -> a -> b
$ DList Line -> [Line] -> [Line]
forall a. DList a -> [a] -> [a]
runDList DList Line
lineUnderConstruction [Line
l]
            s' <- (s -> Bool) -> (s -> Line -> IO s) -> [Line] -> s -> IO s
forall s x. (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
downstream s -> Bool
stop s -> Line -> IO s
step (Line
completedLine Line -> [Line] -> [Line]
forall a. a -> [a] -> [a]
: [Line] -> [Line]
forall a. HasCallStack => [a] -> [a]
init [Line]
rest) s
s
            pure (Pair (singleton (last linesInCurrentBlock)) s')
      initial' :: Pair (DList Line) s
initial' = DList Line -> s -> Pair (DList Line) s
forall a b. a -> b -> Pair a b
Pair DList Line
forall a. Monoid a => a
mempty s
initial
  Pair (mconcat . closeDList -> lineUnderConstruction) final <- (Pair (DList Line) s -> Bool)
-> (Pair (DList Line) s -> Text -> IO (Pair (DList Line) s))
-> Pair (DList Line) s
-> IO (Pair (DList Line) s)
forall s. (s -> Bool) -> (s -> Text -> IO s) -> s -> IO s
f Pair (DList Line) s -> Bool
forall {a}. Pair a s -> Bool
stop' Pair (DList Line) s -> Text -> IO (Pair (DList Line) s)
step' Pair (DList Line) s
initial'
  if
    | stop final ->
        pure final
    | isEmptyLine lineUnderConstruction ->
        pure final
    | otherwise ->
        step final lineUnderConstruction

unlines :: Jet Line -> Jet Text
unlines :: Jet Line -> Jet Text
unlines Jet Line
j = do
  Line text <- Jet Line
j
  pure text <> pure (T.singleton '\n')

downstream :: (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
downstream :: forall s x. (s -> Bool) -> (s -> x -> IO s) -> [x] -> s -> IO s
downstream s -> Bool
stop s -> x -> IO s
step = [x] -> s -> IO s
go
  where
    go :: [x] -> s -> IO s
go [] s
s =
      s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
    go (x
x : [x]
xs) s
s
      | s -> Bool
stop s
s =
          s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
      | Bool
otherwise = do
          !s' <- s -> x -> IO s
step s
s x
x
          go xs s'

-- General sinks

-- | A function that consumes a 'Jet' totally or partially, without returning a result.
type Sink a = Jet a -> IO ()

-- | Helper multi-parameter typeclass for creating 'Jet'-consuming functions
-- out of a variety of common destinations.
--
-- >>> J.each ["aaa","bbb","ccc"] <&> J.stringToLine & J.sink J.stdout
-- aaa
-- bbb
-- ccc
class JetSink a target where
  sink :: target -> Sink a

instance JetSink ByteString Handle where
  sink :: Handle -> Sink ByteString
sink Handle
handle Jet ByteString
j = Jet ByteString -> (ByteString -> IO ()) -> IO ()
forall a b. Jet a -> (a -> IO b) -> IO ()
for_ Jet ByteString
j (Handle -> ByteString -> IO ()
B.hPut Handle
handle)

instance (JetSink a Handle) => JetSink a File where
  sink :: File -> Sink a
sink (File String
path) Jet a
j = String -> IOMode -> (Handle -> IO ()) -> IO ()
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
System.IO.withFile String
path IOMode
System.IO.WriteMode \Handle
handle ->
    Handle -> Sink a
forall a target. JetSink a target => target -> Sink a
sink Handle
handle Jet a
j

-- | Uses the default system locale. Adds newlines.
instance JetSink Line Handle where
  sink :: Handle -> Sink Line
sink Handle
handle = (Line -> IO ()) -> Sink Line
forall a b. (a -> IO b) -> Sink a
traverse_ (Handle -> Text -> IO ()
T.hPutStrLn Handle
handle (Text -> IO ()) -> (Line -> Text) -> Line -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Line -> Text
lineToText)

-- | Uses the default system locale.
instance JetSink Text Handle where
  sink :: Handle -> Sink Text
sink Handle
handle = (Text -> IO ()) -> Sink Text
forall a b. (a -> IO b) -> Sink a
traverse_ (Handle -> Text -> IO ()
T.hPutStr Handle
handle)

-- | 'FilePaths' are plain strings. This newtype provides a small measure of
-- safety over them.
newtype File = File {File -> String
getFilePath :: FilePath} deriving (Int -> File -> ShowS
[File] -> ShowS
File -> String
(Int -> File -> ShowS)
-> (File -> String) -> ([File] -> ShowS) -> Show File
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> File -> ShowS
showsPrec :: Int -> File -> ShowS
$cshow :: File -> String
show :: File -> String
$cshowList :: [File] -> ShowS
showList :: [File] -> ShowS
Show)

-- | The maximum size in bytes of some destination into which we write the
-- bytes produced by a 'Jet'.
data BoundedSize x = BoundedSize Int x deriving stock (Int -> BoundedSize x -> ShowS
[BoundedSize x] -> ShowS
BoundedSize x -> String
(Int -> BoundedSize x -> ShowS)
-> (BoundedSize x -> String)
-> ([BoundedSize x] -> ShowS)
-> Show (BoundedSize x)
forall x. Show x => Int -> BoundedSize x -> ShowS
forall x. Show x => [BoundedSize x] -> ShowS
forall x. Show x => BoundedSize x -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall x. Show x => Int -> BoundedSize x -> ShowS
showsPrec :: Int -> BoundedSize x -> ShowS
$cshow :: forall x. Show x => BoundedSize x -> String
show :: BoundedSize x -> String
$cshowList :: forall x. Show x => [BoundedSize x] -> ShowS
showList :: [BoundedSize x] -> ShowS
Show, ReadPrec [BoundedSize x]
ReadPrec (BoundedSize x)
Int -> ReadS (BoundedSize x)
ReadS [BoundedSize x]
(Int -> ReadS (BoundedSize x))
-> ReadS [BoundedSize x]
-> ReadPrec (BoundedSize x)
-> ReadPrec [BoundedSize x]
-> Read (BoundedSize x)
forall x. Read x => ReadPrec [BoundedSize x]
forall x. Read x => ReadPrec (BoundedSize x)
forall x. Read x => Int -> ReadS (BoundedSize x)
forall x. Read x => ReadS [BoundedSize x]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: forall x. Read x => Int -> ReadS (BoundedSize x)
readsPrec :: Int -> ReadS (BoundedSize x)
$creadList :: forall x. Read x => ReadS [BoundedSize x]
readList :: ReadS [BoundedSize x]
$creadPrec :: forall x. Read x => ReadPrec (BoundedSize x)
readPrec :: ReadPrec (BoundedSize x)
$creadListPrec :: forall x. Read x => ReadPrec [BoundedSize x]
readListPrec :: ReadPrec [BoundedSize x]
Read)

instance JetSink ByteBundle Handle where
  sink :: Handle -> Sink ByteBundle
sink Handle
handle Jet ByteBundle
j = (ByteString -> IO ()) -> Sink ByteString
forall a b. (a -> IO b) -> Sink a
traverse_ (Handle -> ByteString -> IO ()
B.hPut Handle
handle) do
    s <- Jet ByteBundle
j
    bundleBytes s

-- | Distributes incoming bytes through a sequence of files. Once a file is
-- full, we start writing the next one.
instance JetSink ByteString [BoundedSize File] where
  sink :: [BoundedSize File] -> Sink ByteString
sink [BoundedSize File]
bucketFiles Jet ByteString
j =
    (Handle -> ByteString -> IO ())
-> (Handle -> IO ())
-> [IO Handle]
-> (Combiners ByteString () -> IO ())
-> IO ()
forall h a r.
(h -> a -> IO ())
-> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
withCombiners_
      (\Handle
handle ByteString
b -> Handle -> ByteString -> IO ()
B.hPut Handle
handle ByteString
b)
      Handle -> IO ()
hClose
      (BoundedSize File -> IO Handle
makeAllocator (BoundedSize File -> IO Handle)
-> [BoundedSize File] -> [IO Handle]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BoundedSize File]
bucketFiles)
      (\Combiners ByteString ()
combiners -> Sink ()
forall a. Sink a
drain Sink () -> Sink ()
forall a b. (a -> b) -> a -> b
$ Splitter ByteString ByteString
-> Combiners ByteString () -> Jet ByteString -> Jet ()
forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast ([Int] -> Splitter ByteString ByteString
bytesOverBuckets [Int]
bucketSizes) Combiners ByteString ()
combiners Jet ByteString
j)
    where
      bucketSizes :: [Int]
bucketSizes = (BoundedSize File -> Int) -> [BoundedSize File] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (\(BoundedSize Int
size File
_) -> Int
size) [BoundedSize File]
bucketFiles

-- | Distributes incoming bytes through a sequence of files. Once a file is
-- full, we start writing the next one.
--
-- Each 'ByteBundle' value is garanteed to be written to a single file. If a
-- file turns out to be too small for even a single 'ByteBundle' value, a
-- 'BucketOverflow' exception is thrown.
instance JetSink ByteBundle [BoundedSize File] where
  sink :: [BoundedSize File] -> Sink ByteBundle
sink [BoundedSize File]
bucketFiles Jet ByteBundle
j =
    (Handle -> ByteString -> IO ())
-> (Handle -> IO ())
-> [IO Handle]
-> (Combiners ByteString () -> IO ())
-> IO ()
forall h a r.
(h -> a -> IO ())
-> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
withCombiners_
      (\Handle
handle ByteString
b -> Handle -> ByteString -> IO ()
B.hPut Handle
handle ByteString
b)
      Handle -> IO ()
hClose
      (BoundedSize File -> IO Handle
makeAllocator (BoundedSize File -> IO Handle)
-> [BoundedSize File] -> [IO Handle]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BoundedSize File]
bucketFiles)
      (\Combiners ByteString ()
combiners -> Sink ()
forall a. Sink a
drain Sink () -> Sink ()
forall a b. (a -> b) -> a -> b
$ Splitter ByteBundle ByteString
-> Combiners ByteString () -> Jet ByteBundle -> Jet ()
forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast ([Int] -> Splitter ByteBundle ByteString
byteBundlesOverBuckets [Int]
bucketSizes) Combiners ByteString ()
combiners Jet ByteBundle
j)
    where
      bucketSizes :: [Int]
bucketSizes = (BoundedSize File -> Int) -> [BoundedSize File] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (\(BoundedSize Int
size File
_) -> Int
size) [BoundedSize File]
bucketFiles

makeAllocator :: BoundedSize File -> IO Handle
makeAllocator :: BoundedSize File -> IO Handle
makeAllocator (BoundedSize Int
_ (File String
path)) = String -> IOMode -> IO Handle
openBinaryFile String
path IOMode
WriteMode

-- DList helper
newtype DList a = DList {forall a. DList a -> [a] -> [a]
runDList :: [a] -> [a]}

instance Semigroup (DList a) where
  DList [a] -> [a]
a1 <> :: DList a -> DList a -> DList a
<> DList [a] -> [a]
a2 = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList ([a] -> [a]
a1 ([a] -> [a]) -> ([a] -> [a]) -> [a] -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> [a]
a2)

instance Monoid (DList a) where
  mempty :: DList a
mempty = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList [a] -> [a]
forall a. a -> a
id

makeDList :: [a] -> DList a
makeDList :: forall a. [a] -> DList a
makeDList [a]
as = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList \[a]
xs -> [a]
as [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
xs

closeDList :: DList a -> [a]
closeDList :: forall a. DList a -> [a]
closeDList (DList [a] -> [a]
f) = [a] -> [a]
f []

singleton :: a -> DList a
singleton :: forall a. a -> DList a
singleton a
a = ([a] -> [a]) -> DList a
forall a. ([a] -> [a]) -> DList a
DList (([a] -> [a]) -> DList a) -> ([a] -> [a]) -> DList a
forall a b. (a -> b) -> a -> b
$ (a
a a -> [a] -> [a]
forall a. a -> [a] -> [a]
:)

--
-- concurrency

-- | Process the values yielded by the upstream 'Jet' in a concurrent way,
-- and return the results in the form of another 'Jet' as they are produced.
--
-- __NB__: this function might scramble the order of the returned values. Right
-- now there isn't a function for unscrambling them.
--
-- >>> :{
--  J.each [(3,'a'), (2,'b'), (1,'c')]
--  & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)
--  & J.toList
-- :}
-- "cba"
--
-- What happens if we 'limit' the resulting 'Jet' and we reach that limit, or
-- if we otherwise stop consuming the 'Jet' before it gets exhausted? In those
-- cases, all pending @IO b@ tasks are cancelled.
--
-- >>> :{
--  J.each [(9999,'a'), (2,'b'), (1,'c')]
--  & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)
--  & J.take 2
--  & J.toList
-- :}
-- "cb"
traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b
-- TODO:
-- It would be nice to have 0-lengh channels for which one side blocks until
-- the other side takes the job.
traverseConcurrently :: forall a b. (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b
traverseConcurrently PoolConf -> PoolConf
adaptConf a -> IO b
makeTask Jet a
upstream = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial -> do
  if
    -- If we know we aren't going to do any work, don't bother starting the
    -- whole boondoggle.
    | s -> Bool
stop s
initial ->
        s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
    | Bool
otherwise -> do
        -- At this point we know we should do at least one step.
        let PoolConf {Int
_inputQueueSize :: Int
_inputQueueSize :: PoolConf -> Int
_inputQueueSize, Int
_numberOfWorkers :: Int
_numberOfWorkers :: PoolConf -> Int
_numberOfWorkers, Int
_outputQueueSize :: Int
_outputQueueSize :: PoolConf -> Int
_outputQueueSize} = PoolConf -> PoolConf
adaptConf PoolConf
defaultPoolConf
        input <- Int -> IO (TBMQueue (IO b))
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
_inputQueueSize
        inputQueueWriterShouldStop <- newIORef False
        aliveWorkers <- newIORef _numberOfWorkers
        output <- newTBMQueueIO _outputQueueSize
        let -- The inputQueueWriter should *not* be interrupted aynchronously.
            -- After each iteration, it reads the IORef to see if it should stop.
            -- Once it stops, it closes the input queue.
            inputQueueWriter = do
              Jet a
-> (Bool -> Bool) -> (Bool -> a -> IO Bool) -> Bool -> IO Bool
forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run
                Jet a
upstream
                Bool -> Bool
forall a. a -> a
id
                ( \Bool
_ a
a -> do
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> IO b -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (IO b)
input (a -> IO b
makeTask a
a)
                    IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
inputQueueWriterShouldStop
                )
                Bool
False
              STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (IO b)
input
            -- Workers *can* be interrupted asynchronously.
            worker = do
              mtask <- STM (Maybe (IO b)) -> IO (Maybe (IO b))
forall a. STM a -> IO a
atomically (STM (Maybe (IO b)) -> IO (Maybe (IO b)))
-> STM (Maybe (IO b)) -> IO (Maybe (IO b))
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> STM (Maybe (IO b))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (IO b)
input
              case mtask of
                Maybe (IO b)
Nothing -> do
                  remaining <- do
                    IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
aliveWorkers \Int
count ->
                      let count' :: Int
count' = Int -> Int
forall a. Enum a => a -> a
pred Int
count
                       in (Int
count', Int
count')
                  if
                    | remaining == 0 -> do
                        atomically $ closeTBMQueue output
                    | otherwise -> do
                        pure ()
                Just IO b
task -> do
                  result <- IO b
task
                  atomically $ writeTBMQueue output result
                  worker
            outputQueueReader s
s = do
              if
                | s -> Bool
stop s
s -> do
                    -- tell the inserter from upstream that it should stop. is this enough?
                    IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
inputQueueWriterShouldStop Bool
True
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (IO b) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (IO b)
input -- perhaps unnecessary?
                    s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                | Bool
otherwise -> do
                    mresult <- STM (Maybe b) -> IO (Maybe b)
forall a. STM a -> IO a
atomically (STM (Maybe b) -> IO (Maybe b)) -> STM (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ TBMQueue b -> STM (Maybe b)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue b
output
                    case mresult of
                      Maybe b
Nothing -> do
                        s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
s
                      Just b
result -> do
                        !s' <- s -> b -> IO s
step s
s b
result
                        outputQueueReader s'
        runConcurrently $
          Concurrently do
            inputQueueWriter
            *> Concurrently do
              finalLeft <- do
                runConcurrentlyE $
                  -- The worker pool is always killed when the output reader finishes,
                  -- but for the "happy path" the workers will already be dead.
                  ConcurrentlyE (Right <$> replicateConcurrently_ _numberOfWorkers worker)
                    *>
                    -- This Left is what kills the worker pool.
                    ConcurrentlyE (Left <$> outputQueueReader initial)
              case finalLeft of
                Right () -> do
                  String -> IO s
forall a. HasCallStack => String -> a
error String
"never happens, the Left always wins"
                Left s
final -> do
                  s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final

-- | Configuration record for the worker pool.
data PoolConf = PoolConf
  { PoolConf -> Int
_inputQueueSize :: Int,
    PoolConf -> Int
_numberOfWorkers :: Int,
    PoolConf -> Int
_outputQueueSize :: Int
  }
  deriving (Int -> PoolConf -> ShowS
[PoolConf] -> ShowS
PoolConf -> String
(Int -> PoolConf -> ShowS)
-> (PoolConf -> String) -> ([PoolConf] -> ShowS) -> Show PoolConf
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PoolConf -> ShowS
showsPrec :: Int -> PoolConf -> ShowS
$cshow :: PoolConf -> String
show :: PoolConf -> String
$cshowList :: [PoolConf] -> ShowS
showList :: [PoolConf] -> ShowS
Show)

defaultPoolConf :: PoolConf
defaultPoolConf =
  PoolConf
    { _inputQueueSize :: Int
_inputQueueSize = Int
1,
      _numberOfWorkers :: Int
_numberOfWorkers = Int
1,
      _outputQueueSize :: Int
_outputQueueSize = Int
1
    }

-- | Size of the waiting queue into the worker pool. The default is @1@.
inputQueueSize :: Int -> PoolConf -> PoolConf
inputQueueSize :: Int -> PoolConf -> PoolConf
inputQueueSize Int
size PoolConf
poolConf = PoolConf
poolConf {_inputQueueSize = size}

-- | The size of the worker pool. The default is @1@.
numberOfWorkers :: Int -> PoolConf -> PoolConf
numberOfWorkers :: Int -> PoolConf -> PoolConf
numberOfWorkers Int
number PoolConf
poolConf = PoolConf
poolConf {_numberOfWorkers = number}

-- | Size of the queue holding results out of the working pool before they
-- are yielded downstream. The default is @1@.
outputQueueSize :: Int -> PoolConf -> PoolConf
outputQueueSize :: Int -> PoolConf -> PoolConf
outputQueueSize Int
size PoolConf
poolConf = PoolConf
poolConf {_outputQueueSize = size}

-- | An alias for 'id'. Useful with functions like 'traverseConcurrently' and
-- 'throughProcess', for which it means \"use the default configuration\".
defaults :: a -> a
defaults :: forall a. a -> a
defaults = a -> a
forall a. a -> a
id

--
-- process invocation

-- | Feeds the upstream 'Jet' to an external process' @stdin@ and returns the
-- process' @stdout@ as another @Jet@. The feeding and reading of the standard
-- streams is done concurrently in order to avoid deadlocks.
--
-- What happens if we 'limit' the resulting 'Jet' and we reach that limit, or
-- if we otherwise stop consuming the 'Jet' before it gets exhausted? In those
-- cases, the external process is promptly terminated.
throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString
throughProcess :: (ProcConf -> ProcConf)
-> CreateProcess -> Jet ByteString -> Jet ByteString
throughProcess ProcConf -> ProcConf
adaptConf = ProcConf -> CreateProcess -> Jet ByteString -> Jet ByteString
forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ (ProcConf -> ProcConf
adaptConf ProcConf
defaultProcConf)

-- | Like 'throughProcess', but feeding and reading 'Line's using the default
-- system encoding.
--
-- >>> :{
-- J.each ["aaa","bbb","ccc"]
-- <&> J.stringToLine
-- & linesThroughProcess defaults (shell "cat")
-- & J.toList
-- :}
-- ["aaa","bbb","ccc"]
--
-- An example of not reading all the lines from a long-lived process that gets cancelled:
--
-- >>> :{
-- mempty
-- & linesThroughProcess defaults (shell "{ printf \"aaa\\nbbb\\nccc\\n\" ; sleep infinity ; }")
-- & J.limit 2
-- & J.toList
-- :}
-- ["aaa","bbb"]
linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
linesThroughProcess ProcConf -> ProcConf
adaptConf CreateProcess
procSpec = do
  let textLinesProcConf :: ProcConf_ Text Text
textLinesProcConf =
        (ProcConf -> ProcConf
adaptConf ProcConf
defaultProcConf)
          { _writeToStdIn = T.hPutStrLn,
            _readFromStdout = T.hGetLine
          }
  (Text -> Line) -> Jet Text -> Jet Line
forall a b. (a -> b) -> Jet a -> Jet b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> Line
textToLine (Jet Text -> Jet Line)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcConf_ Text Text -> CreateProcess -> Jet Text -> Jet Text
forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ ProcConf_ Text Text
textLinesProcConf CreateProcess
procSpec (Jet Text -> Jet Text)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Line -> Text) -> Jet Line -> Jet Text
forall a b. (a -> b) -> Jet a -> Jet b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Line -> Text
lineToText

-- | Like 'throughProcess', but feeding and reading 'Line's encoded in UTF8.
utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line
utf8LinesThroughProcess ProcConf -> ProcConf
adaptConf CreateProcess
procSpec = do
  Jet Text -> Jet Line
lines (Jet Text -> Jet Line)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet Line
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Jet ByteString -> Jet Text
decodeUtf8 (Jet ByteString -> Jet Text)
-> (Jet Line -> Jet ByteString) -> Jet Line -> Jet Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProcConf -> ProcConf)
-> CreateProcess -> Jet ByteString -> Jet ByteString
throughProcess ProcConf -> ProcConf
adaptConf CreateProcess
procSpec (Jet ByteString -> Jet ByteString)
-> (Jet Line -> Jet ByteString) -> Jet Line -> Jet ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Jet Text -> Jet ByteString
encodeUtf8 (Jet Text -> Jet ByteString)
-> (Jet Line -> Jet Text) -> Jet Line -> Jet ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Jet Line -> Jet Text
unlines

throughProcess_ :: forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ :: forall a b. ProcConf_ a b -> CreateProcess -> Jet a -> Jet b
throughProcess_ ProcConf_ a b
procConf CreateProcess
procSpec Jet a
upstream = (forall s. (s -> Bool) -> (s -> b -> IO s) -> s -> IO s) -> Jet b
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> b -> IO s
step s
initial -> do
  let ProcConf_ {Bool
_bufferStdin :: Bool
_bufferStdin :: forall a b. ProcConf_ a b -> Bool
_bufferStdin, Handle -> a -> IO ()
_writeToStdIn :: forall a b. ProcConf_ a b -> Handle -> a -> IO ()
_writeToStdIn :: Handle -> a -> IO ()
_writeToStdIn, Handle -> IO b
_readFromStdout :: forall a b. ProcConf_ a b -> Handle -> IO b
_readFromStdout :: Handle -> IO b
_readFromStdout, Handle -> IO ()
_readFromStderr :: Handle -> IO ()
_readFromStderr :: forall a b. ProcConf_ a b -> Handle -> IO ()
_readFromStderr, ExitCode -> IO ()
_handleExitCode :: ExitCode -> IO ()
_handleExitCode :: forall a b. ProcConf_ a b -> ExitCode -> IO ()
_handleExitCode} = ProcConf_ a b
procConf
  if
    -- If we know we aren't going to do any work, don't bother starting the
    -- whole boondoggle.
    | s -> Bool
stop s
initial ->
        s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
initial
    | Bool
otherwise -> do
        let procSpec' :: CreateProcess
procSpec' =
              CreateProcess
procSpec
                { std_in = CreatePipe,
                  std_out = CreatePipe,
                  std_err = CreatePipe
                }
        input <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @a Int
1
        inputQueueWriterShouldStop <- newIORef False
        -- remember to drain stderr concurrently with stdout...
        let inputQueueWriter = do
              Jet a
-> (Bool -> Bool) -> (Bool -> a -> IO Bool) -> Bool -> IO Bool
forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
run
                Jet a
upstream
                Bool -> Bool
forall a. a -> a
id
                ( \Bool
_ a
a -> do
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> a -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue a
input a
a
                    IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
inputQueueWriterShouldStop
                )
                Bool
False
              STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue a
input
        finalEither <-
          runConcurrently $
            Concurrently do
              inputQueueWriter
              *> Concurrently do
                withCreateProcess procSpec' \(Just Handle
stdin') (Just Handle
stdout') (Just Handle
stderr') ProcessHandle
phandle -> do
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
_bufferStdin) (Handle -> BufferMode -> IO ()
System.IO.hSetBuffering Handle
stdin' BufferMode
System.IO.NoBuffering)
                  let stdinWriter :: IO ()
stdinWriter = do
                        ma <- STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> IO (Maybe a)) -> STM (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
input
                        case ma of
                          Maybe a
Nothing -> do
                            Handle -> IO ()
hClose Handle
stdin'
                          Just a
a -> do
                            Handle -> a -> IO ()
_writeToStdIn Handle
stdin' a
a
                            IO ()
stdinWriter
                      stderrReader :: IO ()
stderrReader = do
                        (Handle -> IO Bool) -> (Handle -> IO b) -> Handle -> Jet b
forall handle a.
(handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a
untilEOF Handle -> IO Bool
System.IO.hIsEOF Handle -> IO b
_readFromStdout Handle
stderr' Jet b -> (Jet b -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
& Jet b -> IO ()
forall a. Sink a
drain
                      stdoutReader :: s -> IO (Either s s)
stdoutReader s
s = do
                        if
                          | s -> Bool
stop s
s -> do
                              IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
inputQueueWriterShouldStop Bool
True
                              Either s s -> IO (Either s s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (s -> Either s s
forall a b. a -> Either a b
Left s
s)
                          | Bool
otherwise -> do
                              eof <- Handle -> IO Bool
System.IO.hIsEOF Handle
stdout'
                              if
                                | eof -> do
                                    writeIORef inputQueueWriterShouldStop True
                                    exitCode <- waitForProcess phandle
                                    _handleExitCode exitCode
                                    pure (Right s)
                                | otherwise -> do
                                    b <- _readFromStdout stdout'
                                    !s' <- step s b
                                    stdoutReader s'
                  ConcurrentlyE s s -> IO (Either s s)
forall e a. ConcurrentlyE e a -> IO (Either e a)
runConcurrentlyE (ConcurrentlyE s s -> IO (Either s s))
-> ConcurrentlyE s s -> IO (Either s s)
forall a b. (a -> b) -> a -> b
$
                    IO (Either s ()) -> ConcurrentlyE s ()
forall e a. IO (Either e a) -> ConcurrentlyE e a
ConcurrentlyE do () -> Either s ()
forall a b. b -> Either a b
Right (() -> Either s ()) -> IO () -> IO (Either s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ()
stdinWriter
                      ConcurrentlyE s () -> ConcurrentlyE s () -> ConcurrentlyE s ()
forall a b.
ConcurrentlyE s a -> ConcurrentlyE s b -> ConcurrentlyE s b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO (Either s ()) -> ConcurrentlyE s ()
forall e a. IO (Either e a) -> ConcurrentlyE e a
ConcurrentlyE do () -> Either s ()
forall a b. b -> Either a b
Right (() -> Either s ()) -> IO () -> IO (Either s ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ()
stderrReader
                      ConcurrentlyE s () -> ConcurrentlyE s s -> ConcurrentlyE s s
forall a b.
ConcurrentlyE s a -> ConcurrentlyE s b -> ConcurrentlyE s b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO (Either s s) -> ConcurrentlyE s s
forall e a. IO (Either e a) -> ConcurrentlyE e a
ConcurrentlyE do s -> IO (Either s s)
stdoutReader s
initial
        pure (either id id finalEither)

-- | Configuration record with some extra options in addition to those in "CreateProcess".
type ProcConf = ProcConf_ ByteString ByteString

data ProcConf_ a b = ProcConf_
  { forall a b. ProcConf_ a b -> Bool
_bufferStdin :: Bool,
    forall a b. ProcConf_ a b -> Handle -> a -> IO ()
_writeToStdIn :: Handle -> a -> IO (),
    forall a b. ProcConf_ a b -> Handle -> IO b
_readFromStdout :: Handle -> IO b,
    forall a b. ProcConf_ a b -> Handle -> IO ()
_readFromStderr :: Handle -> IO (),
    forall a b. ProcConf_ a b -> ExitCode -> IO ()
_handleExitCode :: ExitCode -> IO ()
  }

defaultProcConf :: ProcConf
defaultProcConf :: ProcConf
defaultProcConf =
  ProcConf_
    { _bufferStdin :: Bool
_bufferStdin = Bool
False,
      _writeToStdIn :: Handle -> ByteString -> IO ()
_writeToStdIn = Handle -> ByteString -> IO ()
B.hPut,
      _readFromStdout :: Handle -> IO ByteString
_readFromStdout = (Handle -> Int -> IO ByteString) -> Int -> Handle -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Handle -> Int -> IO ByteString
B.hGetSome Int
8192,
      _readFromStderr :: Handle -> IO ()
_readFromStderr = IO Text -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Text -> IO ()) -> (Handle -> IO Text) -> Handle -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> IO Text
T.hGetLine,
      _handleExitCode :: ExitCode -> IO ()
_handleExitCode = \ExitCode
exitCode -> case ExitCode
exitCode of
        ExitFailure Int
_ -> ExitCode -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO ExitCode
exitCode
        ExitCode
ExitSuccess -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    }

-- | Should we buffer the process' @stdin@? Usually should be 'True' for
-- interactive scenarios.
--
-- By default, 'False'.
bufferStdin :: Bool -> ProcConf -> ProcConf
bufferStdin :: Bool -> ProcConf -> ProcConf
bufferStdin Bool
doBuffering ProcConf
procConf = ProcConf
procConf {_bufferStdin = doBuffering}

-- | Sets the function that reads a single line of output from the process
-- @stderr@.  It's called repeatedly until @stderr@ is exhausted. The reads are
-- done concurrently with the reads from @stdout@.
--
-- By default, lines of text are read using the system's default encoding.
--
-- This is a good place to throw an exception if we don't like what comes out
-- of @stderr@.
readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf
readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf
readFromStderr Handle -> IO ()
readFunc ProcConf
procConf = ProcConf
procConf {_readFromStderr = readFunc}

-- | Sets the function that handles the final `ExitCode` of the process.
--
-- The default behavior is to throw the `ExitCode` as an exception if it's not
-- a success.
handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf
handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf
handleExitCode ExitCode -> IO ()
handler ProcConf
procConf = ProcConf
procConf {_handleExitCode = handler}

--
--
-- complicated stufff

data AreWeInsideGroup foldState
  = OutsideGroup
  | InsideGroup !foldState

data RecastState foldState = RecastState !(AreWeInsideGroup foldState) [IO foldState]

-- | This is a complex, unwieldly, yet versatile function. It can be used to
-- define grouping operations, but also for decoding and other purposes.
--
-- Groups are delimited in the input 'Jet' using the 'Splitter', and the
-- contents of those groups are then combined using 'Combiners'. The result of
-- each combiner is yielded by the return 'Jet'.
--
-- If the list of combiners is finite and becomes exhausted, we stop splitting
-- and the return 'Jet' stops.
recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c
recast
  (MealyIO s -> a -> IO (SplitStepResult b, s)
splitterStep s -> IO (SplitStepResult b)
splitterCoda IO s
splitterAlloc)
  (Combiners s -> b -> IO s
foldStep s -> IO c
foldCoda [IO s]
foldAllocs0)
  (Jet forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s
upstream) = (forall s. (s -> Bool) -> (s -> c -> IO s) -> s -> IO s) -> Jet c
forall a.
(forall s. (s -> Bool) -> (s -> a -> IO s) -> s -> IO s) -> Jet a
Jet \s -> Bool
stop s -> c -> IO s
step s
initial -> do
    initialSplitterState <- IO s
splitterAlloc
    let -- When to stop? Either downstream says we need to stop,
        -- or we are outside a group and there isn't another group consumer we
        -- can use to process the next one.
        stop' (Triple a
_ (RecastState AreWeInsideGroup foldState
OutsideGroup []) s
_) = Bool
True
        stop' (Triple a
_ RecastState foldState
_ s
s) = s -> Bool
stop s
s

        step' (Triple s
splitterState RecastState s
recastState s
s) a
a = do
          (splitResult, splitterState') <- s -> a -> IO (SplitStepResult b, s)
splitterStep s
splitterState a
a
          Pair recastState' s' <- advanceRecast splitResult recastState s
          pure (Triple splitterState' recastState' s')

        advanceRecast ssr :: SplitStepResult b
ssr@(SplitStepResult {[b]
continuationOfPreviouslyStartedGroup :: forall b. SplitStepResult b -> [b]
continuationOfPreviouslyStartedGroup :: [b]
continuationOfPreviouslyStartedGroup, [[b]]
entireGroups :: forall b. SplitStepResult b -> [[b]]
entireGroups :: [[b]]
entireGroups, [b]
startOfNewGroup :: forall b. SplitStepResult b -> [b]
startOfNewGroup :: [b]
startOfNewGroup}) (RecastState AreWeInsideGroup s
areWeInside [IO s]
foldAllocs) s
s = do
          case (AreWeInsideGroup s
areWeInside, [[b]]
entireGroups, [b]
startOfNewGroup) of
            -- If there aren't any new groups and we don't start an incomplete one, just advance the current fold
            (InsideGroup s
foldState, [], []) -> do
              -- traceIO $ "recast inside group just continuing"
              foldState' <- s -> [b] -> IO s
advanceGroupWithougClosing s
foldState [b]
continuationOfPreviouslyStartedGroup
              pure (Pair (RecastState (InsideGroup foldState') foldAllocs) s) -- main state didn't change
            (InsideGroup s
foldState, [[b]]
_, [b]
_) -> do
              -- traceIO $ "recast inside group closing"
              !c <- s -> [b] -> IO c
processSingleGroup s
foldState [b]
continuationOfPreviouslyStartedGroup
              !s' <- step s c
              if
                | stop s' -> do
                    -- traceIO $ "recast inside group pure"
                    pure (Pair (RecastState OutsideGroup foldAllocs) s')
                | otherwise -> do
                    -- traceIO $ "recast inside group advancing"
                    advanceRecast ssr (RecastState OutsideGroup foldAllocs) s'
            -- if we are outside of a group, the "continuationOfPreviouslyStartedGroup" is ignored.
            (AreWeInsideGroup s
OutsideGroup, [[b]]
_, [b]
_) -> do
              -- traceIO $ "recast outside group"
              -- doens't return foldState becasue we close the groups
              Pair foldAllocs' s' <- [IO s] -> s -> [[b]] -> IO (Pair [IO s] s)
processEntireGroups [IO s]
foldAllocs s
s [[b]]
entireGroups
              bail <- pure (Pair (RecastState OutsideGroup foldAllocs') s')
              if
                | stop s' -> do
                    pure bail
                | otherwise -> do
                    case startOfNewGroup of
                      [] -> do
                        Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Pair (RecastState s) s
bail
                      (b
_ : [b]
_) -> do
                        case [IO s]
foldAllocs of
                          [] -> do
                            Pair (RecastState s) s -> IO (Pair (RecastState s) s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Pair (RecastState s) s
bail
                          IO s
alloc : [IO s]
allocs -> do
                            -- traceIO $ "recast we should be allocating here"
                            -- there is a next group, so let's begin it
                            !foldState0 <- IO s
alloc
                            foldState <- processBeginNextGroup foldState0 startOfNewGroup
                            pure (Pair (RecastState (InsideGroup foldState) allocs) s')
        -- foldM ?
        advanceGroupWithougClosing :: _ -> [b] -> IO _
        advanceGroupWithougClosing s
foldState [] =
          s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
foldState
        advanceGroupWithougClosing s
foldState (b
b : [b]
bs) = do
          !foldState' <- s -> b -> IO s
foldStep s
foldState b
b
          advanceGroupWithougClosing foldState' bs
        processEntireGroups :: [IO _] -> _ -> [[b]] -> IO (Pair [IO _] _)
        -- We can't go on if there aren't any more groups
        processEntireGroups [IO s]
allocs s
s [] = do
          Pair [IO s] s -> IO (Pair [IO s] s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([IO s] -> s -> Pair [IO s] s
forall a b. a -> b -> Pair a b
Pair [IO s]
allocs s
s)
        -- We can't go on if there aren't any more fold initial state allocs
        processEntireGroups [] s
s [[b]]
_ = do
          Pair [IO s] s -> IO (Pair [IO s] s)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([IO s] -> s -> Pair [IO s] s
forall a b. a -> b -> Pair a b
Pair [] s
s)
        processEntireGroups (IO s
alloc : [IO s]
allocs) s
s ([b]
bs : [[b]]
bss) = do
          !foldState0 <- IO s
alloc
          !c <- processSingleGroup foldState0 bs -- a single step downstream
          !s' <- step s c
          if
            | stop s' -> do
                pure (Pair allocs s')
            | otherwise -> do
                processEntireGroups allocs s' bss
        -- a whole fold is processed here
        processSingleGroup :: _ -> [b] -> IO c
        processSingleGroup s
foldState [] = do
          s -> IO c
foldCoda s
foldState
        processSingleGroup s
foldState (b
b : [b]
bs) = do
          !foldState' <- s -> b -> IO s
foldStep s
foldState b
b
          processSingleGroup foldState' bs
        processBeginNextGroup :: _ -> [b] -> IO _
        processBeginNextGroup s
foldState [] = do
          s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
foldState
        processBeginNextGroup s
foldState (b
b : [b]
bs) = do
          !foldState' <- s -> b -> IO s
foldStep s
foldState b
b
          processBeginNextGroup foldState' bs
        initial' = s -> RecastState s -> s -> Triple s (RecastState s) s
forall a b c. a -> b -> c -> Triple a b c
Triple s
initialSplitterState (AreWeInsideGroup s -> [IO s] -> RecastState s
forall foldState.
AreWeInsideGroup foldState
-> [IO foldState] -> RecastState foldState
RecastState AreWeInsideGroup s
forall foldState. AreWeInsideGroup foldState
OutsideGroup [IO s]
foldAllocs0) s
initial
    Triple splitterState recastState final <- upstream stop' step' initial'
    -- What happens if there's a fold ongoing when we stop? Right now we always close it, which seems to be a reasonable
    -- action (because the fold coda might hide a finalizer).
    --
    -- Also, when can it happen that we reach this point with an ongoing fold?
    -- If I understand correctly:
    --    - it can only happen when the upstream closes and leaves the fold open.
    --    - it can't (?) happen when the consumer stops early.
    let closePendingFold = \case
          RecastState AreWeInsideGroup s
OutsideGroup [IO s]
_ -> do
            () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          RecastState (InsideGroup s
foldState) [IO s]
_ -> do
            _ <- s -> IO c
foldCoda s
foldState
            pure ()
    if
      | stop final -> do
          closePendingFold recastState
          pure final
      | otherwise -> do
          splitResult <- splitterCoda splitterState
          -- We discard the "begins next group"; it doesn't make sense in this final step.
          Pair recastState' final' <- advanceRecast (splitResult {startOfNewGroup = []}) recastState final
          if
            | stop final' -> do
                -- TODO:
                -- should we dealloc here? Maybe there's a fold reaminging... we should close it. See below.
                closePendingFold recastState'
                pure final'
            | otherwise -> do
                case recastState' of
                  RecastState AreWeInsideGroup s
OutsideGroup [IO s]
_ -> do
                    -- traceIO $ "final! outside group"
                    s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
final'
                  RecastState (InsideGroup s
foldState) [IO s]
_ -> do
                    -- traceIO $ "final! inside group"
                    c <- s -> IO c
foldCoda s
foldState
                    final'' <- step final' c
                    pure final''

-- | A 'Combiners' value knows how to process a sequence of groups, while
-- keeping a (existentially hidden) state for each group.
--
-- Very much like a @FoldM IO@  from the
-- [foldl](https://hackage.haskell.org/package/foldl-1.4.12/docs/Control-Foldl.html#t:FoldM)
-- library, but \"restartable\" with a list of starting states.
--
-- For converting one into the other, this function should do the trick:
--
-- > \(L.FoldM step allocator coda) -> combiners step coda (Prelude.repeat allocator)
data Combiners a b where
  Combiners :: (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b

deriving stock instance Functor (Combiners a)

-- | Constructor for 'Combiners' values.
combiners ::
  forall s a b r.
  -- \^ foo

  -- | Step function that threads the state @s@.
  (s -> a -> IO s) ->
  -- | Coda invoked when a group closes.
  (s -> IO b) ->
  -- | Actions that produce the initial states @s@ for processing each group.
  [IO s] ->
  Combiners a b
combiners :: forall {k} s a b (r :: k).
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
combiners = (s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
forall s a b.
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
Combiners

-- | A simpler version of 'withCombiners' that doen't thread a state; it merely
-- allocates and deallocates the resource @h@.
withCombiners_ ::
  forall h a r.
  -- | Step function that accesses the resource @h@.
  (h -> a -> IO ()) ->
  -- | Finalizer to run after closing each group, and also in the case of an exception.
  (h -> IO ()) ->
  -- | Actions that allocate a sequence of resources @h@.
  [IO h] ->
  -- | The 'Combiners' value should be consumed linearly.
  (Combiners a () -> IO r) ->
  IO r
withCombiners_ :: forall h a r.
(h -> a -> IO ())
-> (h -> IO ()) -> [IO h] -> (Combiners a () -> IO r) -> IO r
withCombiners_ h -> a -> IO ()
step h -> IO ()
finalize [IO h]
allocators = do
  (h -> () -> a -> IO ())
-> (h -> () -> IO ())
-> (h -> IO ())
-> [(IO h, h -> IO ())]
-> (Combiners a () -> IO r)
-> IO r
forall h s a b r.
(h -> s -> a -> IO s)
-> (h -> s -> IO b)
-> (h -> IO ())
-> [(IO h, h -> IO s)]
-> (Combiners a b -> IO r)
-> IO r
withCombiners
    (\h
h () a
a -> h -> a -> IO ()
step h
h a
a)
    (\h
_ () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    h -> IO ()
finalize
    ( do
        allocator <- [IO h]
allocators
        pure (allocator, \h
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    )

-- | 'Combiners' thread a state @s@ while processing each group. Sometimes, in
-- addition to that, we want to allocate a resource @h@ when we start
-- processing a group, and deallocate it after we finish processing the group
-- or an exception is thrown. The typical example is allocating a 'Handle' for
-- writing the elements of the group as they arrive.
withCombiners ::
  forall h s a b r.
  -- | Step function that accesses the resource @h@ and threads the state @s@.
  (h -> s -> a -> IO s) ->
  -- | Coda invoked when a group closes.
  (h -> s -> IO b) ->
  -- | Finalizer to run after each coda, and also in the case of an exception.
  (h -> IO ()) ->
  -- | Actions that allocate a sequence of resources @h@ and produce initial states @s@ for processing each group.
  [(IO h, h -> IO s)] ->
  -- | The 'Combiners' value should be consumed linearly.
  (Combiners a b -> IO r) ->
  IO r
withCombiners :: forall h s a b r.
(h -> s -> a -> IO s)
-> (h -> s -> IO b)
-> (h -> IO ())
-> [(IO h, h -> IO s)]
-> (Combiners a b -> IO r)
-> IO r
withCombiners h -> s -> a -> IO s
step h -> s -> IO b
coda h -> IO ()
finalize [(IO h, h -> IO s)]
allocators Combiners a b -> IO r
continuation = do
  resourceRef <- forall a. IO (MVar a)
newEmptyMVar @h
  let step' (Pair h
h s
s) a
a = do
        s' <- h -> s -> a -> IO s
step h
h s
s a
a
        pure (Pair h s')
      tryFinalize = do
        MVar h -> IO (Maybe h)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar h
resourceRef IO (Maybe h) -> (Maybe h -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe h
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just h
resource -> h -> IO ()
finalize h
resource
      adaptAllocator :: (IO h, h -> IO s) -> IO (Pair h s)
      adaptAllocator (IO h
allocate, h -> IO s
makeInitialState) = do
        h <- IO h -> IO h
forall x. IO x -> IO x
mask_ do
          h <- IO h
allocate
          putMVar resourceRef h
          pure h
        s <- makeInitialState h
        pure (Pair h s)
      coda' :: Pair h s -> IO b
      coda' (Pair h
h s
s) = do
        b <- h -> s -> IO b
coda h
h s
s
        -- this always succeeds, we store the resource at the beginning!
        mask_ tryFinalize
        pure b
  r <-
    (continuation (combiners step' coda' (adaptAllocator <$> allocators)))
      `Control.Exception.finally` tryFinalize
  pure r

-- | Puts the elements of each group into a list that is kept in memory. This breaks streaming within the group.
--
-- Useful with 'recast'.
combineIntoLists :: Combiners a [a]
combineIntoLists :: forall a. Combiners a [a]
combineIntoLists =
  (DList a -> a -> IO (DList a))
-> (DList a -> IO [a]) -> [IO (DList a)] -> Combiners a [a]
forall {k} s a b (r :: k).
(s -> a -> IO s) -> (s -> IO b) -> [IO s] -> Combiners a b
combiners
    (\DList a
s a
a -> DList a -> IO (DList a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList a
s DList a -> DList a -> DList a
forall a. Semigroup a => a -> a -> a
<> a -> DList a
forall a. a -> DList a
singleton a
a))
    ([a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO [a]) -> (DList a -> [a]) -> DList a -> IO [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DList a -> [a]
forall a. DList a -> [a]
closeDList)
    (IO (DList a) -> [IO (DList a)]
forall a. a -> [a]
Prelude.repeat (DList a -> IO (DList a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DList a
forall a. Monoid a => a
mempty))

-- | Delimits groups in the values yielded by a 'Jet', and can also transform
-- those values.
type Splitter a b = MealyIO a (SplitStepResult b)

-- | A [Mealy machine](https://en.wikipedia.org/wiki/Mealy_machine) with an
-- existentially hidden state.
--
-- Very much like a @FoldM IO@  from the
-- [foldl](https://hackage.haskell.org/package/foldl-1.4.12/docs/Control-Foldl.html#t:FoldM)
-- library, but it emits an output at each step, not only at the end.
data MealyIO a b where
  MealyIO ::
    -- | The step function which threads the state.
    (s -> a -> IO (b, s)) ->
    -- | The final output, produced from the final state.
    (s -> IO b) ->
    -- | An action that produces the initial state.
    IO s ->
    MealyIO a b

deriving stock instance Functor (MealyIO a)

-- | For each value coming from upstream, what has the 'Splitter' learned?
--
-- * Perhaps we should continue some group we have already started in a previous step.
--
-- * Perhaps we have found entire groups that we should emit in one go, groups we know are already complete.
--
-- * Perhaps we should start a new group that will continue in the next steps.
data SplitStepResult b = SplitStepResult
  { -- | The continued group will be \"closed"\ if in the current step we emit
    -- an entire group or we begin a new group.
    --
    -- __INVARIANT__: we should only continue a group if we have already
    -- opened a \"new one\" with one or more elements in an earlier step.
    forall b. SplitStepResult b -> [b]
continuationOfPreviouslyStartedGroup :: [b],
    -- | It's ok if the groups we find are empty.
    forall b. SplitStepResult b -> [[b]]
entireGroups :: [[b]],
    -- | __INVARIANT__: when we are in the final step, we should not yield elements
    -- for the beginning of a new one.
    forall b. SplitStepResult b -> [b]
startOfNewGroup :: [b]
  }
  deriving ((forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b)
-> (forall a b. a -> SplitStepResult b -> SplitStepResult a)
-> Functor SplitStepResult
forall a b. a -> SplitStepResult b -> SplitStepResult a
forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b
fmap :: forall a b. (a -> b) -> SplitStepResult a -> SplitStepResult b
$c<$ :: forall a b. a -> SplitStepResult b -> SplitStepResult a
<$ :: forall a b. a -> SplitStepResult b -> SplitStepResult a
Functor, Int -> SplitStepResult b -> ShowS
[SplitStepResult b] -> ShowS
SplitStepResult b -> String
(Int -> SplitStepResult b -> ShowS)
-> (SplitStepResult b -> String)
-> ([SplitStepResult b] -> ShowS)
-> Show (SplitStepResult b)
forall b. Show b => Int -> SplitStepResult b -> ShowS
forall b. Show b => [SplitStepResult b] -> ShowS
forall b. Show b => SplitStepResult b -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall b. Show b => Int -> SplitStepResult b -> ShowS
showsPrec :: Int -> SplitStepResult b -> ShowS
$cshow :: forall b. Show b => SplitStepResult b -> String
show :: SplitStepResult b -> String
$cshowList :: forall b. Show b => [SplitStepResult b] -> ShowS
showList :: [SplitStepResult b] -> ShowS
Show)

instance Semigroup (SplitStepResult b) where
  SplitStepResult [b]
c1 [[b]]
e1 [b]
b1 <> :: SplitStepResult b -> SplitStepResult b -> SplitStepResult b
<> SplitStepResult [b]
c2 [[b]]
e2 [b]
b2 =
    [b] -> [[b]] -> [b] -> SplitStepResult b
forall b. [b] -> [[b]] -> [b] -> SplitStepResult b
SplitStepResult ([b]
c1 [b] -> [b] -> [b]
forall a. Semigroup a => a -> a -> a
<> [b]
c2) ([[b]]
e1 [[b]] -> [[b]] -> [[b]]
forall a. Semigroup a => a -> a -> a
<> [[b]]
e2) ([b]
b1 [b] -> [b] -> [b]
forall a. Semigroup a => a -> a -> a
<> [b]
b2)

instance Monoid (SplitStepResult b) where
  mempty :: SplitStepResult b
mempty = [b] -> [[b]] -> [b] -> SplitStepResult b
forall b. [b] -> [[b]] -> [b] -> SplitStepResult b
SplitStepResult [] [] []

-- TODO: bring back some linear stuff? Perhaps adding a linearFmap ?
--