module Database.Cassandra.Basic
(
CPool
, Server
, defServer
, defServers
, KeySpace
, createCassandraPool
, MonadCassandra (..)
, Cas (..)
, runCas
, getCol
, get
, getMulti
, insert
, delete
, Selector(..)
, Order(..)
, KeySelector(..)
, KeyRangeType(..)
, CassandraException(..)
, getTime
, throwing
, wrapException
, ColumnFamily
, Key
, ColumnName
, Value
, Column(..)
, col
, Row
, ConsistencyLevel(..)
, CKey (..)
, packLong
) where
import Control.Applicative
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Reader
import Data.ByteString.Lazy (ByteString)
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe (mapMaybe)
import qualified Database.Cassandra.Thrift.Cassandra_Client as C
import Database.Cassandra.Thrift.Cassandra_Types (ConsistencyLevel (..))
import qualified Database.Cassandra.Thrift.Cassandra_Types as T
import Prelude hiding (catch)
import Database.Cassandra.Pack
import Database.Cassandra.Pool
import Database.Cassandra.Types
class (MonadIO m) => MonadCassandra m where
getCassandraPool :: m CPool
withCassandraPool :: MonadCassandra m => (Cassandra -> IO b) -> m b
withCassandraPool f = do
p <- getCassandraPool
liftIO $ withResource p f
newtype Cas a = Cas { unCas :: ReaderT CPool IO a }
deriving (Functor,Applicative,Monad,MonadIO)
runCas :: Cas a -> CPool -> IO a
runCas f p = runReaderT (unCas f) p
instance MonadCassandra Cas where
getCassandraPool = Cas ask
getCol
:: (MonadCassandra m)
=> ColumnFamily
-> Key
-> ColumnName
-> ConsistencyLevel
-> m (Maybe Column)
getCol cf k cn cl = do
res <- get cf k (ColNames [cn]) cl
case res of
[] -> return Nothing
x:_ -> return $ Just x
get
:: (MonadCassandra m)
=> ColumnFamily
-> Key
-> Selector
-> ConsistencyLevel
-> m Row
get cf k s cl = withCassandraPool $ \ Cassandra{..} -> do
res <- wrapException $ C.get_slice (cProto, cProto) k cp (mkPredicate s) cl
throwing . return $ mapM castColumn res
where
cp = T.ColumnParent (Just cf) Nothing
getMulti
:: (MonadCassandra m)
=> ColumnFamily
-> KeySelector
-> Selector
-> ConsistencyLevel
-> m (Map ByteString Row)
getMulti cf ks s cl = withCassandraPool $ \ Cassandra{..} -> do
case ks of
Keys xs -> do
res <- wrapException $ C.multiget_slice (cProto, cProto) xs cp (mkPredicate s) cl
return $ M.mapMaybe f res
KeyRange {} -> do
res <- wrapException $
C.get_range_slices (cProto, cProto) cp (mkPredicate s) (mkKeyRange ks) cl
return $ collectKeySlices res
where
collectKeySlices :: [T.KeySlice] -> Map ByteString Row
collectKeySlices xs = M.fromList $ mapMaybe collectKeySlice xs
collectKeySlice (T.KeySlice (Just k) (Just xs)) =
case mapM castColumn xs of
Left _ -> Nothing
Right xs' -> Just (k, xs')
collectKeySlice _ = Nothing
cp = T.ColumnParent (Just cf) Nothing
f xs =
case mapM castColumn xs of
Left _ -> Nothing
Right xs' -> Just xs'
insert
:: (MonadCassandra m)
=> ColumnFamily
-> Key
-> ConsistencyLevel
-> Row
-> m ()
insert cf k cl row = withCassandraPool $ \ Cassandra{..} -> do
let insCol cp c = do
c' <- mkThriftCol c
C.insert (cProto, cProto) k cp c' cl
forM_ row $ \ c -> do
case c of
Column{} -> do
let cp = T.ColumnParent (Just cf) Nothing
insCol cp c
SuperColumn cn cols -> do
let cp = T.ColumnParent (Just cf) (Just cn)
mapM_ (insCol cp) cols
delete
:: (MonadCassandra m)
=> ColumnFamily
-> Key
-> Selector
-> ConsistencyLevel
-> m ()
delete cf k s cl = withCassandraPool $ \ Cassandra {..} -> do
now <- getTime
wrapException $ case s of
All -> C.remove (cProto, cProto) k cpAll now cl
ColNames cs -> forM_ cs $ \c -> do
C.remove (cProto, cProto) k (cpCol c) now cl
SupNames sn cs -> forM_ cs $ \c -> do
C.remove (cProto, cProto) k (cpSCol sn c) now cl
Range _ _ _ _ -> error "delete: Range delete not implemented"
where
cpAll = T.ColumnPath (Just cf) Nothing Nothing
cpCol name = T.ColumnPath (Just cf) Nothing (Just name)
cpSCol sc name = T.ColumnPath (Just cf) (Just sc) (Just name)
wrapException :: IO a -> IO a
wrapException a = f
where
f = a
`catch` (\ (T.NotFoundException) -> throw NotFoundException)
`catch` (\ (T.InvalidRequestException e) ->
throw . InvalidRequestException $ maybe "" id e)
`catch` (\ T.UnavailableException -> throw UnavailableException)
`catch` (\ T.TimedOutException -> throw TimedOutException)
`catch` (\ (T.AuthenticationException e) ->
throw . AuthenticationException $ maybe "" id e)
`catch` (\ (T.AuthorizationException e) ->
throw . AuthorizationException $ maybe "" id e)
`catch` (\ T.SchemaDisagreementException -> throw SchemaDisagreementException)
throwing :: IO (Either CassandraException a) -> IO a
throwing f = do
res <- f
case res of
Left e -> throw e
Right a -> return a