{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

-- |A module for automatic, optimal protocol pipelining.
--
--  Protocol pipelining is a technique in which multiple requests are written
--  out to a single socket without waiting for the corresponding responses.
--  The pipelining of requests results in a dramatic improvement in protocol
--  performance.
--
--  [Optimal Pipelining] uses the least number of network packets possible
--
--  [Automatic Pipelining] means that requests are implicitly pipelined as much
--      as possible, i.e. as long as a request's response is not used before any
--      subsequent requests.
--
module Database.Redis.ProtocolPipelining (
  Connection,
  connect, enableTLS, beginReceiving, disconnect, request, send, recv, flush, fromCtx
) where

import           Prelude
import           Control.Monad
import qualified Scanner
import qualified Data.ByteString as S
import           Data.IORef
import qualified Network.Socket as NS
import qualified Network.TLS as TLS
import           System.IO.Unsafe

import           Database.Redis.Protocol
import qualified Database.Redis.ConnectionContext as CC

data Connection = Conn
  { Connection -> ConnectionContext
connCtx        :: CC.ConnectionContext -- ^ Connection socket-handle.
  , Connection -> IORef [Reply]
connReplies    :: IORef [Reply] -- ^ Reply thunks for unsent requests.
  , Connection -> IORef [Reply]
connPending    :: IORef [Reply]
    -- ^ Reply thunks for requests "in the pipeline". Refers to the same list as
    --   'connReplies', but can have an offset.
  , Connection -> IORef Int
connPendingCnt :: IORef Int
    -- ^ Number of pending replies and thus the difference length between
    --   'connReplies' and 'connPending'.
    --   length connPending  - pendingCount = length connReplies
  }


fromCtx :: CC.ConnectionContext -> IO Connection
fromCtx :: ConnectionContext -> IO Connection
fromCtx ConnectionContext
ctx = ConnectionContext
-> IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection
Conn ConnectionContext
ctx (IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Connection)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Connection) -> IO (IORef Int) -> IO Connection
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0

connect :: NS.HostName -> CC.PortID -> Maybe Int -> IO Connection
connect :: HostName -> PortID -> Maybe Int -> IO Connection
connect HostName
hostName PortID
portId Maybe Int
timeoutOpt = do
    ConnectionContext
connCtx <- HostName -> PortID -> Maybe Int -> IO ConnectionContext
CC.connect HostName
hostName PortID
portId Maybe Int
timeoutOpt
    IORef [Reply]
connReplies <- [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef []
    IORef [Reply]
connPending <- [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef []
    IORef Int
connPendingCnt <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Conn :: ConnectionContext
-> IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection
Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
..}

enableTLS :: TLS.ClientParams -> Connection -> IO Connection
enableTLS :: ClientParams -> Connection -> IO Connection
enableTLS ClientParams
tlsParams conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = do
    ConnectionContext
newCtx <- ClientParams -> ConnectionContext -> IO ConnectionContext
CC.enableTLS ClientParams
tlsParams ConnectionContext
connCtx
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn{connCtx :: ConnectionContext
connCtx = ConnectionContext
newCtx}

beginReceiving :: Connection -> IO ()
beginReceiving :: Connection -> IO ()
beginReceiving Connection
conn = do
  [Reply]
rs <- Connection -> IO [Reply]
connGetReplies Connection
conn
  IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Connection -> IORef [Reply]
connReplies Connection
conn) [Reply]
rs
  IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Connection -> IORef [Reply]
connPending Connection
conn) [Reply]
rs

disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
connCtx

-- |Write the request to the socket output buffer, without actually sending.
--  The 'Handle' is 'hFlush'ed when reading replies from the 'connCtx'.
send :: Connection -> S.ByteString -> IO ()
send :: Connection -> ByteString -> IO ()
send Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} ByteString
s = do
  ConnectionContext -> ByteString -> IO ()
CC.send ConnectionContext
connCtx ByteString
s

  -- Signal that we expect one more reply from Redis.
  Int
