{-# LANGUAGE DuplicateRecordFields, CPP #-}
-- |
-- Module: Network.Greskell.WebSocket.Connection.Impl
-- Description: internal implementation of Connection
-- Maintainer: Toshio Ito <debug.ito@gmail.com>
--
-- This is an internal module. It deliberately exports everything. The
-- upper module is responsible to make a proper export list.
module Network.Greskell.WebSocket.Connection.Impl where

import Control.Applicative ((<$>), (<|>), empty)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync, Async, async, waitCatchSTM, waitAnySTM)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM
  ( TBQueue, readTBQueue, newTBQueueIO, writeTBQueue,
    TQueue, writeTQueue, newTQueueIO, readTQueue,
    TVar, newTVarIO, readTVar, writeTVar,
    TMVar, tryPutTMVar, tryReadTMVar, putTMVar, newEmptyTMVarIO, readTMVar,
    STM, atomically, retry
  )
import qualified Control.Concurrent.STM as STM
import Control.Exception.Safe
  ( Exception(toException), SomeException, withException, throw, try, finally
  )
import Control.Monad (when, void, forM_)
import Data.Aeson (Value)
import qualified Data.ByteString.Lazy as BSL
import Data.Foldable (toList)
import qualified Data.HashTable.IO as HT
import Data.Monoid (mempty)
import Data.Typeable (Typeable)
import Data.UUID (UUID)
import Data.Vector (Vector)
import qualified Network.WebSockets as WS

import Network.Greskell.WebSocket.Codec (Codec(decodeWith, encodeWith), encodeBinaryWith)
import Network.Greskell.WebSocket.Connection.Settings (Settings)
import qualified Network.Greskell.WebSocket.Connection.Settings as Settings
import Network.Greskell.WebSocket.Connection.Type
  ( Connection(..), ConnectionState(..),
    ResPack, ReqID, ReqPack(..), RawRes,
    GeneralException(..)
  )
import Network.Greskell.WebSocket.Request
  ( RequestMessage(RequestMessage, requestId),
    Operation, makeRequestMessage
  )
import Network.Greskell.WebSocket.Response
  ( ResponseMessage(ResponseMessage, requestId, status),
    ResponseStatus(ResponseStatus, code),
    isTerminating
  )
import Network.Greskell.WebSocket.Util (slurp, drain)


flushTBQueue :: TBQueue a -> STM [a]
#if MIN_VERSION_stm(2,4,5)
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue = TBQueue a -> STM [a]
forall a. TBQueue a -> STM [a]
STM.flushTBQueue
#else
flushTBQueue q = fmap toList $ slurp $ STM.tryReadTBQueue q
#endif


-- | Host name or an IP address.
type Host = String

-- | TCP port number.
type Port = Int

-- | Make a 'Connection' to a Gremlin Server.
--
-- If it fails to connect to the specified server, it throws an
-- exception.
connect :: Settings s -> Host -> Port -> IO (Connection s)
connect :: Settings s -> Host -> Port -> IO (Connection s)
connect Settings s
settings Host
host Port
port = do
  HashTable RealWorld ReqID (ReqPoolEntry s)
