{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} -- | Provides service and node discovery for Cloud Haskell applications using -- a Zookeeper cluster for name registration, lookups and leader election. -- Uses the hzk bindings to the Zookeeper multi-threaded C library. -- -- Objectives and features: -- -- * Compatible with @distributed-process-p2p@ API - can work as a drop-in -- replacement. -- -- * No dependencies beyond those already included by distributed-process, hzk and network-transport-tcp. -- -- * Automatic registration of local names to Zookeeper. -- -- * Global singleton processes with leader election and re-elections on -- leader exit. module Control.Distributed.Process.Zookeeper ( -- * Initialization bootstrap , bootstrapWith , zkController , zkControllerWith -- * API , registerZK , getCapable , nsendCapable , registerCandidate , whereisGlobal -- * Compatibility , getPeers , nsendPeers -- * Config , Config(..) , defaultConfig -- * Utility , nolog , sayTrace , waitController ) where import Database.Zookeeper (AclList (..), CreateFlag (..), Event (..), Watcher, ZKError (..), Zookeeper) import qualified Database.Zookeeper as ZK import Control.Concurrent (myThreadId, newEmptyMVar, putMVar, putMVar, takeMVar, takeMVar, threadDelay) import Control.DeepSeq (deepseq, NFData(..)) import Control.Exception (bracket, throwIO, throwTo) import Control.Monad (forM, join, void) import Control.Monad.Except (ExceptT (..), lift) import Control.Monad.IO.Class (MonadIO) import Control.Monad.Trans.Except (runExceptT, throwE) import Data.Binary (Binary, decode, encode) import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL import Data.Foldable (forM_) import Data.List (isPrefixOf, sort) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) #if !MIN_VERSION_base(4,8,0) import Data.Monoid (mempty) import Control.Applicative ((<$>)) #endif import Data.Typeable (Typeable) import GHC.Generics (Generic) import Control.Distributed.Process hiding (bracket, proxy) import Control.Distributed.Process.Management import Control.Distributed.Process.Node (newLocalNode, runProcess) import Control.Distributed.Process.Serializable import Network (HostName) import Network.Socket (ServiceName) import Network.Transport (closeTransport) import Network.Transport.TCP (createTransport, defaultTCPParameters) data Command = Register !String !ProcessId !(SendPort (Either String ())) | GlobalCandidate !String !ProcessId !(SendPort (Either String ProcessId)) | CheckCandidate !String | ClearCache !String | GetRegistered !String !(SendPort [ProcessId]) | GetGlobal !String !(SendPort (Maybe ProcessId)) | Exit deriving (Show, Typeable, Generic) instance Binary Command instance NFData Command data Elect = Elect deriving (Typeable, Generic) instance Binary Elect instance NFData Elect data State = State { nodeCache :: Map String [ProcessId] -- service nodes to remove each pid from when it exits , monPids :: Map ProcessId ([String],[String]) , candidates :: Map String (String, ProcessId) , spid :: ProcessId , proxy :: Process () -> IO () , conn :: Zookeeper } instance Show State where show State{..} = show ("nodes", nodeCache, "monPids", monPids) data Config = Config { -- | Only register locally registered process names with zookeeper -- if the name begins with the given prefix. Default is "" which will -- register every locally registered process in the Zookeeper services -- node. registerPrefix :: String -- | An operation that will be called for trace level logging. -- 'defaultConfig' uses 'nolog'. , logTrace :: String -> Process () -- | An operation that will be called for error logging. -- 'defaultConfig' uses 'say'. , logError :: String -> Process () -- | The log level for the C Zookeper library. 'defaultConfig' uses -- 'ZK.ZLogWarn'. , zLogLevel :: ZK.ZLogLevel -- | The ACL to use for every node - see hzk documentation for -- 'ZK.AclList'. Note that if your nodes do not -- connect with the same identity, every node will need at least Read -- permission to all nodes created by this package. , acl :: ZK.AclList -- | Credentials for Zookeeper, see hzk 'ZK.addAuth' for details. , credentials :: Maybe (ZK.Scheme, ByteString) } -- | A no-op that can be used for either of the loggers in 'Config'. -- Because no actual I/O is performed, it fully evaluates the message so -- thunks do not build up. nolog :: String -> Process () nolog m = m `deepseq` return () -- | Simple formatter for trace output through 'say'. sayTrace :: String -> Process () sayTrace = say . ("[C.D.P.Zookeeper: TRACE] - " ++) -- | By default all local names are registered with zookeeper, and only -- error messages are logged through 'say'. -- -- > defaultConfig = Config { -- > registerPrefix = "" -- > , logTrace = nolog -- > , logError = say . ("[C.D.P.Zookeeper: ERROR] - " ++) -- > , zLogLevel = ZK.ZLogWarn -- > , acl = OpenAclUnsafe -- > , credentials = Nothing -- > } defaultConfig :: Config defaultConfig = Config { registerPrefix = "" , logTrace = nolog , logError = say . ("[C.D.P.Zookeeper: ERROR] - " ++) , zLogLevel = ZK.ZLogWarn , acl = OpenAclUnsafe , credentials = Nothing } -- |Register a name and pid as a service in Zookeeper. The controller -- will monitor the pid and remove its child node from Zookeeper when it -- exits. -- -- Names will be registered at "\/distributed-process\/services\/\\/\" -- -- Note: By default all locally registered names (using 'register') will be -- registered in Zookeeper under the same name by an MxAgent process. Use -- this function if you want to register an anonymous pid or use -- a different name than is registered with the local Process, or when -- you are using a 'registerPrefix' to exclude the automatic -- registration (see 'Config'). registerZK :: String -> ProcessId -> Process (Either String ()) registerZK name rpid = callZK $ Register name rpid -- | Get a list of nodes advertised in Zookeeper. These are registered -- when 'zkController' starts in path -- "\/distributed-process\/controllers\/\". -- -- Note: this is included for API compatibility with -- @distributed-process-p2p@ but its usage -- would suggest discovery patterns that could be made more efficient -- when using Zookeeper - i.e. just use 'getCapable'. getPeers :: Process [NodeId] getPeers = fmap processNodeId <$> callZK (GetRegistered controllersNode) -- | Returns list of pids registered with the service name. -- -- Results are cached by the controller until they are invalidated by -- subsequent changes to the service node in Zookeeper, which is -- communicated through a 'Watcher'. Data will be fetched from Zookeeper -- only when it is changed and then requested again. getCapable :: String -> Process [ProcessId] getCapable name = callZK (GetRegistered (servicesNode name)) -- | Broadcast a message to a specific service on all registered nodes. -- -- Note: this is included for API compatibility with @distributed-process-p2p@ but its usage -- would suggest discovery patterns that could be made more efficient -- when using Zookeeper - i.e. just use 'nfSendCapable' to -- nfSend a broadcast directly to the registered process on each node. nsendPeers :: Serializable a => String -> a -> Process () nsendPeers service msg = getPeers >>= mapM_ (\peer -> nsendRemote peer service msg) -- | Broadcast a message to all pids registered with a particular service -- name. nsendCapable :: Serializable a => String -> a -> Process () nsendCapable service msg = getCapable service >>= mapM_ (`send` msg) -- | Register a candidate process for election of a single process -- associated to the given global name, and returns the 'ProcessId' of the -- elected global (which may or may not be on the local node). The @Process ()@ argument is -- only evaluated if this node ends up being the elected host for the -- global. Calling this function subsequently on the same node for the same -- name will replace the current candidate computation with the new one. registerCandidate :: String -> Process () -> Process (Either String ProcessId) registerCandidate name proc = stage >>= callZK . GlobalCandidate name where stage = spawnLocal $ (expect :: Process Elect) >> proc -- | Find a registered global by name - see 'registerCandidate'. whereisGlobal :: String -> Process (Maybe ProcessId) whereisGlobal = callZK . GetGlobal -- | Run a Zookeeper service process, and installs an MXAgent to -- automatically register all local names in Zookeeper using -- 'defaultConfig'. -- -- > zkController = zkControllerWith defaultConfig zkController :: String -- ^ The Zookeeper endpoint(s) -- comma separated list of host:port -> Process () zkController = zkControllerWith defaultConfig -- | As 'zkController' but accept 'Config' options rather than assuming -- defaults. zkControllerWith :: Config -> String -- ^ The Zookeeper endpoint(s) -- comma separated list of host:port -> Process () zkControllerWith config@Config{..} keepers = do run <- spawnLinkedProxy waitInit <- liftIO newEmptyMVar liftIO $ ZK.setDebugLevel zLogLevel mthread <- liftIO myThreadId liftIO $ ZK.withZookeeper keepers 1000 (Just $ inits mthread waitInit) Nothing $ \rzh -> do init' <- takeMVar waitInit case init' of Left reason -> run $ do logError $ "Init failed: " ++ show reason die reason Right () -> run (server rzh config) where inits _ waitInit rzh SessionEvent ZK.ConnectedState _ = do esetup <- runExceptT $ eauth >> dosetup case esetup of Right _ -> putMVar waitInit (Right ()) Left reason -> putMVar waitInit (Left reason) where eauth = lift auth >>= hoistEither auth = case credentials of Nothing -> return (Right ()) Just (scheme, creds) -> do waitAuth <- newEmptyMVar ZK.addAuth rzh scheme creds $ \ res -> case res of Left reason -> let msg = "Authentication to Zookeeper failed: " ++ show reason in putMVar waitAuth (Left msg) Right () -> putMVar waitAuth (Right ()) takeMVar waitAuth dosetup = do esetup <- runExceptT $ do createAssert rzh "/" Nothing acl [] createAssert rzh rootNode Nothing acl [] createAssert rzh servicesNode Nothing acl [] createAssert rzh controllersNode Nothing acl [] createAssert rzh globalsNode Nothing acl [] createAssert rzh (globalsNode "blarg") Nothing acl [] case esetup of Right _ -> return () Left reason -> throwE $ "FATAL: could not create system nodes in Zookeeper: " ++ show reason inits zkthread _ _ SessionEvent ZK.ExpiredSessionState _ = --for some reason having a linked proxy die here was not getting it done throwTo zkthread $ userError "Zookeeper session expired." inits _ _ _ _ _ _ = return () server :: Zookeeper -> Config -> Process () server rzh config@Config{..} = do pid <- getSelfPid register controller pid watchRegistration config proxy' <- spawnLinkedProxy regself <- create rzh (controllersNode pretty pid) (pidBytes pid) acl [Ephemeral] case regself of Left reason -> let msg = "Could not register self with Zookeeper: " ++ show reason in logError msg >> die msg Right _ -> return () let loop st = let recvCmd = match $ \command -> case command of Exit -> return () _ -> do eresult <- runExceptT $ handle st command case eresult of Right st' -> do logTrace $ "State of: " ++ show st' ++ " - after - " ++ show command loop st' Left reason -> do logError $ "Error handling: " ++ show command ++ " : " ++ show reason loop st recvMon = match $ \(ProcessMonitorNotification _ dead _) -> reap st dead >>= loop in logTrace (show st) >> receiveWait [recvCmd, recvMon] void $ loop (State mempty mempty mempty pid proxy' rzh) where reap st@State{..} pid = do let (services, globals) = fromMaybe ([],[]) (Map.lookup pid monPids) forM_ services $ \name -> deleteNode (servicesNode name pretty pid) forM_ globals $ \name -> deleteNode (globalsNode name) return st{monPids = Map.delete pid monPids} where deleteNode node = do result <- liftIO $ ZK.delete rzh node Nothing case result of Left reason -> logError $ show reason ++ " - failed to delete " ++ node _ -> logTrace $ "Reaped " ++ node handle st@State{..} (Register name rpid reply) = do let node = servicesNode name pretty rpid createAssert rzh (servicesNode name) Nothing acl [] result <- create rzh node (pidBytes rpid) acl [Ephemeral] case result of Right _ -> lift $ do logTrace $ "Registered " ++ node nfSendChan reply (Right ()) void $ monitor rpid let apService (a',b') (a, b) = (a' ++ a, b' ++ b) return st{monPids = Map.insertWith apService rpid ([name],[]) monPids} Left reason -> lift $ do logError $ "Failed to register name: " ++ node ++ " - " ++ show reason nfSendChan reply (Left $ show reason) return st handle st@State{..} (ClearCache node) = return st{nodeCache = Map.delete node nodeCache} handle st@State{..} (GetRegistered node reply) = do epids <- case Map.lookup node nodeCache of Just pids -> return (Right pids) Nothing -> getChildPids rzh node (Just $ watchCache st node) lift $ case epids of Right pids -> do nfSendChan reply pids return st{nodeCache = Map.insert node pids nodeCache} Left reason -> do logError $ "Retrieval failed for node: " ++ node ++ " - " ++ show reason nfSendChan reply [] return st{nodeCache = Map.delete node nodeCache} handle st (GlobalCandidate n c r) = handleGlobalCandidate config st n c r handle st@State{..} (GetGlobal name reply) = let gname = globalsNode name in case Map.lookup gname nodeCache of Just (pid : _) -> lift $ do nfSendChan reply (Just pid) return st _ -> do elected <- getElected lift $ nfSendChan reply elected return $ maybe st (\pid -> st {nodeCache = Map.insert gname [pid] nodeCache}) elected where getElected = do children <- getGlobalIds conn name case children of [] -> return Nothing (first: _) -> Just <$> getPid conn (globalsNode name first) (Just $ watchCache st name) handle st@State{..} (CheckCandidate name) = case Map.lookup name candidates of Just (myid, staged) -> snd <$> mayElect st name myid staged Nothing -> return st handle st Exit = return st --satisfy exhaustiveness checker pretty pid = drop 6 (show pid) handleGlobalCandidate :: Config -> State -> String -> ProcessId -> SendPort (Either String ProcessId) -> ExceptT ZKError Process State handleGlobalCandidate Config{..} st@State{..} name proc reply | Just (myid, staged) <- Map.lookup name candidates = case Map.lookup (globalsNode name) nodeCache of Just (pid : _) -> lift $ do exit staged "New candidate staged." nfSendChan reply (Right pid) return st {candidates = Map.insert name (myid, proc) candidates} _ -> respondElect myid proc (Just staged) | otherwise = do myid <- registerGlobalId proc respondElect myid proc Nothing where respondElect myid staged mprev = lift $ do eresult <- runExceptT $ mayElect st name myid staged case eresult of Right (pid, st') -> do nfSendChan reply (Right pid) forM_ mprev (`exit` "New candidate staged.") return st' Left reason -> do nfSendChan reply (Left $ show reason) return st registerGlobalId staged = do let pname = globalsNode name createAssert conn pname Nothing acl [] node <- create conn (pname "") (pidBytes staged) acl [Ephemeral, Sequence] >>= hoistEither return $ extractId node where extractId s = trimId (reverse s) "" where trimId [] _ = error $ "end of string without delimiter / in " ++ s trimId ('/' : _) str = str trimId (n : rest) str = trimId rest (n : str) mayElect :: State -> String -> String -> ProcessId -> ExceptT ZKError Process (ProcessId, State) mayElect st@State{..} name myid staged = do others <- getGlobalIds conn name let first : _ = others if myid == first then do lift $ nfSend staged Elect st' <- lift $ cacheNMonitor staged return (staged, st') else do let prev = findPrev others watchFirst <- if prev == first then return $ Just watchPid else do void $ liftIO (ZK.exists conn (globalsNode name prev) (Just watchPid)) >>= hoistEither return Nothing pid <- getPid conn (globalsNode name first) watchFirst return (pid, stCache) where findPrev (prev : next : rest) = if next == myid then prev else findPrev (next : rest) findPrev _ = error "impossible: couldn't find myself in election" watchPid _ ZK.DeletedEvent ZK.ConnectedState _ = proxy $ nfSend spid (CheckCandidate name) watchPid _ _ _ _ = return () stCandidate = st{candidates = Map.insert name (myid, staged) candidates} stCache = stCandidate {nodeCache = Map.insert (globalsNode name) [staged] nodeCache} cacheNMonitor pid = do void $ monitor pid liftIO $ void $ ZK.exists conn (globalsNode name myid) (Just $ watchCache st (globalsNode name)) return stCacheMon where stCacheMon = let apGlobal (a', b') (a, b) = (a' ++ a, b' ++ b) in st{ candidates = Map.delete name candidates , monPids = Map.insertWith apGlobal pid ([],[name myid]) monPids , nodeCache = Map.insert (globalsNode name) [pid] nodeCache } getGlobalIds :: MonadIO m => Zookeeper -> String -> ExceptT ZKError m [String] getGlobalIds conn name = liftIO $ do echildren <- ZK.getChildren conn (globalsNode name) Nothing case echildren of Left NoNodeError -> return [] Left reason -> do putStrLn $ "Could not fetch globals for " ++ name ++ " - " ++ show reason return [] Right children -> return (sort children) getPid :: MonadIO m => Zookeeper -> String -> Maybe Watcher -> ExceptT ZKError m ProcessId getPid conn name watcher = do res <- liftIO $ ZK.get conn name watcher case res of Right (Just bs, _) -> return (decode $ BL.fromStrict bs) Right _ -> throwE NothingError Left reason -> throwE reason getChildPids :: MonadIO m => Zookeeper -> String -> Maybe Watcher -> m (Either ZKError [ProcessId]) getChildPids rzh node watcher = liftIO $ do enodes' <- ZK.getChildren rzh node watcher case enodes' of Left NoNodeError -> return $ Right [] _ -> runExceptT $ do children <- hoistEither enodes' forM children $ \child -> do eresult <- liftIO $ ZK.get rzh (node child) Nothing case eresult of Left reason -> throwE reason Right (Nothing, _) -> throwE NothingError Right (Just bs, _) -> return (decode $ BL.fromStrict bs) watchCache :: State -> String -> a -> b -> ZK.State -> c -> IO () watchCache State{..} node _ _ ZK.ConnectedState _ = proxy $ nfSend spid (ClearCache node) watchCache _ _ _ _ _ _ = return () watchRegistration :: Config -> Process () watchRegistration Config{..} = do let initState = [] :: [MxEvent] void $ mxAgent (MxAgentId "zookeeper:name:listener") initState [ mxSink $ \ev -> do let act = case ev of (MxRegistered _ "zookeeper:name:listener") -> return () (MxRegistered pid name') | prefixed name' -> liftMX $ do ereg <- registerZK name' pid case ereg of Left reason -> logError $ "Automatic registration failed for name: " ++ name' ++ " - " ++ reason _ -> return () | otherwise -> return () _ -> return () act >> mxReady ] liftIO $ threadDelay 10000 where prefixed = isPrefixOf registerPrefix -- | Wait for zkController to startup and register iteself. -- This is only useful if you are *not* using a 'bootstrap' -- function to start your node, but rather starting the node yourself -- and using one of the 'zkController' functions. waitController :: Int -> Process (Maybe ()) waitController timeout = do res <- whereis controller case res of Nothing -> do let timeleft = timeout - 10000 if timeleft <= 0 then return Nothing else do liftIO (threadDelay 10000) waitController timeleft Just _ -> return (Just ()) stopController :: Process () stopController = do res <- whereis controller case res of Nothing -> say "Could not find controller to stop it." Just pid -> nfSend pid Exit hoistEither :: Monad m => Either e a -> ExceptT e m a hoistEither = ExceptT . return () :: String -> String -> String l r = l ++ "/" ++ r servicesNode :: String servicesNode = rootNode "services" controllersNode :: String controllersNode = rootNode "controllers" globalsNode :: String globalsNode = rootNode "globals" rootNode :: String rootNode = "/distributed-process" createAssert :: MonadIO m => Zookeeper -> String -> Maybe BS.ByteString -> AclList -> [CreateFlag] -> ExceptT ZKError m () createAssert z n d a f = create z n d a f >>= eitherExists where eitherExists (Right _) = return () eitherExists (Left NodeExistsError) = return () eitherExists (Left reason) = throwE reason pidBytes :: ProcessId -> Maybe BS.ByteString pidBytes = Just . BL.toStrict . encode controller :: String controller = "zookeeper:controller" -- We should not have any big surprises due to serialization semantics -- because all messaging will always be local in this module - it will never be necessary -- to actually serialize. Still, we do want to -- preserve expectations about which process is doing the evaluation. nfSendChan :: (Binary a, Typeable a, NFData a) => SendPort a -> a -> Process () nfSendChan !port !msg = unsafeSendChan port (msg `deepseq` msg) nfSend :: (Binary a, Typeable a, NFData a) => ProcessId -> a -> Process () nfSend !pid !msg = unsafeSend pid (msg `deepseq` msg) callZK :: Serializable a => (SendPort a -> Command) -> Process a callZK command = do Just pid <- whereis controller (nfSendCh, replyCh) <- newChan link pid nfSend pid (command nfSendCh) result <- receiveChan replyCh unlink pid return result -- | Create a process runner that communicates -- through an MVar - so you can call process actions from IO -- -- This is linked bidirectionally - if either the proxy or parent exits the -- other will as well. spawnLinkedProxy :: Process (Process a -> IO a) spawnLinkedProxy = do action <- liftIO newEmptyMVar result <- liftIO newEmptyMVar self <- getSelfPid pid <- spawnLocal $ let loop = join (liftIO $ takeMVar action) >>= liftIO . putMVar result >> loop in link self >> loop link pid return (\f -> putMVar action f >> takeMVar result) create :: MonadIO m => Zookeeper -> String -> Maybe BS.ByteString -> AclList -> [CreateFlag] -> m (Either ZKError String) create z n d a = liftIO . ZK.create z n d a -- | Create a new Cloud Haskell node on the provided IP/Port and start -- a Zookeeper-backed controller process ('zkController') with a default configuration -- connected to the provided Zookeeper server list. -- Finally execute the supplied Process computation. -- -- > bootstrap = bootstrapWith defaultConfig bootstrap :: HostName -- ^ Hostname or IP this Cloud Haskell node will listen on. -> ServiceName -- ^ Port or port name this node will listen on. -> String -- ^ The Zookeeper endpoint(s) -- comma separated list of host:port -> RemoteTable -- ^ Cloud Haskell 'RemoteTable' to use in the new node. -> Process () -- ^ Process computation to run in the new node. -> IO () bootstrap = bootstrapWith defaultConfig -- | Create a new Cloud Haskell node on the provided IP/Port and start -- a Zookeeper-backed controller process ('zkController') connected to the provided -- Zookeeper server list and finally execute the supplied Process computation. bootstrapWith :: Config -- ^ controller configuration -> HostName -- ^ Hostname or IP this Cloud Haskell node will listen on. -> ServiceName -- ^ Port or port name this node will listen on. -> String -- ^ The Zookeeper endpoint(s) -- comma separated list of host:port -> RemoteTable -- ^ Cloud Haskell 'RemoteTable' to use in the new node. -> Process () -- ^ Process computation to run in the new node. -> IO () bootstrapWith config host port zservs rtable proc = bracket openTransport closeTransport exec where openTransport = do mtcp <- createTransport host port defaultTCPParameters case mtcp of Right tcp -> return tcp Left reason -> throwIO reason exec tcp = do node <- newLocalNode tcp rtable runProcess node $ do zkpid <- spawnLocal $ zkControllerWith config zservs link zkpid found <- waitController 100000 case found of Nothing -> die "Timeout waiting for Zookeeper controller to start." Just () -> proc stopController