module Kafka.Conduit.Combinators
  ( BatchSize(..)
  , batchByOrFlush
  , batchByOrFlushEither
  , foldYield
  , throwLeft
  , throwLeftSatisfy
  ) where

import           Control.Exception
import           Control.Monad
import           Control.Monad.Catch
import           Data.Conduit

-- | Throws the left part of a value in a 'MonadThrow' context
throwLeft :: (MonadThrow m, Exception e) => Conduit (Either e i) m i
throwLeft = awaitForever (either throwM yield)

-- | Throws the left part of a value in a 'MonadThrow' context if the value
-- satisfies the predicate
throwLeftSatisfy :: (MonadThrow m, Exception e) => (e -> Bool) -> Conduit (Either e i) m (Either e i)
throwLeftSatisfy p = awaitForever awaitHandle
  where awaitHandle (Left e) | p e  = throwM e
        awaitHandle v        = yield v

-- | Create a conduit that folds with the function f over its input i with its
-- internal state s and emits outputs [o], then finally emits outputs [o] from
-- the function g applied to the final state s.
foldYield :: Monad m => (i -> s -> (s, [o])) -> (s -> [o]) -> s -> Conduit i m o
foldYield f g s = do
  mi <- await
  case mi of
    Just i -> do
      let (s', os) = f i s
      forM_ os yield
      foldYield f g s'
    Nothing -> forM_ (g s) yield

newtype BatchSize = BatchSize Int deriving (Show, Eq, Ord)

batchByOrFlush :: Monad m => BatchSize -> Conduit (Maybe a) m [a]
batchByOrFlush (BatchSize n) = foldYield folder finish (0 :: Int, [])
  where
    folder Nothing  (_, xs)                 = ((0    ,   []), [reverse    xs ])
    folder (Just a) (i, xs) | (i + 1) >= n  = ((0    ,   []), [reverse (a:xs)])
    folder (Just a) (i, xs)                 = ((i + 1, a:xs),               [])
    finish (_, xs) = [reverse xs]

batchByOrFlushEither :: Monad m => BatchSize -> Conduit (Either e a) m [a]
batchByOrFlushEither (BatchSize n) = foldYield folder finish (0 :: Int, [])
  where
    folder (Left _)  (_, xs)              = ((0    ,    []), [reverse    xs ])
    folder (Right a) (i, xs) | i + 1 >= n = ((0    ,    []), [reverse (a:xs)])
    folder (Right a) (i, xs)              = ((i + 1, a:xs ),               [])
    finish (_, xs) = [reverse xs]