{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeApplications #-}
module Network.Greskell.WebSocket.Connection.Impl where
import Control.Applicative (empty, (<$>), (<|>))
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, waitAnySTM,
waitCatchSTM, withAsync)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM (STM, TBQueue, TMVar, TQueue, TVar,
atomically, newEmptyTMVarIO,
newTBQueueIO, newTQueueIO,
newTVarIO, putTMVar, readTBQueue,
readTMVar, readTQueue, readTVar,
retry, tryPutTMVar, tryReadTMVar,
writeTBQueue, writeTQueue,
writeTVar)
import qualified Control.Concurrent.STM as STM
import Control.Exception.Safe (Exception (toException),
SomeException, finally, throw, try,
withException)
import Control.Monad (forM_, void, when)
import Data.Aeson (Value)
import qualified Data.ByteString.Lazy as BSL
import Data.Foldable (toList)
import qualified Data.HashTable.IO as HT
import Data.Monoid (mempty)
import Data.Typeable (Typeable)
import Data.UUID (UUID)
import Data.Vector (Vector)
import GHC.Records (HasField (..))
import qualified Network.WebSockets as WS
import Network.Greskell.WebSocket.Codec (Codec (decodeWith, encodeWith),
encodeBinaryWith)
import Network.Greskell.WebSocket.Connection.Settings (Settings)
import qualified Network.Greskell.WebSocket.Connection.Settings as Settings
import Network.Greskell.WebSocket.Connection.Type (Connection (..),
ConnectionState (..),
GeneralException (..), RawRes,
ReqID, ReqPack (..), ResPack)
import Network.Greskell.WebSocket.Request (Operation,
RequestMessage (RequestMessage, requestId),
makeRequestMessage)
import Network.Greskell.WebSocket.Response (ResponseMessage (ResponseMessage, requestId, status),
ResponseStatus (ResponseStatus, code),
isTerminating)
import Network.Greskell.WebSocket.Util (drain, slurp)
flushTBQueue :: TBQueue a -> STM [a]
#if MIN_VERSION_stm(2,4,5)
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue = TBQueue a -> STM [a]
forall a. TBQueue a -> STM [a]
STM.flushTBQueue
#else
flushTBQueue q = fmap toList $ slurp $ STM.tryReadTBQueue q
#endif
type Host = String
type Port = Int
connect :: Settings s -> Host -> Port -> IO (Connection s)
connect :: forall s. Settings s -> Host -> Int -> IO (Connection s)
connect Settings s
settings Host
host Int
port = do
HashTable RealWorld ReqID (ReqPoolEntry s)
req_pool <- IO (HashTable RealWorld ReqID (ReqPoolEntry s))
IO (IOHashTable HashTable ReqID (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
HashTable h =>
IO (IOHashTable h k v)
HT.new
TBQueue (ReqPack s)
qreq <- Natural -> IO (TBQueue (ReqPack s))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
qreq_size
TMVar (Either SomeException ())
var_connect_result <- IO (TMVar (Either SomeException ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
TVar ConnectionState
var_conn_state <- ConnectionState -> IO (TVar ConnectionState)
forall a. a -> IO (TVar a)
newTVarIO ConnectionState
ConnOpen
Async ()
ws_thread <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Settings s
-> Host
-> Int
-> Host
-> IOHashTable HashTable ReqID (ReqPoolEntry s)
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
forall s.
Settings s
-> Host
-> Int
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Int
port Host
ws_path HashTable RealWorld ReqID (ReqPoolEntry s)
IOHashTable HashTable ReqID (ReqPoolEntry s)
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state
Either SomeException ()
eret <- STM (Either SomeException ()) -> IO (Either SomeException ())
forall a. STM a -> IO a
atomically (STM (Either SomeException ()) -> IO (Either SomeException ()))
-> STM (Either SomeException ()) -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ TMVar (Either SomeException ()) -> STM (Either SomeException ())
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException ())
var_connect_result
case Either SomeException ()
eret of
Left SomeException
e -> SomeException -> IO (Connection s)
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw SomeException
e
Right () -> Connection s -> IO (Connection s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection s -> IO (Connection s))
-> Connection s -> IO (Connection s)
forall a b. (a -> b) -> a -> b
$ Connection { connQReq :: TBQueue (ReqPack s)
connQReq = TBQueue (ReqPack s)
qreq,
connState :: TVar ConnectionState
connState = TVar ConnectionState
var_conn_state,
connWSThread :: Async ()
connWSThread = Async ()
ws_thread,
connCodec :: Codec s
connCodec = Codec s
codec
}
where
codec :: Codec s
codec = Settings s -> Codec s
forall s. Settings s -> Codec s
Settings.codec Settings s
settings
qreq_size :: Natural
qreq_size = Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Natural) -> Int -> Natural
forall a b. (a -> b) -> a -> b
$ Settings s -> Int
forall s. Settings s -> Int
Settings.requestQueueSize Settings s
settings
ws_path :: Host
ws_path = Settings s -> Host
forall s. Settings s -> Host
Settings.endpointPath Settings s
settings
close :: Connection s -> IO ()
close :: forall s. Connection s -> IO ()
close Connection s
conn = do
Bool
need_wait <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
ConnectionState
cur_state <- TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar (TVar ConnectionState -> STM ConnectionState)
-> TVar ConnectionState -> STM ConnectionState
forall a b. (a -> b) -> a -> b
$ Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
case ConnectionState
cur_state of
ConnectionState
ConnClosed -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ConnectionState
ConnClosing -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
ConnectionState
ConnOpen -> do
TVar ConnectionState -> ConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn) ConnectionState
ConnClosing
Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
if Bool
need_wait then IO ()
waitForClose else () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
waitForClose :: IO ()
waitForClose = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ConnectionState
cur_state <- TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar (TVar ConnectionState -> STM ConnectionState)
-> TVar ConnectionState -> STM ConnectionState
forall a b. (a -> b) -> a -> b
$ Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
if ConnectionState
cur_state ConnectionState -> ConnectionState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnClosed
then () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
else STM ()
forall a. STM a
retry
type Path = String
runWSConn :: Settings s
-> Host -> Port -> Path
-> ReqPool s -> TBQueue (ReqPack s)
-> TMVar (Either SomeException ()) -> TVar ConnectionState
-> IO ()
runWSConn :: forall s.
Settings s
-> Host
-> Int
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Int
port Host
path ReqPool s
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state =
(IO ()
doConnect IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) e a b.
(HasCallStack, MonadMask m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` SomeException -> IO ()
reportFatalEx) IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` IO ()
finalize
where
doConnect :: IO ()
doConnect = Host -> Int -> Host -> ClientApp () -> IO ()
forall a. Host -> Int -> Host -> ClientApp a -> IO a
WS.runClient Host
host Int
port Host
path (ClientApp () -> IO ()) -> ClientApp () -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
wsconn -> do
Bool
is_success <- IO Bool
checkAndReportConnectSuccess
if Bool -> Bool
not Bool
is_success
then () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
else ClientApp ()
setupMux Connection
wsconn
setupMux :: ClientApp ()
setupMux Connection
wsconn = do
TQueue RawRes
qres <- IO (TQueue RawRes)
forall a. IO (TQueue a)
newTQueueIO
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
rx_thread ->
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres (TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state) Async ()
rx_thread
checkAndReportConnectSuccess :: IO Bool
checkAndReportConnectSuccess = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
Maybe (Either SomeException ())
mret <- TMVar (Either SomeException ())
-> STM (Maybe (Either SomeException ()))
forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar (Either SomeException ())
var_connect_result
case Maybe (Either SomeException ())
mret of
Maybe (Either SomeException ())
Nothing -> do
TMVar (Either SomeException ())
-> Either SomeException () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException ())
var_connect_result (Either SomeException () -> STM ())
-> Either SomeException () -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> Either SomeException ()
forall a b. b -> Either a b
Right ()
Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just (Right ()
_) -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just (Left SomeException
_) -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
reportFatalEx :: SomeException -> IO ()
reportFatalEx :: SomeException -> IO ()
reportFatalEx SomeException
cause = do
SomeException -> IO ()
reportToConnectCaller SomeException
cause
ReqPool s -> SomeException -> IO ()
forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause
TBQueue (ReqPack s) -> SomeException -> IO ()
forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause
reportToConnectCaller :: SomeException -> IO ()
reportToConnectCaller SomeException
cause = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar (Either SomeException ())
-> Either SomeException () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (Either SomeException ())
var_connect_result (Either SomeException () -> STM Bool)
-> Either SomeException () -> STM Bool
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
cause
finalize :: IO ()
finalize = do
ReqPool s -> IO ()
forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnectionState -> ConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnectionState
var_conn_state ConnectionState
ConnClosed
reportToReqPool :: ReqPool s -> SomeException -> IO ()
reportToReqPool :: forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause = ((ReqID, ReqPoolEntry s) -> IO ()) -> ReqPool s -> IO ()
forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ (ReqID, ReqPoolEntry s) -> IO ()
forall {a} {s}. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
where
forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPoolEntry s -> TQueue (ResPack s)
forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left SomeException
cause
reportToQReq :: TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq :: forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[ReqPack s]
reqpacks <- TBQueue (ReqPack s) -> STM [ReqPack s]
forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue (ReqPack s)
qreq
[ReqPack s] -> (ReqPack s -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ReqPack s]
reqpacks ReqPack s -> STM ()
forall {s}. ReqPack s -> STM ()
reportToReqPack
where
reportToReqPack :: ReqPack s -> STM ()
reportToReqPack ReqPack s
reqp = TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPack s -> TQueue (ResPack s)
forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
reqp) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left SomeException
cause
data RequestException
= AlreadyClosed
| ServerClosed
| DuplicateRequestId UUID
| ResponseTimeout
deriving (RequestException -> RequestException -> Bool
(RequestException -> RequestException -> Bool)
-> (RequestException -> RequestException -> Bool)
-> Eq RequestException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RequestException -> RequestException -> Bool
== :: RequestException -> RequestException -> Bool
$c/= :: RequestException -> RequestException -> Bool
/= :: RequestException -> RequestException -> Bool
Eq, Int -> RequestException -> ShowS
[RequestException] -> ShowS
RequestException -> Host
(Int -> RequestException -> ShowS)
-> (RequestException -> Host)
-> ([RequestException] -> ShowS)
-> Show RequestException
forall a.
(Int -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RequestException -> ShowS
showsPrec :: Int -> RequestException -> ShowS
$cshow :: RequestException -> Host
show :: RequestException -> Host
$cshowList :: [RequestException] -> ShowS
showList :: [RequestException] -> ShowS
Show, Typeable)
instance Exception RequestException
data ReqPoolEntry s
= ReqPoolEntry
{ forall s. ReqPoolEntry s -> ReqID
rpeReqId :: !ReqID
, forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput :: !(TQueue (ResPack s))
, forall s. ReqPoolEntry s -> Async ReqID
rpeTimer :: !(Async ReqID)
}
type ReqPool s = HT.BasicHashTable ReqID (ReqPoolEntry s)
data MuxEvent s
= EvReq (ReqPack s)
| EvRes RawRes
| EvActiveClose
| EvRxFinish
| EvRxError SomeException
| EvResponseTimeout ReqID
tryInsertToReqPool :: ReqPool s
-> ReqID
-> IO (ReqPoolEntry s)
-> IO Bool
tryInsertToReqPool :: forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeEntry = do
Maybe (ReqPoolEntry s)
mexist_entry <- ReqPool s -> ReqID -> IO (Maybe (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
case Maybe (ReqPoolEntry s)
mexist_entry of
Just ReqPoolEntry s
_ -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Maybe (ReqPoolEntry s)
Nothing -> do
ReqPoolEntry s
new_entry <- IO (ReqPoolEntry s)
makeEntry
ReqPool s -> ReqID -> ReqPoolEntry s -> IO ()
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> v -> IO ()
HT.insert ReqPool s
req_pool ReqID
rid ReqPoolEntry s
new_entry
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
cleanupReqPoolEntry :: ReqPoolEntry s -> IO ()
cleanupReqPoolEntry :: forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry = Async ReqID -> IO ()
forall a. Async a -> IO ()
Async.cancel (Async ReqID -> IO ()) -> Async ReqID -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqPoolEntry s -> Async ReqID
forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry
removeReqPoolEntry :: ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry :: forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry = do
ReqPoolEntry s -> IO ()
forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry
ReqPool s -> ReqID -> IO ()
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO ()
HT.delete ReqPool s
req_pool (ReqID -> IO ()) -> ReqID -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqPoolEntry s -> ReqID
forall s. ReqPoolEntry s -> ReqID
rpeReqId ReqPoolEntry s
entry
cleanupReqPool :: ReqPool s -> IO ()
cleanupReqPool :: forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool = ((ReqID, ReqPoolEntry s) -> IO ()) -> ReqPool s -> IO ()
forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ (ReqID, ReqPoolEntry s) -> IO ()
forall {a} {s}. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
where
forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = ReqPoolEntry s -> IO ()
forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry
getAllResponseTimers :: ReqPool s -> IO [Async ReqID]
getAllResponseTimers :: forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool = (([(ReqID, ReqPoolEntry s)] -> [Async ReqID])
-> IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([(ReqID, ReqPoolEntry s)] -> [Async ReqID])
-> IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID])
-> (((ReqID, ReqPoolEntry s) -> Async ReqID)
-> [(ReqID, ReqPoolEntry s)] -> [Async ReqID])
-> ((ReqID, ReqPoolEntry s) -> Async ReqID)
-> IO [(ReqID, ReqPoolEntry s)]
-> IO [Async ReqID]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((ReqID, ReqPoolEntry s) -> Async ReqID)
-> [(ReqID, ReqPoolEntry s)] -> [Async ReqID]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) (ReqID, ReqPoolEntry s) -> Async ReqID
forall {a} {s}. (a, ReqPoolEntry s) -> Async ReqID
toTimer (IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID])
-> IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID]
forall a b. (a -> b) -> a -> b
$ ReqPool s -> IO [(ReqID, ReqPoolEntry s)]
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> IO [(k, v)]
HT.toList ReqPool s
req_pool
where
toTimer :: (a, ReqPoolEntry s) -> Async ReqID
toTimer (a
_, ReqPoolEntry s
entry) = ReqPoolEntry s -> Async ReqID
forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry
runMuxLoop :: WS.Connection -> ReqPool s -> Settings s
-> TBQueue (ReqPack s) -> TQueue RawRes -> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop :: forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres STM ConnectionState
readConnState Async ()
rx_thread = IO ()
loop
where
codec :: Codec s
codec = Settings s -> Codec s
forall s. Settings s -> Codec s
Settings.codec Settings s
settings
loop :: IO ()
loop = do
[Async ReqID]
res_timers <- ReqPool s -> IO [Async ReqID]
forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool
MuxEvent s
event <- STM (MuxEvent s) -> IO (MuxEvent s)
forall a. STM a -> IO a
atomically (STM (MuxEvent s) -> IO (MuxEvent s))
-> STM (MuxEvent s) -> IO (MuxEvent s)
forall a b. (a -> b) -> a -> b
$ [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers
case MuxEvent s
event of
EvReq ReqPack s
req -> ReqPack s -> IO ()
handleReq ReqPack s
req IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
EvRes RawRes
res -> RawRes -> IO ()
handleRes RawRes
res IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
MuxEvent s
EvActiveClose -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
MuxEvent s
EvRxFinish -> IO ()
handleRxFinish
EvRxError SomeException
e -> SomeException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw SomeException
e
EvResponseTimeout ReqID
rid -> ReqID -> IO ()
handleResponseTimeout ReqID
rid IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
getEventSTM :: [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers = STM (MuxEvent s)
getRequest
STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (RawRes -> MuxEvent s
forall s. RawRes -> MuxEvent s
EvRes (RawRes -> MuxEvent s) -> STM RawRes -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TQueue RawRes -> STM RawRes
forall a. TQueue a -> STM a
readTQueue TQueue RawRes
qres)
STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (MuxEvent s)
forall {s}. STM (MuxEvent s)
makeEvActiveClose
STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Either SomeException () -> MuxEvent s
forall {s}. Either SomeException () -> MuxEvent s
rxResultToEvent (Either SomeException () -> MuxEvent s)
-> STM (Either SomeException ()) -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async () -> STM (Either SomeException ())
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ()
rx_thread)
STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((Async ReqID, ReqID) -> MuxEvent s
forall {a} {s}. (a, ReqID) -> MuxEvent s
timeoutToEvent ((Async ReqID, ReqID) -> MuxEvent s)
-> STM (Async ReqID, ReqID) -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Async ReqID] -> STM (Async ReqID, ReqID)
forall a. [Async a] -> STM (Async a, a)
waitAnySTM [Async ReqID]
res_timers)
where
max_concurrency :: Int
max_concurrency = Settings s -> Int
forall s. Settings s -> Int
Settings.concurrency Settings s
settings
cur_concurrency :: Int
cur_concurrency = [Async ReqID] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ReqID]
res_timers
getRequest :: STM (MuxEvent s)
getRequest = if Int
cur_concurrency Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
max_concurrency
then ReqPack s -> MuxEvent s
forall s. ReqPack s -> MuxEvent s
EvReq (ReqPack s -> MuxEvent s) -> STM (ReqPack s) -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue (ReqPack s) -> STM (ReqPack s)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (ReqPack s)
qreq
else STM (MuxEvent s)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
rxResultToEvent :: Either SomeException () -> MuxEvent s
rxResultToEvent (Right ()) = MuxEvent s
forall s. MuxEvent s
EvRxFinish
rxResultToEvent (Left SomeException
e) = SomeException -> MuxEvent s
forall s. SomeException -> MuxEvent s
EvRxError SomeException
e
timeoutToEvent :: (a, ReqID) -> MuxEvent s
timeoutToEvent (a
_, ReqID
rid) = ReqID -> MuxEvent s
forall s. ReqID -> MuxEvent s
EvResponseTimeout ReqID
rid
makeEvActiveClose :: STM (MuxEvent s)
makeEvActiveClose = do
if Int
cur_concurrency Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then STM (MuxEvent s)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
else do
ConnectionState
conn_state <- STM ConnectionState
readConnState
if ConnectionState
conn_state ConnectionState -> ConnectionState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen then STM (MuxEvent s)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty else MuxEvent s -> STM (MuxEvent s)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return MuxEvent s
forall s. MuxEvent s
EvActiveClose
handleReq :: ReqPack s -> IO ()
handleReq ReqPack s
req = do
Bool
insert_ok <- ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeNewEntry
if Bool
insert_ok
then Connection -> RawRes -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
wsconn (RawRes -> IO ()) -> RawRes -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqPack s -> RawRes
forall s. ReqPack s -> RawRes
reqData ReqPack s
req
else IO ()
reportError
where
rid :: ReqID
rid = ReqPack s -> ReqID
forall s. ReqPack s -> ReqID
reqId ReqPack s
req
qout :: TQueue (ResPack s)
qout = ReqPack s -> TQueue (ResPack s)
forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
req
makeNewEntry :: IO (ReqPoolEntry s)
makeNewEntry = do
Async ReqID
timer_thread <- Int -> ReqID -> IO (Async ReqID)
runTimer (Settings s -> Int
forall s. Settings s -> Int
Settings.responseTimeout Settings s
settings) ReqID
rid
ReqPoolEntry s -> IO (ReqPoolEntry s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReqPoolEntry s -> IO (ReqPoolEntry s))
-> ReqPoolEntry s -> IO (ReqPoolEntry s)
forall a b. (a -> b) -> a -> b
$ ReqPoolEntry { $sel:rpeReqId:ReqPoolEntry :: ReqID
rpeReqId = ReqID
rid,
$sel:rpeOutput:ReqPoolEntry :: TQueue (ResPack s)
rpeOutput = TQueue (ResPack s)
qout,
$sel:rpeTimer:ReqPoolEntry :: Async ReqID
rpeTimer = Async ReqID
timer_thread
}
reportError :: IO ()
reportError =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ResPack s)
qout (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left (SomeException -> ResPack s) -> SomeException -> ResPack s
forall a b. (a -> b) -> a -> b
$ RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException (RequestException -> SomeException)
-> RequestException -> SomeException
forall a b. (a -> b) -> a -> b
$ ReqID -> RequestException
DuplicateRequestId ReqID
rid
handleRes :: RawRes -> IO ()
handleRes RawRes
res = case Codec s -> RawRes -> Either Host (ResponseMessage s)
forall s. Codec s -> RawRes -> Either Host (ResponseMessage s)
decodeWith Codec s
codec RawRes
res of
Left Host
err -> Settings s -> GeneralException -> IO ()
forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings (GeneralException -> IO ()) -> GeneralException -> IO ()
forall a b. (a -> b) -> a -> b
$ Host -> GeneralException
ResponseParseFailure Host
err
Right ResponseMessage s
res_msg -> ResponseMessage s -> IO ()
handleResMsg ResponseMessage s
res_msg
handleResMsg :: ResponseMessage s -> IO ()
handleResMsg res_msg :: ResponseMessage s
res_msg@(ResponseMessage { requestId :: forall s. ResponseMessage s -> ReqID
requestId = ReqID
rid }) = do
Maybe (ReqPoolEntry s)
m_entry <- ReqPool s -> ReqID -> IO (Maybe (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
case Maybe (ReqPoolEntry s)
m_entry of
Maybe (ReqPoolEntry s)
Nothing -> Settings s -> GeneralException -> IO ()
forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings (GeneralException -> IO ()) -> GeneralException -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqID -> GeneralException
UnexpectedRequestId ReqID
rid
Just ReqPoolEntry s
entry -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResponseMessage s -> Bool
forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res_msg) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ReqPool s -> ReqPoolEntry s -> IO ()
forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPoolEntry s -> TQueue (ResPack s)
forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ ResponseMessage s -> ResPack s
forall a b. b -> Either a b
Right ResponseMessage s
res_msg
handleRxFinish :: IO ()
handleRxFinish = do
let ex :: SomeException
ex = RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException RequestException
ServerClosed
ReqPool s -> SomeException -> IO ()
forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
ex
TBQueue (ReqPack s) -> SomeException -> IO ()
forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
ex
handleResponseTimeout :: ReqID -> IO ()
handleResponseTimeout ReqID
rid = do
Maybe (ReqPoolEntry s)
mentry <- ReqPool s -> ReqID -> IO (Maybe (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
case Maybe (ReqPoolEntry s)
mentry of
Maybe (ReqPoolEntry s)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ReqPoolEntry s
entry -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPoolEntry s -> TQueue (ResPack s)
forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left (SomeException -> ResPack s) -> SomeException -> ResPack s
forall a b. (a -> b) -> a -> b
$ RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException (RequestException -> SomeException)
-> RequestException -> SomeException
forall a b. (a -> b) -> a -> b
$ RequestException
ResponseTimeout
ReqPool s -> ReqPoolEntry s -> IO ()
forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry
runRxLoop :: WS.Connection -> TQueue RawRes -> IO ()
runRxLoop :: Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres = IO ()
loop
where
loop :: IO ()
loop = do
Maybe RawRes
mgot <- IO (Maybe RawRes)
tryReceive
case Maybe RawRes
mgot of
Maybe RawRes
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just RawRes
got -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue RawRes -> RawRes -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue RawRes
qres RawRes
got
IO ()
loop
tryReceive :: IO (Maybe RawRes)
tryReceive = Either ConnectionException RawRes -> IO (Maybe RawRes)
forall {m :: * -> *} {a}.
MonadThrow m =>
Either ConnectionException a -> m (Maybe a)
toMaybe (Either ConnectionException RawRes -> IO (Maybe RawRes))
-> IO (Either ConnectionException RawRes) -> IO (Maybe RawRes)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (IO RawRes -> IO (Either ConnectionException RawRes)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (IO RawRes -> IO (Either ConnectionException RawRes))
-> IO RawRes -> IO (Either ConnectionException RawRes)
forall a b. (a -> b) -> a -> b
$ Connection -> IO RawRes
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
wsconn)
where
toMaybe :: Either ConnectionException a -> m (Maybe a)
toMaybe (Right a
d) = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
d
toMaybe (Left e :: ConnectionException
e@(WS.CloseRequest Word16
close_status RawRes
_)) = do
if Word16
close_status Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
1000
then Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
else ConnectionException -> m (Maybe a)
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw ConnectionException
e
toMaybe (Left ConnectionException
WS.ConnectionClosed) = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
toMaybe (Left ConnectionException
e) = ConnectionException -> m (Maybe a)
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw ConnectionException
e
runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer Int
wait_sec ReqID
rid = IO ReqID -> IO (Async ReqID)
forall a. IO a -> IO (Async a)
async (IO ReqID -> IO (Async ReqID)) -> IO ReqID -> IO (Async ReqID)
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
wait_sec Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
ReqID -> IO ReqID
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ReqID
rid
data ResponseHandle s
= ResponseHandle
{ forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse :: STM (ResPack s)
, forall s. ResponseHandle s -> TVar Bool
rhTerminated :: TVar Bool
}
instance Functor ResponseHandle where
fmap :: forall a b. (a -> b) -> ResponseHandle a -> ResponseHandle b
fmap a -> b
f ResponseHandle a
rh = ResponseHandle a
rh { rhGetResponse = (fmap . fmap . fmap) f $ rhGetResponse rh }
sendRequest :: Operation o => Connection s -> o -> IO (ResponseHandle s)
sendRequest :: forall o s.
Operation o =>
Connection s -> o -> IO (ResponseHandle s)
sendRequest Connection s
conn o
o = Connection s -> RequestMessage -> IO (ResponseHandle s)
forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn (RequestMessage -> IO (ResponseHandle s))
-> IO RequestMessage -> IO (ResponseHandle s)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< o -> IO RequestMessage
forall o. Operation o => o -> IO RequestMessage
makeRequestMessage o
o
sendRequest' :: Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' :: forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn RequestMessage
req_msg = do
TQueue (ResPack s)
qout <- IO (TQueue (ResPack s))
forall a. IO (TQueue a)
newTQueueIO
Bool
is_open <- IO Bool
getConnectionOpen
if Bool
is_open
then TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout
else TQueue (ResPack s) -> IO ()
forall {b}. TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (ResPack s)
qout
TQueue (ResPack s) -> IO (ResponseHandle s)
forall {s}. TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout
where
codec :: Codec s
codec = Connection s -> Codec s
forall s. Connection s -> Codec s
connCodec Connection s
conn
qreq :: TBQueue (ReqPack s)
qreq = Connection s -> TBQueue (ReqPack s)
forall s. Connection s -> TBQueue (ReqPack s)
connQReq Connection s
conn
var_conn_state :: TVar ConnectionState
var_conn_state = Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
rid :: ReqID
rid = forall {k} (x :: k) r a. HasField x r a => r -> a
forall (x :: Symbol) r a. HasField x r a => r -> a
getField @"requestId" RequestMessage
req_msg
getConnectionOpen :: IO Bool
getConnectionOpen = (ConnectionState -> Bool) -> IO ConnectionState -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ConnectionState -> ConnectionState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen) (IO ConnectionState -> IO Bool) -> IO ConnectionState -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM ConnectionState -> IO ConnectionState
forall a. STM a -> IO a
atomically (STM ConnectionState -> IO ConnectionState)
-> STM ConnectionState -> IO ConnectionState
forall a b. (a -> b) -> a -> b
$ TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state
sendReqPack :: TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue (ReqPack s) -> ReqPack s -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (ReqPack s)
qreq ReqPack s
reqpack
where
reqpack :: ReqPack s
reqpack = ReqPack
{ reqData :: RawRes
reqData = Codec s -> RequestMessage -> RawRes
forall s. Codec s -> RequestMessage -> RawRes
encodeBinaryWith Codec s
codec RequestMessage
req_msg,
reqId :: ReqID
reqId = ReqID
rid,
reqOutput :: TQueue (ResPack s)
reqOutput = TQueue (ResPack s)
qout
}
makeResHandle :: TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout = do
TVar Bool
var_term <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
ResponseHandle s -> IO (ResponseHandle s)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ResponseHandle s -> IO (ResponseHandle s))
-> ResponseHandle s -> IO (ResponseHandle s)
forall a b. (a -> b) -> a -> b
$ ResponseHandle
{ $sel:rhGetResponse:ResponseHandle :: STM (ResPack s)
rhGetResponse = TQueue (ResPack s) -> STM (ResPack s)
forall a. TQueue a -> STM a
readTQueue TQueue (ResPack s)
qout,
$sel:rhTerminated:ResponseHandle :: TVar Bool
rhTerminated = TVar Bool
var_term
}
reportAlreadyClosed :: TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (Either SomeException b)
qout = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (Either SomeException b) -> Either SomeException b -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Either SomeException b)
qout (Either SomeException b -> STM ())
-> Either SomeException b -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException b
forall a b. a -> Either a b
Left (SomeException -> Either SomeException b)
-> SomeException -> Either SomeException b
forall a b. (a -> b) -> a -> b
$ RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException (RequestException -> SomeException)
-> RequestException -> SomeException
forall a b. (a -> b) -> a -> b
$ RequestException
AlreadyClosed
nextResponse :: ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse :: forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse = STM (Maybe (ResponseMessage s)) -> IO (Maybe (ResponseMessage s))
forall a. STM a -> IO a
atomically (STM (Maybe (ResponseMessage s)) -> IO (Maybe (ResponseMessage s)))
-> (ResponseHandle s -> STM (Maybe (ResponseMessage s)))
-> ResponseHandle s
-> IO (Maybe (ResponseMessage s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseHandle s -> STM (Maybe (ResponseMessage s))
forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM
nextResponseSTM :: ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM :: forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM ResponseHandle s
rh = do
Bool
termed <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (TVar Bool -> STM Bool) -> TVar Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ ResponseHandle s -> TVar Bool
forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh
if Bool
termed
then Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ResponseMessage s)
forall a. Maybe a
Nothing
else STM (Maybe (ResponseMessage s))
readResponse
where
readResponse :: STM (Maybe (ResponseMessage s))
readResponse = do
ResPack s
eres <- ResponseHandle s -> STM (ResPack s)
forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle s
rh
case ResPack s
eres of
Left SomeException
ex -> SomeException -> STM (Maybe (ResponseMessage s))
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throw SomeException
ex
Right ResponseMessage s
res -> do
ResponseMessage s -> STM ()
forall {s}. ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res
Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s)))
-> Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s))
forall a b. (a -> b) -> a -> b
$ ResponseMessage s -> Maybe (ResponseMessage s)
forall a. a -> Maybe a
Just ResponseMessage s
res
updateTermed :: ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res =
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResponseMessage s -> Bool
forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (ResponseHandle s -> TVar Bool
forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh) Bool
True
isTerminatingResponse :: ResponseMessage s -> Bool
isTerminatingResponse :: forall s. ResponseMessage s -> Bool
isTerminatingResponse (ResponseMessage { status :: forall s. ResponseMessage s -> ResponseStatus
status = (ResponseStatus { code :: ResponseStatus -> ResponseCode
code = ResponseCode
c }) }) =
ResponseCode -> Bool
isTerminating ResponseCode
c
slurpResponses :: ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses :: forall s. ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses ResponseHandle s
h = IO (Maybe (ResponseMessage s)) -> IO (Vector (ResponseMessage s))
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m (Vector a)
slurp (IO (Maybe (ResponseMessage s)) -> IO (Vector (ResponseMessage s)))
-> IO (Maybe (ResponseMessage s))
-> IO (Vector (ResponseMessage s))
forall a b. (a -> b) -> a -> b
$ ResponseHandle s -> IO (Maybe (ResponseMessage s))
forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h
drainResponses :: ResponseHandle s -> IO ()
drainResponses :: forall s. ResponseHandle s -> IO ()
drainResponses ResponseHandle s
h = IO (Maybe (ResponseMessage s)) -> IO ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m ()
drain (IO (Maybe (ResponseMessage s)) -> IO ())
-> IO (Maybe (ResponseMessage s)) -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseHandle s -> IO (Maybe (ResponseMessage s))
forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h