module Network.WebSockets.Util.PubSub
( PubSub
, newPubSub
, publish
, subscribe
) where
import Control.Applicative ((<$>))
import Control.Monad (forM_, forever)
import Control.Monad.Trans (liftIO)
import Data.IntMap (IntMap)
import qualified Control.Concurrent.MVar as MV
import qualified Data.IntMap as IM
import Network.WebSockets
data PubSub_ p = PubSub_
{ pubSubNextId :: Int
, pubSubSinks :: IntMap (Sink p)
}
addClient :: Sink p -> PubSub_ p -> (PubSub_ p, Int)
addClient sink (PubSub_ nid sinks) =
(PubSub_ (nid + 1) (IM.insert nid sink sinks), nid)
removeClient :: Int -> PubSub_ p -> PubSub_ p
removeClient ref ps = ps {pubSubSinks = IM.delete ref (pubSubSinks ps)}
newtype PubSub p = PubSub (MV.MVar (PubSub_ p))
newPubSub :: IO (PubSub p)
newPubSub = PubSub <$> MV.newMVar PubSub_
{ pubSubNextId = 0
, pubSubSinks = IM.empty
}
publish :: PubSub p -> Message p -> IO ()
publish (PubSub mvar) msg = do
sinks <- pubSubSinks <$> MV.readMVar mvar
forM_ (IM.toList sinks) $ \(_, s) -> sendSink s msg
subscribe :: Protocol p => PubSub p -> WebSockets p ()
subscribe (PubSub mvar) = do
sink <- getSink
ref <- liftIO $ MV.modifyMVar mvar $ return . addClient sink
catchWsError loop $ const $ liftIO $
MV.modifyMVar_ mvar $ return . removeClient ref
where
loop = forever $ do
_ <- receiveDataMessage
return ()