module Network.Pusher.WebSockets.Internal.Client where
import Control.Concurrent (forkIO, killThread, threadDelay)
import Control.Exception (fromException, throwIO)
import Control.Monad (forever)
import Data.Maybe (isJust)
import Control.Concurrent.STM (atomically, check, retry)
import Control.Concurrent.STM.TQueue (tryReadTQueue)
import Control.Concurrent.STM.TVar (TVar, newTVarIO, readTVar, writeTVar)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (Value(..), encode)
import qualified Data.HashMap.Strict as H
import qualified Data.Set as S
import Data.Text (Text, pack)
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Data.Word (Word16)
import Network.WebSockets (Connection, sendClose)
import qualified Network.WebSockets as WS
import Network.Pusher.WebSockets.Channel
import Network.Pusher.WebSockets.Event
import Network.Pusher.WebSockets.Internal
import Network.Pusher.WebSockets.Internal.Event
import Network.Pusher.WebSockets.Util
pusherClient :: Pusher -> ((Connection -> IO ()) -> IO ()) -> IO ()
pusherClient pusher withConnection = do
runPusherClient pusher $
mapM_ (\(e, h) -> bind e Nothing h) defaultHandlers
catchAll
(reconnecting
(changeConnectionState pusher Connecting >>
withConnection (client pusher))
(changeConnectionState pusher Unavailable >>
threadDelay (1 * 1000 * 1000)))
(\e -> case fromException e of
Just (TerminatePusher closeCode) ->
changeConnectionState pusher (Disconnected closeCode)
Nothing ->
changeConnectionState pusher (Disconnected Nothing))
readTVarIO (threadStore pusher) >>= mapM_ killThread
client :: Pusher -> Connection -> IO ()
client pusher conn = flip catchAll handleExc $ do
closevar <- newTVarIO Nothing
_ <- forkIO (handleThread pusher conn closevar)
liftIO . atomically $
check . isJust =<< readTVar (idleTimer pusher)
changeConnectionState pusher Connected
_ <- forkIO (pingThread pusher conn closevar)
channels <- liftIO . atomically $ do
writeTVar (presenceChannels pusher) H.empty
readTVar (allChannels pusher)
runPusherClient pusher $
mapM_ (subscribe . unChannel) channels
forever $
handleCommandOrClose pusher conn =<< awaitCommandOrClose pusher closevar
where
handleExc e = do
strictModifyTVarIO (idleTimer pusher) (const Nothing)
strictModifyTVarIO (socketId pusher) (const Nothing)
throwIO e
awaitCommandOrClose :: Pusher
-> TVar (Maybe Word16)
-> IO (Either Word16 PusherCommand)
awaitCommandOrClose pusher closevar = atomically $ do
cmd <- tryReadTQueue (commandQueue pusher)
ccode <- readTVar closevar
case (cmd, ccode) of
(Just cmd', _) -> pure (Right cmd')
(Nothing, Just ccode') -> pure (Left ccode')
(Nothing, Nothing) -> retry
handleCommandOrClose :: Pusher
-> Connection
-> Either Word16 PusherCommand
-> IO ()
handleCommandOrClose pusher conn (Right pusherCommand) =
handleCommand pusher conn pusherCommand
handleCommandOrClose _ _ (Left closeCode) =
throwCloseException closeCode
handleCommand :: Pusher -> Connection -> PusherCommand -> IO ()
handleCommand pusher conn pusherCommand = case pusherCommand of
SendMessage json -> sendJSON json
SendLocalMessage json -> handleEvent pusher (Right json)
Subscribe handle channelData -> do
sendJSON . Object $ H.fromList
[ ("event", String "pusher:subscribe")
, ("data", channelData)
]
strictModifyTVarIO (allChannels pusher) (S.insert handle)
Terminate -> sendClose conn ("goodbye" :: Text)
where
#if MIN_VERSION_websockets(0,12,0)
sendJSON val = WS.sendDataMessage conn (WS.Text (encode val) Nothing)
#else
sendJSON = WS.sendDataMessage conn . WS.Text . encode
#endif
throwCloseException :: Word16 -> IO a
throwCloseException closeCode
| closeCode < 4000 =
throwIO $ TerminatePusher Nothing
| closeCode >= 4000 && closeCode < 4100 =
throwIO . TerminatePusher $ Just closeCode
| otherwise =
throwIO WS.ConnectionClosed
pingThread :: Pusher -> Connection -> TVar (Maybe Word16) -> IO ()
pingThread pusher conn closevar = do
timeout <- liftIO . atomically $
maybe retry pure =<< readTVar (idleTimer pusher)
pinger timeout 0
where
pinger :: Int -> Integer -> IO ()
pinger timeout i = do
threadDelay (timeout * 1000 * 1000)
WS.sendPing conn (pack $ show i)
now <- getCurrentTime
lastMsg <- readTVarIO (lastReceived pusher)
if now `diffUTCTime` lastMsg > fromIntegral timeout
then atomically (writeTVar closevar reconnectImmediately)
else pinger timeout (i + 1)
handleThread :: Pusher -> Connection -> TVar (Maybe Word16) -> IO ()
handleThread pusher conn closevar = handler `catchAll` finaliser
where
handler = forever $ do
msg <- awaitEvent conn
atomically . writeTVar (lastReceived pusher) =<< getCurrentTime
handleEvent pusher msg
finaliser e = atomically . writeTVar closevar $ case fromException e of
Just (WS.CloseRequest ccode _) -> Just ccode
_ -> reconnectImmediately
reconnectImmediately :: Maybe Word16
reconnectImmediately = Just 4200
changeConnectionState :: Pusher -> ConnectionState -> IO ()
changeConnectionState pusher connst = do
ev <- atomically $ do
oldState <- readTVar (connState pusher)
writeTVar (connState pusher) connst
pure $ (Object . H.singleton "event" . String) <$>
(if oldState == connst then Nothing else Just (connectionEvent connst))
maybe (pure ()) (handleEvent pusher . Right) ev