module Network.Consul (
createManagedSession
, deleteKey
, destroyManagedSession
, getKey
, getKeys
, getSelf
, getSessionInfo
, getSequencerForLock
, initializeConsulClient
, isValidSequencer
, listKeys
, putKey
, putKeyAcquireLock
, putKeyReleaseLock
, registerService
, withManagedSession
, withSequencer
, withSession
, Consistency(..)
, ConsulClient(..)
, Datacenter(..)
, KeyValue(..)
, KeyValuePut(..)
, ManagedSession(..)
, Session(..)
) where
import Control.Concurrent hiding (killThread)
import Control.Concurrent.Async.Lifted hiding (cancel)
import Control.Concurrent.Lifted (fork, killThread)
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Monad.Trans.Control
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Read as TR
import Data.Traversable
import Data.Word
import qualified Network.Consul.Internal as I
import Network.Consul.Types
import Network.HTTP.Client (defaultManagerSettings, newManager, Manager)
import Network.Socket (PortNumber)
import Prelude hiding (mapM)
initializeConsulClient :: MonadIO m => Text -> PortNumber -> Maybe Manager -> m ConsulClient
initializeConsulClient hostname port man = do
manager <- liftIO $ case man of
Just x -> return x
Nothing -> newManager defaultManagerSettings
return $ ConsulClient manager hostname port
getKey :: MonadIO m => ConsulClient -> Text -> Maybe Word64 -> Maybe Consistency -> Maybe Datacenter -> m (Maybe KeyValue)
getKey _client@ConsulClient{..} = I.getKey ccManager ccHostname ccPort
getKeys :: MonadIO m => ConsulClient -> Text -> Maybe Word64 -> Maybe Consistency -> Maybe Datacenter -> m [KeyValue]
getKeys _client@ConsulClient{..} = I.getKeys ccManager ccHostname ccPort
listKeys :: MonadIO m => ConsulClient -> Text -> Maybe Word64 -> Maybe Consistency -> Maybe Datacenter -> m [Text]
listKeys _client@ConsulClient{..} = I.listKeys ccManager ccHostname ccPort
putKey :: MonadIO m => ConsulClient -> KeyValuePut -> Maybe Datacenter -> m Bool
putKey _client@ConsulClient{..} = I.putKey ccManager ccHostname ccPort
putKeyAcquireLock :: MonadIO m => ConsulClient -> KeyValuePut -> Session -> Maybe Datacenter -> m Bool
putKeyAcquireLock _client@ConsulClient{..} = I.putKeyAcquireLock ccManager ccHostname ccPort
putKeyReleaseLock :: MonadIO m => ConsulClient -> KeyValuePut -> Session -> Maybe Datacenter -> m Bool
putKeyReleaseLock _client@ConsulClient{..} = I.putKeyReleaseLock ccManager ccHostname ccPort
deleteKey :: MonadIO m => ConsulClient -> Text -> Bool -> Maybe Datacenter -> m ()
deleteKey _client@ConsulClient{..} key = I.deleteKey ccManager ccHostname ccPort key
getSelf :: MonadIO m => ConsulClient -> m (Maybe Self)
getSelf _client@ConsulClient{..} = I.getSelf ccManager ccHostname ccPort
registerService :: MonadIO m => ConsulClient -> RegisterService -> Maybe Datacenter -> m Bool
registerService _client@ConsulClient{..} = I.registerService ccManager ccHostname ccPort
getSessionInfo :: MonadIO m => ConsulClient -> Text -> Maybe Datacenter -> m (Maybe [SessionInfo])
getSessionInfo _client@ConsulClient{..} = I.getSessionInfo ccManager ccHostname ccPort
withSession :: forall a m. (MonadIO m,MonadBaseControl IO m) => ConsulClient -> Session -> (Session -> m a) -> m a -> m a
withSession client session action lostAction = do
var <- liftIO $ newEmptyTMVarIO
tidVar <- liftIO $ newEmptyTMVarIO
stid <- fork $ runThread var tidVar
tid <- fork $ action session >>= \ x -> liftIO $ atomically $ putTMVar var x
liftIO $ atomically $ putTMVar tidVar tid
ret <- liftIO $ atomically $ takeTMVar var
killThread stid
return ret
where
runThread :: TMVar a -> TMVar ThreadId -> m ()
runThread var threadVar = do
liftIO $ threadDelay (10 * 1000000)
x <- getSessionInfo client (sId session) Nothing
case x of
Just [] -> cancel var threadVar
Nothing -> cancel var threadVar
Just _ -> runThread var threadVar
cancel :: TMVar a -> TMVar ThreadId -> m ()
cancel resultVar tidVar = do
tid <- liftIO $ atomically $ readTMVar tidVar
killThread tid
empty <- liftIO $ atomically $ isEmptyTMVar resultVar
if empty then do
result <- lostAction
liftIO $ atomically $ putTMVar resultVar result
return ()
else return ()
getSequencerForLock :: MonadIO m => ConsulClient -> Text -> Session -> Maybe Datacenter -> m (Maybe Sequencer)
getSequencerForLock client key session datacenter = do
kv <- getKey client key Nothing (Just Consistent) datacenter
case kv of
Just k -> do
let isValid = maybe False ((sId session) ==) $ kvSession k
if isValid then return $ Just $ Sequencer key (kvLockIndex k) session else return Nothing
Nothing -> return Nothing
isValidSequencer :: MonadIO m => ConsulClient -> Sequencer -> Maybe Datacenter -> m Bool
isValidSequencer client sequencer datacenter = do
mkv <- getKey client (sKey sequencer) Nothing (Just Consistent) datacenter
case mkv of
Just kv -> return $ (maybe False ((sId $ sSession sequencer) ==) $ kvSession kv) && (kvLockIndex kv) == (sLockIndex sequencer)
Nothing -> return False
withSequencer :: (MonadBaseControl IO m, MonadIO m) => ConsulClient -> Sequencer -> m a -> m a -> Int -> Maybe Datacenter -> m a
withSequencer client sequencer action lostAction delay dc = do
mainFunc <- async action
pulseFunc <- async pulseLock
waitAny [mainFunc, pulseFunc] >>= return . snd
where
pulseLock = do
liftIO $ threadDelay delay
valid <- isValidSequencer client sequencer dc
case valid of
True -> pulseLock
False -> lostAction
data ManagedSession = ManagedSession{
msSession :: Session,
msThreadId :: ThreadId
}
withManagedSession :: (MonadBaseControl IO m, MonadIO m) => ConsulClient -> Text -> (Session -> m ()) -> m () -> m ()
withManagedSession client ttl action lostAction = do
x <- createManagedSession client Nothing ttl
case x of
Just s -> withSession client (msSession s) action lostAction >> destroyManagedSession client s
Nothing -> lostAction
createManagedSession :: MonadIO m => ConsulClient -> Maybe Text -> Text -> m (Maybe ManagedSession)
createManagedSession _client@ConsulClient{..} name ttl = do
let r = SessionRequest Nothing name Nothing [] (Just Release) (Just ttl)
s <- I.createSession ccManager ccHostname ccPort r Nothing
mapM f s
where
f x = do
tid <- liftIO $ forkIO $ runThread x
return $ ManagedSession x tid
saneTtl = let Right (x,_) = TR.decimal $ T.filter (/= 's') ttl in x
runThread :: Session -> IO ()
runThread s = do
threadDelay $ (saneTtl (saneTtl 10)) * 1000000
x <- I.renewSession ccManager ccHostname ccPort s Nothing
case x of
True -> runThread s
False -> return ()
destroyManagedSession :: MonadIO m => ConsulClient -> ManagedSession -> m ()
destroyManagedSession _client@ConsulClient{..} (ManagedSession session tid) = do
liftIO $ killThread tid
I.destroySession ccManager ccHostname ccPort session Nothing