{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
module Database.Redis.Connection where

import Control.Exception
import qualified Control.Monad.Catch as Catch
import Control.Monad.IO.Class(liftIO, MonadIO)
import Control.Monad(when)
import Control.Concurrent.MVar(MVar, newMVar)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import Data.Functor(void)
import qualified Data.IntMap.Strict as IntMap
import Data.Pool(Pool, withResource, createPool, destroyAllResources)
import Data.Typeable
import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM

import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
import Database.Redis.Protocol(Reply(..))
import Database.Redis.Cluster(ShardMap(..), Node, Shard(..))
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline
import Database.Redis.Commands
    ( ping
    , select
    , auth
    , clusterSlots
    , command
    , ClusterSlotsResponse(..)
    , ClusterSlotsResponseEntry(..)
    , ClusterSlotsNode(..))

--------------------------------------------------------------------------------
-- Connection
--

-- |A threadsafe pool of network connections to a Redis server. Use the
--  'connect' function to create one.
data Connection
    = NonClusteredConnection (Pool PP.Connection)
    | ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)

-- |Information for connnecting to a Redis server.
--
-- It is recommended to not use the 'ConnInfo' data constructor directly.
-- Instead use 'defaultConnectInfo' and update it with record syntax. For
-- example to connect to a password protected Redis server running on localhost
-- and listening to the default port:
--
-- @
-- myConnectInfo :: ConnectInfo
-- myConnectInfo = defaultConnectInfo {connectAuth = Just \"secret\"}
-- @
--
data ConnectInfo = ConnInfo
    { ConnectInfo -> HostName
connectHost           :: NS.HostName
    , ConnectInfo -> PortID
connectPort           :: CC.PortID
    , ConnectInfo -> Maybe ByteString
connectAuth           :: Maybe B.ByteString
    -- ^ When the server is protected by a password, set 'connectAuth' to 'Just'
    --   the password. Each connection will then authenticate by the 'auth'
    --   command.
    , ConnectInfo -> Integer
connectDatabase       :: Integer
    -- ^ Each connection will 'select' the database with the given index.
    , ConnectInfo -> Int
connectMaxConnections :: Int
    -- ^ Maximum number of connections to keep open. The smallest acceptable
    --   value is 1.
    , ConnectInfo -> NominalDiffTime
connectMaxIdleTime    :: Time.NominalDiffTime
    -- ^ Amount of time for which an unused connection is kept open. The
    --   smallest acceptable value is 0.5 seconds. If the @timeout@ value in
    --   your redis.conf file is non-zero, it should be larger than
    --   'connectMaxIdleTime'.
    , ConnectInfo -> Maybe NominalDiffTime
connectTimeout        :: Maybe Time.NominalDiffTime
    -- ^ Optional timeout until connection to Redis gets
    --   established. 'ConnectTimeoutException' gets thrown if no socket
    --   get connected in this interval of time.
    , ConnectInfo -> Maybe ClientParams
connectTLSParams      :: Maybe ClientParams
    -- ^ Optional TLS parameters. TLS will be enabled if this is provided.
    } deriving Int -> ConnectInfo -> ShowS
[ConnectInfo] -> ShowS
ConnectInfo -> HostName
(Int -> ConnectInfo -> ShowS)
-> (ConnectInfo -> HostName)
-> ([ConnectInfo] -> ShowS)
-> Show ConnectInfo
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectInfo] -> ShowS
$cshowList :: [ConnectInfo] -> ShowS
show :: ConnectInfo -> HostName
$cshow :: ConnectInfo -> HostName
showsPrec :: Int -> ConnectInfo -> ShowS
$cshowsPrec :: Int -> ConnectInfo -> ShowS
Show

data ConnectError = ConnectAuthError Reply
                  | ConnectSelectError Reply
    deriving (ConnectError -> ConnectError -> Bool
(ConnectError -> ConnectError -> Bool)
-> (ConnectError -> ConnectError -> Bool) -> Eq ConnectError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectError -> ConnectError -> Bool
$c/= :: ConnectError -> ConnectError -> Bool
== :: ConnectError -> ConnectError -> Bool
$c== :: ConnectError -> ConnectError -> Bool
Eq, Int -> ConnectError -> ShowS
[ConnectError] -> ShowS
ConnectError -> HostName
(Int -> ConnectError -> ShowS)
-> (ConnectError -> HostName)
-> ([ConnectError] -> ShowS)
-> Show ConnectError
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectError] -> ShowS
$cshowList :: [ConnectError] -> ShowS
show :: ConnectError -> HostName
$cshow :: ConnectError -> HostName
showsPrec :: Int -> ConnectError -> ShowS
$cshowsPrec :: Int -> ConnectError -> ShowS
Show, Typeable)

