module Network.Riak.Resolvable.Internal
(
Resolvable(..)
, ResolvableMonoid(..)
, ResolutionFailure(..)
, get
, getMany
, modify
, modify_
, put
, put_
, putMany
, putMany_
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative ((<$>))
#endif
import Control.Arrow (first)
import Control.Exception (Exception, throwIO)
import Control.Monad (unless)
import Control.Monad.IO.Class
import Data.Aeson.Types (FromJSON, ToJSON)
import Data.Data (Data)
import Data.Either (partitionEithers)
import Data.Function (on)
import Data.List (foldl', sortBy)
import Data.Maybe (isJust)
#if __GLASGOW_HASKELL__ < 710
import Data.Monoid (Monoid(mappend))
#endif
import Data.Typeable (Typeable)
import Network.Riak.Debug (debugValues)
import Network.Riak.Types.Internal hiding (MessageTag(..))
data ResolutionFailure = RetriesExceeded
deriving (Eq, Show, Typeable)
instance Exception ResolutionFailure
class (Show a) => Resolvable a where
resolve :: a -> a -> a
newtype ResolvableMonoid a = RM { unRM :: a }
deriving (Eq, Ord, Read, Show, Typeable, Data, Monoid, FromJSON, ToJSON)
instance (Eq a, Show a, Monoid a) => Resolvable (ResolvableMonoid a) where
resolve = mappend
instance (Resolvable a) => Resolvable (Maybe a) where
resolve (Just a) (Just b) = Just (resolve a b)
resolve a@(Just _) _ = a
resolve _ b = b
type Get a = Connection -> Bucket -> Key -> R -> IO (Maybe ([a], VClock))
get :: (Resolvable a) => Get a
-> (Connection -> Bucket -> Key -> R -> IO (Maybe (a, VClock)))
get doGet conn bucket' key' r =
fmap (first resolveMany) `fmap` doGet conn bucket' key' r
getMany :: (Resolvable a) =>
(Connection -> Bucket -> [Key] -> R -> IO [Maybe ([a], VClock)])
-> Connection -> Bucket -> [Key] -> R -> IO [Maybe (a, VClock)]
getMany doGet conn b ks r =
map (fmap (first resolveMany)) `fmap` doGet conn b ks r
type Put a = Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ([a], VClock)
put :: (Resolvable a) => Put a
-> Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO (a, VClock)
put doPut conn bucket' key' mvclock0 val0 w dw = do
let go !i val mvclock
| i == maxRetries = throwIO RetriesExceeded
| otherwise = do
(xs, vclock) <- doPut conn bucket' key' mvclock val w dw
case xs of
[x] | i > 0 || isJust mvclock -> return (x, vclock)
(_:_) -> do debugValues "put" "conflict" xs
go (i+1) (resolveMany xs) (Just vclock)
[] -> unexError "Network.Riak.Resolvable" "put"
"received empty response from server"
go (0::Int) val0 mvclock0
maxRetries :: Int
maxRetries = 64
put_ :: (Resolvable a) =>
(Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ([a], VClock))
-> Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ()
put_ doPut conn bucket' key' mvclock0 val0 w dw =
put doPut conn bucket' key' mvclock0 val0 w dw >> return ()
modify :: (MonadIO m, Resolvable a) => Get a -> Put a
-> Connection -> Bucket -> Key -> R -> W -> DW -> (Maybe a -> m (a,b))
-> m (a,b)
modify doGet doPut conn bucket' key' r w dw act = do
a0 <- liftIO $ get doGet conn bucket' key' r
(a,b) <- act (fst <$> a0)
(a',_) <- liftIO $ put doPut conn bucket' key' (snd <$> a0) a w dw
return (a',b)
modify_ :: (MonadIO m, Resolvable a) => Get a -> Put a
-> Connection -> Bucket -> Key -> R -> W -> DW -> (Maybe a -> m a)
-> m a
modify_ doGet doPut conn bucket' key' r w dw act = do
a0 <- liftIO $ get doGet conn bucket' key' r
a <- act (fst <$> a0)
liftIO $ fst <$> put doPut conn bucket' key' (snd <$> a0) a w dw
putMany :: (Resolvable a) =>
(Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [([a], VClock)])
-> Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [(a, VClock)]
putMany doPut conn bucket' puts0 w dw = go (0::Int) [] . zip [(0::Int)..] $ puts0
where
go _ acc [] = return . map snd . sortBy (compare `on` fst) $ acc
go !i acc puts
| i == maxRetries = throwIO RetriesExceeded
| otherwise = do
rs <- doPut conn bucket' (map snd puts) w dw
let (conflicts, ok) = partitionEithers $ zipWith mush puts rs
unless (null conflicts) $
debugValues "putMany" "conflicts" conflicts
go (i+1) (ok++acc) conflicts
mush (i,(k,mv,_)) (cs,v) =
case cs of
[x] | i > 0 || isJust mv -> Right (i,(x,v))
(_:_) -> Left (i,(k,Just v, resolveMany cs))
[] -> unexError "Network.Riak.Resolvable" "put"
"received empty response from server"
putMany_ :: (Resolvable a) =>
(Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [([a], VClock)])
-> Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW -> IO ()
putMany_ doPut conn bucket' puts0 w dw =
putMany doPut conn bucket' puts0 w dw >> return ()
resolveMany' :: (Resolvable a) => a -> [a] -> a
resolveMany' = foldl' resolve
resolveMany :: (Resolvable a) => [a] -> a
resolveMany (a:as) = resolveMany' a as
resolveMany _ = error "resolveMany: empty list"