{-# LANGUAGE CPP                   #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeApplications      #-}
-- |
-- Module: Network.Greskell.WebSocket.Connection.Impl
-- Description: internal implementation of Connection
-- Maintainer: Toshio Ito <debug.ito@gmail.com>
--
-- This is an internal module. It deliberately exports everything. The
-- upper module is responsible to make a proper export list.
module Network.Greskell.WebSocket.Connection.Impl where

import           Control.Applicative                            (empty, (<$>), (<|>))
import           Control.Concurrent                             (threadDelay)
import           Control.Concurrent.Async                       (Async, async, waitAnySTM,
                                                                 waitCatchSTM, withAsync)
import qualified Control.Concurrent.Async                       as Async
import           Control.Concurrent.STM                         (STM, TBQueue, TMVar, TQueue, TVar,
                                                                 atomically, newEmptyTMVarIO,
                                                                 newTBQueueIO, newTQueueIO,
                                                                 newTVarIO, putTMVar, readTBQueue,
                                                                 readTMVar, readTQueue, readTVar,
                                                                 retry, tryPutTMVar, tryReadTMVar,
                                                                 writeTBQueue, writeTQueue,
                                                                 writeTVar)
import qualified Control.Concurrent.STM                         as STM
import           Control.Exception.Safe                         (Exception (toException),
                                                                 SomeException, finally, throw, try,
                                                                 withException)
import           Control.Monad                                  (forM_, void, when)
import           Data.Aeson                                     (Value)
import qualified Data.ByteString.Lazy                           as BSL
import           Data.Foldable                                  (toList)
import qualified Data.HashTable.IO                              as HT
import           Data.Monoid                                    (mempty)
import           Data.Typeable                                  (Typeable)
import           Data.UUID                                      (UUID)
import           Data.Vector                                    (Vector)
import           GHC.Records                                    (HasField (..))
import qualified Network.WebSockets                             as WS

import           Network.Greskell.WebSocket.Codec               (Codec (decodeWith, encodeWith),
                                                                 encodeBinaryWith)
import           Network.Greskell.WebSocket.Connection.Settings (Settings)
import qualified Network.Greskell.WebSocket.Connection.Settings as Settings
import           Network.Greskell.WebSocket.Connection.Type     (Connection (..),
                                                                 ConnectionState (..),
                                                                 GeneralException (..), RawRes,
                                                                 ReqID, ReqPack (..), ResPack)
import           Network.Greskell.WebSocket.Request             (Operation,
                                                                 RequestMessage (RequestMessage, requestId),
                                                                 makeRequestMessage)
import           Network.Greskell.WebSocket.Response            (ResponseMessage (ResponseMessage, requestId, status),
                                                                 ResponseStatus (ResponseStatus, code),
                                                                 isTerminating)
import           Network.Greskell.WebSocket.Util                (drain, slurp)


flushTBQueue :: TBQueue a -> STM [a]
#if MIN_VERSION_stm(2,4,5)
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue = forall a. TBQueue a -> STM [a]
STM.flushTBQueue
#else
flushTBQueue q = fmap toList $ slurp $ STM.tryReadTBQueue q
#endif


-- | Host name or an IP address.
type Host = String

-- | TCP port number.
type Port = Int

-- | Make a 'Connection' to a Gremlin Server.
--
-- If it fails to connect to the specified server, it throws an
-- exception.
connect :: Settings s -> Host -> Port -> IO (Connection s)
connect :: forall s. Settings s -> Host -> Int -> IO (Connection s)
connect Settings s
settings Host
host Int
port = do
  HashTable RealWorld ReqID (ReqPoolEntry s)
req_pool <- forall (h :: * -> * -> * -> *) k v.
HashTable h =>
IO (IOHashTable h k v)
HT.new  -- Do not manipulate req_pool in this thread. It belongs to runWSConn thread.
  TBQueue (ReqPack s)
qreq <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
qreq_size
  TMVar (Either SomeException ())
var_connect_result <- forall a. IO (TMVar a)
newEmptyTMVarIO
  TVar ConnectionState
var_conn_state <- forall a. a -> IO (TVar a)
newTVarIO ConnectionState
ConnOpen
  Async ()
ws_thread <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall s.
Settings s
-> Host
-> Int
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Int
port Host
ws_path HashTable RealWorld ReqID (ReqPoolEntry s)
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state
  Either SomeException ()
eret <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException ())
var_connect_result
  case Either SomeException ()
eret of
   Left SomeException
