-- | This is a simple utility module to implement a publish-subscribe pattern. -- Note that this only allows communication in a single direction: pusing data -- from the server to connected clients (browsers). -- -- Usage: -- -- * Create a new 'PubSub' handle using 'newPubSub' -- -- * Subscribe your clients using the 'subscribe' call -- -- * Push new updates from the server using the 'publish' call -- {-# LANGUAGE Rank2Types, ScopedTypeVariables #-} module Network.WebSockets.Util.PubSub ( PubSub , newPubSub , publish , subscribe ) where import Control.Applicative ((<$>)) import Control.Exception (IOException, handle) import Control.Monad (foldM, forever) import Control.Monad.Trans (liftIO) import Data.IntMap (IntMap) import Data.List (foldl') import qualified Control.Concurrent.MVar as MV import qualified Data.IntMap as IM import Network.WebSockets data PubSub_ p = PubSub_ { pubSubNextId :: Int , pubSubSinks :: IntMap (Sink p) } addClient :: Sink p -> PubSub_ p -> (PubSub_ p, Int) addClient sink (PubSub_ nid sinks) = (PubSub_ (nid + 1) (IM.insert nid sink sinks), nid) removeClient :: Int -> PubSub_ p -> PubSub_ p removeClient ref ps = ps {pubSubSinks = IM.delete ref (pubSubSinks ps)} -- | A handle which keeps track of subscribed clients newtype PubSub p = PubSub (MV.MVar (PubSub_ p)) -- | Create a new 'PubSub' handle, with no clients initally connected newPubSub :: IO (PubSub p) newPubSub = PubSub <$> MV.newMVar PubSub_ { pubSubNextId = 0 , pubSubSinks = IM.empty } -- | Broadcast a message to all connected clients publish :: PubSub p -> Message p -> IO () publish (PubSub mvar) msg = MV.modifyMVar_ mvar $ \pubSub -> do -- Take care to detect and remove broken clients broken <- foldM publish' [] (IM.toList $ pubSubSinks pubSub) return $ foldl' (\p b -> removeClient b p) pubSub broken where -- Publish the message to a single client, add it to the broken list if an -- IOException occurs publish' broken (i, s) = handle (\(_ :: IOException) -> return (i : broken)) $ do sendSink s msg return broken -- | Blocks forever subscribe :: Protocol p => PubSub p -> WebSockets p () subscribe (PubSub mvar) = do sink <- getSink ref <- liftIO $ MV.modifyMVar mvar $ return . addClient sink catchWsError loop $ const $ liftIO $ MV.modifyMVar_ mvar $ return . removeClient ref where loop = forever $ do _ <- receiveDataMessage return ()