-- | Receive logstash messages from the network, and process them with -- a conduit. module Data.Conduit.Logstash (logstashListener) where import Data.Conduit import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.List as CL import Data.Conduit.Network import Data.Aeson import qualified Data.ByteString.Lazy as BSL import qualified Data.ByteString as BS import Codec.Text.IConv import Data.Text.Encoding import Logstash.Message -- | Decodes JSON data from ByteStrings that can be encoded in UTF-8 or -- latin1. tryDecode :: (FromJSON a) => BS.ByteString -> Either BS.ByteString a tryDecode i = let latin1 = convert "LATIN1" "UTF-8" li li = BSL.fromStrict i o = case decodeUtf8' i of Left _ -> decode latin1 Right _ -> decode li in case o of Just x -> Right x Nothing -> Left i -- | This creates a logstash network listener, given a TCP port. -- It will try to decode the Bytestring as UTF-8, and, if it fails, as -- Latin1. logstashListener :: Int -- ^ Port number -> Sink (Either BS.ByteString LogstashMessage) (ResourceT IO) () -> IO () logstashListener port sink = runResourceT $ runTCPServer (serverSettings port HostAny) (\app -> appSource app $= CB.lines $= CL.map tryDecode $$ sink)