module Kafka.Conduit.Combinators
( BatchSize(..)
, batchByOrFlush
, batchByOrFlushEither
, foldYield
, throwLeft
, throwLeftSatisfy
) where
import Control.Exception
import Control.Monad
import Control.Monad.Catch
import Data.Conduit
throwLeft :: (MonadThrow m, Exception e) => Conduit (Either e i) m i
throwLeft = awaitForever (either throwM yield)
throwLeftSatisfy :: (MonadThrow m, Exception e) => (e -> Bool) -> Conduit (Either e i) m (Either e i)
throwLeftSatisfy p = awaitForever awaitHandle
where awaitHandle (Left e) | p e = throwM e
awaitHandle v = yield v
foldYield :: Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> Conduit i m o
foldYield f g s = do
mi <- await
case mi of
Just i -> do
let (s', os) = f i s
forM_ os yield
foldYield f g s'
Nothing -> forM_ (g s) yield
newtype BatchSize = BatchSize Int deriving (Show, Eq, Ord)
batchByOrFlush :: Monad m => BatchSize -> Conduit (Maybe a) m [a]
batchByOrFlush (BatchSize n) = foldYield folder finish (0 :: Int, [])
where
folder Nothing (_, xs) = ((0 , []), [reverse xs ])
folder (Just a) (i, xs) | (i + 1) >= n = ((0 , []), [reverse (a:xs)])
folder (Just a) (i, xs) = ((i + 1, a:xs), [])
finish (_, xs) = [reverse xs]
batchByOrFlushEither :: Monad m => BatchSize -> Conduit (Either e a) m [a]
batchByOrFlushEither (BatchSize n) = foldYield folder finish (0 :: Int, [])
where
folder (Left _) (_, xs) = ((0 , []), [reverse xs ])
folder (Right a) (i, xs) | i + 1 >= n = ((0 , []), [reverse (a:xs)])
folder (Right a) (i, xs) = ((i + 1, a:xs ), [])
finish (_, xs) = [reverse xs]