instance Exception ConnectError

-- |Default information for connecting:
--
-- @
--  connectHost           = \"localhost\"
--  connectPort           = PortNumber 6379 -- Redis default port
--  connectAuth           = Nothing         -- No password
--  connectDatabase       = 0               -- SELECT database 0
--  connectMaxConnections = 50              -- Up to 50 connections
--  connectMaxIdleTime    = 30              -- Keep open for 30 seconds
--  connectTimeout        = Nothing         -- Don't add timeout logic
--  connectTLSParams      = Nothing         -- Do not use TLS
-- @
--
defaultConnectInfo :: ConnectInfo
defaultConnectInfo :: ConnectInfo
defaultConnectInfo = ConnInfo :: HostName
-> PortID
-> Maybe ByteString
-> Integer
-> Int
-> NominalDiffTime
-> Maybe NominalDiffTime
-> Maybe ClientParams
-> ConnectInfo
ConnInfo
    { connectHost :: HostName
connectHost           = HostName
"localhost"
    , connectPort :: PortID
connectPort           = PortNumber -> PortID
CC.PortNumber PortNumber
6379
    , connectAuth :: Maybe ByteString
connectAuth           = Maybe ByteString
forall a. Maybe a
Nothing
    , connectDatabase :: Integer
connectDatabase       = Integer
0
    , connectMaxConnections :: Int
connectMaxConnections = Int
50
    , connectMaxIdleTime :: NominalDiffTime
connectMaxIdleTime    = NominalDiffTime
30
    , connectTimeout :: Maybe NominalDiffTime
connectTimeout        = Maybe NominalDiffTime
forall a. Maybe a
Nothing
    , connectTLSParams :: Maybe ClientParams
connectTLSParams      = Maybe ClientParams
forall a. Maybe a
Nothing
    }

createConnection :: ConnectInfo -> IO PP.Connection
createConnection :: ConnectInfo -> IO Connection
createConnection ConnInfo{Int
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Int
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Int
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = do
    let timeoutOptUs :: Maybe Int
timeoutOptUs =
          NominalDiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime -> Int)
-> (NominalDiffTime -> NominalDiffTime) -> NominalDiffTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NominalDiffTime
1000000 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
*) (NominalDiffTime -> Int) -> Maybe NominalDiffTime -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe NominalDiffTime
connectTimeout
    Connection
conn <- HostName -> PortID -> Maybe Int -> IO Connection
PP.connect HostName
connectHost PortID
connectPort Maybe Int
timeoutOptUs
    Connection
conn' <- case Maybe ClientParams
connectTLSParams of
               Maybe ClientParams
Nothing -> Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
               Just ClientParams
tlsParams -> ClientParams -> Connection -> IO Connection
PP.enableTLS ClientParams
tlsParams Connection
conn
    Connection -> IO ()
PP.beginReceiving Connection
conn'

    Connection -> Redis () -> IO ()
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn' (Redis () -> IO ()) -> Redis () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- AUTH
        case Maybe ByteString
connectAuth of
            Maybe ByteString
Nothing   -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just ByteString
pass -> do
              Either Reply Status
resp <- ByteString -> Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Status)
auth ByteString
pass
              case Either Reply Status
resp of
                Left Reply
r -> IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ ConnectError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ConnectError -> IO ()) -> ConnectError -> IO ()
forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectAuthError Reply
r
                Either Reply Status
_      -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        -- SELECT
        Bool -> Redis () -> Redis ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
connectDatabase Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
/= Integer
0) (Redis () -> Redis ()) -> Redis () -> Redis ()
forall a b. (a -> b) -> a -> b
$ do
          Either Reply Status
resp <- Integer -> Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
Integer -> m (f Status)
select Integer
connectDatabase
          case Either Reply Status
resp of
              Left Reply
