module System.Serf (
SerfM,
Serf(..),
MonadSerf(..),
serf,
serfAt,
serfWithOpts,
sendEvent,
sendEvent',
SendOptions(..),
forceLeave,
joinNodes,
joinNodes',
JoinOptions(..),
members,
MemberStatus(..),
LastKnownStatus(..),
LogLevel(..),
MonitorOptions(..)
) where
import Control.Applicative
import Control.Monad.Operational
import Control.Monad.Reader
import Data.Attoparsec.Text hiding (Done)
import Data.Char
import Data.Conduit.Attoparsec
import Data.Conduit
import Data.Conduit.Binary
import Data.Conduit.Internal (ConduitM(..), Pipe(Done))
import qualified Data.Conduit.List as L
import qualified Data.Conduit.Text as T
import Data.List (lookup)
import Data.Text (Text)
import System.Environment
import System.Exit
import System.IO (Handle, hClose)
import System.Process
data StartAgentOptions = StartAgentOptions
{ agentNodeName :: Maybe String
, agentRole :: Maybe String
, agentBindAddr :: Maybe String
, agentEncryptKey :: Maybe String
, agentLogLevel :: Maybe LogLevel
, agentProtocol :: Maybe Int
, agentRpcAddr :: Maybe String
, agentEventHandlers :: [String]
, agentStartJoin :: [String]
}
data MonitorOptions = MonitorOptions
{ monitorLogLevel :: Maybe LogLevel
}
data LogLevel
= Trace
| Debug
| Info
| Warn
| Error
logStr :: LogLevel -> String
logStr l = case l of
Trace -> "trace"
Debug -> "debug"
Info -> "info"
Warn -> "warn"
Error -> "err"
sendEvent :: String
-> Maybe String
-> SerfM Bool
sendEvent n p = sendEvent' (SendOptions Nothing) n p
sendEvent' :: SendOptions -> String -> Maybe String -> SerfM Bool
sendEvent' o n p = singleton $ SendEvent o n p
forceLeave :: String -> SerfM Bool
forceLeave = singleton . ForceLeave
joinNodes :: String
-> [String]
-> SerfM Bool
joinNodes n ns = joinNodes' (JoinOptions Nothing) n ns
joinNodes' :: JoinOptions -> String -> [String] -> SerfM Bool
joinNodes' o n ns = singleton $ JoinNodes o n ns
members :: SerfM [MemberStatus]
members = singleton Members
data Serf a where
SendEvent :: SendOptions -> String -> Maybe String -> Serf Bool
ForceLeave :: String -> Serf Bool
JoinNodes :: JoinOptions -> String -> [String] -> Serf Bool
Members :: Serf [MemberStatus]
type SerfM = Program Serf
class (Monad m) => MonadSerf m where
evalSerf :: SerfM a -> m a
serf :: SerfM a -> IO a
serf m = serfWithOpts [] m
serfAt :: String -> SerfM a -> IO a
serfAt str = serfWithOpts ["-rpc-addr=" ++ str]
serfWithOpts :: [String] -> SerfM a -> IO a
serfWithOpts globals m = case view m of
Return a -> return a
(SendEvent opts name mPayload) :>>= k -> do
b <- sendEventCore globals opts name mPayload
serf $ k b
(ForceLeave node) :>>= k -> do
b <- forceLeaveCore globals node
serf $ k b
(JoinNodes opts aNode rest) :>>= k -> do
b <- joinNodesCore globals opts aNode rest
serf $ k b
Members :>>= k -> do
b <- membersCore globals
serf $ k b
data MemberInfo = MemberInfo
{ memberName :: String
, memberAddress :: String
, memberRole :: String
}
data LastKnownStatus = Alive | Failed
deriving (Read, Show)
data MemberStatus = MemberStatus
{ memberStatusName :: Text
, memberStatusAddress :: Text
, memberStatus :: LastKnownStatus
} deriving (Read, Show)
data SendOptions = SendOptions
{ coalesceEvents :: Maybe Bool
}
bool :: Bool -> String
bool False = "false"
bool True = "true"
serfCmd :: String -> [String] -> [String] -> CreateProcess
serfCmd cmd rest opts = proc "serf" (cmd : (opts ++ rest))
toList = maybe [] (:[])
type WithSendOptions = SendOptions -> IO Bool
type ModifyDefaultSendOptions = (SendOptions -> SendOptions) -> IO Bool
sendEventCore :: [String] -> SendOptions -> String -> Maybe String -> IO Bool
sendEventCore globalOpts options name mPayload = do
(_, _, _, process) <- createProcess processSettings
exitCode <- waitForProcess process
return $! exitCode == ExitSuccess
where
payload = toList mPayload
processSettings = serfCmd "event" (optionArgs ++ [name] ++ payload) globalOpts
optionArgs = toList sendCoalesce
sendCoalesce = fmap (\o -> "-coalesce=" ++ bool o) $ coalesceEvents options
forceLeaveCore :: [String] -> String -> IO Bool
forceLeaveCore globalOpts node = do
(_, _, _, process) <- createProcess $ processSettings
exitCode <- waitForProcess process
return $! exitCode == ExitSuccess
where
processSettings = serfCmd "force-leave" [node] globalOpts
data JoinOptions = JoinOptions
{ _jsReplay :: Maybe Bool
}
joinNodesCore :: [String] -> JoinOptions -> String -> [String] -> IO Bool
joinNodesCore globalOpts opts aNode rest = do
(_, _, _, process) <- createProcess $ processSettings
exitCode <- waitForProcess process
return $! exitCode == ExitSuccess
where
processSettings = serfCmd "join" (aNode : rest) globalOpts
memberParser :: Parser MemberStatus
memberParser = do
n <- takeTill isSpace
skipSpace
a <- takeTill isSpace
skipSpace
s <- takeTill isSpace
skipSpace
return $ MemberStatus n a $ status s
where
status s = if s == "alive"
then Alive
else Failed
readMemberList :: Handle -> IO [MemberStatus]
readMemberList h = runResourceT (bracketP (return h) hClose source $$ L.consume)
where
source h = mapOutput snd (sourceHandle h $= T.decode T.utf8 $= conduitParser memberParser)
membersCore :: [String] -> IO [MemberStatus]
membersCore globalOpts = do
mh <- membersOutput globalOpts
case mh of
Nothing -> return []
Just h -> do
l <- readMemberList h
return l
membersOutput :: [String] -> IO (Maybe Handle)
membersOutput globalOpts = do
(_, (Just h), _, process) <- createProcess $ processSettings { std_out = CreatePipe }
exitCode <- waitForProcess process
if exitCode == ExitSuccess
then return $ Just h
else return Nothing
where
processSettings = serfCmd "members" [] globalOpts
monitorCore :: [String] -> MonitorOptions -> IO (Maybe (Source (ResourceT IO) Text))
monitorCore globalOpts opts = do
(_, (Just h), _, process) <- createProcess $ processSettings { std_out = CreatePipe }
exitCode <- waitForProcess process
if exitCode == ExitSuccess
then return $ Just undefined
else return Nothing
where
processSettings = serfCmd "monitor" (toList $ fmap logStr $ monitorLogLevel opts) globalOpts
src = do
(_, (Just h), _, process) <- createProcess $ processSettings { std_out = CreatePipe }
mExit <- getProcessExitCode process
if mExit == Nothing
then undefined
else undefined
linesUntilNewline :: Conduit Text (ResourceT IO) Text
linesUntilNewline = do
mLine <- await
liftIO $ print mLine
case mLine of
Nothing -> ConduitM $ Done ()
Just l -> if l == ""
then ConduitM $ Done ()
else yield l >> linesUntilNewline