{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Stream
  ( toStream
  , fromStream
  , toCommit
  , toCommitFold
  , toCommitSink
  , toEmit
  , queueStream
  , toStreamM
  , fromStreamM
  ) where
import Control.Category
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Streaming (Of(..), Stream)
import qualified Control.Foldl as L
import qualified Streaming.Prelude as S
import Control.Monad.Conc.Class as C
import Protolude hiding ((<>), (.), STM, check, wait, cancel, atomically, withAsync, concurrently)
toCommit :: (MonadConc m) => (Stream (Of a) m () -> m r) -> Cont m (Committer (STM m) a)
toCommit f =
  Cont (\c -> queueC c (\(Emitter o) -> f . toStream . Emitter $ o))
toCommitFold :: (MonadConc m) => L.FoldM m a () -> Cont m (Committer (STM m) a)
toCommitFold f = toCommit (fmap S.snd' . L.impurely S.foldM f)
toCommitSink :: (MonadConc m) => (a -> m ()) -> Cont m (Committer (STM m) a)
toCommitSink sink = toCommitFold (L.FoldM step begin done)
  where
    step x a = do
      sink a
      pure x
    begin = pure ()
    done = pure
toEmit :: (MonadConc m) => Stream (Of a) m () -> Cont m (Emitter (STM m) a)
toEmit s = Cont (queueE (fromStream s))
queueStream ::
     (MonadConc m) => Stream (Of a) m () -> Cont m (Stream (Of a) m ())
queueStream i = Cont $ \o -> queueE (fromStream i) (toStream >>> o)
toStream :: (MonadConc m) => Emitter (STM m) a -> Stream (Of a) m ()
toStream e = toStreamM (liftE e)
toStreamM :: (MonadConc m) => Emitter m a -> Stream (Of a) m ()
toStreamM e = S.untilRight getNext
  where
    getNext = maybe (Right ()) Left <$> emit e
fromStream :: (MonadConc m) => Stream (Of b) m () -> Committer (STM m) b -> m ()
fromStream s c = fromStreamM s (liftC c)
fromStreamM :: (MonadConc m) => Stream (Of b) m () -> Committer m b -> m ()
fromStreamM s c = go s
  where
    go str = do
      eNxt <- S.next str 
      forM_ eNxt $ \(a, str') -> do
        continue <- commit c a
        when continue (go str')