r -> IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ ConnectError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ConnectError -> IO ()) -> ConnectError -> IO ()
forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectSelectError Reply
r
              Either Reply Status
_      -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn'

-- |Constructs a 'Connection' pool to a Redis server designated by the
--  given 'ConnectInfo'. The first connection is not actually established
--  until the first call to the server.
connect :: ConnectInfo -> IO Connection
connect :: ConnectInfo -> IO Connection
connect cInfo :: ConnectInfo
cInfo@ConnInfo{Int
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Int
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Int
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = Pool Connection -> Connection
NonClusteredConnection (Pool Connection -> Connection)
-> IO (Pool Connection) -> IO Connection
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
    IO Connection
-> (Connection -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool Connection)
forall a.
IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool (ConnectInfo -> IO Connection
createConnection ConnectInfo
cInfo) Connection -> IO ()
PP.disconnect Int
1 NominalDiffTime
connectMaxIdleTime Int
connectMaxConnections

-- |Constructs a 'Connection' pool to a Redis server designated by the
--  given 'ConnectInfo', then tests if the server is actually there.
--  Throws an exception if the connection to the Redis server can't be
--  established.
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo = do
    Connection
conn <- ConnectInfo -> IO Connection
connect ConnectInfo
connInfo
    Connection -> Redis () -> IO ()
forall a. Connection -> Redis a -> IO a
runRedis Connection
conn (Redis () -> IO ()) -> Redis () -> IO ()
forall a b. (a -> b) -> a -> b
$ Redis (Either Reply Status) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *). RedisCtx m f => m (f Status)
ping
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn

-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection Pool Connection
pool) = Pool Connection -> IO ()
forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool
disconnect (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) = Pool Connection -> IO ()
forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool

-- | Memory bracket around 'connect' and 'disconnect'.
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
withConnect :: ConnectInfo -> (Connection -> m c) -> m c
withConnect ConnectInfo
connInfo = m Connection -> (Connection -> m ()) -> (Connection -> m c) -> m c
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket (IO Connection -> m Connection
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ ConnectInfo -> IO Connection
connect ConnectInfo
connInfo) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
disconnect)

-- | Memory bracket around 'checkedConnect' and 'disconnect'
withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect ConnectInfo
connInfo = IO Connection
-> (Connection -> IO ()) -> (Connection -> IO c) -> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo) Connection -> IO ()
disconnect

-- |Interact with a Redis datastore specified by the given 'Connection'.
--
--  Each call of 'runRedis' takes a network connection from the 'Connection'
--  pool and runs the given 'Redis' action. Calls to 'runRedis' may thus block
--  while all connections from the pool are in use.
runRedis :: Connection -> Redis a -> IO a
runRedis :: Connection -> Redis a -> IO a
runRedis (NonClusteredConnection Pool Connection
pool) Redis a
redis =
  Pool Connection -> (Connection -> IO a) -> IO a
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> Connection -> Redis a -> IO a
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis a
redis
runRedis (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) Redis a
redis =
    Pool Connection -> (Connection -> IO a) -> IO a
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> Connection -> IO ShardMap -> Redis a -> IO a
forall a. Connection -> IO ShardMap -> Redis a -> IO a
runRedisClusteredInternal Connection
conn (Connection -> IO ShardMap
refreshShardMap Connection
conn) Redis a
redis

newtype ClusterConnectError = ClusterConnectError Reply
    deriving (ClusterConnectError -> ClusterConnectError -> Bool
(ClusterConnectError -> ClusterConnectError -> Bool)
-> (ClusterConnectError -> ClusterConnectError -> Bool)
-> Eq ClusterConnectError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ClusterConnectError -> ClusterConnectError -> Bool
$c/= :: ClusterConnectError -> ClusterConnectError -> Bool
== :: ClusterConnectError -> ClusterConnectError -> Bool
$c== :: ClusterConnectError -> ClusterConnectError -> Bool
Eq, Int -> ClusterConnectError -> ShowS
[ClusterConnectError] -> ShowS
ClusterConnectError -> HostName
(Int -> ClusterConnectError -> ShowS)
-> (ClusterConnectError -> HostName)
-> ([ClusterConnectError] -> ShowS)
-> Show ClusterConnectError
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ClusterConnectError] -> ShowS
$cshowList :: [ClusterConnectError] -> ShowS
show :: ClusterConnectError -> HostName
$cshow :: ClusterConnectError -> HostName
showsPrec :: Int -> ClusterConnectError -> ShowS
$cshowsPrec :: Int -> ClusterConnectError -> ShowS
Show, Typeable)

