{-# LANGUAGE DeriveDataTypeable    #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}

module Network.Riak.Cluster
    ( Cluster(..)
    , InClusterError(..)
    , connectToCluster
    , inCluster
    , Riak.create
    , Riak.defaultClient
    ) where

import           Control.Concurrent.STM        (atomically)
import           Control.Concurrent.STM.TMVar
import           Control.Exception
import           Control.Exception.Enclosed
import           Control.Monad.Base            (liftBase)
import           Control.Monad.Catch           (MonadThrow (..))
import           Control.Monad.Trans.Control   (MonadBaseControl)
import           Data.Typeable
import           Network.Riak                  (Connection)
import qualified Network.Riak                  as Riak
import qualified Network.Riak.Connection.Pool  as Riak
import           System.Random.Mersenne.Pure64

-- | Datatype holding connection-pool with all known cluster nodes
data Cluster = Cluster
    { Cluster -> [Pool]
clusterPools :: [Riak.Pool]
      -- ^ Vector of connection pools to riak cluster nodes
    , Cluster -> TMVar PureMT
clusterGen   :: TMVar PureMT
    }

-- | Error that gets thrown whenever operation couldn't succeed with
-- any node.
data InClusterError = InClusterError [SomeException]
    deriving (Int -> InClusterError -> ShowS
[InClusterError] -> ShowS
InClusterError -> String
(Int -> InClusterError -> ShowS)
-> (InClusterError -> String)
-> ([InClusterError] -> ShowS)
-> Show InClusterError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [InClusterError] -> ShowS
$cshowList :: [InClusterError] -> ShowS
show :: InClusterError -> String
$cshow :: InClusterError -> String
showsPrec :: Int -> InClusterError -> ShowS
$cshowsPrec :: Int -> InClusterError -> ShowS
Show, Typeable)
instance Exception InClusterError

-- | Function to connect to riak cluster with sane pool defaults
connectToCluster :: [Riak.Client] -> IO Cluster
connectToCluster :: [Client] -> IO Cluster
connectToCluster [Client]
clients = do
    [Pool]
pools <- (Client -> IO Pool) -> [Client] -> IO [Pool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\Client
c -> Client -> Int -> NominalDiffTime -> Int -> IO Pool
Riak.create Client
c Int
1 NominalDiffTime
10 Int
20) [Client]
clients
    [Pool] -> IO Cluster
connectToClusterWithPools [Pool]
pools

-- | Function to connect to riak cluster with pre-created list of
-- 'Riak.Pool' objects
connectToClusterWithPools :: [Riak.Pool] -> IO Cluster
connectToClusterWithPools :: [Pool] -> IO Cluster
connectToClusterWithPools [Pool]
pools = do
    PureMT
gen <- IO PureMT
newPureMT
    TMVar PureMT
mt <- STM (TMVar PureMT) -> IO (TMVar PureMT)
forall a. STM a -> IO a
atomically (PureMT -> STM (TMVar PureMT)
forall a. a -> STM (TMVar a)
newTMVar PureMT
gen)
    Cluster -> IO Cluster
forall (m :: * -> *) a. Monad m => a -> m a
return ([Pool] -> TMVar PureMT -> Cluster
Cluster [Pool]
pools TMVar PureMT
mt)

-- | Tries to run some operation for a random riak node. If it fails,
-- tries all other nodes. If all other nodes fail - throws
-- 'InClusterError' exception.
inCluster :: (MonadThrow m, MonadBaseControl IO m)
          => Cluster -> (Connection -> m a) -> m a
inCluster :: Cluster -> (Connection -> m a) -> m a
inCluster Cluster{clusterPools :: Cluster -> [Pool]
clusterPools=[Pool]
pools, clusterGen :: Cluster -> TMVar PureMT
clusterGen=TMVar PureMT
tMT} Connection -> m a
f = do
    Int
rnd <- IO Int -> m Int
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ do
      PureMT
mt <- TMVar PureMT -> STM PureMT
forall a. TMVar a -> STM a
takeTMVar TMVar PureMT
tMT
      let (Int
i, PureMT
mt') = PureMT -> (Int, PureMT)
randomInt PureMT
mt
      TMVar PureMT -> PureMT -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar PureMT
tMT PureMT
mt'
      Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i
    let n :: Int
n = if [Pool] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Pool]
pools then Int
0 else Int
rnd Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` [Pool] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Pool]
pools
        -- we rotate pool vector by n
        pools' :: [Pool]
pools' = Int -> [Pool] -> [Pool]
forall a. Int -> [a] -> [a]
rotateL Int
n [Pool]
pools
    [Pool] -> [SomeException] -> m a
go [Pool]
pools' []
  where
    go :: [Pool] -> [SomeException] -> m a
go [] [SomeException]
errors = InClusterError -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM ([SomeException] -> InClusterError
InClusterError [SomeException]
errors)
    go (Pool
p:[Pool]
ps) [SomeException]
es =
        m a -> (SomeException -> m a) -> m a
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> (SomeException -> m a) -> m a
catchAny (Pool -> (Connection -> m a) -> m a
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Pool -> (Connection -> m a) -> m a
Riak.withConnectionM Pool
p Connection -> m a
f) (\SomeException
e -> [Pool] -> [SomeException] -> m a
go [Pool]
ps (SomeException
eSomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
:[SomeException]
es))

rotateL :: Int -> [a] -> [a]
rotateL :: Int -> [a] -> [a]
rotateL Int
i [a]
xs = [a]
right [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
left
  where
    ([a]
left, [a]
right) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
i [a]
xs