module Database.MongoDB.Connection (
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
ReplicaSet(..), Name,
MasterOrSlaveOk(..),
Server(..), connHost, replicaSet
) where
import Database.MongoDB.Internal.Protocol
import Data.Bson ((=:), at, UString)
import Control.Pipeline (Resource(..))
import Control.Applicative ((<$>))
import Control.Exception (assert)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.MVar
import Network (HostName, PortID(..), connectTo)
import Data.Bson (Document, look)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
import Control.Monad.Util (MonadIO', untilSuccess)
import Database.MongoDB.Internal.Util ()
import Var.Pool
import System.Random (newStdGen, randomRs)
import Data.List (delete, find, nub)
import System.IO.Unsafe (unsafePerformIO)
type Name = UString
adminCommand :: Document -> Request
adminCommand cmd = Query{..} where
qOptions = [SlaveOK]
qFullCollection = "admin.$cmd"
qSkip = 0
qBatchSize = 0
qSelector = cmd
qProjector = []
commandReply :: String -> Reply -> Document
commandReply title Reply{..} = if elem QueryError rResponseFlags
then error $ title ++ ": " ++ at "$err" (head rDocuments)
else head rDocuments
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
defaultPort = PortNumber 27017
host :: HostName -> Host
host hostname = Host hostname defaultPort
showHostPort :: Host -> String
showHostPort (Host hostname port) = hostname ++ ":" ++ (case port of
Service s -> s
PortNumber p -> show p
UnixSocket s -> s)
readHostPortM :: (Monad m) => String -> m Host
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
T.try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Host
readHostPort = runIdentity . readHostPortM
data ReplicaSet = ReplicaSet {setName :: Name, seedHosts :: [Host]} deriving (Show)
instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y
getReplicaInfo :: Pipe -> ErrorT IOError IO ReplicaInfo
getReplicaInfo pipe = do
promise <- call pipe [] (adminCommand ["ismaster" =: (1 :: Int)])
info <- commandReply "ismaster" <$> promise
_ <- look "hosts" info
_ <- look "primary" info
return info
type ReplicaInfo = Document
replicas :: ReplicaInfo -> [Host]
replicas = map readHostPort . at "hosts"
primary :: ReplicaInfo -> Host
primary = readHostPort . at "primary"
hosts :: ReplicaInfo -> [Host]
hosts info = master : delete master members where
members = replicas info
master = primary info
data MasterOrSlaveOk =
Master
| SlaveOk
deriving (Show, Eq)
type Pool' = Pool IOError
class Server t where
data ConnPool t
newConnPool :: (MonadIO' m) => Int -> t -> m (ConnPool t)
getPipe :: MasterOrSlaveOk -> ConnPool t -> ErrorT IOError IO Pipe
killPipes :: ConnPool t -> IO ()
instance Server Host where
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
newConnPool poolSize' host' = liftIO (newHostConnPool poolSize' host')
getPipe _ = getHostPipe
killPipes (HostConnPool _ pool) = killAll pool
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost
newHostConnPool :: Int -> Host -> IO (ConnPool Host)
newHostConnPool poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect host'
killResource = close
isExpired = isClosed
getHostPipe :: ConnPool Host -> ErrorT IOError IO Pipe
getHostPipe (HostConnPool _ pool) = aResource pool
tcpConnect :: Host -> ErrorT IOError IO Pipe
tcpConnect (Host hostname port) = ErrorT . E.try $ mkPipe =<< connectTo hostname port
instance Server ReplicaSet where
data ConnPool ReplicaSet = ReplicaSetConnPool {
repsetName :: Name,
currentMembers :: MVar [ConnPool Host] }
newConnPool poolSize' repset = liftIO (newSetConnPool poolSize' repset)
getPipe = getSetPipe
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)
instance Show (ConnPool ReplicaSet) where
show r = "ConnPool " ++ show (unsafePerformIO $ replicaSet r)
replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
newSetConnPool :: Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
newSetConnPool poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (newConnPool poolSize') (seedHosts repset)
return $ ReplicaSetConnPool (setName repset) currentMembers
getMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [Host]
getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections
refreshMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [ConnPool Host]
refreshMembers repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (connection n) =<< getMembers repsetName connections
where
connection n host' = maybe (newConnPool n host') return $ find ((host' ==) . connHost) connections
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> ErrorT IOError IO Pipe
getSetPipe mos ReplicaSetConnPool{..} = modifyMVar currentMembers $ \conns -> do
connections <- refreshMembers repsetName conns
pipe <- case mos of
Master -> getHostPipe (head connections)
SlaveOk -> do
let n = length connections 1
is <- take (max 1 n) . nub . randomRs (min 1 n, n) <$> liftIO newStdGen
untilSuccess (getHostPipe . (connections !!)) is
return (connections, pipe)