instance Exception ClusterConnectError

-- |Constructs a 'ShardMap' of connections to clustered nodes. The argument is
-- a 'ConnectInfo' for any node in the cluster
--
-- Some Redis commands are currently not supported in cluster mode
-- - CONFIG, AUTH
-- - SCAN
-- - MOVE, SELECT
-- - PUBLISH, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, RESET
connectCluster :: ConnectInfo -> IO Connection
connectCluster :: ConnectInfo -> IO Connection
connectCluster ConnectInfo
bootstrapConnInfo = do
    Connection
conn <- ConnectInfo -> IO Connection
createConnection ConnectInfo
bootstrapConnInfo
    Either Reply ClusterSlotsResponse
slotsResponse <- Connection
-> Redis (Either Reply ClusterSlotsResponse)
-> IO (Either Reply ClusterSlotsResponse)
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis (Either Reply ClusterSlotsResponse)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
    MVar ShardMap
shardMapVar <- case Either Reply ClusterSlotsResponse
slotsResponse of
        Left Reply
e -> ClusterConnectError -> IO (MVar ShardMap)
forall e a. Exception e => e -> IO a
throwIO (ClusterConnectError -> IO (MVar ShardMap))
-> ClusterConnectError -> IO (MVar ShardMap)
forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
        Right ClusterSlotsResponse
slots -> do
            ShardMap
shardMap <- ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots
            ShardMap -> IO (MVar ShardMap)
forall a. a -> IO (MVar a)
newMVar ShardMap
shardMap
    Either Reply [CommandInfo]
commandInfos <- Connection
-> Redis (Either Reply [CommandInfo])
-> IO (Either Reply [CommandInfo])
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis (Either Reply [CommandInfo])
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f [CommandInfo])
command
    case Either Reply [CommandInfo]
commandInfos of
        Left Reply
e -> ClusterConnectError -> IO Connection
forall e a. Exception e => e -> IO a
throwIO (ClusterConnectError -> IO Connection)
-> ClusterConnectError -> IO Connection
forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
        Right [CommandInfo]
infos -> do
            Pool Connection
pool <- IO Connection
-> (Connection -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool Connection)
forall a.
IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool ([CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
Cluster.connect [CommandInfo]
infos MVar ShardMap
shardMapVar Maybe Int
forall a. Maybe a
Nothing) Connection -> IO ()
Cluster.disconnect Int
1 (ConnectInfo -> NominalDiffTime
connectMaxIdleTime ConnectInfo
bootstrapConnInfo) (ConnectInfo -> Int
connectMaxConnections ConnectInfo
bootstrapConnInfo)
            Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO Connection) -> Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> Pool Connection -> Connection
