-- | This is a simple utility module to implement a publish-subscribe pattern.
-- Note that this only allows communication in a single direction: pusing data
-- from the server to connected clients (browsers).
--
-- Usage:
--
-- * Create a new 'PubSub' handle using 'newPubSub'
--
-- * Subscribe your clients using the 'subscribe' call
--
-- * Push new updates from the server using the 'publish' call
--
{-# LANGUAGE Rank2Types #-}
module Network.WebSockets.Util.PubSub
    ( PubSub
    , newPubSub
    , publish
    , subscribe
    ) where

import Control.Applicative ((<$>))
import Control.Monad (forM_, forever)
import Control.Monad.Trans (liftIO)
import Data.IntMap (IntMap)
import qualified Control.Concurrent.MVar as MV

import qualified Data.IntMap as IM

import Network.WebSockets

data PubSub_ p = PubSub_
    { pubSubNextId :: Int
    , pubSubSinks  :: IntMap (Sink p)
    }

addClient :: Sink p -> PubSub_ p -> (PubSub_ p, Int)
addClient sink (PubSub_ nid sinks) =
    (PubSub_ (nid + 1) (IM.insert nid sink sinks), nid)

removeClient :: Int -> PubSub_ p -> PubSub_ p
removeClient ref ps = ps {pubSubSinks = IM.delete ref (pubSubSinks ps)}

-- | A handle which keeps track of subscribed clients
newtype PubSub p = PubSub (MV.MVar (PubSub_ p))

-- | Create a new 'PubSub' handle, with no clients initally connected
newPubSub :: IO (PubSub p)
newPubSub = PubSub <$> MV.newMVar PubSub_
    { pubSubNextId  = 0
    , pubSubSinks  = IM.empty
    }

-- | Broadcast a message to all connected clients
publish :: PubSub p -> Message p -> IO ()
publish (PubSub mvar) msg = do
    sinks <- pubSubSinks <$> MV.readMVar mvar 
    forM_ (IM.toList sinks) $ \(_, s) -> sendSink s msg

-- | Blocks forever
subscribe :: Protocol p => PubSub p -> WebSockets p ()
subscribe (PubSub mvar) = do
    sink <- getSink
    ref  <- liftIO $ MV.modifyMVar mvar $ return . addClient sink
    catchWsError loop $ const $ liftIO $
        MV.modifyMVar_ mvar $ return . removeClient ref
  where
    loop = forever $ do
        _ <- receiveDataMessage
        return ()