{-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
#else
{-# LANGUAGE DoRec #-}
#endif
module Database.MongoDB.Connection (
Secs,
Pipe, close, isClosed,
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
ReplicaSetName, openReplicaSet, openReplicaSet', openReplicaSetTLS, openReplicaSetTLS',
openReplicaSetSRV, openReplicaSetSRV', openReplicaSetSRV'', openReplicaSetSRV''',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete)
import Data.Maybe (fromJust)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad (forM_, guard)
import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
spaces, try, (<|>))
import qualified Data.List as List
import Control.Monad.Except (throwError)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
readMVar)
import Data.Bson (Document, at, (=:))
import Data.Text (Text)
import qualified Data.Bson as B
import qualified Data.Text as T
import Database.MongoDB.Internal.Network (Host(..), HostName, PortID(..), connectTo, lookupSeedList, lookupReplicaSetName)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand, retrieveServerData)
import qualified Database.MongoDB.Transport.Tls as TLS (connect)
adminCommand :: Command -> Pipe -> IO Document
adminCommand :: Document -> Pipe -> IO Document
adminCommand Document
cmd Pipe
pipe =
forall (m :: * -> *) e e' a.
(MonadIO m, Exception e, Exception e') =>
(e -> e') -> IO a -> m a
liftIOE Failure -> IOError
failureToIOError forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadIO m =>
Pipe -> AccessMode -> Database -> Action m a -> m a
access Pipe
pipe AccessMode
slaveOk Database
"admin" forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => Document -> Action m Document
runCommand Document
cmd
where
failureToIOError :: Failure -> IOError
failureToIOError (ConnectionFailure IOError
e) = IOError
e
failureToIOError Failure
e = [Char] -> IOError
userError forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> [Char]
show Failure
e
defaultPort :: PortID
defaultPort :: PortID
defaultPort = PortNumber -> PortID
PortNumber PortNumber
27017
host :: HostName -> Host
host :: [Char] -> Host
host [Char]
hostname = [Char] -> PortID -> Host
Host [Char]
hostname PortID
defaultPort
showHostPort :: Host -> String
showHostPort :: Host -> [Char]
showHostPort (Host [Char]
hostname (PortNumber PortNumber
port)) = [Char]
hostname forall a. [a] -> [a] -> [a]
++ [Char]
":" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show PortNumber
port
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
showHostPort (Host [Char]
_ (UnixSocket [Char]
path)) = [Char]
"unix:" forall a. [a] -> [a] -> [a]
++ [Char]
path
#endif
readHostPortM :: (MonadFail m) => String -> m Host
readHostPortM :: forall (m :: * -> *). MonadFail m => [Char] -> m Host
readHostPortM = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> [Char]
show) forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s t a.
Stream s Identity t =>
Parsec s () a -> [Char] -> s -> Either ParseError a
parse forall {u}. ParsecT [Char] u Identity Host
parser [Char]
"readHostPort" where
hostname :: ParsecT [Char] u Identity [Char]
hostname = forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m [a]
many1 (forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
letter forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
digit forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
'-' forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
'.' forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
'_')
parser :: ParsecT [Char] u Identity Host
parser = do
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m ()
spaces
[Char]
h <- forall {u}. ParsecT [Char] u Identity [Char]
hostname
forall tok st a. GenParser tok st a -> GenParser tok st a
try (forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m ()
spaces forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return ([Char] -> Host
host [Char]
h)) forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> do
Char
_ <- forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
':'
forall tok st a. GenParser tok st a -> GenParser tok st a
try ( do Int
port :: Int <- forall a. Read a => [Char] -> a
read forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m [a]
many1 forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
digit
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m ()
spaces forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [Char] -> PortID -> Host
Host [Char]
h (PortNumber -> PortID
PortNumber forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port))
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> do forall (f :: * -> *). Alternative f => Bool -> f ()
guard ([Char]
h forall a. Eq a => a -> a -> Bool
== [Char]
"unix")
[Char]
p <- forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m [a]
many1 forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
anyChar
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [Char] -> PortID -> Host
Host [Char]
"" ([Char] -> PortID
UnixSocket [Char]
p)
#endif
readHostPort :: String -> Host
readHostPort :: [Char] -> Host
readHostPort = forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadFail m => [Char] -> m Host
readHostPortM
type Secs = Double
globalConnectTimeout :: IORef Secs
globalConnectTimeout :: IORef Secs
globalConnectTimeout = forall a. IO a -> a
unsafePerformIO (forall a. a -> IO (IORef a)
newIORef Secs
6)
{-# NOINLINE globalConnectTimeout #-}
connect :: Host -> IO Pipe
connect :: Host -> IO Pipe
connect Host
h = forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip Secs -> Host -> IO Pipe
connect' Host
h
connect' :: Secs -> Host -> IO Pipe
connect' :: Secs -> Host -> IO Pipe
connect' Secs
timeoutSecs (Host [Char]
hostname PortID
port) = do
Maybe Handle
mh <- forall a. Int -> IO a -> IO (Maybe a)
timeout (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Secs
timeoutSecs forall a. Num a => a -> a -> a
* Secs
1000000) ([Char] -> PortID -> IO Handle
connectTo [Char]
hostname PortID
port)
Handle
handle <- forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a. IOError -> IO a
ioError forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connect timed out") forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Handle
mh
rec
Pipe
p <- ServerData -> Handle -> IO Pipe
newPipe ServerData
sd Handle
handle
ServerData
sd <- forall (m :: * -> *) a.
MonadIO m =>
Pipe -> AccessMode -> Database -> Action m a -> m a
access Pipe
p AccessMode
slaveOk Database
"admin" forall (m :: * -> *). MonadIO m => Action m ServerData
retrieveServerData
forall (m :: * -> *) a. Monad m => a -> m a
return Pipe
p
type ReplicaSetName = Text
data TransportSecurity = Secure | Unsecure
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs TransportSecurity
replSetName :: ReplicaSet -> Text
replSetName :: ReplicaSet -> Database
replSetName (ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
_ Secs
_ TransportSecurity
_) = Database
rsName
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet :: (Database, [Host]) -> IO ReplicaSet
openReplicaSet (Database, [Host])
rsSeed = forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSet' (Database, [Host])
rsSeed
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet' :: Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSet' Secs
timeoutSecs (Database
rs, [Host]
hosts) = Secs -> (Database, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet Secs
timeoutSecs (Database
rs, [Host]
hosts, TransportSecurity
Unsecure)
openReplicaSetTLS :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSetTLS :: (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS (Database, [Host])
rsSeed = forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS' (Database, [Host])
rsSeed
openReplicaSetTLS' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSetTLS' :: Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS' Secs
timeoutSecs (Database
rs, [Host]
hosts) = Secs -> (Database, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet Secs
timeoutSecs (Database
rs, [Host]
hosts, TransportSecurity
Secure)
_openReplicaSet :: Secs -> (ReplicaSetName, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet :: Secs -> (Database, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet Secs
timeoutSecs (Database
rsName, [Host]
seedList, TransportSecurity
transportSecurity) = do
MVar [(Host, Maybe Pipe)]
vMembers <- forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar (forall a b. (a -> b) -> [a] -> [b]
map (, forall a. Maybe a
Nothing) [Host]
seedList)
let rs :: ReplicaSet
rs = Database
-> MVar [(Host, Maybe Pipe)]
-> Secs
-> TransportSecurity
-> ReplicaSet
ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
vMembers Secs
timeoutSecs TransportSecurity
transportSecurity
ReplicaInfo
_ <- ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
forall (m :: * -> *) a. Monad m => a -> m a
return ReplicaSet
rs
openReplicaSetSRV :: HostName -> IO ReplicaSet
openReplicaSetSRV :: [Char] -> IO ReplicaSet
openReplicaSetSRV [Char]
hostname = do
Secs
timeoutSecs <- forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout
Secs -> TransportSecurity -> [Char] -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Unsecure [Char]
hostname
openReplicaSetSRV' :: HostName -> IO ReplicaSet
openReplicaSetSRV' :: [Char] -> IO ReplicaSet
openReplicaSetSRV' [Char]
hostname = do
Secs
timeoutSecs <- forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout
Secs -> TransportSecurity -> [Char] -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Secure [Char]
hostname
openReplicaSetSRV'' :: Secs -> HostName -> IO ReplicaSet
openReplicaSetSRV'' :: Secs -> [Char] -> IO ReplicaSet
openReplicaSetSRV'' Secs
timeoutSecs = Secs -> TransportSecurity -> [Char] -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Unsecure
openReplicaSetSRV''' :: Secs -> HostName -> IO ReplicaSet
openReplicaSetSRV''' :: Secs -> [Char] -> IO ReplicaSet
openReplicaSetSRV''' Secs
timeoutSecs = Secs -> TransportSecurity -> [Char] -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Secure
_openReplicaSetSRV :: Secs -> TransportSecurity -> HostName -> IO ReplicaSet
_openReplicaSetSRV :: Secs -> TransportSecurity -> [Char] -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
transportSecurity [Char]
hostname = do
Maybe Database
replicaSetName <- [Char] -> IO (Maybe Database)
lookupReplicaSetName [Char]
hostname
[Host]
hosts <- [Char] -> IO [Host]
lookupSeedList [Char]
hostname
case (Maybe Database
replicaSetName, [Host]
hosts) of
(Maybe Database
Nothing, [Host]
_) -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"Failed to lookup replica set name"
(Maybe Database
_, []) -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"Failed to lookup replica set seedlist"
(Just Database
rsName, [Host]
_) ->
case TransportSecurity
transportSecurity of
TransportSecurity
Secure -> Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS' Secs
timeoutSecs (Database
rsName, [Host]
hosts)
TransportSecurity
Unsecure -> Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSet' Secs
timeoutSecs (Database
rsName, [Host]
hosts)
closeReplicaSet :: ReplicaSet -> IO ()
closeReplicaSet :: ReplicaSet -> IO ()
closeReplicaSet (ReplicaSet Database
_ MVar [(Host, Maybe Pipe)]
vMembers Secs
_ TransportSecurity
_) = forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar [(Host, Maybe Pipe)]
vMembers forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) Pipe -> IO ()
close forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd)
primary :: ReplicaSet -> IO Pipe
primary :: ReplicaSet -> IO Pipe
primary rs :: ReplicaSet
rs@(ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
_ Secs
_ TransportSecurity
_) = do
Maybe Host
mHost <- ReplicaInfo -> Maybe Host
statedPrimary forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
case Maybe Host
mHost of
Just Host
host' -> ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs forall a. Maybe a
Nothing Host
host'
Maybe Host
Nothing -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError forall a b. (a -> b) -> a -> b
$ [Char]
"replica set " forall a. [a] -> [a] -> [a]
++ Database -> [Char]
T.unpack Database
rsName forall a. [a] -> [a] -> [a]
++ [Char]
" has no primary"
secondaryOk :: ReplicaSet -> IO Pipe
secondaryOk :: ReplicaSet -> IO Pipe
secondaryOk ReplicaSet
rs = do
ReplicaInfo
info <- ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
[Host]
hosts <- forall a. [a] -> IO [a]
shuffle (ReplicaInfo -> [Host]
possibleHosts ReplicaInfo
info)
let hosts' :: [Host]
hosts' = forall b a. b -> (a -> b) -> Maybe a -> b
maybe [Host]
hosts (\Host
p -> forall a. Eq a => a -> [a] -> [a]
delete Host
p [Host]
hosts forall a. [a] -> [a] -> [a]
++ [Host
p]) (ReplicaInfo -> Maybe Host
statedPrimary ReplicaInfo
info)
forall e (m :: * -> *) a b.
MonadError e m =>
(a -> m b) -> [a] -> m b
untilSuccess (ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs forall a. Maybe a
Nothing) [Host]
hosts'
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO Pipe
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering)
-> ReplicaSet -> IO Pipe
routedHost (Host, Bool) -> (Host, Bool) -> IO Ordering
f ReplicaSet
rs = do
ReplicaInfo
info <- ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
[Host]
hosts <- forall a. [a] -> IO [a]
shuffle (ReplicaInfo -> [Host]
possibleHosts ReplicaInfo
info)
let addIsPrimary :: Host -> (Host, Bool)
addIsPrimary Host
h = (Host
h, forall a. a -> Maybe a
Just Host
h forall a. Eq a => a -> a -> Bool
== ReplicaInfo -> Maybe Host
statedPrimary ReplicaInfo
info)
[Host]
hosts' <- forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> [a] -> m [a]
mergesortM (\Host
a Host
b -> (Host, Bool) -> (Host, Bool) -> IO Ordering
f (Host -> (Host, Bool)
addIsPrimary Host
a) (Host -> (Host, Bool)
addIsPrimary Host
b)) [Host]
hosts
forall e (m :: * -> *) a b.
MonadError e m =>
(a -> m b) -> [a] -> m b
untilSuccess (ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs forall a. Maybe a
Nothing) [Host]
hosts'
type ReplicaInfo = (Host, Document)
statedPrimary :: ReplicaInfo -> Maybe Host
statedPrimary :: ReplicaInfo -> Maybe Host
statedPrimary (Host
host', Document
info) = if (forall v. Val v => Database -> Document -> v
at Database
"ismaster" Document
info) then forall a. a -> Maybe a
Just Host
host' else [Char] -> Host
readHostPort forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall v (m :: * -> *).
(Val v, MonadFail m) =>
Database -> Document -> m v
B.lookup Database
"primary" Document
info
possibleHosts :: ReplicaInfo -> [Host]
possibleHosts :: ReplicaInfo -> [Host]
possibleHosts (Host
_, Document
info) = forall a b. (a -> b) -> [a] -> [b]
map [Char] -> Host
readHostPort forall a b. (a -> b) -> a -> b
$ forall v. Val v => Database -> Document -> v
at Database
"hosts" Document
info
updateMembers :: ReplicaSet -> IO ReplicaInfo
updateMembers :: ReplicaSet -> IO ReplicaInfo
updateMembers rs :: ReplicaSet
rs@(ReplicaSet Database
_ MVar [(Host, Maybe Pipe)]
vMembers Secs
_ TransportSecurity
_) = do
(Host
host', Document
info) <- forall e (m :: * -> *) a b.
MonadError e m =>
(a -> m b) -> [a] -> m b
untilSuccess (ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo ReplicaSet
rs) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar [(Host, Maybe Pipe)]
vMembers
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m (a, b)) -> m b
modifyMVar MVar [(Host, Maybe Pipe)]
vMembers forall a b. (a -> b) -> a -> b
$ \[(Host, Maybe Pipe)]
members -> do
let (([(Host, Maybe Pipe)]
members', [(Host, Maybe Pipe)]
old), [Host]
new) = forall k v. Eq k => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection (forall a b. (a -> b) -> [a] -> [b]
map [Char] -> Host
readHostPort forall a b. (a -> b) -> a -> b
$ forall v. Val v => Database -> Document -> v
at Database
"hosts" Document
info) [(Host, Maybe Pipe)]
members
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Host, Maybe Pipe)]
old forall a b. (a -> b) -> a -> b
$ \(Host
_, Maybe Pipe
mPipe) -> forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) Pipe -> IO ()
close Maybe Pipe
mPipe
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Host, Maybe Pipe)]
members' forall a. [a] -> [a] -> [a]
++ forall a b. (a -> b) -> [a] -> [b]
map (, forall a. Maybe a
Nothing) [Host]
new, (Host
host', Document
info))
where
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection :: forall k v. Eq k => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection [k]
keys [(k, v)]
assocs = (forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem [k]
inKeys forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst) [(k, v)]
assocs, [k]
keys forall a. Eq a => [a] -> [a] -> [a]
\\ [k]
inKeys) where
assocKeys :: [k]
assocKeys = forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(k, v)]
assocs
inKeys :: [k]
inKeys = forall a. Eq a => [a] -> [a] -> [a]
intersect [k]
keys [k]
assocKeys
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo rs :: ReplicaSet
rs@(ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
_ Secs
_ TransportSecurity
_) (Host
host', Maybe Pipe
mPipe) = do
Pipe
pipe <- ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs Maybe Pipe
mPipe Host
host'
Document
info <- Document -> Pipe -> IO Document
adminCommand [Database
"isMaster" forall v. Val v => Database -> v -> Field
=: (Int
1 :: Int)] Pipe
pipe
case forall v (m :: * -> *).
(Val v, MonadFail m) =>
Database -> Document -> m v
B.lookup Database
"setName" Document
info of
Maybe Database
Nothing -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> [Char]
show Host
host' forall a. [a] -> [a] -> [a]
++ [Char]
" not a member of any replica set, including " forall a. [a] -> [a] -> [a]
++ Database -> [Char]
T.unpack Database
rsName forall a. [a] -> [a] -> [a]
++ [Char]
": " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Document
info
Just Database
setName | Database
setName forall a. Eq a => a -> a -> Bool
/= Database
rsName -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> [Char]
show Host
host' forall a. [a] -> [a] -> [a]
++ [Char]
" not a member of replica set " forall a. [a] -> [a] -> [a]
++ Database -> [Char]
T.unpack Database
rsName forall a. [a] -> [a] -> [a]
++ [Char]
": " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Document
info
Just Database
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return (Host
host', Document
info)
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection (ReplicaSet Database
_ MVar [(Host, Maybe Pipe)]
vMembers Secs
timeoutSecs TransportSecurity
transportSecurity) Maybe Pipe
mPipe Host
host' =
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO Pipe
conn (\Pipe
p -> Pipe -> IO Bool
isClosed Pipe
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
bad -> if Bool
bad then IO Pipe
conn else forall (m :: * -> *) a. Monad m => a -> m a
return Pipe
p) Maybe Pipe
mPipe
where
conn :: IO Pipe
conn = forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m (a, b)) -> m b
modifyMVar MVar [(Host, Maybe Pipe)]
vMembers forall a b. (a -> b) -> a -> b
$ \[(Host, Maybe Pipe)]
members -> do
let (Host [Char]
h PortID
p) = Host
host'
let conn' :: IO Pipe
conn' = case TransportSecurity
transportSecurity of
TransportSecurity
Secure -> [Char] -> PortID -> IO Pipe
TLS.connect [Char]
h PortID
p
TransportSecurity
Unsecure -> Secs -> Host -> IO Pipe
connect' Secs
timeoutSecs Host
host'
let new :: IO ([(Host, Maybe Pipe)], Pipe)
new = IO Pipe
conn' forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Pipe
pipe -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall k v. Eq k => k -> v -> [(k, v)] -> [(k, v)]
updateAssocs Host
host' (forall a. a -> Maybe a
Just Pipe
pipe) [(Host, Maybe Pipe)]
members, Pipe
pipe)
case forall a b. Eq a => a -> [(a, b)] -> Maybe b
List.lookup Host
host' [(Host, Maybe Pipe)]
members of
Just (Just Pipe
pipe) -> Pipe -> IO Bool
isClosed Pipe
pipe forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
bad -> if Bool
bad then IO ([(Host, Maybe Pipe)], Pipe)
new else forall (m :: * -> *) a. Monad m => a -> m a
return ([(Host, Maybe Pipe)]
members, Pipe
pipe)
Maybe (Maybe Pipe)
_ -> IO ([(Host, Maybe Pipe)], Pipe)
new