ClusteredConnection MVar ShardMap
shardMapVar Pool Connection
pool

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{[ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: ClusterSlotsResponse -> [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: [ClusterSlotsResponseEntry]
..} = IntMap Shard -> ShardMap
ShardMap (IntMap Shard -> ShardMap) -> IO (IntMap Shard) -> IO ShardMap
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ClusterSlotsResponseEntry
 -> IO (IntMap Shard) -> IO (IntMap Shard))
-> IO (IntMap Shard)
-> [ClusterSlotsResponseEntry]
-> IO (IntMap Shard)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap (IntMap Shard -> IO (IntMap Shard)
forall (f :: * -> *) a. Applicative f => a -> f a
pure IntMap Shard
forall a. IntMap a
IntMap.empty)  [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries where
    mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap.IntMap Shard) -> IO (IntMap.IntMap Shard)
    mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap ClusterSlotsResponseEntry{Int
[ClusterSlotsNode]
ClusterSlotsNode
clusterSlotsResponseEntryReplicas :: ClusterSlotsResponseEntry -> [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsResponseEntry -> ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: ClusterSlotsResponseEntry -> Int
clusterSlotsResponseEntryStartSlot :: ClusterSlotsResponseEntry -> Int
clusterSlotsResponseEntryReplicas :: [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: Int
clusterSlotsResponseEntryStartSlot :: Int
..} IO (IntMap Shard)
accumulator = do
        IntMap Shard
accumulated <- IO (IntMap Shard)
accumulator
        let master :: Node
master = Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
True ClusterSlotsNode
clusterSlotsResponseEntryMaster
        let replicas :: [Node]
replicas = (ClusterSlotsNode -> Node) -> [ClusterSlotsNode] -> [Node]
forall a b. (a -> b) -> [a] -> [b]
map (Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
False) [ClusterSlotsNode]
clusterSlotsResponseEntryReplicas
        let shard :: Shard
shard = Node -> [Node] -> Shard
Shard Node
master [Node]
replicas
        let slotMap :: IntMap Shard
slotMap = [(Int, Shard)] -> IntMap Shard
forall a. [(Int, a)] -> IntMap a
IntMap.fromList ([(Int, Shard)] -> IntMap Shard) -> [(Int, Shard)] -> IntMap Shard
forall a b. (a -> b) -> a -> b
$ (Int -> (Int, Shard)) -> [Int] -> [(Int, Shard)]
forall a b. (a -> b) -> [a] -> [b]
map (, Shard
shard) [Int
clusterSlotsResponseEntryStartSlot..Int
clusterSlotsResponseEntryEndSlot]
        IntMap Shard -> IO (IntMap Shard)
forall (m :: * -> *) a. Monad m => a -> m a
return (IntMap Shard -> IO (IntMap Shard))
-> IntMap Shard -> IO (IntMap Shard)
forall a b. (a -> b) -> a -> b
$ IntMap Shard -> IntMap Shard -> IntMap Shard
forall a. IntMap a -> IntMap a -> IntMap a
IntMap.union IntMap Shard
slotMap IntMap Shard
accumulated
    nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
    nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
isMaster ClusterSlotsNode{Int
ByteString
clusterSlotsNodeID :: ClusterSlotsNode -> ByteString
clusterSlotsNodePort :: ClusterSlotsNode -> Int
clusterSlotsNodeIP :: ClusterSlotsNode -> ByteString
clusterSlotsNodeID :: ByteString
clusterSlotsNodePort :: Int
clusterSlotsNodeIP :: ByteString
..} =
        let hostname :: HostName
hostname = ByteString -> HostName
Char8.unpack ByteString
clusterSlotsNodeIP
            role :: NodeRole
role = if Bool
isMaster then NodeRole
Cluster.Master else NodeRole
Cluster.Slave
        in
            ByteString -> NodeRole -> HostName -> Int -> Node
Cluster.Node ByteString
clusterSlotsNodeID NodeRole
role HostName
hostname (Int -> Int
forall a. Enum a => Int -> a
toEnum Int
clusterSlotsNodePort)

refreshShardMap :: Cluster.Connection -> IO ShardMap
refreshShardMap :: Connection -> IO ShardMap
refreshShardMap (Cluster.Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = do
    let (Cluster.NodeConnection ConnectionContext
ctx IORef (Maybe ByteString)
_ ByteString
_) = [NodeConnection] -> NodeConnection
forall a. [a] -> a
head ([NodeConnection] -> NodeConnection)
-> [NodeConnection] -> NodeConnection
forall a b. (a -> b) -> a -> b
$ HashMap ByteString NodeConnection -> [NodeConnection]
forall k v. HashMap k v -> [v]
HM.elems HashMap ByteString NodeConnection
nodeConns
    Connection
pipelineConn <- ConnectionContext -> IO Connection
PP.fromCtx ConnectionContext
ctx
    ()
_ <- Connection -> IO ()
PP.beginReceiving Connection
pipelineConn
    Either Reply ClusterSlotsResponse
slotsResponse <- Connection
-> Redis (Either Reply ClusterSlotsResponse)
-> IO (Either Reply ClusterSlotsResponse)
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
pipelineConn Redis (Either Reply ClusterSlotsResponse)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
    case Either Reply ClusterSlotsResponse
slotsResponse of
        Left Reply
e -> ClusterConnectError -> IO ShardMap
forall e a. Exception e => e -> IO a
throwIO (ClusterConnectError -> IO ShardMap)
-> ClusterConnectError -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
        Right ClusterSlotsResponse
slots -> ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots