module Network.WebSockets.Connection
( PendingConnection (..)
, AcceptRequest(..)
, acceptRequest
, acceptRequestWith
, rejectRequest
, Available (..)
, Connection (..)
, ConnectionOptions (..)
, defaultConnectionOptions
, receive
, receiveDataMessage
, receiveData
, send
, sendDataMessage
, sendTextData
, sendBinaryData
, sendClose
, sendCloseCode
, sendPing
, forkPingThread
) where
import qualified Blaze.ByteString.Builder as Builder
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar)
import Control.Exception (AsyncException, fromException,
handle, mask, onException, throwIO)
import Control.Monad (unless)
import qualified Data.ByteString as B
import Data.IORef (IORef, newIORef, readIORef,
writeIORef)
import Data.List (find)
import qualified Data.Text as T
import Data.Word (Word16)
import Network.WebSockets.Http
import Network.WebSockets.Protocol
import Network.WebSockets.Stream (Stream)
import qualified Network.WebSockets.Stream as Stream
import Network.WebSockets.Types
data PendingConnection = PendingConnection
{ pendingOptions :: !ConnectionOptions
, pendingRequest :: !RequestHead
, pendingOnAccept :: !(Connection -> IO ())
, pendingStream :: !Stream
}
data AcceptRequest = AcceptRequest
{ acceptSubprotocol :: !(Maybe B.ByteString)
}
sendResponse :: PendingConnection -> Response -> IO ()
sendResponse pc rsp = Stream.write (pendingStream pc)
(Builder.toLazyByteString (encodeResponse rsp))
acceptRequest :: PendingConnection -> IO Connection
acceptRequest pc = acceptRequestWith pc $ AcceptRequest Nothing
acceptRequestWith :: PendingConnection -> AcceptRequest -> IO Connection
acceptRequestWith pc ar = case find (flip compatible request) protocols of
Nothing -> do
sendResponse pc $ response400 versionHeader ""
throwIO NotSupported
Just protocol -> do
let subproto = maybe [] (\p -> [("Sec-WebSocket-Protocol", p)]) $ acceptSubprotocol ar
response = finishRequest protocol request subproto
sendResponse pc response
parse <- decodeMessages protocol (pendingStream pc)
write <- encodeMessages protocol ServerConnection (pendingStream pc)
parseState <- newMVar (Available parse)
writeState <- newMVar (Available write)
sentRef <- newIORef False
let connection = Connection
{ connectionOptions = pendingOptions pc
, connectionType = ServerConnection
, connectionProtocol = protocol
, connectionParse = parseState
, connectionWrite = writeState
, connectionSentClose = sentRef
}
pendingOnAccept pc connection
return connection
where
request = pendingRequest pc
versionHeader = [("Sec-WebSocket-Version",
B.intercalate ", " $ concatMap headerVersions protocols)]
rejectRequest :: PendingConnection -> B.ByteString -> IO ()
rejectRequest pc message = sendResponse pc $ response400 [] message
data Available a = Available !a | Unavailable
data Connection = Connection
{ connectionOptions :: !ConnectionOptions
, connectionType :: !ConnectionType
, connectionProtocol :: !Protocol
, connectionParse :: !(MVar (Available (IO (Maybe Message))))
, connectionWrite :: !(MVar (Available (Message -> IO ())))
, connectionSentClose :: !(IORef Bool)
}
data ConnectionOptions = ConnectionOptions
{ connectionOnPong :: !(IO ())
}
defaultConnectionOptions :: ConnectionOptions
defaultConnectionOptions = ConnectionOptions
{ connectionOnPong = return ()
}
receive :: Connection -> IO Message
receive conn = withMVarEx m Unavailable $ \state ->
case state of
Unavailable -> throwIO ConnectionClosed
Available parse -> do
mbMsg <- parse
case mbMsg of
Nothing -> throwIO ConnectionClosed
Just msg -> return msg
where
m = connectionParse conn
receiveDataMessage :: Connection -> IO DataMessage
receiveDataMessage conn = do
msg <- receive conn
case msg of
DataMessage am -> return am
ControlMessage cm -> case cm of
Close i closeMsg -> do
hasSentClose <- readIORef $ connectionSentClose conn
unless hasSentClose $ send conn msg
throwIO $ CloseRequest i closeMsg
Pong _ -> do
connectionOnPong (connectionOptions conn)
receiveDataMessage conn
Ping pl -> do
send conn (ControlMessage (Pong pl))
receiveDataMessage conn
receiveData :: WebSocketsData a => Connection -> IO a
receiveData conn = do
dm <- receiveDataMessage conn
case dm of
Text x -> return (fromLazyByteString x)
Binary x -> return (fromLazyByteString x)
send :: Connection -> Message -> IO ()
send conn msg = withMVarEx m Unavailable $ \state -> do
case msg of
(ControlMessage (Close _ _)) -> writeIORef (connectionSentClose conn) True
_ -> return ()
case state of
Unavailable -> throwIO ConnectionClosed
Available write -> write msg
where
m = connectionWrite conn
sendDataMessage :: Connection -> DataMessage -> IO ()
sendDataMessage conn = send conn . DataMessage
sendTextData :: WebSocketsData a => Connection -> a -> IO ()
sendTextData conn = sendDataMessage conn . Text . toLazyByteString
sendBinaryData :: WebSocketsData a => Connection -> a -> IO ()
sendBinaryData conn = sendDataMessage conn . Binary . toLazyByteString
sendClose :: WebSocketsData a => Connection -> a -> IO ()
sendClose conn = sendCloseCode conn 1000
sendCloseCode :: WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode conn code =
send conn . ControlMessage . Close code . toLazyByteString
sendPing :: WebSocketsData a => Connection -> a -> IO ()
sendPing conn = send conn . ControlMessage . Ping . toLazyByteString
forkPingThread :: Connection -> Int -> IO ()
forkPingThread conn n
| n <= 0 = return ()
| otherwise = do
_ <- forkIO (ignore `handle` go 1)
return ()
where
go :: Int -> IO ()
go i = do
threadDelay (n * 1000 * 1000)
sendPing conn (T.pack $ show i)
go (i + 1)
ignore e = case fromException e of
Just async -> throwIO (async :: AsyncException)
Nothing -> return ()
withMVarEx :: MVar a -> a -> (a -> IO b) -> IO b
withMVarEx m x io =
mask $ \restore -> do
a <- takeMVar m
b <- restore (io a) `onException` putMVar m x
putMVar m a
return b