-- | A firehose conduit, spawning a web server that will allow for the -- observation of the messages. module Data.Conduit.FireHose (fireHose) where import Control.Monad.IO.Class import Logstash.Message import Data.Conduit import Data.Conduit.Network.Firehose import Network.Wai (pathInfo) import qualified Data.HashSet as HS import qualified Data.Text as T import Data.Aeson import Blaze.ByteString.Builder.ByteString import Data.Monoid import Data.Maybe (fromMaybe) {-| A web server will be launched on the specified port. Clients can request URLs of the form /type1,type2,type3. They will be fed all 'LogstashMessage' matching one of the given types. Here is a sample usage : > -- run the fire hose on port 13400 > fh <- fireHose 13400 10 > logstashListener lport (printErrors =$ CL.mapM (liftIO . addLogstashTime) -- add the time > =$ fh > =$ CL.map (BSL.toStrict . encode) -- turn into a bytestring > =$ redisSink host port queue (Just logfunc)) -- store to redis -} fireHose :: MonadIO m => Int -- ^ Port -> Int -- ^ Buffer size for the fire hose threads -> IO (Conduit LogstashMessage m LogstashMessage) fireHose port buffersize = firehoseConduit port buffersize getFilter serialize where serialize = (<> fromByteString "\n") . fromLazyByteString . encode getFilter r = case pathInfo r of [p] -> let set = HS.fromList $ T.splitOn "," p in flip HS.member set . fromMaybe "empty" . logstashType _ -> error "invalid url"