req_pool <- IO (HashTable RealWorld ReqID (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
HashTable h =>
IO (IOHashTable h k v)
HT.new  -- Do not manipulate req_pool in this thread. It belongs to runWSConn thread.
  TBQueue (ReqPack s)
qreq <- Natural -> IO (TBQueue (ReqPack s))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
qreq_size
  TMVar (Either SomeException ())
var_connect_result <- IO (TMVar (Either SomeException ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
  TVar ConnectionState
var_conn_state <- ConnectionState -> IO (TVar ConnectionState)
forall a. a -> IO (TVar a)
newTVarIO ConnectionState
ConnOpen
  Async ()
ws_thread <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Settings s
-> Host
-> Port
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
forall s.
Settings s
-> Host
-> Port
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Port
port Host
ws_path HashTable RealWorld ReqID (ReqPoolEntry s)
ReqPool s
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state
  Either SomeException ()
eret <- STM (Either SomeException ()) -> IO (Either SomeException ())
forall a. STM a -> IO a
atomically (STM (Either SomeException ()) -> IO (Either SomeException ()))
-> STM (Either SomeException ()) -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ TMVar (Either SomeException ()) -> STM (Either SomeException ())
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException ())
var_connect_result
  case Either SomeException ()
eret of
   Left SomeException
e -> SomeException -> IO (Connection s)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
   Right () -> Connection s -> IO (Connection s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection s -> IO (Connection s))
-> Connection s -> IO (Connection s)
forall a b. (a -> b) -> a -> b
$ Connection :: forall s.
TBQueue (ReqPack s)
-> TVar ConnectionState -> Async () -> Codec s -> Connection s
Connection { connQReq :: TBQueue (ReqPack s)
connQReq = TBQueue (ReqPack s)
qreq,
                                     connState :: TVar ConnectionState
connState = TVar ConnectionState
var_conn_state,
                                     connWSThread :: Async ()
connWSThread = Async ()
ws_thread,
                                     connCodec :: Codec s
connCodec = Codec s
codec
                                   }
  where
    codec :: Codec s
codec = Settings s -> Codec s
forall s. Settings s -> Codec s
Settings.codec Settings s
settings
    qreq_size :: Natural
qreq_size = Port -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Port -> Natural) -> Port -> Natural
forall a b. (a -> b) -> a -> b
$ Settings s -> Port
forall s. Settings s -> Port
Settings.requestQueueSize Settings s
settings
    ws_path :: Host
ws_path = Settings s -> Host
forall s. Settings s -> Host
Settings.endpointPath Settings s
settings

-- | Close the 'Connection'.
--
-- If there are pending requests in the 'Connection', 'close' function
-- blocks for them to complete or time out.
-- 
-- Calling 'close' on a 'Connection' already closed (or waiting to
-- close) does nothing.
close :: Connection s -> IO ()
close :: Connection s -> IO ()
close Connection s
conn = do
  Bool
need_wait <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
    ConnectionState
cur_state <- TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar (TVar ConnectionState -> STM ConnectionState)
-> TVar ConnectionState -> STM ConnectionState
forall a b. (a -> b) -> a -> b
$ Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
    case ConnectionState
cur_state of
     ConnectionState
ConnClosed -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
     ConnectionState
ConnClosing -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
     ConnectionState
ConnOpen -> do
       TVar ConnectionState -> ConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn) ConnectionState
ConnClosing
       Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
  if Bool
need_wait then IO ()
waitForClose else () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    waitForClose :: IO ()
waitForClose = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      ConnectionState
cur_state <- TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar (TVar ConnectionState -> STM ConnectionState)
-> TVar ConnectionState -> STM ConnectionState
forall a b. (a -> b) -> a -> b
$ Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
      if ConnectionState
cur_state ConnectionState -> ConnectionState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnClosed
        then () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        else STM ()
forall a. STM a
retry
  
type Path = String

-- | A thread taking care of a WS connection.
runWSConn :: Settings s
          -> Host -> Port -> Path
          -> ReqPool s -> TBQueue (ReqPack s)
          -> TMVar (Either SomeException ()) -> TVar ConnectionState
          -> IO ()
runWSConn :: Settings s
-> Host
-> Port
-> Host
-> ReqPool s
-> TBQueue (ReqPack s)
-> TMVar (Either SomeException ())
-> TVar ConnectionState
-> IO ()
runWSConn Settings s
settings Host
host Port
port Host
path ReqPool s
req_pool TBQueue (ReqPack s)
qreq TMVar (Either SomeException ())
var_connect_result TVar ConnectionState
var_conn_state =
  (IO ()
doConnect IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) e a b.
(MonadMask m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` SomeException -> IO ()
reportFatalEx) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
finalize
  where
    doConnect :: IO ()
doConnect = Host -> Port -> Host -> ClientApp () -> IO ()
forall a. Host -> Port -> Host -> ClientApp a -> IO a
WS.runClient Host
host Port
port Host
path (ClientApp () -> IO ()) -> ClientApp () -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
wsconn -> do
      Bool
is_success <- IO Bool
checkAndReportConnectSuccess
      if Bool -> Bool
not Bool
is_success
        then () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- result is already reported at var_connect_result
        else ClientApp ()
setupMux Connection
wsconn
    setupMux :: ClientApp ()
setupMux Connection
wsconn = do
      TQueue RawRes
qres <- IO (TQueue RawRes)
forall a. IO (TQueue a)
newTQueueIO
      IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
rx_thread -> 
        Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
forall s.
Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres (TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state) Async ()
rx_thread
    checkAndReportConnectSuccess :: IO Bool
checkAndReportConnectSuccess = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
      Maybe (Either SomeException ())
mret <- TMVar (Either SomeException ())
-> STM (Maybe (Either SomeException ()))
forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar (Either SomeException ())
var_connect_result
      case Maybe (Either SomeException ())
mret of
       -- usually, mret should be Nothing.
       Maybe (Either SomeException ())
Nothing -> do
         TMVar (Either SomeException ())
-> Either SomeException () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException ())
var_connect_result (Either SomeException () -> STM ())
-> Either SomeException () -> STM ()
forall a b. (a -> b) -> a -> b
$ () -> Either SomeException ()
forall a b. b -> Either a b
Right ()
         Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
       Just (Right ()
_) -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
       Just (Left SomeException
_) -> Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    reportFatalEx :: SomeException -> IO ()
    reportFatalEx :: SomeException -> IO ()
reportFatalEx SomeException
cause = do
      SomeException -> IO ()
reportToConnectCaller SomeException
cause
      ReqPool s -> SomeException -> IO ()
forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause
      TBQueue (ReqPack s) -> SomeException -> IO ()
forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause
    reportToConnectCaller :: SomeException -> IO ()
reportToConnectCaller SomeException
cause = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar (Either SomeException ())
-> Either SomeException () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (Either SomeException ())
var_connect_result (Either SomeException () -> STM Bool)
-> Either SomeException () -> STM Bool
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
cause
    finalize :: IO ()
finalize = do
      ReqPool s -> IO ()
forall s. ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnectionState -> ConnectionState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnectionState
var_conn_state ConnectionState
ConnClosed

reportToReqPool :: ReqPool s -> SomeException -> IO ()
reportToReqPool :: ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
cause = ((ReqID, ReqPoolEntry s) -> IO ()) -> ReqPool s -> IO ()
forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ (ReqID, ReqPoolEntry s) -> IO ()
forall a s. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
  where
    forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPoolEntry s -> TQueue (ResPack s)
forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left SomeException
cause

reportToQReq :: TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq :: TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
cause = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  [ReqPack s]
reqpacks <- TBQueue (ReqPack s) -> STM [ReqPack s]
forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue (ReqPack s)
qreq
  [ReqPack s] -> (ReqPack s -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ReqPack s]
reqpacks ReqPack s -> STM ()
forall s. ReqPack s -> STM ()
reportToReqPack
  where
    reportToReqPack :: ReqPack s -> STM ()
reportToReqPack ReqPack s
reqp = TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPack s -> TQueue (ResPack s)
forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
reqp) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left SomeException
cause

-- | An exception related to a specific request.
data RequestException =
    AlreadyClosed
    -- ^ The connection is already closed before it sends the request.
  | ServerClosed
    -- ^ The server closed the connection before it sends response for
    -- this request.
  | DuplicateRequestId UUID
    -- ^ The requestId (kept in this object) is already pending in the
    -- connection.
  | ResponseTimeout
    -- ^ The server fails to send ResponseMessages within
    -- 'Settings.responseTimeout'.
  deriving (Port -> RequestException -> ShowS
[RequestException] -> ShowS
RequestException -> Host
(Port -> RequestException -> ShowS)
-> (RequestException -> Host)
-> ([RequestException] -> ShowS)
-> Show RequestException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [RequestException] -> ShowS
$cshowList :: [RequestException] -> ShowS
show :: RequestException -> Host
$cshow :: RequestException -> Host
showsPrec :: Port -> RequestException -> ShowS
$cshowsPrec :: Port -> RequestException -> ShowS
Show,RequestException -> RequestException -> Bool
(RequestException -> RequestException -> Bool)
-> (RequestException -> RequestException -> Bool)
-> Eq RequestException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RequestException -> RequestException -> Bool
$c/= :: RequestException -> RequestException -> Bool
== :: RequestException -> RequestException -> Bool
$c== :: RequestException -> RequestException -> Bool
Eq,Typeable)

instance Exception RequestException

data ReqPoolEntry s =
  ReqPoolEntry
  { ReqPoolEntry s -> ReqID
rpeReqId :: !ReqID,
    ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput :: !(TQueue (ResPack s)),
    ReqPoolEntry s -> Async ReqID
rpeTimer :: !(Async ReqID)
    -- ^ timer thread to time out response.
  }

-- | (requestId of pending request) --> (objects related to that pending request)
type ReqPool s = HT.BasicHashTable ReqID (ReqPoolEntry s)

-- | Multiplexed event object
data MuxEvent s = EvReq (ReqPack s)
                | EvRes RawRes
                | EvActiveClose
                | EvRxFinish
                | EvRxError SomeException
                | EvResponseTimeout ReqID

-- | HashTable's mutateIO is available since 1.2.3.0
tryInsertToReqPool :: ReqPool s
                   -> ReqID
                   -> IO (ReqPoolEntry s) -- ^ action to create the new entry.
                   -> IO Bool -- ^ 'True' if insertion is successful.
tryInsertToReqPool :: ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeEntry = do
  Maybe (ReqPoolEntry s)
mexist_entry <- ReqPool s -> ReqID -> IO (Maybe (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
  case Maybe (ReqPoolEntry s)
mexist_entry of
   Just ReqPoolEntry s
_ -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
   Maybe (ReqPoolEntry s)
Nothing -> do
     ReqPoolEntry s
new_entry <- IO (ReqPoolEntry s)
makeEntry
     ReqPool s -> ReqID -> ReqPoolEntry s -> IO ()
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> v -> IO ()
HT.insert ReqPool s
req_pool ReqID
rid ReqPoolEntry s
new_entry
     Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

cleanupReqPoolEntry :: ReqPoolEntry s -> IO ()
cleanupReqPoolEntry :: ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry = Async ReqID -> IO ()
forall a. Async a -> IO ()
Async.cancel (Async ReqID -> IO ()) -> Async ReqID -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqPoolEntry s -> Async ReqID
forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry

removeReqPoolEntry :: ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry :: ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry = do
  ReqPoolEntry s -> IO ()
forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry
  ReqPool s -> ReqID -> IO ()
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO ()
HT.delete ReqPool s
req_pool (ReqID -> IO ()) -> ReqID -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqPoolEntry s -> ReqID
forall s. ReqPoolEntry s -> ReqID
rpeReqId ReqPoolEntry s
entry

cleanupReqPool :: ReqPool s -> IO ()
cleanupReqPool :: ReqPool s -> IO ()
cleanupReqPool ReqPool s
req_pool = ((ReqID, ReqPoolEntry s) -> IO ()) -> ReqPool s -> IO ()
forall (h :: * -> * -> * -> *) k v a.
HashTable h =>
((k, v) -> IO a) -> IOHashTable h k v -> IO ()
HT.mapM_ (ReqID, ReqPoolEntry s) -> IO ()
forall a s. (a, ReqPoolEntry s) -> IO ()
forEntry ReqPool s
req_pool
  where
    forEntry :: (a, ReqPoolEntry s) -> IO ()
forEntry (a
_, ReqPoolEntry s
entry) = ReqPoolEntry s -> IO ()
forall s. ReqPoolEntry s -> IO ()
cleanupReqPoolEntry ReqPoolEntry s
entry

getAllResponseTimers :: ReqPool s -> IO [Async ReqID]
getAllResponseTimers :: ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool = (([(ReqID, ReqPoolEntry s)] -> [Async ReqID])
-> IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([(ReqID, ReqPoolEntry s)] -> [Async ReqID])
 -> IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID])
-> (((ReqID, ReqPoolEntry s) -> Async ReqID)
    -> [(ReqID, ReqPoolEntry s)] -> [Async ReqID])
-> ((ReqID, ReqPoolEntry s) -> Async ReqID)
-> IO [(ReqID, ReqPoolEntry s)]
-> IO [Async ReqID]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((ReqID, ReqPoolEntry s) -> Async ReqID)
-> [(ReqID, ReqPoolEntry s)] -> [Async ReqID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) (ReqID, ReqPoolEntry s) -> Async ReqID
forall a s. (a, ReqPoolEntry s) -> Async ReqID
toTimer (IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID])
-> IO [(ReqID, ReqPoolEntry s)] -> IO [Async ReqID]
forall a b. (a -> b) -> a -> b
$ ReqPool s -> IO [(ReqID, ReqPoolEntry s)]
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> IO [(k, v)]
HT.toList ReqPool s
req_pool
  where
    toTimer :: (a, ReqPoolEntry s) -> Async ReqID
toTimer (a
_, ReqPoolEntry s
entry) = ReqPoolEntry s -> Async ReqID
forall s. ReqPoolEntry s -> Async ReqID
rpeTimer ReqPoolEntry s
entry

-- | Multiplexer loop.
runMuxLoop :: WS.Connection -> ReqPool s -> Settings s
           -> TBQueue (ReqPack s) -> TQueue RawRes -> STM ConnectionState
           -> Async ()
           -> IO ()
runMuxLoop :: Connection
-> ReqPool s
-> Settings s
-> TBQueue (ReqPack s)
-> TQueue RawRes
-> STM ConnectionState
-> Async ()
-> IO ()
runMuxLoop Connection
wsconn ReqPool s
req_pool Settings s
settings TBQueue (ReqPack s)
qreq TQueue RawRes
qres STM ConnectionState
readConnState Async ()
rx_thread = IO ()
loop
  where
    codec :: Codec s
codec = Settings s -> Codec s
forall s. Settings s -> Codec s
Settings.codec Settings s
settings
    loop :: IO ()
loop = do
      [Async ReqID]
res_timers <- ReqPool s -> IO [Async ReqID]
forall s. ReqPool s -> IO [Async ReqID]
getAllResponseTimers ReqPool s
req_pool
      MuxEvent s
event <- STM (MuxEvent s) -> IO (MuxEvent s)
forall a. STM a -> IO a
atomically (STM (MuxEvent s) -> IO (MuxEvent s))
-> STM (MuxEvent s) -> IO (MuxEvent s)
forall a b. (a -> b) -> a -> b
$ [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers
      case MuxEvent s
event of
       EvReq ReqPack s
req -> ReqPack s -> IO ()
handleReq ReqPack s
req IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
       EvRes RawRes
res -> RawRes -> IO ()
handleRes RawRes
res IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
       MuxEvent s
EvActiveClose -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       MuxEvent s
EvRxFinish -> IO ()
handleRxFinish
       EvRxError SomeException
e -> SomeException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
       EvResponseTimeout ReqID
rid -> ReqID -> IO ()
handleResponseTimeout ReqID
rid IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
    getEventSTM :: [Async ReqID] -> STM (MuxEvent s)
getEventSTM [Async ReqID]
res_timers = STM (MuxEvent s)
getRequest
                             STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (RawRes -> MuxEvent s
forall s. RawRes -> MuxEvent s
EvRes (RawRes -> MuxEvent s) -> STM RawRes -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TQueue RawRes -> STM RawRes
forall a. TQueue a -> STM a
readTQueue TQueue RawRes
qres)
                             STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (MuxEvent s)
forall s. STM (MuxEvent s)
makeEvActiveClose
                             STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Either SomeException () -> MuxEvent s
forall s. Either SomeException () -> MuxEvent s
rxResultToEvent (Either SomeException () -> MuxEvent s)
-> STM (Either SomeException ()) -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async () -> STM (Either SomeException ())
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ()
rx_thread)
                             STM (MuxEvent s) -> STM (MuxEvent s) -> STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((Async ReqID, ReqID) -> MuxEvent s
forall a s. (a, ReqID) -> MuxEvent s
timeoutToEvent ((Async ReqID, ReqID) -> MuxEvent s)
-> STM (Async ReqID, ReqID) -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Async ReqID] -> STM (Async ReqID, ReqID)
forall a. [Async a] -> STM (Async a, a)
waitAnySTM [Async ReqID]
res_timers)
        where
          max_concurrency :: Port
max_concurrency = Settings s -> Port
forall s. Settings s -> Port
Settings.concurrency Settings s
settings
          cur_concurrency :: Port
cur_concurrency = [Async ReqID] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [Async ReqID]
res_timers
          getRequest :: STM (MuxEvent s)
getRequest = if Port
cur_concurrency Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
< Port
max_concurrency
                       then ReqPack s -> MuxEvent s
forall s. ReqPack s -> MuxEvent s
EvReq (ReqPack s -> MuxEvent s) -> STM (ReqPack s) -> STM (MuxEvent s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue (ReqPack s) -> STM (ReqPack s)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (ReqPack s)
qreq
                       else STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a
empty
          rxResultToEvent :: Either SomeException () -> MuxEvent s
rxResultToEvent (Right ()) = MuxEvent s
forall s. MuxEvent s
EvRxFinish
          rxResultToEvent (Left SomeException
e) = SomeException -> MuxEvent s
forall s. SomeException -> MuxEvent s
EvRxError SomeException
e
          timeoutToEvent :: (a, ReqID) -> MuxEvent s
timeoutToEvent (a
_, ReqID
rid) = ReqID -> MuxEvent s
forall s. ReqID -> MuxEvent s
EvResponseTimeout ReqID
rid
          makeEvActiveClose :: STM (MuxEvent s)
makeEvActiveClose = do
            if Port
cur_concurrency Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
> Port
0
              then STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a
empty
              else do
                ConnectionState
conn_state <- STM ConnectionState
readConnState
                if ConnectionState
conn_state ConnectionState -> ConnectionState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen then STM (MuxEvent s)
forall (f :: * -> *) a. Alternative f => f a
empty else MuxEvent s -> STM (MuxEvent s)
forall (m :: * -> *) a. Monad m => a -> m a
return MuxEvent s
forall s. MuxEvent s
EvActiveClose
    handleReq :: ReqPack s -> IO ()
handleReq ReqPack s
req = do
      Bool
insert_ok <- ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
forall s. ReqPool s -> ReqID -> IO (ReqPoolEntry s) -> IO Bool
tryInsertToReqPool ReqPool s
req_pool ReqID
rid IO (ReqPoolEntry s)
makeNewEntry
      if Bool
insert_ok
        then Connection -> RawRes -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
wsconn (RawRes -> IO ()) -> RawRes -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqPack s -> RawRes
forall s. ReqPack s -> RawRes
reqData ReqPack s
req
        else IO ()
reportError
        where
          rid :: ReqID
rid = ReqPack s -> ReqID
forall s. ReqPack s -> ReqID
reqId ReqPack s
req
          qout :: TQueue (ResPack s)
qout = ReqPack s -> TQueue (ResPack s)
forall s. ReqPack s -> TQueue (ResPack s)
reqOutput ReqPack s
req
          makeNewEntry :: IO (ReqPoolEntry s)
makeNewEntry = do
            Async ReqID
timer_thread <- Port -> ReqID -> IO (Async ReqID)
runTimer (Settings s -> Port
forall s. Settings s -> Port
Settings.responseTimeout Settings s
settings) ReqID
rid
            ReqPoolEntry s -> IO (ReqPoolEntry s)
forall (m :: * -> *) a. Monad m => a -> m a
return (ReqPoolEntry s -> IO (ReqPoolEntry s))
-> ReqPoolEntry s -> IO (ReqPoolEntry s)
forall a b. (a -> b) -> a -> b
$ ReqPoolEntry :: forall s.
ReqID -> TQueue (ResPack s) -> Async ReqID -> ReqPoolEntry s
ReqPoolEntry { $sel:rpeReqId:ReqPoolEntry :: ReqID
rpeReqId = ReqID
rid,
                                    $sel:rpeOutput:ReqPoolEntry :: TQueue (ResPack s)
rpeOutput = TQueue (ResPack s)
qout,
                                    $sel:rpeTimer:ReqPoolEntry :: Async ReqID
rpeTimer = Async ReqID
timer_thread
                                  }
          reportError :: IO ()
reportError =
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ResPack s)
qout (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left (SomeException -> ResPack s) -> SomeException -> ResPack s
forall a b. (a -> b) -> a -> b
$ RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException (RequestException -> SomeException)
-> RequestException -> SomeException
forall a b. (a -> b) -> a -> b
$ ReqID -> RequestException
DuplicateRequestId ReqID
rid
    handleRes :: RawRes -> IO ()
handleRes RawRes
res = case Codec s -> RawRes -> Either Host (ResponseMessage s)
forall s. Codec s -> RawRes -> Either Host (ResponseMessage s)
decodeWith Codec s
codec RawRes
res of
      Left Host
err -> Settings s -> GeneralException -> IO ()
forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings (GeneralException -> IO ()) -> GeneralException -> IO ()
forall a b. (a -> b) -> a -> b
$ Host -> GeneralException
ResponseParseFailure Host
err
      Right ResponseMessage s
res_msg -> ResponseMessage s -> IO ()
handleResMsg ResponseMessage s
res_msg
    handleResMsg :: ResponseMessage s -> IO ()
handleResMsg res_msg :: ResponseMessage s
res_msg@(ResponseMessage { requestId :: forall s. ResponseMessage s -> ReqID
requestId = ReqID
rid }) = do
      Maybe (ReqPoolEntry s)
m_entry <- ReqPool s -> ReqID -> IO (Maybe (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
      case Maybe (ReqPoolEntry s)
m_entry of
       Maybe (ReqPoolEntry s)
Nothing -> Settings s -> GeneralException -> IO ()
forall s. Settings s -> GeneralException -> IO ()
Settings.onGeneralException Settings s
settings (GeneralException -> IO ()) -> GeneralException -> IO ()
forall a b. (a -> b) -> a -> b
$ ReqID -> GeneralException
UnexpectedRequestId ReqID
rid
       Just ReqPoolEntry s
entry -> do
         Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResponseMessage s -> Bool
forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res_msg) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
           ReqPool s -> ReqPoolEntry s -> IO ()
forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry
         STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPoolEntry s -> TQueue (ResPack s)
forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ ResponseMessage s -> ResPack s
forall a b. b -> Either a b
Right ResponseMessage s
res_msg
    handleRxFinish :: IO ()
handleRxFinish = do
      -- RxFinish is an error for pending requests. If there is no
      -- pending requests, it's totally normal.
      let ex :: SomeException
ex = RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException RequestException
ServerClosed
      ReqPool s -> SomeException -> IO ()
forall s. ReqPool s -> SomeException -> IO ()
reportToReqPool ReqPool s
req_pool SomeException
ex
      TBQueue (ReqPack s) -> SomeException -> IO ()
forall s. TBQueue (ReqPack s) -> SomeException -> IO ()
reportToQReq TBQueue (ReqPack s)
qreq SomeException
ex
    handleResponseTimeout :: ReqID -> IO ()
handleResponseTimeout ReqID
rid = do
      Maybe (ReqPoolEntry s)
mentry <- ReqPool s -> ReqID -> IO (Maybe (ReqPoolEntry s))
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO (Maybe v)
HT.lookup ReqPool s
req_pool ReqID
rid
      case Maybe (ReqPoolEntry s)
mentry of
       Maybe (ReqPoolEntry s)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- this case may happen if the response came just before the time-out, I think.
       Just ReqPoolEntry s
entry -> do
         STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (ResPack s) -> ResPack s -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (ReqPoolEntry s -> TQueue (ResPack s)
forall s. ReqPoolEntry s -> TQueue (ResPack s)
rpeOutput ReqPoolEntry s
entry) (ResPack s -> STM ()) -> ResPack s -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResPack s
forall a b. a -> Either a b
Left (SomeException -> ResPack s) -> SomeException -> ResPack s
forall a b. (a -> b) -> a -> b
$ RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException (RequestException -> SomeException)
-> RequestException -> SomeException
forall a b. (a -> b) -> a -> b
$ RequestException
ResponseTimeout
         ReqPool s -> ReqPoolEntry s -> IO ()
forall s. ReqPool s -> ReqPoolEntry s -> IO ()
removeReqPoolEntry ReqPool s
req_pool ReqPoolEntry s
entry


-- | Receiver thread. It keeps receiving data from WS until the
-- connection finishes cleanly. Basically every exception is raised to
-- the caller.
runRxLoop :: WS.Connection -> TQueue RawRes -> IO ()
runRxLoop :: Connection -> TQueue RawRes -> IO ()
runRxLoop Connection
wsconn TQueue RawRes
qres = IO ()
loop
  where
    loop :: IO ()
loop = do
      Maybe RawRes
mgot <- IO (Maybe RawRes)
tryReceive
      case Maybe RawRes
mgot of
        Maybe RawRes
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just RawRes
got -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue RawRes -> RawRes -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue RawRes
qres RawRes
got
          IO ()
loop
    tryReceive :: IO (Maybe RawRes)
tryReceive = Either ConnectionException RawRes -> IO (Maybe RawRes)
forall (m :: * -> *) a.
MonadThrow m =>
Either ConnectionException a -> m (Maybe a)
toMaybe (Either ConnectionException RawRes -> IO (Maybe RawRes))
-> IO (Either ConnectionException RawRes) -> IO (Maybe RawRes)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (IO RawRes -> IO (Either ConnectionException RawRes)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (IO RawRes -> IO (Either ConnectionException RawRes))
-> IO RawRes -> IO (Either ConnectionException RawRes)
forall a b. (a -> b) -> a -> b
$ Connection -> IO RawRes
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
wsconn)
      where
        toMaybe :: Either ConnectionException a -> m (Maybe a)
toMaybe (Right a
d) = Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
d
        toMaybe (Left e :: ConnectionException
e@(WS.CloseRequest Word16
close_status RawRes
_)) = do
          if Word16
close_status Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
1000 -- "normal closure". See sec. 7.4, RFC 6455.
            then Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            else ConnectionException -> m (Maybe a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ConnectionException
e
        -- We allow the server to close the connection without sending Close request message.
        toMaybe (Left ConnectionException
WS.ConnectionClosed) = Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        toMaybe (Left ConnectionException
e) = ConnectionException -> m (Maybe a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ConnectionException
e

runTimer :: Int -> ReqID -> IO (Async ReqID)
runTimer :: Port -> ReqID -> IO (Async ReqID)
runTimer Port
wait_sec ReqID
rid = IO ReqID -> IO (Async ReqID)
forall a. IO a -> IO (Async a)
async (IO ReqID -> IO (Async ReqID)) -> IO ReqID -> IO (Async ReqID)
forall a b. (a -> b) -> a -> b
$ do
  Port -> IO ()
threadDelay (Port -> IO ()) -> Port -> IO ()
forall a b. (a -> b) -> a -> b
$ Port
wait_sec Port -> Port -> Port
forall a. Num a => a -> a -> a
* Port
1000000
  ReqID -> IO ReqID
forall (m :: * -> *) a. Monad m => a -> m a
return ReqID
rid
  

-- | A handle associated in a 'Connection' for a pair of request and
-- response. You can retrieve 'ResponseMessage's from this object.
--
-- Type @s@ is the body of the response.
data ResponseHandle s =
  ResponseHandle
  { ResponseHandle s -> STM (ResPack s)
rhGetResponse :: STM (ResPack s),
    ResponseHandle s -> TVar Bool
rhTerminated :: TVar Bool
  }

instance Functor ResponseHandle where
  fmap :: (a -> b) -> ResponseHandle a -> ResponseHandle b
fmap a -> b
f ResponseHandle a
rh = ResponseHandle a
rh { $sel:rhGetResponse:ResponseHandle :: STM (ResPack b)
rhGetResponse = ((Either SomeException (ResponseMessage a) -> ResPack b)
-> STM (Either SomeException (ResponseMessage a))
-> STM (ResPack b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Either SomeException (ResponseMessage a) -> ResPack b)
 -> STM (Either SomeException (ResponseMessage a))
 -> STM (ResPack b))
-> ((a -> b)
    -> Either SomeException (ResponseMessage a) -> ResPack b)
-> (a -> b)
-> STM (Either SomeException (ResponseMessage a))
-> STM (ResPack b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResponseMessage a -> ResponseMessage b)
-> Either SomeException (ResponseMessage a) -> ResPack b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((ResponseMessage a -> ResponseMessage b)
 -> Either SomeException (ResponseMessage a) -> ResPack b)
-> ((a -> b) -> ResponseMessage a -> ResponseMessage b)
-> (a -> b)
-> Either SomeException (ResponseMessage a)
-> ResPack b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> b) -> ResponseMessage a -> ResponseMessage b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) a -> b
f (STM (Either SomeException (ResponseMessage a)) -> STM (ResPack b))
-> STM (Either SomeException (ResponseMessage a))
-> STM (ResPack b)
forall a b. (a -> b) -> a -> b
$ ResponseHandle a -> STM (Either SomeException (ResponseMessage a))
forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle a
rh }


-- | Make a 'RequestMessage' from an 'Operation' and send it.
--
-- Usually this function does not throw any exception. Exceptions
-- about sending requests are reported when you operate on
-- 'ResponseHandle'.
sendRequest :: Operation o => Connection s -> o -> IO (ResponseHandle s)
sendRequest :: Connection s -> o -> IO (ResponseHandle s)
sendRequest Connection s
conn o
o = Connection s -> RequestMessage -> IO (ResponseHandle s)
forall s. Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn (RequestMessage -> IO (ResponseHandle s))
-> IO RequestMessage -> IO (ResponseHandle s)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< o -> IO RequestMessage
forall o. Operation o => o -> IO RequestMessage
makeRequestMessage o
o

-- | Like 'sendRequest', but you can pass a 'RequestMessage' directly
-- to this function.
sendRequest' :: Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' :: Connection s -> RequestMessage -> IO (ResponseHandle s)
sendRequest' Connection s
conn RequestMessage
req_msg = do
  TQueue (ResPack s)
qout <- IO (TQueue (ResPack s))
forall a. IO (TQueue a)
newTQueueIO
  Bool
is_open <- IO Bool
getConnectionOpen
  if Bool
is_open
    then TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout
    else TQueue (ResPack s) -> IO ()
forall b. TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (ResPack s)
qout
  TQueue (ResPack s) -> IO (ResponseHandle s)
forall s. TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout
  where
    codec :: Codec s
codec = Connection s -> Codec s
forall s. Connection s -> Codec s
connCodec Connection s
conn
    qreq :: TBQueue (ReqPack s)
qreq = Connection s -> TBQueue (ReqPack s)
forall s. Connection s -> TBQueue (ReqPack s)
connQReq Connection s
conn
    var_conn_state :: TVar ConnectionState
var_conn_state = Connection s -> TVar ConnectionState
forall s. Connection s -> TVar ConnectionState
connState Connection s
conn
    rid :: ReqID
rid = RequestMessage -> ReqID
requestId (RequestMessage
req_msg :: RequestMessage)
    getConnectionOpen :: IO Bool
getConnectionOpen = (ConnectionState -> Bool) -> IO ConnectionState -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ConnectionState -> ConnectionState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionState
ConnOpen) (IO ConnectionState -> IO Bool) -> IO ConnectionState -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM ConnectionState -> IO ConnectionState
forall a. STM a -> IO a
atomically (STM ConnectionState -> IO ConnectionState)
-> STM ConnectionState -> IO ConnectionState
forall a b. (a -> b) -> a -> b
$ TVar ConnectionState -> STM ConnectionState
forall a. TVar a -> STM a
readTVar TVar ConnectionState
var_conn_state
    sendReqPack :: TQueue (ResPack s) -> IO ()
sendReqPack TQueue (ResPack s)
qout = do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue (ReqPack s) -> ReqPack s -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (ReqPack s)
qreq ReqPack s
reqpack
      where
        reqpack :: ReqPack s
reqpack = ReqPack :: forall s. RawRes -> ReqID -> TQueue (ResPack s) -> ReqPack s
ReqPack
                  { reqData :: RawRes
reqData = Codec s -> RequestMessage -> RawRes
forall s. Codec s -> RequestMessage -> RawRes
encodeBinaryWith Codec s
codec RequestMessage
req_msg,
                    reqId :: ReqID
reqId = ReqID
rid,
                    reqOutput :: TQueue (ResPack s)
reqOutput = TQueue (ResPack s)
qout
                  }
    makeResHandle :: TQueue (ResPack s) -> IO (ResponseHandle s)
makeResHandle TQueue (ResPack s)
qout = do
      TVar Bool
var_term <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
      ResponseHandle s -> IO (ResponseHandle s)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResponseHandle s -> IO (ResponseHandle s))
-> ResponseHandle s -> IO (ResponseHandle s)
forall a b. (a -> b) -> a -> b
$ ResponseHandle :: forall s. STM (ResPack s) -> TVar Bool -> ResponseHandle s
ResponseHandle
               { $sel:rhGetResponse:ResponseHandle :: STM (ResPack s)
rhGetResponse = TQueue (ResPack s) -> STM (ResPack s)
forall a. TQueue a -> STM a
readTQueue TQueue (ResPack s)
qout,
                 $sel:rhTerminated:ResponseHandle :: TVar Bool
rhTerminated = TVar Bool
var_term
               }
    reportAlreadyClosed :: TQueue (Either SomeException b) -> IO ()
reportAlreadyClosed TQueue (Either SomeException b)
qout = do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (Either SomeException b) -> Either SomeException b -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Either SomeException b)
qout (Either SomeException b -> STM ())
-> Either SomeException b -> STM ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException b
forall a b. a -> Either a b
Left (SomeException -> Either SomeException b)
-> SomeException -> Either SomeException b
forall a b. (a -> b) -> a -> b
$ RequestException -> SomeException
forall e. Exception e => e -> SomeException
toException (RequestException -> SomeException)
-> RequestException -> SomeException
forall a b. (a -> b) -> a -> b
$ RequestException
AlreadyClosed
    

-- | Get a 'ResponseMessage' from 'ResponseHandle'. If you have
-- already got all responses, it returns 'Nothing'. This function may
-- block for a new 'ResponseMessage' to come.
--
-- On error, it may throw all sorts of exceptions including
-- 'RequestException'.
nextResponse :: ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse :: ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse = STM (Maybe (ResponseMessage s)) -> IO (Maybe (ResponseMessage s))
forall a. STM a -> IO a
atomically (STM (Maybe (ResponseMessage s)) -> IO (Maybe (ResponseMessage s)))
-> (ResponseHandle s -> STM (Maybe (ResponseMessage s)))
-> ResponseHandle s
-> IO (Maybe (ResponseMessage s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseHandle s -> STM (Maybe (ResponseMessage s))
forall s. ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM

-- | 'STM' version of 'nextResponse'.
nextResponseSTM :: ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM :: ResponseHandle s -> STM (Maybe (ResponseMessage s))
nextResponseSTM ResponseHandle s
rh = do
  Bool
termed <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (TVar Bool -> STM Bool) -> TVar Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ ResponseHandle s -> TVar Bool
forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh
  if Bool
termed
    then Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ResponseMessage s)
forall a. Maybe a
Nothing
    else STM (Maybe (ResponseMessage s))
readResponse
  where
    readResponse :: STM (Maybe (ResponseMessage s))
readResponse = do
      ResPack s
eres <- ResponseHandle s -> STM (ResPack s)
forall s. ResponseHandle s -> STM (ResPack s)
rhGetResponse ResponseHandle s
rh
      case ResPack s
eres of
       Left SomeException
ex -> SomeException -> STM (Maybe (ResponseMessage s))
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
ex -- throw in STM. The eres is put back to the queue.
       Right ResponseMessage s
res -> do
         ResponseMessage s -> STM ()
forall s. ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res
         Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s)))
-> Maybe (ResponseMessage s) -> STM (Maybe (ResponseMessage s))
forall a b. (a -> b) -> a -> b
$ ResponseMessage s -> Maybe (ResponseMessage s)
forall a. a -> Maybe a
Just ResponseMessage s
res
    updateTermed :: ResponseMessage s -> STM ()
updateTermed ResponseMessage s
res =
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ResponseMessage s -> Bool
forall s. ResponseMessage s -> Bool
isTerminatingResponse ResponseMessage s
res) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
        TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (ResponseHandle s -> TVar Bool
forall s. ResponseHandle s -> TVar Bool
rhTerminated ResponseHandle s
rh) Bool
True

isTerminatingResponse :: ResponseMessage s -> Bool
isTerminatingResponse :: ResponseMessage s -> Bool
isTerminatingResponse (ResponseMessage { status :: forall s. ResponseMessage s -> ResponseStatus
status = (ResponseStatus { code :: ResponseStatus -> ResponseCode
code = ResponseCode
c }) }) =
  ResponseCode -> Bool
isTerminating ResponseCode
c

-- | Get all remaining 'ResponseMessage's from 'ResponseHandle'.
slurpResponses :: ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses :: ResponseHandle s -> IO (Vector (ResponseMessage s))
slurpResponses ResponseHandle s
h = IO (Maybe (ResponseMessage s)) -> IO (Vector (ResponseMessage s))
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m (Vector a)
slurp (IO (Maybe (ResponseMessage s)) -> IO (Vector (ResponseMessage s)))
-> IO (Maybe (ResponseMessage s))
-> IO (Vector (ResponseMessage s))
forall a b. (a -> b) -> a -> b
$ ResponseHandle s -> IO (Maybe (ResponseMessage s))
forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h

-- | Similar to 'slurpResponses', but this function discards the
-- responses.
--
-- @since 0.1.1.0
drainResponses :: ResponseHandle s -> IO ()
drainResponses :: ResponseHandle s -> IO ()
drainResponses ResponseHandle s
h = IO (Maybe (ResponseMessage s)) -> IO ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m ()
drain (IO (Maybe (ResponseMessage s)) -> IO ())
-> IO (Maybe (ResponseMessage s)) -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseHandle s -> IO (Maybe (ResponseMessage s))
forall s. ResponseHandle s -> IO (Maybe (ResponseMessage s))
nextResponse ResponseHandle s
h