n <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, Int)) -> IO Int) -> (Int -> (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
n -> let n' :: Int
n' = Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1 in (Int
n', Int
n')
  -- Limit the "pipeline length". This is necessary in long pipelines, to avoid
  -- thunk build-up, and thus space-leaks.
  -- TODO find smallest max pending with good-enough performance.
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- Force oldest pending reply.
    Reply
r:[Reply]
_ <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connPending
    Reply
r Reply -> IO () -> IO ()
`seq` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- |Take a reply-thunk from the list of future replies.
recv :: Connection -> IO Reply
recv :: Connection -> IO Reply
recv Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = do
  (Reply
r:[Reply]
rs) <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connReplies
  IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Reply]
connReplies [Reply]
rs
  Reply -> IO Reply
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
r

-- | Flush the socket.  Normally, the socket is flushed in 'recv' (actually 'conGetReplies'), but
-- for the multithreaded pub/sub code, the sending thread needs to explicitly flush the subscription
-- change requests.
flush :: Connection -> IO ()
flush :: Connection -> IO ()
flush Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ConnectionContext -> IO ()
CC.flush ConnectionContext
connCtx

-- |Send a request and receive the corresponding reply
request :: Connection -> S.ByteString -> IO Reply
request :: Connection -> ByteString -> IO Reply
request Connection
conn ByteString
req = Connection -> ByteString -> IO ()
send Connection
conn ByteString
req IO () -> IO Reply -> IO Reply
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO Reply
recv Connection
conn

-- |A list of all future 'Reply's of the 'Connection'.
--
--  The spine of the list can be evaluated without forcing the replies.
--
--  Evaluating/forcing a 'Reply' from the list will 'unsafeInterleaveIO' the
--  reading and parsing from the 'connCtx'. To ensure correct ordering, each
--  Reply first evaluates (and thus reads from the network) the previous one.
--
--  'unsafeInterleaveIO' only evaluates it's result once, making this function
--  thread-safe. 'Handle' as implemented by GHC is also threadsafe, it is safe
--  to call 'hFlush' here. The list constructor '(:)' must be called from
--  /within/ unsafeInterleaveIO, to keep the replies in correct order.
connGetReplies :: Connection -> IO [Reply]
connGetReplies :: Connection -> IO [Reply]
connGetReplies conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ByteString -> Reply -> IO [Reply]
go ByteString
S.empty (ByteString -> Reply
SingleLine ByteString
"previous of first")
  where
    go :: ByteString -> Reply -> IO [Reply]
go ByteString
rest Reply
previous = do
      -- lazy pattern match to actually delay the receiving
      ~(Reply
r, ByteString
rest') <- IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a. IO a -> IO a
unsafeInterleaveIO (IO (Reply, ByteString) -> IO (Reply, ByteString))
-> IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a b. (a -> b) -> a -> b
$ do
        -- Force previous reply for correct order.
        Reply
previous Reply -> IO () -> IO ()
`seq` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Result Reply
scanResult <- IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith IO ByteString
readMore Scanner Reply
reply ByteString
rest
        case Result Reply
scanResult of
          Scanner.Fail{}       -> IO (Reply, ByteString)
forall a. IO a
CC.errConnClosed
          Scanner.More{}    -> HostName -> IO (Reply, ByteString)
forall a. HasCallStack => HostName -> a
error HostName
"Hedis: parseWith returned Partial"
          Scanner.Done ByteString
rest' Reply
r -> do
            -- r is the same as 'head' of 'connPending'. Since we just
            -- received r, we remove it from the pending list.
            IORef [Reply] -> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [Reply]
connPending (([Reply] -> ([Reply], ())) -> IO ())
-> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Reply
_:[Reply]
rs) -> ([Reply]
rs, ())
            -- We now expect one less reply from Redis. We don't count to
            -- negative, which would otherwise occur during pubsub.
            IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1), ())
            (Reply, ByteString) -> IO (Reply, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
r, ByteString
rest')
      [Reply]
rs <- IO [Reply] -> IO [Reply]
forall a. IO a -> IO a
unsafeInterleaveIO (ByteString -> Reply -> IO [Reply]
go ByteString
rest' Reply
r)
      [Reply] -> IO [Reply]
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
rReply -> [Reply] -> [Reply]
forall a. a -> [a] -> [a]
:[Reply]
rs)

    readMore :: IO ByteString
readMore = IO ByteString -> IO ByteString
forall a. IO a -> IO a
CC.ioErrorToConnLost (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
      Connection -> IO ()
flush Connection
conn
      ConnectionContext -> IO ByteString
CC.recv ConnectionContext
connCtx