{-# LANGUAGE DeriveDataTypeable    #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Network.Riak.Cluster
    ( Cluster(..)
    , InClusterError(..)
    , connectToCluster
    , inCluster
    , Riak.create
    , Riak.defaultClient
    ) where
import           Control.Concurrent.STM        (atomically)
import           Control.Concurrent.STM.TMVar
import           Control.Exception
import           Control.Exception.Enclosed
import           Control.Monad.Base            (liftBase)
import           Control.Monad.Catch           (MonadThrow (..))
import           Control.Monad.Trans.Control   (MonadBaseControl)
import           Data.Typeable
import           Network.Riak                  (Connection)
import qualified Network.Riak                  as Riak
import qualified Network.Riak.Connection.Pool  as Riak
import           System.Random.Mersenne.Pure64
data Cluster = Cluster
    { Cluster -> [Pool]
clusterPools :: [Riak.Pool]
      
    , Cluster -> TMVar PureMT
clusterGen   :: TMVar PureMT
    }
data InClusterError = InClusterError [SomeException]
    deriving (Int -> InClusterError -> ShowS
[InClusterError] -> ShowS
InClusterError -> String
(Int -> InClusterError -> ShowS)
-> (InClusterError -> String)
-> ([InClusterError] -> ShowS)
-> Show InClusterError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [InClusterError] -> ShowS
$cshowList :: [InClusterError] -> ShowS
show :: InClusterError -> String
$cshow :: InClusterError -> String
showsPrec :: Int -> InClusterError -> ShowS
$cshowsPrec :: Int -> InClusterError -> ShowS
Show, Typeable)
instance Exception InClusterError
connectToCluster :: [Riak.Client] -> IO Cluster
connectToCluster :: [Client] -> IO Cluster
connectToCluster [Client]
clients = do
    [Pool]
pools <- (Client -> IO Pool) -> [Client] -> IO [Pool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\Client
c -> Client -> Int -> NominalDiffTime -> Int -> IO Pool
Riak.create Client
c Int
1 NominalDiffTime
10 Int
20) [Client]
clients
    [Pool] -> IO Cluster
connectToClusterWithPools [Pool]
pools
connectToClusterWithPools :: [Riak.Pool] -> IO Cluster
connectToClusterWithPools :: [Pool] -> IO Cluster
connectToClusterWithPools [Pool]
pools = do
    PureMT
gen <- IO PureMT
newPureMT
    TMVar PureMT
mt <- STM (TMVar PureMT) -> IO (TMVar PureMT)
forall a. STM a -> IO a
atomically (PureMT -> STM (TMVar PureMT)
forall a. a -> STM (TMVar a)
newTMVar PureMT
gen)
    Cluster -> IO Cluster
forall (m :: * -> *) a. Monad m => a -> m a
return ([Pool] -> TMVar PureMT -> Cluster
Cluster [Pool]
pools TMVar PureMT
mt)
inCluster :: (MonadThrow m, MonadBaseControl IO m)
          => Cluster -> (Connection -> m a) -> m a
inCluster :: Cluster -> (Connection -> m a) -> m a
inCluster Cluster{clusterPools :: Cluster -> [Pool]
clusterPools=[Pool]
pools, clusterGen :: Cluster -> TMVar PureMT
clusterGen=TMVar PureMT
tMT} Connection -> m a
f = do
    Int
rnd <- IO Int -> m Int
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ do
      PureMT
mt <- TMVar PureMT -> STM PureMT
forall a. TMVar a -> STM a
takeTMVar TMVar PureMT
tMT
      let (Int
i, PureMT
mt') = PureMT -> (Int, PureMT)
randomInt PureMT
mt
      TMVar PureMT -> PureMT -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar PureMT
tMT PureMT
mt'
      Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i
    let n :: Int
n = if [Pool] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Pool]
pools then Int
0 else Int
rnd Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` [Pool] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Pool]
pools
        
        pools' :: [Pool]
pools' = Int -> [Pool] -> [Pool]
forall a. Int -> [a] -> [a]
rotateL Int
n [Pool]
pools
    [Pool] -> [SomeException] -> m a
go [Pool]
pools' []
  where
    go :: [Pool] -> [SomeException] -> m a
go [] [SomeException]
errors = InClusterError -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM ([SomeException] -> InClusterError
InClusterError [SomeException]
errors)
    go (Pool
p:[Pool]
ps) [SomeException]
es =
        m a -> (SomeException -> m a) -> m a
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> (SomeException -> m a) -> m a
catchAny (Pool -> (Connection -> m a) -> m a
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Pool -> (Connection -> m a) -> m a
Riak.withConnectionM Pool
p Connection -> m a
f) (\SomeException
e -> [Pool] -> [SomeException] -> m a
go [Pool]
ps (SomeException
eSomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
:[SomeException]
es))
rotateL :: Int -> [a] -> [a]
rotateL :: Int -> [a] -> [a]
rotateL Int
i [a]
xs = [a]
right [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
left
  where
    ([a]
left, [a]
right) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
i [a]
xs