module Database.Cassandra.Pool
( CPool
, Server
, defServer
, defServers
, KeySpace
, Cassandra (..)
, createCassandraPool
, withResource
, openThrift
) where
import Control.Applicative ((<$>))
import Control.Arrow
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (SomeException, handle, onException)
import Control.Monad (forM_, forever, join, liftM2, unless, when)
import Data.ByteString (ByteString)
import Data.List (find, nub, partition)
import Data.Maybe
import Data.Pool
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import qualified Database.Cassandra.Thrift.Cassandra_Client as C
import qualified Database.Cassandra.Thrift.Cassandra_Types as C
import Network
import Prelude hiding (catch)
import System.IO (Handle(..), hClose)
import System.Mem.Weak (addFinalizer)
import Thrift.Protocol.Binary
import Thrift.Transport
import Thrift.Transport.Framed
import Thrift.Transport.Handle
type CPool = Pool Cassandra
type Server = (HostName, Int)
defServer :: Server
defServer = ("127.0.0.1", 9160)
defServers :: [Server]
defServers = [defServer]
type KeySpace = String
data Cassandra = Cassandra {
cHandle :: Handle
, cFramed :: FramedTransport Handle
, cProto :: BinaryProtocol (FramedTransport Handle)
}
createCassandraPool
:: [Server]
-> Int
-> Int
-> NominalDiffTime
-> KeySpace
-> IO CPool
createCassandraPool servers numStripes perStripe maxIdle ks = do
sring <- newTVarIO $ mkRing servers
pool <- createPool (cr 4 sring) dest numStripes maxIdle perStripe
return pool
where
cr :: Int -> ServerRing -> IO Cassandra
cr n sring = do
s@(host, p) <- atomically $ do
ring@Ring{..} <- readTVar sring
writeTVar sring (next ring)
return current
handle (handler n sring s) $ do
(h,ft,proto) <- openThrift host p
C.set_keyspace (proto, proto) ks
return $ Cassandra h ft proto
handler :: Int -> ServerRing -> Server -> SomeException -> IO Cassandra
handler 0 _ _ e = error $ "Can't connect to cassandra after several tries: " ++ show e
handler n sring server e = do
threadDelay 100000
cr (n1) sring
dest h = hClose $ cHandle h
openThrift host port = do
h <- hOpen (host, PortNumber (fromIntegral port))
ft <- openFramedTransport h
let p = BinaryProtocol ft
return (h, ft, p)
modifyServers :: TVar (Ring a) -> (Ring a -> Ring a) -> IO ()
modifyServers sring f = atomically $ do
ring@Ring{..} <- readTVar sring
writeTVar sring $ f ring
return ()
serverDiscoveryThread :: TVar (Ring Server)
-> String
-> Pool Cassandra
-> IO b
serverDiscoveryThread sring ks pool = forever $ do
withResource pool (updateServers sring ks)
threadDelay 60000000
updateServers :: TVar (Ring Server) -> String -> Cassandra -> IO ()
updateServers sring ks (Cassandra _ _ p) = do
ranges <- C.describe_ring (p,p) ks
let hosts = concat $ catMaybes $ map C.f_TokenRange_endpoints ranges
servers = nub $ map (\e -> first (const e) defServer) hosts
modifyServers sring (addNewServers servers)
type ServerRing = TVar (Ring Server)
data Ring a = Ring {
allItems :: Set a
, current :: !a
, used :: [a]
, upcoming :: [a]
}
mkRing [] = error "Can't make a ring from empty list"
mkRing all@(a:as) = Ring (S.fromList all) a [] as
next :: Ring a -> Ring a
next Ring{..}
| (n:rest) <- upcoming
= Ring allItems n (current : used) rest
next Ring{..}
| (n:rest) <- reverse (current : used)
= Ring allItems n [] rest
removeServer :: Ord a => a -> Ring a -> Ring a
removeServer s r@Ring{..}
| s `S.member` allItems = Ring all' cur' [] up'
| otherwise = r
where
all' = S.delete s allItems
cur' : up' = S.toList all'
addNewServers :: [Server] -> Ring Server -> Ring Server
addNewServers servers Ring{..} = Ring all' current' used' (new ++ upcoming')
where
all' = S.fromList servers
new = S.toList $ all' S.\\ allItems
used' = filter (`S.member` all') used
(current':upcoming') = filter (`S.member` all') (current:upcoming)