e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
   Right () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Connection { connQReq :: TBQueue (ReqPack s)
connQReq = TBQueue (ReqPack s)
qreq,
                                     connState :: TVar ConnectionState
connState = TVar ConnectionState
var_conn_state,
                                     connWSThread :: Async ()
connWSThread = Async ()
ws_thread,
                                     connCodec :: Codec s
connCodec = Codec s
codec
                                   }
  where
    codec :: Codec s
codec = forall s. Settings s -> Codec s
Settings.codec Settings s
settings
    qreq_size :: Natural
qreq_size = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ forall s. Settings s -> Int
Settings.requestQueueSize Settings s
settings
    ws_path :: Host
ws_path = forall s. Settings s -> Host
Settings.endpointPath Settings s
settings

-- | Close the 'Connection'.
--
-- If there are pending requests in the 'Connection', 'close' function
-- blocks for them to complete or time out.
--
-- Calling 'close' on a 'Connection' already closed (or waiting to
-- close) does nothing.
close :: Connection s -> IO ()
close :: forall s. Connection s -> IO ()
close Connection s
conn = do
  Bool
need_wait <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    ConnectionState
cur_state <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
    case ConnectionState
cur_state of
     ConnectionState
ConnClosed -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
     ConnectionState
ConnClosing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
     ConnectionState
ConnOpen -> do
       forall a. TVar a -> a -> STM ()
writeTVar (forall s. Connection s -> TVar ConnectionState
connState Connection s
conn) ConnectionState
ConnClosing
       forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
  if Bool
need_wait then IO ()
waitForClose else forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    waitForClose :: IO ()
waitForClose = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      ConnectionState
cur_state <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
      if ConnectionState
cur_state forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnClosed
        then forall (m :: * -> *) a. Monad m => a -> m a
return ()
        else forall a. STM a
retry

type Path = String

-- | A thread taking care of a WS connection.
runWSConn :: Settings s
          -> Host -> Port -> Path
          -> ReqPool s -> TBQueue (ReqPack s)
          -> TMVar (Either SomeException ()) -> TVar ConnectionState
          -> IO ()
