{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeApplications #-}
module Network.Greskell.WebSocket.Connection.Impl where
import Control.Applicative (empty, (<$>), (<|>))
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, waitAnySTM,
waitCatchSTM, withAsync)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM (STM, TBQueue, TMVar, TQueue, TVar,
atomically, newEmptyTMVarIO,
newTBQueueIO, newTQueueIO,
newTVarIO, putTMVar, readTBQueue,
readTMVar, readTQueue, readTVar,
retry, tryPutTMVar, tryReadTMVar,
writeTBQueue, writeTQueue,
writeTVar)
import qualified Control.Concurrent.STM as STM
import Control.Exception.Safe (Exception (toException),
SomeException, finally, throw, try,
withException)
import Control.Monad (forM_, void, when)
import Data.Aeson (Value)
import qualified Data.ByteString.Lazy as BSL
import Data.Foldable (toList)
import qualified Data.HashTable.IO as HT
import Data.Monoid (mempty)
import Data.Typeable (Typeable)
import Data.UUID (UUID)
import Data.Vector (Vector)
import GHC.Records (HasField (..))
import qualified Network.WebSockets as WS
import Network.Greskell.WebSocket.Codec (Codec (decodeWith, encodeWith),
encodeBinaryWith)
import Network.Greskell.WebSocket.Connection.Settings (Settings)
import qualified Network.Greskell.WebSocket.Connection.Settings as Settings
import Network.Greskell.WebSocket.Connection.Type (Connection (..),
ConnectionState (..),
GeneralException (..), RawRes,
ReqID, ReqPack (..), ResPack)
import Network.Greskell.WebSocket.Request (Operation,
RequestMessage (RequestMessage, requestId),
makeRequestMessage)
import Network.Greskell.WebSocket.Response (ResponseMessage (ResponseMessage, requestId, status),
ResponseStatus (ResponseStatus, code),
isTerminating)
import Network.Greskell.WebSocket.Util (drain, slurp)
flushTBQueue :: TBQueue a -> STM [a]
#if MIN_VERSION_stm(2,4,5)
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue = forall a. TBQueue a -> STM [a]
STM.flushTBQueue
#else
flushTBQueue q = fmap toList $ slurp $ STM.tryReadTBQueue q
#endif
type Host = String
type Port = Int
connect :: Settings s -> Host -> Port -> IO (Connection s)
connect :: forall s. Settings s -> Host -> Int -> IO (Connection s)
connect Settings s
settings Host
host Int
port = do
HashTable RealWorld ReqID (ReqPoolEntry s)
req_pool <- forall (h :: * -> * -> * -> *) k v.
HashTable h =>
IO (IOHashTable h k v)
HT.new
TBQueue (ReqPack s)
qreq <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
qreq_size
TMVar (Either SomeException ())
var_connect_result <- forall a. IO (TMVar a)
newEmptyTMVarIO
TVar ConnectionState
var_conn_state <- forall a. a -> IO (TVar a)
newTVarIO ConnectionState
ConnOpen
Async ()
ws_thread <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall s.
Settings s
-> Host
-> Int
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Int
port Host
ws_path HashTable RealWorld ReqID (ReqPoolEntry s)
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state
Either SomeException ()
eret <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException ())
var_connect_result
case Either SomeException ()
eret of
Left SomeException
e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Connection { connQReq :: TBQueue (ReqPack s)
connQReq = TBQueue (ReqPack s)
qreq,
connState :: TVar ConnectionState
connState = TVar ConnectionState
var_conn_state,
connWSThread :: Async ()
connWSThread = Async ()
ws_thread,
connCodec :: Codec s
connCodec = Codec s
codec
}
where
codec :: Codec s
codec = forall s. Settings s -> Codec s
Settings.codec Settings s
settings
qreq_size :: Natural
qreq_size = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ forall s. Settings s -> Int
Settings.requestQueueSize Settings s
settings
ws_path :: Host
ws_path = forall s. Settings s -> Host
Settings.endpointPath Settings s
settings
close :: Connection s -> IO ()
close :: forall s. Connection s -> IO ()
close Connection s
conn = do
Bool
need_wait <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ConnectionState
cur_state <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
case ConnectionState
cur_state of
ConnectionState
ConnClosed -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ConnectionState
ConnClosing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
ConnectionState
ConnOpen -> do
forall a. TVar a -> a -> STM ()
writeTVar (forall s. Connection s -> TVar ConnectionState
connState Connection s
conn) ConnectionState
ConnClosing
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
if Bool
need_wait then IO ()
waitForClose else forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
waitForClose :: IO ()
waitForClose = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ConnectionState
cur_state <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
if ConnectionState
cur_state forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnClosed
then forall (m :: * -> *) a. Monad m => a -> m a
return ()
else forall a. STM a
retry
type Path = String
runWSConn :: Settings s
-> Host -> Port -> Path
-> ReqPool s -> TBQueue (ReqPack s)
-> TMVar (Either SomeException ()) -> TVar ConnectionState
-> IO ()
runWSConn :: forall s.
Settings s
-> Host
-> Int
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Int
port Host
path ReqPool s
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state =
(IO ()
doConnect forall (m :: * -> *) e a b.
(MonadMask m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` SomeException -> IO ()
reportFatalEx) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
finalize
where
doConnect :: IO ()
doConnect = forall a. Host -> Int -> Host -> ClientApp a -> IO a
WS.runClient Host
host Int
port Host
path forall a b. (a -> b) -> a -> b
$ \Connection
wsconn -> do
Bool
is_success <- IO Bool
checkAndReportConnectSuccess
if Bool -> Bool
not Bool
is_success
then forall (m :: * -> *) a. Monad m => a -> m a
return ()
else Connection -> IO ()
setupMux Connection
wsconn
setupMux :: Connection -> IO ()
setupMux Connection
wsconn = do
TQueue RawRes
qres <- forall a. IO (TQueue a)
newTQueueIO
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres) forall a b. (a -> b) -> a -> b
$ \Async ()
rx_thread ->
forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres (forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state) Async ()
rx_thread
checkAndReportConnectSuccess :: IO Bool
checkAndReportConnectSuccess = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Maybe (Either SomeException ())
mret <- forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar (Either SomeException ())
var_connect_result
case Maybe (Either SomeException ())
mret of
Maybe (Either SomeException ())
Nothing -> do
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException ())
var_connect_result forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just (Right ()
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just (Left SomeException
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
reportFatalEx :: SomeException -> IO ()
reportFatalEx :: SomeException -> IO ()
reportFatalEx SomeException
cause = do
SomeException -> IO ()
reportToConnectCaller SomeException
cause
forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause
forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause
reportToConnectCaller :: SomeException -> IO ()
reportToConnectCaller SomeException
cause = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (Either SomeException ())
var_connect_result forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left SomeException
cause
finalize :: IO ()
finalize = do
forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnectionState
var_conn_state ConnectionState
ConnClosed
reportToReqPool :: ReqPool s -> SomeException -> IO ()
reportToReqPool :: forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause = forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ forall {a} {s}. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
where
forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left SomeException
cause
reportToQReq :: TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq :: forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
[ReqPack s]
reqpacks <- forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue (ReqPack s)
qreq
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ReqPack s]
reqpacks forall {s}. ReqPack s -> STM ()
reportToReqPack
where
reportToReqPack :: ReqPack s -> STM ()
reportToReqPack ReqPack s
reqp = forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
reqp) forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left SomeException
cause
data RequestException
= AlreadyClosed
| ServerClosed
| DuplicateRequestId UUID
| ResponseTimeout
deriving (RequestException -> RequestException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RequestException -> RequestException -> Bool
$c/= :: RequestException -> RequestException -> Bool
== :: RequestException -> RequestException -> Bool
$c== :: RequestException -> RequestException -> Bool
Eq, Int -> RequestException -> ShowS
[RequestException] -> ShowS
RequestException -> Host
forall a.
(Int -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [RequestException] -> ShowS
$cshowList :: [RequestException] -> ShowS
show :: RequestException -> Host
$cshow :: RequestException -> Host
showsPrec :: Int -> RequestException -> ShowS
$cshowsPrec :: Int -> RequestException -> ShowS
Show, Typeable)
instance Exception RequestException
data ReqPoolEntry s
= ReqPoolEntry
{ forall s. ReqPoolEntry s -> ReqID
rpeReqId :: !ReqID
, forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput :: !(TQueue (ResPack s))
, forall s. ReqPoolEntry s -> Async ReqID
rpeTimer :: !(Async ReqID)
}
type ReqPool s = HT.BasicHashTable ReqID (ReqPoolEntry s)
data MuxEvent s
= EvReq (ReqPack s)
| EvRes RawRes
| EvActiveClose
| EvRxFinish
| EvRxError SomeException
| EvResponseTimeout ReqID
tryInsertToReqPool :: ReqPool s
-> ReqID
-> IO (ReqPoolEntry s)
-> IO Bool
tryInsertToReqPool :: forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeEntry = do
Maybe (ReqPoolEntry s)
mexist_entry <- forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
case Maybe (ReqPoolEntry s)
mexist_entry of
Just ReqPoolEntry s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Maybe (ReqPoolEntry s)
Nothing -> do
ReqPoolEntry s
new_entry <- IO (ReqPoolEntry s)
makeEntry
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> v -> IO ()
HT.insert ReqPool s
req_pool ReqID
rid ReqPoolEntry s
new_entry
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
cleanupReqPoolEntry :: ReqPoolEntry s -> IO ()
cleanupReqPoolEntry :: forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry = forall a. Async a -> IO ()
Async.cancel forall a b. (a -> b) -> a -> b
$ forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry
removeReqPoolEntry :: ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry :: forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry = do
forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO ()
HT.delete ReqPool s
req_pool forall a b. (a -> b) -> a -> b
$ forall s. ReqPoolEntry s -> ReqID
rpeReqId ReqPoolEntry s
entry
cleanupReqPool :: ReqPool s -> IO ()
cleanupReqPool :: forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool = forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ forall {a} {s}. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
where
forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry
getAllResponseTimers :: ReqPool s -> IO [Async ReqID]
getAllResponseTimers :: forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) forall {a} {s}. (a, ReqPoolEntry s) -> Async ReqID
toTimer forall a b. (a -> b) -> a -> b
$ forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> IO [(k, v)]
HT.toList ReqPool s
req_pool
where
toTimer :: (a, ReqPoolEntry s) -> Async ReqID
toTimer (a
_, ReqPoolEntry s
entry) = forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry
runMuxLoop :: WS.Connection -> ReqPool s -> Settings s
-> TBQueue (ReqPack s) -> TQueue RawRes -> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop :: forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres STM ConnectionState
readConnState Async ()
rx_thread = IO ()
loop
where
codec :: Codec s
codec = forall s. Settings s -> Codec s
Settings.codec Settings s
settings
loop :: IO ()
loop = do
[Async ReqID]
res_timers <- forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool
MuxEvent s
event <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers
case MuxEvent s
event of
EvReq ReqPack s
req -> ReqPack s -> IO ()
handleReq ReqPack s
req forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
EvRes RawRes
res -> RawRes -> IO ()
handleRes RawRes
res forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
MuxEvent s
EvActiveClose -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
MuxEvent s
EvRxFinish -> IO ()
handleRxFinish
EvRxError SomeException
e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
EvResponseTimeout ReqID
rid -> ReqID -> IO ()
handleResponseTimeout ReqID
rid forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
getEventSTM :: [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers = STM (MuxEvent s)
getRequest
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall s. RawRes -> MuxEvent s
EvRes forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TQueue a -> STM a
readTQueue TQueue RawRes
qres)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall {s}. STM (MuxEvent s)
makeEvActiveClose
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall {s}. Either SomeException () -> MuxEvent s
rxResultToEvent forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ()
rx_thread)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall {a} {s}. (a, ReqID) -> MuxEvent s
timeoutToEvent forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. [Async a] -> STM (Async a, a)
waitAnySTM [Async ReqID]
res_timers)
where
max_concurrency :: Int
max_concurrency = forall s. Settings s -> Int
Settings.concurrency Settings s
settings
cur_concurrency :: Int
cur_concurrency = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ReqID]
res_timers
getRequest :: STM (MuxEvent s)
getRequest = if Int
cur_concurrency forall a. Ord a => a -> a -> Bool
< Int
max_concurrency
then forall s. ReqPack s -> MuxEvent s
EvReq forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TBQueue a -> STM a
readTBQueue TBQueue (ReqPack s)
qreq
else forall (f :: * -> *) a. Alternative f => f a
empty
rxResultToEvent :: Either SomeException () -> MuxEvent s
rxResultToEvent (Right ()) = forall s. MuxEvent s
EvRxFinish
rxResultToEvent (Left SomeException
e) = forall s. SomeException -> MuxEvent s
EvRxError SomeException
e
timeoutToEvent :: (a, ReqID) -> MuxEvent s
timeoutToEvent (a
_, ReqID
rid) = forall s. ReqID -> MuxEvent s
EvResponseTimeout ReqID
rid
makeEvActiveClose :: STM (MuxEvent s)
makeEvActiveClose = do
if Int
cur_concurrency forall a. Ord a => a -> a -> Bool
> Int
0
then forall (f :: * -> *) a. Alternative f => f a
empty
else do
ConnectionState
conn_state <- STM ConnectionState
readConnState
if ConnectionState
conn_state forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen then forall (f :: * -> *) a. Alternative f => f a
empty else forall (m :: * -> *) a. Monad m => a -> m a
return forall s. MuxEvent s
EvActiveClose
handleReq :: ReqPack s -> IO ()
handleReq ReqPack s
req = do
Bool
insert_ok <- forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeNewEntry
if Bool
insert_ok
then forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
wsconn forall a b. (a -> b) -> a -> b
$ forall s. ReqPack s -> RawRes
reqData ReqPack s
req
else IO ()
reportError
where
rid :: ReqID
rid = forall s. ReqPack s -> ReqID
reqId ReqPack s
req
qout :: TQueue (ResPack s)
qout = forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
req
makeNewEntry :: IO (ReqPoolEntry s)
makeNewEntry = do
Async ReqID
timer_thread <- Int -> ReqID -> IO (Async ReqID)
runTimer (forall s. Settings s -> Int
Settings.responseTimeout Settings s
settings) ReqID
rid
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ReqPoolEntry { $sel:rpeReqId:ReqPoolEntry :: ReqID
rpeReqId = ReqID
rid,
$sel:rpeOutput:ReqPoolEntry :: TQueue (ResPack s)
rpeOutput = TQueue (ResPack s)
qout,
$sel:rpeTimer:ReqPoolEntry :: Async ReqID
rpeTimer = Async ReqID
timer_thread
}
reportError :: IO ()
reportError =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ResPack s)
qout forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$ ReqID -> RequestException
DuplicateRequestId ReqID
rid
handleRes :: RawRes -> IO ()
handleRes RawRes
res = case forall s. Codec s -> RawRes -> Either Host (ResponseMessage s)
decodeWith Codec s
codec RawRes
res of
Left Host
err -> forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings forall a b. (a -> b) -> a -> b
$ Host -> GeneralException
ResponseParseFailure Host
err
Right ResponseMessage s
res_msg -> ResponseMessage s -> IO ()
handleResMsg ResponseMessage s
res_msg
handleResMsg :: ResponseMessage s -> IO ()
handleResMsg res_msg :: ResponseMessage s
res_msg@(ResponseMessage { requestId :: forall s. ResponseMessage s -> ReqID
requestId = ReqID
rid }) = do
Maybe (ReqPoolEntry s)
m_entry <- forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
case Maybe (ReqPoolEntry s)
m_entry of
Maybe (ReqPoolEntry s)
Nothing -> forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings forall a b. (a -> b) -> a -> b
$ ReqID -> GeneralException
UnexpectedRequestId ReqID
rid
Just ReqPoolEntry s
entry -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res_msg) forall a b. (a -> b) -> a -> b
$ do
forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ResponseMessage s
res_msg
handleRxFinish :: IO ()
handleRxFinish = do
let ex :: SomeException
ex = forall e. Exception e => e -> SomeException
toException RequestException
ServerClosed
forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
ex
forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
ex
handleResponseTimeout :: ReqID -> IO ()
handleResponseTimeout ReqID
rid = do
Maybe (ReqPoolEntry s)
mentry <- forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
case Maybe (ReqPoolEntry s)
mentry of
Maybe (ReqPoolEntry s)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ReqPoolEntry s
entry -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$ RequestException
ResponseTimeout
forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry
runRxLoop :: WS.Connection -> TQueue RawRes -> IO ()
runRxLoop :: Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres = IO ()
loop
where
loop :: IO ()
loop = do
Maybe RawRes
mgot <- IO (Maybe RawRes)
tryReceive
case Maybe RawRes
mgot of
Maybe RawRes
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just RawRes
got -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue RawRes
qres RawRes
got
IO ()
loop
tryReceive :: IO (Maybe RawRes)
tryReceive = forall {m :: * -> *} {a}.
MonadThrow m =>
Either ConnectionException a -> m (Maybe a)
toMaybe forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
wsconn)
where
toMaybe :: Either ConnectionException a -> m (Maybe a)
toMaybe (Right a
d) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
d
toMaybe (Left e :: ConnectionException
e@(WS.CloseRequest Word16
close_status RawRes
_)) = do
if Word16
close_status forall a. Eq a => a -> a -> Bool
== Word16
1000
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
else forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ConnectionException
e
toMaybe (Left ConnectionException
WS.ConnectionClosed) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
toMaybe (Left ConnectionException
e) = forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ConnectionException
e
runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer Int
wait_sec ReqID
rid = forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
wait_sec forall a. Num a => a -> a -> a
* Int
1000000
forall (m :: * -> *) a. Monad m => a -> m a
return ReqID
rid
data ResponseHandle s
= ResponseHandle
{ forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse :: STM (ResPack s)
, forall s. ResponseHandle s -> TVar Bool
rhTerminated :: TVar Bool
}
instance Functor ResponseHandle where
fmap :: forall a b. (a -> b) -> ResponseHandle a -> ResponseHandle b
fmap a -> b
f ResponseHandle a
rh = ResponseHandle a
rh { $sel:rhGetResponse:ResponseHandle :: STM (ResPack b)
rhGetResponse = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) a -> b
f forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle a
rh }
sendRequest :: Operation o => Connection s -> o -> IO (ResponseHandle s)
sendRequest :: forall o s.
Operation o =>
Connection s -> o -> IO (ResponseHandle s)
sendRequest Connection s
conn o
o = forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall o. Operation o => o -> IO RequestMessage
makeRequestMessage o
o
sendRequest' :: Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' :: forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn RequestMessage
req_msg = do
TQueue (ResPack s)
qout <- forall a. IO (TQueue a)
newTQueueIO
Bool
is_open <- IO Bool
getConnectionOpen
if Bool
is_open
then TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout
else forall {b}. TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (ResPack s)
qout
forall {s}. TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout
where
codec :: Codec s
codec = forall s. Connection s -> Codec s
connCodec Connection s
conn
qreq :: TBQueue (ReqPack s)
qreq = forall s. Connection s -> TBQueue (ReqPack s)
connQReq Connection s
conn
var_conn_state :: TVar ConnectionState
var_conn_state = forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
rid :: ReqID
rid = forall {k} (x :: k) r a. HasField x r a => r -> a
getField @"requestId" RequestMessage
req_msg
getConnectionOpen :: IO Bool
getConnectionOpen = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen) forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state
sendReqPack :: TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout = do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (ReqPack s)
qreq ReqPack s
reqpack
where
reqpack :: ReqPack s
reqpack = ReqPack
{ reqData :: RawRes
reqData = forall s. Codec s -> RequestMessage -> RawRes
encodeBinaryWith Codec s
codec RequestMessage
req_msg,
reqId :: ReqID
reqId = ReqID
rid,
reqOutput :: TQueue (ResPack s)
reqOutput = TQueue (ResPack s)
qout
}
makeResHandle :: TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout = do
TVar Bool
var_term <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ResponseHandle
{ $sel:rhGetResponse:ResponseHandle :: STM (ResPack s)
rhGetResponse = forall a. TQueue a -> STM a
readTQueue TQueue (ResPack s)
qout,
$sel:rhTerminated:ResponseHandle :: TVar Bool
rhTerminated = TVar Bool
var_term
}
reportAlreadyClosed :: TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (Either SomeException b)
qout = do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Either SomeException b)
qout forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$ RequestException
AlreadyClosed
nextResponse :: ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse :: forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM
nextResponseSTM :: ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM :: forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM ResponseHandle s
rh = do
Bool
termed <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh
if Bool
termed
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
else STM (Maybe (ResponseMessage s))
readResponse
where
readResponse :: STM (Maybe (ResponseMessage s))
readResponse = do
ResPack s
eres <- forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle s
rh
case ResPack s
eres of
Left SomeException
ex -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
ex
Right ResponseMessage s
res -> do
forall {s}. ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ResponseMessage s
res
updateTermed :: ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res =
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res) forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> a -> STM ()
writeTVar (forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh) Bool
True
isTerminatingResponse :: ResponseMessage s -> Bool
isTerminatingResponse :: forall s. ResponseMessage s -> Bool
isTerminatingResponse (ResponseMessage { status :: forall s. ResponseMessage s -> ResponseStatus
status = (ResponseStatus { code :: ResponseStatus -> ResponseCode
code = ResponseCode
c }) }) =
ResponseCode -> Bool
isTerminating ResponseCode
c
slurpResponses :: ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses :: forall s. ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses ResponseHandle s
h = forall (m :: * -> *) a. Monad m => m (Maybe a) -> m (Vector a)
slurp forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h
drainResponses :: ResponseHandle s -> IO ()
drainResponses :: forall s. ResponseHandle s -> IO ()
drainResponses ResponseHandle s
h = forall (m :: * -> *) a. Monad m => m (Maybe a) -> m ()
drain forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h