module System.Rados.Monadic
(
runConnect,
parseConfig,
parseArgv,
parseEnv,
runPool,
objects,
unsafeObjects,
RadosReader(readChunk, readFull, stat),
RadosWriter(..),
async,
runAsync,
waitSafe,
waitComplete,
look,
runObject,
#if defined(ATOMIC_WRITES)
runAtomicWrite,
assertExists,
compareXAttribute,
B.eq, B.ne, B.gt, B.gte, B.lt, B.lte, B.nop,
setXAttribute,
#endif
withExclusiveLock,
withSharedLock,
StatResult,
fileSize,
modifyTime,
AsyncRead,
AsyncWrite,
Connection,
Pool,
Object,
Async,
E.RadosError(..),
liftIO
)
where
import Control.Exception (bracket, bracket_, throwIO)
import Control.Monad.State
import Control.Monad.Reader
import Control.Applicative
import Data.Word (Word64)
import System.Posix.Types(EpochTime)
import Foreign.ForeignPtr
import Foreign.Storable
import System.IO.Unsafe
import Data.Typeable
import Data.Maybe
import qualified Control.Concurrent.Async as A
import qualified Data.ByteString.Char8 as B
import qualified System.Rados.Error as E
import qualified System.Rados.Base as B
import Data.UUID
import Data.UUID.V4
newtype Connection a = Connection (ReaderT B.Connection IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader B.Connection)
newtype Pool a = Pool (ReaderT B.IOContext IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader B.IOContext)
newtype Object parent a = Object (ReaderT B.ByteString parent a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader B.ByteString)
newtype Async a = Async (ReaderT B.IOContext IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader B.IOContext)
#if defined(ATOMIC_WRITES)
newtype AtomicWrite a = AtomicWrite (ReaderT B.WriteOperation IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader B.WriteOperation)
#endif
data AsyncWrite = ActionFailure E.RadosError | ActionInFlight B.Completion
data AsyncRead a = ReadFailure E.RadosError | ReadInFlight B.Completion a
data StatResult = StatResult Word64 EpochTime
| StatInFlight (ForeignPtr Word64) (ForeignPtr EpochTime)
deriving (Typeable)
fileSize :: StatResult -> Word64
fileSize (StatResult s _) = s
fileSize (StatInFlight s _) = unsafePerformIO $ withForeignPtr s peek
modifyTime :: StatResult -> EpochTime
modifyTime (StatResult _ m) = m
modifyTime (StatInFlight _ m) = unsafePerformIO $ withForeignPtr m peek
class (MonadReader B.IOContext m, MonadIO m) => PoolReader m where
unPoolReader :: m a -> ReaderT B.IOContext IO a
class Monad m => RadosWriter m e | m -> e where
writeChunk
:: Word64
-> B.ByteString
-> m e
writeFull :: B.ByteString -> m e
append :: B.ByteString -> m e
remove :: m e
#if defined(ATOMIC_WRITES)
class Monad m => AtomicWriter m e | m -> e where
runAtomicWrite :: AtomicWrite a -> m e
#endif
class Monad m => RadosReader m wrapper | m -> wrapper where
readChunk
:: Word64
-> Word64
-> m (wrapper B.ByteString)
readFull :: m (wrapper B.ByteString)
readFull =
stat >>= unWrap >>= either wrapFail (\r -> readChunk (fileSize r) 0)
stat :: m (wrapper StatResult)
unWrap :: Typeable a => wrapper a -> m (Either E.RadosError a)
wrapFail :: E.RadosError -> m (wrapper a)
instance PoolReader Async where
unPoolReader (Async a) = a
instance PoolReader Pool where
unPoolReader (Pool a) = a
instance RadosWriter (Object Pool) (Maybe E.RadosError) where
writeChunk offset buffer = do
(object, pool) <- askObjectPool
liftIO $ B.syncWrite pool object offset buffer
writeFull buffer = do
(object, pool) <- askObjectPool
liftIO $ B.syncWriteFull pool object buffer
append buffer = do
(object, pool) <- askObjectPool
liftIO $ B.syncAppend pool object buffer
remove = do
(object, pool) <- askObjectPool
liftIO $ B.syncRemove pool object
instance RadosWriter (Object Async) AsyncWrite where
writeChunk offset buffer = do
(object, pool) <- askObjectPool
withActionCompletion $ \completion ->
liftIO $ B.asyncWrite pool completion object offset buffer
writeFull buffer = do
(object, pool) <- askObjectPool
withActionCompletion $ \completion ->
liftIO $ B.asyncWriteFull pool completion object buffer
append buffer = do
(object, pool) <- askObjectPool
withActionCompletion $ \completion ->
liftIO $ B.asyncAppend pool completion object buffer
remove = do
(object, pool) <- askObjectPool
withActionCompletion $ \completion ->
liftIO $ B.asyncRemove pool completion object
#if defined(ATOMIC_WRITES)
instance RadosWriter AtomicWrite () where
writeChunk offset buffer = do
op <- ask
liftIO $ B.writeOperationWrite op buffer offset
writeFull buffer = do
op <- ask
liftIO $ B.writeOperationWriteFull op buffer
append buffer = do
op <- ask
liftIO $ B.writeOperationAppend op buffer
remove = do
op <- ask
liftIO $ B.writeOperationRemove op
instance AtomicWriter (Object Pool) (Maybe E.RadosError) where
runAtomicWrite (AtomicWrite action) = do
(object, pool) <- askObjectPool
liftIO $ do
op <- B.newWriteOperation
runReaderT action op
B.writeOperate op pool object
instance AtomicWriter (Object Async) AsyncWrite where
runAtomicWrite (AtomicWrite action) = do
(object, pool) <- askObjectPool
withActionCompletion $ \completion ->
liftIO $ do
op <- B.newWriteOperation
runReaderT action op
B.asyncWriteOperate op pool completion object
#endif
instance RadosReader (Object Pool) (Either E.RadosError) where
readChunk len offset = do
(object, pool) <- askObjectPool
liftIO $ B.syncRead pool object len offset
stat = do
(object, pool) <- askObjectPool
liftIO $ do
s <- B.syncStat pool object
return $ case s of
Left e -> Left e
Right (size, time) -> Right $ StatResult size time
unWrap = return . id
wrapFail = return . Left
instance RadosReader (Object Async) AsyncRead where
readChunk len offset = do
(object, pool) <- askObjectPool
withReadCompletion $ \completion ->
liftIO $ B.asyncRead pool completion object len offset
stat = do
(object, pool) <- askObjectPool
withReadCompletion $ \completion ->
liftIO $ do
s <- B.asyncStat pool completion object
return $ case s of
Left e ->
Left e
Right (size_fp, mtime_fp) ->
Right $ StatInFlight size_fp mtime_fp
unWrap = look
wrapFail = return . ReadFailure
askObjectPool :: MonadReader B.IOContext m => Object m (B.ByteString, B.IOContext)
askObjectPool =
liftM2 (,) ask (Object . lift $ ask)
async :: PoolReader m => m a -> m (A.Async a)
async action = do
pool <- ask
liftIO . A.async $ runReaderT (unPoolReader action) pool
waitSafe :: MonadIO m => AsyncWrite -> m (Maybe E.RadosError)
waitSafe = waitAsync B.waitForSafe
waitComplete :: MonadIO m => AsyncWrite -> m (Maybe E.RadosError)
waitComplete = waitAsync B.waitForComplete
waitAsync :: MonadIO m
=> (B.Completion -> IO a) -> AsyncWrite -> m (Maybe E.RadosError)
waitAsync f async_request =
case async_request of
ActionFailure e ->
return $ Just e
ActionInFlight completion -> do
e <- liftIO $ do
f completion
B.getAsyncError completion
return $ either Just (const Nothing) e
look :: (MonadIO m, Typeable a)
=> AsyncRead a -> m (Either E.RadosError a)
look async_request =
case async_request of
ReadFailure e ->
return $ Left e
ReadInFlight completion a -> do
ret <- liftIO $ do
B.waitForSafe completion
B.getAsyncError completion
return $ case ret of
Left e -> Left e
Right n -> Right $
case (cast a :: Maybe B.ByteString) of
Just bs -> fromJust . cast $ B.take n bs
Nothing -> a
withActionCompletion :: (B.Completion -> IO (Either E.RadosError a)) -> Object Async AsyncWrite
withActionCompletion f = do
completion <- liftIO B.newCompletion
result <- liftIO $ f completion
return $ case result of
Left e -> ActionFailure e
Right _ -> ActionInFlight completion
withReadCompletion :: (B.Completion -> IO (Either E.RadosError a)) -> Object Async (AsyncRead a)
withReadCompletion f = do
completion <- liftIO B.newCompletion
result <- liftIO $ f completion
return $ case result of
Left e -> ReadFailure e
Right a -> ReadInFlight completion a
runConnect
:: Maybe B.ByteString
-> (B.Connection -> IO (Maybe E.RadosError))
-> Connection a
-> IO a
runConnect user configure (Connection action) =
bracket
(do h <- B.newConnection user
conf <- configure h
case conf of
Just e -> do
B.cleanupConnection h
throwIO e
Nothing -> do
B.connect h
return h)
B.cleanupConnection
(runReaderT action)
runPool :: B.ByteString -> Pool a -> Connection a
runPool pool (Pool action) = do
connection <- ask
liftIO $ bracket
(B.newIOContext connection pool)
B.cleanupIOContext
(runReaderT action)
runObject :: PoolReader m =>
B.ByteString -> Object m a -> m a
runObject object_id (Object action) =
runReaderT action object_id
runAsync :: PoolReader m => Async a -> m a
runAsync (Async action) = do
pool <- ask
liftIO $ runReaderT action pool
parseConfig :: FilePath -> B.Connection -> IO (Maybe E.RadosError)
parseConfig = flip B.confReadFile
parseArgv :: B.Connection -> IO (Maybe E.RadosError)
parseArgv = B.confParseArgv
parseEnv :: B.Connection -> IO (Maybe E.RadosError)
parseEnv = B.confParseEnv
withExclusiveLock
:: B.ByteString
-> B.ByteString
-> B.ByteString
-> Maybe Double
-> Pool a
-> Pool a
withExclusiveLock oid name desc duration action =
withLock oid name action $ \pool cookie ->
B.exclusiveLock pool oid name cookie desc duration []
withSharedLock
:: B.ByteString
-> B.ByteString
-> B.ByteString
-> B.ByteString
-> Maybe Double
-> Pool a
-> Pool a
withSharedLock oid name desc tag duration action =
withLock oid name action $ \pool cookie ->
B.sharedLock pool oid name cookie tag desc duration []
withLock
:: B.ByteString
-> B.ByteString
-> Pool a
-> (B.IOContext -> B.ByteString -> IO b)
-> Pool a
withLock oid name (Pool user_action) lock_action = do
pool <- ask
cookie <- liftIO $ B.pack . toString <$> nextRandom
liftIO $ bracket_
(lock_action pool cookie)
(tryUnlock pool oid name cookie)
(runReaderT user_action pool)
where
tryUnlock pool oid' name' cookie = do
me <- B.unlock pool oid' name' cookie
case me of
Nothing -> return ()
Just (E.NoEntity {}) -> return ()
Just e -> throwIO e
objects :: Pool [B.ByteString]
objects = do
os <- unsafeObjects
length os `seq` return os
unsafeObjects :: Pool [B.ByteString]
unsafeObjects = do
pool <- ask
liftIO $ B.unsafeObjects pool
#if defined(ATOMIC_WRITES)
assertExists :: AtomicWrite ()
assertExists = do
op <- ask
liftIO $ B.writeOperationAssertExists op
compareXAttribute :: B.ByteString -> B.ComparisonFlag -> B.ByteString -> AtomicWrite ()
compareXAttribute key operator value = do
op <- ask
liftIO $ B.writeOperationCompareXAttribute op key operator value
setXAttribute :: B.ByteString -> B.ByteString -> AtomicWrite ()
setXAttribute key value = do
op <- ask
liftIO $ B.writeOperationSetXAttribute op key value
#endif