{-# LANGUAGE OverloadedStrings #-}
module Network.Wai.EventSource.Streaming
(
withEvents
, FromEvent(..)
, dataOnly
, event
) where
import Control.Applicative (many, optional)
import Control.Monad (unless)
import Data.Attoparsec.ByteString.Char8
import qualified Data.Attoparsec.ByteString.Streaming as SA
import Data.Binary.Builder (Builder, fromByteString, toLazyByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Streaming.Char8 as SB
import Network.HTTP.Client
import Network.Wai.EventSource (ServerEvent(..))
import Prelude hiding (takeWhile)
import Streaming
import qualified Streaming.Prelude as SP
withEvents :: Request -> Manager -> (Stream (Of ServerEvent) IO () -> IO a) -> IO a
withEvents r m f = withResponse r m $ f . g . responseBody
where
g :: BodyReader -> Stream (Of ServerEvent) IO ()
g = void . SA.parsed event . SB.fromChunks . fromBodyReader
fromBodyReader :: BodyReader -> Stream (Of B.ByteString) IO ()
fromBodyReader br = do
bs <- liftIO $ brRead br
unless (B.null bs) $ SP.yield bs >> fromBodyReader br
class FromEvent a where
fromEvent :: B.ByteString -> Maybe a
dataOnly :: (FromEvent a, Monad m) => Stream (Of ServerEvent) m r -> Stream (Of a) m r
dataOnly = SP.mapMaybe (fromEvent . BL.toStrict . toLazyByteString) . SP.concat . SP.map f
where
f (ServerEvent _ _ d) = d
f _ = []
event :: Parser ServerEvent
event = (sevent <|> comment <|> retry) <* eol
sevent :: Parser ServerEvent
sevent = ServerEvent
<$> optional (string "event" *> char ':' *> chars <* eol)
<*> optional (string "id" *> char ':' *> chars <* eol)
<*> many (string "data" *> char ':' *> chars <* eol)
comment :: Parser ServerEvent
comment = CommentEvent <$> (char ':' *> chars <* eol)
retry :: Parser ServerEvent
retry = RetryEvent <$> (string "retry:" *> decimal <* eol)
chars :: Parser Builder
chars = fromByteString <$> takeTill (== '\n')
eol :: Parser Char
eol = char '\n'