{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}

-- | Streaming functionality
--

module Box.Stream
  ( toStream
  , fromStream
  , toCommit
  , toCommitFold
  , toCommitSink
  , toEmit
  , queueStream
  , toStreamM
  , fromStreamM
  ) where

import Control.Category

import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Streaming (Of(..), Stream)
import qualified Control.Foldl as L
import qualified Streaming.Prelude as S
import Control.Monad.Conc.Class as C
import Protolude hiding ((<>), (.), STM, check, wait, cancel, atomically, withAsync, concurrently)

-- * streaming
-- | create a committer from a stream consumer
toCommit :: (MonadConc m) => (Stream (Of a) m () -> m r) -> Cont m (Committer (STM m) a)
toCommit f =
  Cont (\c -> queueC c (\(Emitter o) -> f . toStream . Emitter $ o))

-- | create a committer from a fold
toCommitFold :: (MonadConc m) => L.FoldM m a () -> Cont m (Committer (STM m) a)
toCommitFold f = toCommit (fmap S.snd' . L.impurely S.foldM f)

-- | create a committer from a sink
toCommitSink :: (MonadConc m) => (a -> m ()) -> Cont m (Committer (STM m) a)
toCommitSink sink = toCommitFold (L.FoldM step begin done)
  where
    step x a = do
      sink a
      pure x
    begin = pure ()
    done = pure

-- | create an emitter from a stream
toEmit :: (MonadConc m) => Stream (Of a) m () -> Cont m (Emitter (STM m) a)
toEmit s = Cont (queueE (fromStream s))

-- | insert a queue into a stream (left biased collapse)
-- todo: look at biases
queueStream ::
     (MonadConc m) => Stream (Of a) m () -> Cont m (Stream (Of a) m ())
queueStream i = Cont $ \o -> queueE (fromStream i) (toStream >>> o)

-- | turn an emitter into a stream
toStream :: (MonadConc m) => Emitter (STM m) a -> Stream (Of a) m ()
toStream e = toStreamM (liftE e)

-- | turn an emitter into a stream
toStreamM :: (MonadConc m) => Emitter m a -> Stream (Of a) m ()
toStreamM e = S.untilRight getNext
  where
    getNext = maybe (Right ()) Left <$> emit e

-- | turn a stream into a committer
fromStream :: (MonadConc m) => Stream (Of b) m () -> Committer (STM m) b -> m ()
fromStream s c = fromStreamM s (liftC c)

-- | turn a stream into a committer
fromStreamM :: (MonadConc m) => Stream (Of b) m () -> Committer m b -> m ()
fromStreamM s c = go s
  where
    go str = do
      eNxt <- S.next str -- uncons requires r ~ ()
      forM_ eNxt $ \(a, str') -> do
        continue <- commit c a
        when continue (go str')