{-# LANGUAGE NoImplicitPrelude #-}
{-# OPTIONS_GHC -Wall #-}

module Box.Broadcast
  ( Broadcaster(..)
  , broadcast
  , subscribe
  , Funneler(..)
  , funnel
  , widen
  ) where

import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Protolude hiding (STM, atomically, (.), (<>))
import Control.Concurrent.Classy.STM as C
import Control.Monad.Conc.Class as C

-- | a broadcaster 
newtype Broadcaster m a = Broadcaster
  { unBroadcast :: TVar m (Committer m a)
  }

-- | create a (broadcaster, committer)
broadcast :: (MonadSTM stm) => stm (Broadcaster stm a, Committer stm a)
broadcast = do
  ref <- newTVar mempty
  let com = Committer $ \a -> do
        c <- readTVar ref
        commit c a
  return (Broadcaster ref, com)

-- | subscribe to a broadcaster
subscribe :: (MonadConc m) => Broadcaster (STM m) a -> Cont m (Emitter (STM m) a)
subscribe (Broadcaster tvar) = Cont $ \e -> queueE cio e
  where
    cio c = atomically $ modifyTVar' tvar (mappend c)

-- | a funneler
newtype Funneler m a = Funneler
  { unFunnel :: TVar m (Emitter m a)
  }

-- | create a (funneler, emitter)
funnel :: (MonadSTM stm) => stm (Funneler stm a, Emitter stm a)
funnel = do
  ref <- newTVar mempty
  let em =
        Emitter $ do
          e <- readTVar ref
          emit e
  pure (Funneler ref, em)

-- | widen to a funneler
widen :: (MonadConc m) => Funneler (STM m) a -> Cont m (Committer (STM m) a)
widen (Funneler tvar) =
  Cont $ \c -> queueC c $ \e -> atomically $ modifyTVar' tvar (mappend e)