module Data.Conduit.Network.Firehose (firehoseApp, firehoseConduit) where
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Concurrent.STM.Firehose
import Data.Conduit.TQueue
import qualified Data.Conduit.List as CL
import Network.HTTP.Types
import Network.Wai
import Data.Conduit
import Blaze.ByteString.Builder
import Network.Wai.Handler.Warp
import Control.Monad (void)
import Control.Monad.IO.Class
firehoseApp :: Int
-> (Request -> a -> Bool)
-> (a -> Builder)
-> Firehose a
-> Application
firehoseApp buffsize filtering serialize fh req = responseSourceBracket (atomically (subscribe buffsize fh)) (atomically . unsubscribe) runFirehose
where
filtering' = filtering req
runFirehose sub = return (status200, [], sourceTBMQueue (getQueue sub) $= CL.filter filtering' $= CL.concatMap (\e -> [Chunk (serialize e), Flush]) )
firehoseConduit :: (Monad m, MonadIO m)
=> Int
-> Int
-> (Request -> a -> Bool)
-> (a -> Builder)
-> IO (Conduit a m a)
firehoseConduit port buffersize getFilter serialize = do
fh <- atomically newFirehose
let settings = defaultSettings { settingsPort = port, settingsTimeout = 3600 }
void $ forkIO (runSettings settings (firehoseApp buffersize getFilter serialize fh))
return (CL.mapM (\m -> liftIO (atomically (writeEvent fh m)) >> return m))