runWSConn :: forall s.
Settings s
-> Host
-> Int
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Int
port Host
path ReqPool s
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state =
  (IO ()
doConnect forall (m :: * -> *) e a b.
(MonadMask m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` SomeException -> IO ()
reportFatalEx) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
finalize
  where
    doConnect :: IO ()
doConnect = forall a. Host -> Int -> Host -> ClientApp a -> IO a
WS.runClient Host
host Int
port Host
path forall a b. (a -> b) -> a -> b
$ \Connection
wsconn -> do
      Bool
is_success <- IO Bool
checkAndReportConnectSuccess
      if Bool -> Bool
not Bool
is_success
        then forall (m :: * -> *) a. Monad m => a -> m a
return () -- result is already reported at var_connect_result
        else Connection -> IO ()
setupMux Connection
wsconn
    setupMux :: Connection -> IO ()
setupMux Connection
wsconn = do
      TQueue RawRes
qres <- forall a. IO (TQueue a)
newTQueueIO
      forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres) forall a b. (a -> b) -> a -> b
$ \Async ()
rx_thread ->
        forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres (forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state) Async ()
rx_thread
    checkAndReportConnectSuccess :: IO Bool
checkAndReportConnectSuccess = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      Maybe (Either SomeException ())
mret <- forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar (Either SomeException ())
var_connect_result
      case Maybe (Either SomeException ())
mret of
       -- usually, mret should be Nothing.
       Maybe (Either SomeException ())
Nothing -> do
         forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException ())
var_connect_result forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ()
         forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
       Just (Right ()
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
       Just (Left SomeException
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    reportFatalEx :: SomeException -> IO ()
    reportFatalEx :: SomeException -> IO ()
reportFatalEx SomeException
cause = do
      SomeException -> IO ()
reportToConnectCaller SomeException
cause
      forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause
      forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause
    reportToConnectCaller :: SomeException -> IO ()
reportToConnectCaller SomeException
cause = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (Either SomeException ())
var_connect_result forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left SomeException
cause
    finalize :: IO ()
finalize = do
      forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnectionState
var_conn_state ConnectionState
ConnClosed

reportToReqPool :: ReqPool s -> SomeException -> IO ()
reportToReqPool :: forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause = forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ forall {a} {s}. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
  where
    forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left SomeException
cause

reportToQReq :: TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq :: forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  [ReqPack s]
reqpacks <- forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue (ReqPack s)
qreq
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ReqPack s]
reqpacks forall {s}. ReqPack s -> STM ()
reportToReqPack
  where
    reportToReqPack :: ReqPack s -> STM ()
reportToReqPack ReqPack s
reqp = forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
reqp) forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left SomeException
cause

-- | An exception related to a specific request.
data RequestException
  = AlreadyClosed
  -- ^ The connection is already closed before it sends the request.
  | ServerClosed
  -- ^ The server closed the connection before it sends response for
  -- this request.
  | DuplicateRequestId UUID
  -- ^ The requestId (kept in this object) is already pending in the
  -- connection.
  | ResponseTimeout
  -- ^ The server fails to send ResponseMessages within
  -- 'Settings.responseTimeout'.
  deriving (RequestException -> RequestException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RequestException -> RequestException -> Bool
$c/= :: RequestException -> RequestException -> Bool
== :: RequestException -> RequestException -> Bool
$c== :: RequestException -> RequestException -> Bool
Eq, Int -> RequestException -> ShowS
[RequestException] -> ShowS
RequestException -> Host
forall a.
(Int -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [RequestException] -> ShowS
$cshowList :: [RequestException] -> ShowS
show :: RequestException -> Host
$cshow :: RequestException -> Host
showsPrec :: Int -> RequestException -> ShowS
$cshowsPrec :: Int -> RequestException -> ShowS
Show, Typeable)

instance Exception RequestException

data ReqPoolEntry s
  = ReqPoolEntry
      { forall s. ReqPoolEntry s -> ReqID
rpeReqId  :: !ReqID
      , forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput :: !(TQueue (ResPack s))
      , forall s. ReqPoolEntry s -> Async ReqID
rpeTimer  :: !(Async ReqID)
        -- ^ timer thread to time out response.
      }

-- | (requestId of pending request) --> (objects related to that pending request)
type ReqPool s = HT.BasicHashTable ReqID (ReqPoolEntry s)

-- | Multiplexed event object
data MuxEvent s
  = EvReq (ReqPack s)
  | EvRes RawRes
  | EvActiveClose
  | EvRxFinish
  | EvRxError SomeException
  | EvResponseTimeout ReqID

-- | HashTable's mutateIO is available since 1.2.3.0
tryInsertToReqPool :: ReqPool s
                   -> ReqID
                   -> IO (ReqPoolEntry s) -- ^ action to create the new entry.
                   -> IO Bool -- ^ 'True' if insertion is successful.
tryInsertToReqPool :: forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeEntry = do
  Maybe (ReqPoolEntry s)
mexist_entry <- forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
  case Maybe (ReqPoolEntry s)
mexist_entry of
   Just ReqPoolEntry s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
   Maybe (ReqPoolEntry s)
Nothing -> do
     ReqPoolEntry s
new_entry <- IO (ReqPoolEntry s)
makeEntry
     forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> v -> IO ()
HT.insert ReqPool s
req_pool ReqID
rid ReqPoolEntry s
new_entry
     forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

cleanupReqPoolEntry :: ReqPoolEntry s -> IO ()
cleanupReqPoolEntry :: forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry = forall a. Async a -> IO ()
Async.cancel forall a b. (a -> b) -> a -> b
$ forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry

removeReqPoolEntry :: ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry :: forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry = do
  forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry
  forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO ()
HT.delete ReqPool s
req_pool forall a b. (a -> b) -> a -> b
$ forall s. ReqPoolEntry s -> ReqID
rpeReqId ReqPoolEntry s
entry

cleanupReqPool :: ReqPool s -> IO ()
cleanupReqPool :: forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool = forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ forall {a} {s}. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
  where
    forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry

getAllResponseTimers :: ReqPool s -> IO [Async ReqID]
getAllResponseTimers :: forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) forall {a} {s}. (a, ReqPoolEntry s) -> Async ReqID
toTimer forall a b. (a -> b) -> a -> b
$ forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> IO [(k, v)]
HT.toList ReqPool s
req_pool
  where
    toTimer :: (a, ReqPoolEntry s) -> Async ReqID
toTimer (a
_, ReqPoolEntry s
entry) = forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry

-- | Multiplexer loop.
runMuxLoop :: WS.Connection -> ReqPool s -> Settings s
           -> TBQueue (ReqPack s) -> TQueue RawRes -> STM ConnectionState
           -> Async ()
           -> IO ()
runMuxLoop :: forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres STM ConnectionState
readConnState Async ()
rx_thread = IO ()
loop
  where
    codec :: Codec s
codec = forall s. Settings s -> Codec s
Settings.codec Settings s
settings
    loop :: IO ()
loop = do
      [Async ReqID]
res_timers <- forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool
      MuxEvent s
event <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers
      case MuxEvent s
event of
       EvReq ReqPack s
req             -> ReqPack s -> IO ()
handleReq ReqPack s
req forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
       EvRes RawRes
res             -> RawRes -> IO ()
handleRes RawRes
res forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
       MuxEvent s
EvActiveClose         -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
       MuxEvent s
EvRxFinish            -> IO ()
handleRxFinish
       EvRxError SomeException
e           -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
       EvResponseTimeout ReqID
rid -> ReqID -> IO ()
handleResponseTimeout ReqID
rid forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
    getEventSTM :: [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers = STM (MuxEvent s)
getRequest
                             forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall s. RawRes -> MuxEvent s
EvRes forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TQueue a -> STM a
readTQueue TQueue RawRes
qres)
                             forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall {s}. STM (MuxEvent s)
makeEvActiveClose
                             forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall {s}. Either SomeException () -> MuxEvent s
rxResultToEvent forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ()
rx_thread)
                             forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall {a} {s}. (a, ReqID) -> MuxEvent s
timeoutToEvent forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. [Async a] -> STM (Async a, a)
waitAnySTM [Async ReqID]
res_timers)
        where
          max_concurrency :: Int
max_concurrency = forall s. Settings s -> Int
Settings.concurrency Settings s
settings
          cur_concurrency :: Int
cur_concurrency = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ReqID]
res_timers
          getRequest :: STM (MuxEvent s)
getRequest = if Int
cur_concurrency forall a. Ord a => a -> a -> Bool
< Int
max_concurrency
                       then forall s. ReqPack s -> MuxEvent s
EvReq forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TBQueue a -> STM a
readTBQueue TBQueue (ReqPack s)
qreq
                       else forall (f :: * -> *) a. Alternative f => f a
empty
          rxResultToEvent :: Either SomeException () -> MuxEvent s
rxResultToEvent (Right ()) = forall s. MuxEvent s
EvRxFinish
          rxResultToEvent (Left SomeException
e)   = forall s. SomeException -> MuxEvent s
EvRxError SomeException
e
          timeoutToEvent :: (a, ReqID) -> MuxEvent s
timeoutToEvent (a
_, ReqID
rid) = forall s. ReqID -> MuxEvent s
EvResponseTimeout ReqID
rid
          makeEvActiveClose :: STM (MuxEvent s)
makeEvActiveClose = do
            if Int
cur_concurrency forall a. Ord a => a -> a -> Bool
> Int
0
              then forall (f :: * -> *) a. Alternative f => f a
empty
              else do
                ConnectionState
conn_state <- STM ConnectionState
readConnState
                if ConnectionState
conn_state forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen then forall (f :: * -> *) a. Alternative f => f a
empty else forall (m :: * -> *) a. Monad m => a -> m a
return forall s. MuxEvent s
EvActiveClose
    handleReq :: ReqPack s -> IO ()
handleReq ReqPack s
req = do
      Bool
insert_ok <- forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeNewEntry
      if Bool
insert_ok
        then forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
wsconn forall a b. (a -> b) -> a -> b
$ forall s. ReqPack s -> RawRes
reqData ReqPack s
req
        else IO ()
reportError
        where
          rid :: ReqID
rid = forall s. ReqPack s -> ReqID
reqId ReqPack s
req
          qout :: TQueue (ResPack s)
qout = forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
req
          makeNewEntry :: IO (ReqPoolEntry s)
makeNewEntry = do
            Async ReqID
timer_thread <- Int -> ReqID -> IO (Async ReqID)
runTimer (forall s. Settings s -> Int
Settings.responseTimeout Settings s
settings) ReqID
rid
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ReqPoolEntry { $sel:rpeReqId:ReqPoolEntry :: ReqID
rpeReqId = ReqID
rid,
                                    $sel:rpeOutput:ReqPoolEntry :: TQueue (ResPack s)
rpeOutput = TQueue (ResPack s)
qout,
                                    $sel:rpeTimer:ReqPoolEntry :: Async ReqID
rpeTimer = Async ReqID
timer_thread
                                  }
          reportError :: IO ()
reportError =
            forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ResPack s)
qout forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$ ReqID -> RequestException
DuplicateRequestId ReqID
rid
    handleRes :: RawRes -> IO ()
handleRes RawRes
res = case forall s. Codec s -> RawRes -> Either Host (ResponseMessage s)
decodeWith Codec s
codec RawRes
res of
      Left Host
err      -> forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings forall a b. (a -> b) -> a -> b
$ Host -> GeneralException
ResponseParseFailure Host
err
      Right ResponseMessage s
res_msg -> ResponseMessage s -> IO ()
handleResMsg ResponseMessage s
res_msg
    handleResMsg :: ResponseMessage s -> IO ()
handleResMsg res_msg :: ResponseMessage s
res_msg@(ResponseMessage { requestId :: forall s. ResponseMessage s -> ReqID
requestId = ReqID
rid }) = do
      Maybe (ReqPoolEntry s)
m_entry <- forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
      case Maybe (ReqPoolEntry s)
m_entry of
       Maybe (ReqPoolEntry s)
Nothing -> forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings forall a b. (a -> b) -> a -> b
$ ReqID -> GeneralException
UnexpectedRequestId ReqID
rid
       Just ReqPoolEntry s
entry -> do
         forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res_msg) forall a b. (a -> b) -> a -> b
$ do
           forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry
         forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ResponseMessage s
res_msg
    handleRxFinish :: IO ()
handleRxFinish = do
      -- RxFinish is an error for pending requests. If there is no
      -- pending requests, it's totally normal.
      let ex :: SomeException
ex = forall e. Exception e => e -> SomeException
toException RequestException
ServerClosed
      forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
ex
      forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
ex
    handleResponseTimeout :: ReqID -> IO ()
handleResponseTimeout ReqID
rid = do
      Maybe (ReqPoolEntry s)
mentry <- forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
      case Maybe (ReqPoolEntry s)
mentry of
       Maybe (ReqPoolEntry s)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return () -- this case may happen if the response came just before the time-out, I think.
       Just ReqPoolEntry s
entry -> do
         forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue (forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$ RequestException
ResponseTimeout
         forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry


-- | Receiver thread. It keeps receiving data from WS until the
-- connection finishes cleanly. Basically every exception is raised to
-- the caller.
runRxLoop :: WS.Connection -> TQueue RawRes -> IO ()
runRxLoop :: Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres = IO ()
loop
  where
    loop :: IO ()
loop = do
      Maybe RawRes
mgot <- IO (Maybe RawRes)
tryReceive
      case Maybe RawRes
mgot of
        Maybe RawRes
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just RawRes
got -> do
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue RawRes
qres RawRes
got
          IO ()
loop
    tryReceive :: IO (Maybe RawRes)
tryReceive = forall {m :: * -> *} {a}.
MonadThrow m =>
Either ConnectionException a -> m (Maybe a)
toMaybe forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
wsconn)
      where
        toMaybe :: Either ConnectionException a -> m (Maybe a)
toMaybe (Right a
d) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
d
        toMaybe (Left e :: ConnectionException
e@(WS.CloseRequest Word16
close_status RawRes
_)) = do
          if Word16
close_status forall a. Eq a => a -> a -> Bool
== Word16
1000 -- "normal closure". See sec. 7.4, RFC 6455.
            then forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            else forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ConnectionException
e
        -- We allow the server to close the connection without sending Close request message.
        toMaybe (Left ConnectionException
WS.ConnectionClosed) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        toMaybe (Left ConnectionException
e) = forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ConnectionException
e

runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer Int
wait_sec ReqID
rid = forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ do
  Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
wait_sec forall a. Num a => a -> a -> a
* Int
1000000
  forall (m :: * -> *) a. Monad m => a -> m a
return ReqID
rid


-- | A handle associated in a 'Connection' for a pair of request and
-- response. You can retrieve 'ResponseMessage's from this object.
--
-- Type @s@ is the body of the response.
data ResponseHandle s
  = ResponseHandle
      { forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse :: STM (ResPack s)
      , forall s. ResponseHandle s -> TVar Bool
rhTerminated  :: TVar Bool
      }

instance Functor ResponseHandle where
  fmap :: forall a b. (a -> b) -> ResponseHandle a -> ResponseHandle b
fmap a -> b
f ResponseHandle a
rh = ResponseHandle a
rh { $sel:rhGetResponse:ResponseHandle :: STM (ResPack b)
rhGetResponse = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) a -> b
f forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle a
rh }


-- | Make a 'RequestMessage' from an 'Operation' and send it.
--
-- Usually this function does not throw any exception. Exceptions
-- about sending requests are reported when you operate on
-- 'ResponseHandle'.
sendRequest :: Operation o => Connection s -> o -> IO (ResponseHandle s)
sendRequest :: forall o s.
Operation o =>
Connection s -> o -> IO (ResponseHandle s)
sendRequest Connection s
conn o
o = forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall o. Operation o => o -> IO RequestMessage
makeRequestMessage o
o

-- | Like 'sendRequest', but you can pass a 'RequestMessage' directly
-- to this function.
sendRequest' :: Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' :: forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn RequestMessage
req_msg = do
  TQueue (ResPack s)
qout <- forall a. IO (TQueue a)
newTQueueIO
  Bool
is_open <- IO Bool
getConnectionOpen
  if Bool
is_open
    then TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout
    else forall {b}. TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (ResPack s)
qout
  forall {s}. TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout
  where
    codec :: Codec s
codec = forall s. Connection s -> Codec s
connCodec Connection s
conn
    qreq :: TBQueue (ReqPack s)
qreq = forall s. Connection s -> TBQueue (ReqPack s)
connQReq Connection s
conn
    var_conn_state :: TVar ConnectionState
var_conn_state = forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
    rid :: ReqID
rid = forall {k} (x :: k) r a. HasField x r a => r -> a
getField @"requestId" RequestMessage
req_msg
    getConnectionOpen :: IO Bool
getConnectionOpen = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen) forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state
    sendReqPack :: TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout = do
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (ReqPack s)
qreq ReqPack s
reqpack
      where
        reqpack :: ReqPack s
reqpack = ReqPack
                  { reqData :: RawRes
reqData = forall s. Codec s -> RequestMessage -> RawRes
encodeBinaryWith Codec s
codec RequestMessage
req_msg,
                    reqId :: ReqID
reqId = ReqID
rid,
                    reqOutput :: TQueue (ResPack s)
reqOutput = TQueue (ResPack s)
qout
                  }
    makeResHandle :: TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout = do
      TVar Bool
var_term <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ResponseHandle
               { $sel:rhGetResponse:ResponseHandle :: STM (ResPack s)
rhGetResponse = forall a. TQueue a -> STM a
readTQueue TQueue (ResPack s)
qout,
                 $sel:rhTerminated:ResponseHandle :: TVar Bool
rhTerminated = TVar Bool
var_term
               }
    reportAlreadyClosed :: TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (Either SomeException b)
qout = do
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Either SomeException b)
qout forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$ RequestException
AlreadyClosed


-- | Get a 'ResponseMessage' from 'ResponseHandle'. If you have
-- already got all responses, it returns 'Nothing'. This function may
-- block for a new 'ResponseMessage' to come.
--
-- On error, it may throw all sorts of exceptions including
-- 'RequestException'.
nextResponse :: ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse :: forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM

-- | 'STM' version of 'nextResponse'.
nextResponseSTM :: ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM :: forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM ResponseHandle s
rh = do
  Bool
termed <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh
  if Bool
termed
    then forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
    else STM (Maybe (ResponseMessage s))
readResponse
  where
    readResponse :: STM (Maybe (ResponseMessage s))
readResponse = do
      ResPack s
eres <- forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle s
rh
      case ResPack s
eres of
       Left SomeException
ex -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
ex -- throw in STM. The eres is put back to the queue.
       Right ResponseMessage s
res -> do
         forall {s}. ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res
         forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ResponseMessage s
res
    updateTermed :: ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res =
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res) forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> a -> STM ()
writeTVar (forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh) Bool
True

isTerminatingResponse :: ResponseMessage s -> Bool
isTerminatingResponse :: forall s. ResponseMessage s -> Bool
isTerminatingResponse (ResponseMessage { status :: forall s. ResponseMessage s -> ResponseStatus
status = (ResponseStatus { code :: ResponseStatus -> ResponseCode
code = ResponseCode
c }) }) =
  ResponseCode -> Bool
isTerminating ResponseCode
c

-- | Get all remaining 'ResponseMessage's from 'ResponseHandle'.
slurpResponses :: ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses :: forall s. ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses ResponseHandle s
h = forall (m :: * -> *) a. Monad m => m (Maybe a) -> m (Vector a)
slurp forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h

-- | Similar to 'slurpResponses', but this function discards the
-- responses.
--
-- @since 0.1.1.0
drainResponses :: ResponseHandle s -> IO ()
drainResponses :: forall s. ResponseHandle s -> IO ()
drainResponses ResponseHandle s
h = forall (m :: * -> *) a. Monad m => m (Maybe a) -> m ()
drain forall a b. (a -> b) -> a -> b
$ forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h