module Database.Memcache.Cluster (
Cluster, ServerSpec(..), Options(..), newCluster,
Retries, keyedOp, anyOp, allOp, allOp'
) where
import Database.Memcache.Errors
import Database.Memcache.Server
import Database.Memcache.Types
import Control.Concurrent (threadDelay)
import Control.Exception (handle, throwIO, SomeException)
import Data.Default.Class
import Data.Fixed (Milli)
import Data.Hashable (hash)
import Data.IORef
import Data.Maybe (fromMaybe)
import Data.List (sort)
import Data.Time.Clock (NominalDiffTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Data.Vector as V
import Network.Socket (HostName, PortNumber)
import System.Timeout
type Retries = Int
data ServerSpec = ServerSpec {
ssHost :: HostName,
ssPort :: PortNumber,
ssAuth :: Authentication
} deriving (Eq, Show)
instance Default ServerSpec where
def = ServerSpec "localhost" 11211 NoAuth
data Options = Options {
optsServerRetries :: Retries,
optsFailRetryDelay :: Milli,
optsDeadRetryDelay :: Milli,
optsServerTimeout :: Milli
} deriving (Eq, Show)
instance Default Options where
def = Options {
optsServerRetries = 2,
optsFailRetryDelay = 200,
optsDeadRetryDelay = 1500,
optsServerTimeout = 750
}
data Cluster = Cluster {
cServers :: V.Vector Server,
cRetries :: !Int,
cFailDelay :: !Int,
cDeadDelay :: !NominalDiffTime,
cTimeout :: !Int
} deriving (Eq, Show)
newCluster :: [ServerSpec] -> Options -> IO Cluster
newCluster [] _ = throwIO $ ClientError NoServersReady
newCluster hosts Options{..} = do
s <- mapM (\ServerSpec{..} -> newServer ssHost ssPort ssAuth) hosts
return $
Cluster {
cServers = (V.fromList $ sort s),
cRetries = optsServerRetries ,
cFailDelay = fromEnum optsFailRetryDelay,
cDeadDelay = fromRational $ toRational optsDeadRetryDelay / 1000,
cTimeout = fromEnum optsServerTimeout
}
serverAlive :: NominalDiffTime -> Server -> IO Bool
serverAlive deadDelay s = do
t <- readIORef (failed s)
if t == 0
then return True
else do
t' <- getPOSIXTime
if (t' t) < deadDelay
then return False
else do
writeIORef (failed s) 0
return True
getServerForKey :: Cluster -> Key -> IO (Maybe Server)
getServerForKey c k = do
let hashedKey = hash k
searchF s = sid s < hashedKey
servers' <- V.filterM (serverAlive $ cDeadDelay c) $ cServers c
return $ if V.null servers'
then Nothing
else Just $ fromMaybe (V.last servers') (V.find searchF servers')
serverOp :: Cluster -> Server -> Request -> IO Response
serverOp c s req = retryOp c s $ sendRecv s req
keyedOp :: Cluster -> Key -> Request -> IO Response
keyedOp c k req = do
s' <- getServerForKey c k
case s' of
Just s -> serverOp c s req
Nothing -> throwIO $ ClientError NoServersReady
anyOp :: Cluster -> Request -> IO Response
anyOp c req = do
servers' <- V.filterM (serverAlive $ cDeadDelay c) $ cServers c
if V.null servers'
then throwIO $ ClientError NoServersReady
else serverOp c (V.head servers') req
allOp :: Cluster -> Request -> IO [(Server, Response)]
allOp c req = do
servers' <- V.filterM (serverAlive $ cDeadDelay c) $ cServers c
if V.null servers'
then throwIO $ ClientError NoServersReady
else do
res <- V.forM servers' $ \s -> serverOp c s req
return $ V.toList $ V.zip servers' res
allOp' :: Cluster -> (Server -> IO a) -> IO [(Server, a)]
allOp' c op = do
servers' <- V.filterM (serverAlive $ cDeadDelay c) $ cServers c
if V.null servers'
then throwIO $ ClientError NoServersReady
else do
res <- V.forM servers' $ \s -> retryOp c s (op s)
return $ V.toList $ V.zip servers' res
retryOp :: forall a. Cluster -> Server -> IO a -> IO a
retryOp Cluster{..} s op = go cRetries
where
go :: Int -> IO a
go !n = handle (handleErrs $ n 1) $ do
mr <- timeout cTimeout op
case mr of
Just r -> return r
Nothing -> close s >> throwIO (ClientError Timeout)
handleErrs :: Int -> SomeException -> IO a
handleErrs 0 err = do t <- getPOSIXTime
writeIORef (failed s) t
throwIO err
handleErrs n _ = do
threadDelay cFailDelay
go n