module System.Rados.Base
(
Connection,
Completion,
IOContext,
ListContext,
F.TimeVal(..),
F.LockFlag,
RadosError(..),
withConnection,
newConnection,
cleanupConnection,
confReadFile,
confParseArgv,
confParseEnv,
connect,
withIOContext,
newIOContext,
cleanupIOContext,
syncWrite,
syncWriteFull,
syncRead,
syncAppend,
syncRemove,
syncStat,
newCompletion,
waitForComplete,
waitForSafe,
asyncWrite,
asyncWriteFull,
asyncRead,
asyncAppend,
asyncStat,
asyncRemove,
getAsyncError,
withList,
nextObject,
objects,
unsafeObjects,
openList,
closeList,
newCookie,
exclusiveLock,
sharedLock,
unlock,
F.idempotent,
#if defined(ATOMIC_WRITES)
WriteOperation,
F.ComparisonFlag,
newWriteOperation,
writeOperationAssertExists,
writeOperationCompareXAttribute,
F.nop, F.eq, F.ne, F.gt, F.gte, F.lt, F.lte,
writeOperationSetXAttribute,
writeOperationRemoveXAttribute,
writeOperationCreate,
writeOperationRemove,
writeOperationWrite,
writeOperationWriteFull,
writeOperationAppend,
writeOperate,
asyncWriteOperate,
#endif
missingOK
) where
import Control.Applicative
import Control.Exception (bracket, onException, throwIO)
import Control.Monad (void)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Unsafe as B
import Data.UUID
import Data.UUID.V4
import Foreign hiding (void)
import Foreign.C.String
import Foreign.C.Types
import System.IO.Unsafe (unsafeInterleaveIO)
import System.Posix.Types (EpochTime)
import System.Rados.Error
import qualified System.Rados.FFI as F
newtype Connection = Connection (Ptr F.RadosT)
newtype IOContext = IOContext (Ptr F.RadosIOCtxT)
newtype ListContext = ListContext (Ptr F.RadosListCtxT)
newtype Completion = Completion (ForeignPtr F.RadosCompletionT)
deriving (Ord, Eq)
#if defined(ATOMIC_WRITES)
newtype WriteOperation = WriteOperation (ForeignPtr F.RadosWriteOpT)
# endif
withConnection :: Maybe ByteString -> (Connection -> IO a) -> IO a
withConnection user = bracket (newConnection user) cleanupConnection
newConnection :: Maybe ByteString -> IO Connection
newConnection maybe_bs =
alloca $ \rados_t_p_p -> do
checkError_ "rados_create" $ case maybe_bs of
Nothing ->
F.c_rados_create rados_t_p_p nullPtr
Just bs -> B.useAsCString bs $ \cstr ->
F.c_rados_create rados_t_p_p cstr
Connection <$> peek rados_t_p_p
cleanupConnection :: Connection -> IO ()
cleanupConnection (Connection rados_t_p) =
F.c_rados_shutdown rados_t_p
confReadFile :: Connection -> FilePath -> IO (Maybe RadosError)
confReadFile (Connection rados_t_p) file_path =
maybeError "rados_conf_read_file" $ withCString file_path $ \cstr ->
F.c_rados_conf_read_file rados_t_p cstr
confParseArgv :: Connection -> IO (Maybe RadosError)
confParseArgv (Connection rados_t_p) =
alloca $ \p_argc ->
alloca $ \ p_argv -> do
F.c_getProgArgv p_argc p_argv
argc <- peek p_argc
argv <- peek p_argv
maybeError "rados_conf_parse_argv" $
F.c_rados_conf_parse_argv rados_t_p argc argv
confParseEnv :: Connection -> IO (Maybe RadosError)
confParseEnv (Connection rados_t_p) =
maybeError "rados_conf_parse_env" $
F.c_rados_conf_parse_env rados_t_p nullPtr
connect :: Connection -> IO ()
connect (Connection rados_t_p) =
checkError_ "rados_connect" $ F.c_rados_connect rados_t_p
withIOContext :: Connection -> ByteString -> (IOContext -> IO a) -> IO a
withIOContext c pool = bracket (newIOContext c pool) cleanupIOContext
newIOContext :: Connection -> ByteString -> IO IOContext
newIOContext (Connection rados_t_p) bs =
B.useAsCString bs $ \cstr ->
alloca $ \ioctx_p_p -> do
checkError_ "rados_ioctx_create" $
F.c_rados_ioctx_create rados_t_p cstr ioctx_p_p
IOContext <$> peek ioctx_p_p
cleanupIOContext :: IOContext -> IO ()
cleanupIOContext (IOContext p) = F.c_rados_ioctx_destroy p
newCompletion :: IO Completion
newCompletion =
alloca $ \completion_p_p -> do
checkError_ "rados_aio_create_completion" $
F.c_rados_aio_create_completion nullPtr nullFunPtr nullFunPtr completion_p_p
Completion <$> (peek completion_p_p >>= newForeignPtr F.c_rados_aio_release)
waitForComplete :: Completion -> IO ()
waitForComplete (Completion rados_completion_t_fp) = void $
withForeignPtr rados_completion_t_fp F.c_rados_aio_wait_for_complete
waitForSafe :: Completion -> IO ()
waitForSafe (Completion rados_completion_t_fp) = void $
withForeignPtr rados_completion_t_fp F.c_rados_aio_wait_for_complete
getAsyncError :: Completion -> IO (Either RadosError Int)
getAsyncError (Completion rados_completion_t_fp) =
checkError' "rados_aio_get_return_value" $
withForeignPtr rados_completion_t_fp
F.c_rados_aio_get_return_value
asyncRead :: IOContext
-> Completion
-> ByteString
-> Word64
-> Word64
-> IO (Either RadosError ByteString)
asyncRead (IOContext ioctx_p) (Completion rados_completion_t_fp) oid len offset = do
c_buf <- mallocBytes (fromIntegral len)
B.useAsCString oid $ \c_oid -> do
let c_len = fromIntegral len
let c_offset = fromIntegral offset
result <- checkError' "rados_aio_read" $
withForeignPtr rados_completion_t_fp $ \cmp_p ->
F.c_rados_aio_read ioctx_p c_oid cmp_p c_buf c_len c_offset
case result of
Right _ ->
Right <$> B.unsafePackMallocCStringLen (c_buf, fromIntegral len)
Left e ->
return . Left $ e
asyncWrite
:: IOContext
-> Completion
-> ByteString
-> Word64
-> ByteString
-> IO (Either RadosError Int)
asyncWrite (IOContext ioctxt_p) (Completion rados_completion_t_fp)
oid offset bs =
B.useAsCString oid $ \c_oid ->
withForeignPtr rados_completion_t_fp $ \cmp_p ->
useAsCStringCSize bs $ \(c_buf, c_size) -> do
let c_offset = fromIntegral offset
checkError' "rados_aio_write" $ F.c_rados_aio_write
ioctxt_p c_oid cmp_p c_buf c_size c_offset
asyncWriteFull
:: IOContext
-> Completion
-> ByteString
-> ByteString
-> IO (Either RadosError Int)
asyncWriteFull (IOContext ioctxt_p) (Completion rados_completion_t_fp) oid bs =
B.useAsCString oid $ \c_oid ->
withForeignPtr rados_completion_t_fp $ \cmp_p ->
useAsCStringCSize bs $ \(c_buf, c_size) ->
checkError' "rados_aio_write_full" $
F.c_rados_aio_write_full
ioctxt_p c_oid cmp_p c_buf c_size
asyncAppend
:: IOContext
-> Completion
-> ByteString
-> ByteString
-> IO (Either RadosError Int)
asyncAppend (IOContext ioctxt_p) (Completion rados_completion_t_fp) oid bs =
B.useAsCString oid $ \c_oid ->
withForeignPtr rados_completion_t_fp $ \cmp_p ->
useAsCStringCSize bs $ \(c_buf, c_size) ->
checkError' "rados_aio_append" $ F.c_rados_aio_append
ioctxt_p c_oid cmp_p c_buf c_size
asyncRemove
:: IOContext
-> Completion
-> ByteString
-> IO (Either RadosError Int)
asyncRemove (IOContext ioctxt_p) (Completion rados_completion_t_fp) oid =
B.useAsCString oid $ \c_oid ->
withForeignPtr rados_completion_t_fp $ \cmp_p ->
checkError' "rados_aio_remove" $ F.c_rados_aio_remove
ioctxt_p c_oid cmp_p
asyncStat
:: IOContext
-> Completion
-> ByteString
-> IO (Either RadosError (ForeignPtr Word64, ForeignPtr CTime))
asyncStat (IOContext ioctxt_p) (Completion rados_completion_t_fp) oid =
B.useAsCString oid $ \c_oid -> do
size_fp <- mallocForeignPtr
time_fp <- mallocForeignPtr
withForeignPtr rados_completion_t_fp $ \cmp_p -> do
result <- withForeignPtr size_fp $ \p_size ->
withForeignPtr time_fp $ \p_time ->
checkError' "rados_aio_stat" $
F.c_rados_aio_stat ioctxt_p c_oid cmp_p p_size p_time
return $ either Left (const $ Right (size_fp, time_fp)) result
syncWrite
:: IOContext
-> ByteString
-> Word64
-> ByteString
-> IO (Maybe RadosError)
syncWrite (IOContext ioctxt_p) oid offset bs =
B.useAsCString oid $ \c_oid ->
useAsCStringCSize bs $ \(c_buf, c_size) -> do
let c_offset = fromIntegral offset
maybeError "rados_write" $ F.c_rados_write
ioctxt_p c_oid c_buf c_size c_offset
syncWriteFull
:: IOContext
-> ByteString
-> ByteString
-> IO (Maybe RadosError)
syncWriteFull (IOContext ioctxt_p) oid bs =
B.useAsCString oid $ \c_oid ->
useAsCStringCSize bs $ \(c_buf, len) ->
maybeError "rados_write_full" $ F.c_rados_write_full
ioctxt_p c_oid c_buf len
syncAppend
:: IOContext
-> ByteString
-> ByteString
-> IO (Maybe RadosError)
syncAppend (IOContext ioctxt_p) oid bs =
B.useAsCString oid $ \c_oid ->
useAsCStringCSize bs $ \(c_buf, c_size) ->
maybeError "rados_append" $ F.c_rados_append
ioctxt_p c_oid c_buf c_size
syncRead
:: IOContext
-> ByteString
-> Word64
-> Word64
-> IO (Either RadosError ByteString)
syncRead (IOContext ioctxt_p) oid len offset = do
c_buf <- mallocBytes (fromIntegral len)
B.useAsCString oid $ \c_oid -> do
let c_offset = fromIntegral offset
let c_len = fromIntegral len
result <- checkError' "rados_read" $
F.c_rados_read ioctxt_p c_oid c_buf c_len c_offset
case result of
Right read_bytes ->
Right <$> B.unsafePackMallocCStringLen (c_buf, read_bytes)
Left e ->
return . Left $ e
useAsCStringCSize :: ByteString -> ((CString, CSize) -> IO a) -> IO a
useAsCStringCSize bs f =
B.useAsCStringLen bs $ \(cstr, len) -> f (cstr, (CSize . fromIntegral) len)
syncRemove :: IOContext -> ByteString -> IO (Maybe RadosError)
syncRemove (IOContext ioctxt_p) oid =
B.useAsCString oid $ \c_oid ->
maybeError "rados_remove" $ F.c_rados_remove
ioctxt_p c_oid
syncStat :: IOContext -> ByteString -> IO (Either RadosError (Word64, EpochTime))
syncStat (IOContext ioctxt_p) oid =
B.useAsCString oid $ \c_oid ->
alloca $ \size_p ->
alloca $ \time_p -> do
result <- checkError' "rados_stat" $ F.c_rados_stat
ioctxt_p c_oid size_p time_p
case result of
Left e ->
return $ Left e
Right _ ->
Right <$> liftA2 (,) (peek size_p) (peek time_p)
combineLockFlags :: [F.LockFlag] -> F.LockFlag
combineLockFlags = F.LockFlag . foldr ((.|.) . F.unLockFlag) 0
timeValFromRealFrac :: RealFrac n => n -> F.TimeVal
timeValFromRealFrac n =
let (seconds, fractional) = properFraction n in
F.TimeVal seconds (floor $ 1000000 / fractional)
newCookie :: IO ByteString
newCookie = B.pack . toString <$> nextRandom
exclusiveLock
:: RealFrac duration
=> IOContext
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> Maybe duration
-> [F.LockFlag]
-> IO ()
exclusiveLock (IOContext ioctx_p) oid name cookie desc maybe_duration flags =
let flag = combineLockFlags flags in
B.useAsCString oid $ \c_oid ->
B.useAsCString name $ \c_name ->
B.useAsCString cookie $ \c_cookie ->
B.useAsCString desc $ \c_desc ->
case maybe_duration of
Nothing ->
checkErrorRetryBusy_ "c_rados_lock_exclusive" $
F.c_rados_lock_exclusive ioctx_p
c_oid
c_name
c_cookie
c_desc
nullPtr
flag
Just duration ->
alloca $ \timeval_p -> do
let timeval = timeValFromRealFrac duration
poke timeval_p timeval
checkErrorRetryBusy_ "c_rados_lock_exclusive" $
F.c_rados_lock_exclusive ioctx_p
c_oid
c_name
c_cookie
c_desc
timeval_p
flag
sharedLock
:: RealFrac duration
=> IOContext
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> Maybe duration
-> [F.LockFlag]
-> IO ()
sharedLock (IOContext ioctx_p) oid name cookie tag desc maybe_duration flags =
let flag = combineLockFlags flags in
B.useAsCString oid $ \c_oid ->
B.useAsCString name $ \c_name ->
B.useAsCString cookie $ \c_cookie ->
B.useAsCString tag $ \c_tag ->
B.useAsCString desc $ \c_desc ->
case maybe_duration of
Nothing ->
checkErrorRetryBusy_ "c_rados_lock_shared" $
F.c_rados_lock_shared ioctx_p
c_oid
c_name
c_cookie
c_tag
c_desc
nullPtr
flag
Just duration ->
alloca $ \timeval_p -> do
let timeval = timeValFromRealFrac duration
poke timeval_p timeval
checkErrorRetryBusy_ "c_rados_lock_shared" $
F.c_rados_lock_shared ioctx_p
c_oid
c_name
c_cookie
c_tag
c_desc
timeval_p
flag
unlock
:: IOContext
-> ByteString
-> ByteString
-> ByteString
-> IO (Maybe RadosError)
unlock (IOContext ioctx_p) oid name cookie =
B.useAsCString oid $ \c_oid ->
B.useAsCString name $ \c_name ->
B.useAsCString cookie $ \c_cookie ->
maybeError "c_rados_unlock" $
F.c_rados_unlock ioctx_p
c_oid
c_name
c_cookie
openList :: IOContext -> IO ListContext
openList (IOContext ioctx_p) =
alloca $ \list_p_p -> do
checkError_ "rados_objects_list_open" $
F.c_rados_objects_list_open ioctx_p list_p_p
ListContext <$> peek list_p_p
closeList :: ListContext -> IO ()
closeList (ListContext list_p) =
F.c_rados_objects_list_close list_p
withList :: IOContext -> (ListContext -> IO a) -> IO a
withList io_ctx = bracket (openList io_ctx) closeList
nextObject :: ListContext -> IO (Maybe ByteString)
nextObject (ListContext list_p) =
alloca $ \string_p -> do
me <- maybeError "c_rados_objects_list_next" $
F.c_rados_objects_list_next list_p string_p nullPtr
case me of
Just (NoEntity{}) -> return Nothing
Just e -> throwIO e
Nothing -> Just <$> (peek string_p >>= B.packCString)
objects :: IOContext -> IO ([ByteString])
objects ctx = do
os <- unsafeObjects ctx
length os `seq` return os
unsafeObjects :: IOContext -> IO ([ByteString])
unsafeObjects ctx = do
list_ctx <- openList ctx
go list_ctx `onException` closeList list_ctx
where
go list_ctx = do
next <- nextObject list_ctx
case next of
Nothing -> closeList list_ctx >> return []
Just n -> (n:) <$> unsafeInterleaveIO (go list_ctx)
#if defined(ATOMIC_WRITES)
newWriteOperation
:: IO WriteOperation
newWriteOperation =
WriteOperation <$> (F.c_rados_create_write_op >>=
newForeignPtr F.c_rados_release_write_op)
writeOperationAssertExists
:: WriteOperation
-> IO ()
writeOperationAssertExists (WriteOperation o) =
withForeignPtr o F.c_rados_write_op_assert_exists
writeOperationCompareXAttribute
:: WriteOperation
-> ByteString
-> F.ComparisonFlag
-> ByteString
-> IO ()
writeOperationCompareXAttribute (WriteOperation o) key comparison_flag value =
withForeignPtr o $ \ofp ->
B.useAsCString key $ \c_key ->
B.useAsCStringLen value $ \(c_val, c_val_len) -> do
F.c_rados_write_op_cmpxattr
ofp c_key comparison_flag c_val (fromIntegral c_val_len)
writeOperationSetXAttribute
:: WriteOperation
-> ByteString
-> ByteString
-> IO ()
writeOperationSetXAttribute (WriteOperation o) key value =
withForeignPtr o $ \ofp ->
B.useAsCString key $ \c_key ->
B.useAsCStringLen value $ \(c_val, c_val_len) -> do
F.c_rados_write_op_setxattr ofp c_key c_val (fromIntegral c_val_len)
writeOperationRemoveXAttribute
:: WriteOperation
-> ByteString
-> IO ()
writeOperationRemoveXAttribute (WriteOperation o) key =
withForeignPtr o $ \ofp ->
B.useAsCString key $ \c_key ->
F.c_rados_write_op_rmxattr ofp c_key
writeOperationCreate
:: WriteOperation
-> Bool
-> IO ()
writeOperationCreate (WriteOperation o) exclusive =
withForeignPtr o $ \ofp ->
let int_exclusive = if exclusive then 1 else 0 in
F.c_rados_write_op_create ofp int_exclusive nullPtr
writeOperationRemove
:: WriteOperation
-> IO ()
writeOperationRemove (WriteOperation o) =
withForeignPtr o $ \ofp ->
F.c_rados_write_op_remove ofp
writeOperationWrite
:: WriteOperation
-> ByteString
-> Word64
-> IO ()
writeOperationWrite (WriteOperation o) buffer offset =
withForeignPtr o $ \ofp ->
B.useAsCStringLen buffer $ \(c_buf, c_len) ->
F.c_rados_write_op_write ofp c_buf (fromIntegral c_len) offset
writeOperationWriteFull
:: WriteOperation
-> ByteString
-> IO ()
writeOperationWriteFull (WriteOperation o) buffer =
withForeignPtr o $ \ofp ->
B.useAsCStringLen buffer $ \(c_buf, c_len) ->
F.c_rados_write_op_write_full ofp c_buf (fromIntegral c_len)
writeOperationAppend
:: WriteOperation
-> ByteString
-> IO ()
writeOperationAppend (WriteOperation o) buffer =
withForeignPtr o $ \ofp ->
B.useAsCStringLen buffer $ \(c_buf, c_len) ->
F.c_rados_write_op_append ofp c_buf (fromIntegral c_len)
writeOperate
:: WriteOperation
-> IOContext
-> ByteString
-> IO (Maybe RadosError)
writeOperate (WriteOperation o) (IOContext ioctx_p) oid =
withForeignPtr o $ \ofp ->
B.useAsCString oid $ \c_oid->
maybeError "rados_write_op_operate" $
F.c_rados_write_op_operate ofp ioctx_p c_oid nullPtr
asyncWriteOperate
:: WriteOperation
-> IOContext
-> Completion
-> ByteString
-> IO (Either RadosError Int)
asyncWriteOperate (WriteOperation o) (IOContext ioctx_p)
(Completion rados_completion_t_fp) oid =
withForeignPtr o $ \ofp ->
withForeignPtr rados_completion_t_fp $ \cmp_p ->
B.useAsCString oid $ \c_oid->
checkError' "rados_aio_write_op_operate" $
F.c_rados_aio_write_op_operate ofp ioctx_p cmp_p c_oid nullPtr
#endif
missingOK :: Maybe RadosError -> IO ()
missingOK Nothing = return ()
missingOK (Just (NoEntity {})) = return ()
missingOK (Just e) = throwIO e