-- | A firehose sink, letting client get through a port and read the sink
-- output.
module Data.Conduit.FireHose (fireHose) where

import Control.Monad.IO.Class

import Logstash.Message

import Data.Conduit
import Data.Conduit.Network.Firehose
import Network.Wai (Request,pathInfo)

import qualified Data.HashSet as HS
import qualified Data.Text as T

import Data.Aeson
import Blaze.ByteString.Builder.ByteString
import Data.Monoid

-- | A web server will be launched on the specified port. The request URL
-- must be of the form /type1,type2,type3. The client will be fed all
-- messages matching those types.
fireHose :: MonadIO m => Int -- ^ Port
                      -> Int -- ^ Buffer size for the fire hose threads
                      -> IO (Conduit LogstashMessage m LogstashMessage)
fireHose port buffersize = firehoseConduit port buffersize getFilter serialize
    where
        serialize = (<> fromByteString "\n") . fromLazyByteString . encode
        getFilter r = case pathInfo r of
                          [p] -> let set = HS.fromList $ T.splitOn "," p
                                 in  flip HS.member set . logstashType
                          _ -> error "invalid url"