module Hbro.Socket where
import Hbro.Core hiding(getURI)
import Hbro.Util
import Hbro.Types
import Control.Concurrent
import Control.Monad hiding(mapM_)
import Data.ByteString.Char8 (pack, unpack)
import qualified Data.Map as M
import Prelude hiding(log, mapM_)
import System.FilePath
import System.Posix.Process
import System.Posix.Types
import System.ZMQ
open :: K ()
open = do
pid <- io getProcessID
socketURI <- with (mSocketDir . mConfig) $ resolve >=> (return . (socketFile pid))
mapK (void . forkIO) $ withK mContext $ \context -> do
logNormal $ "Opening socket at " ++ socketURI
mapK2 (withSocket context Rep) $ \sock -> do
io $ bind sock socketURI
readCommands sock
close :: K ()
close = getURI >>= \uri -> do
logVerbose . ("Closing socket " ++) . (++ " ...") $ uri
void . (`sendCommand` "QUIT") $ uri
readCommands :: Socket Rep -> K ()
readCommands sock = do
message <- io $ unpack `fmap` receive sock []
case words message of
[] -> io $ send sock (pack "ERROR Unknown command") []
["QUIT"] -> io $ do
logVerbose "Receiving QUIT command"
send sock (pack "OK") []
command:arguments -> withK (M.fromList . mCommands . mConfig) $ \commands -> do
logVerbose . ("Receiving command: " ++) $ message
case M.lookup command commands of
Just callback -> callback arguments >>= io . (send'' sock) . pack
_ -> io $ send sock (pack "ERROR Unknown command") []
readCommands sock
getURI :: K String
getURI = with (mSocketDir . mConfig) $ \dir -> do
dir' <- resolve dir
(`socketFile` dir') `fmap` getProcessID
socketFile :: ProcessID -> String -> String
socketFile pid socketDir = "ipc://" ++ socketDir </> "hbro." ++ show pid
sendCommand :: String -> String -> K String
sendCommand socketURI command = with mContext $ \context -> withSocket context Req $ \sock -> do
connect sock socketURI
send sock (pack command) []
unpack `fmap` receive sock []
sendCommandToAll :: String -> K [String]
sendCommandToAll command = withK (mSocketDir . mConfig) $ \dir -> do
dir' <- io $ resolve dir
(io getAllProcessIDs) >>= mapM ((`sendCommand` command) . (`socketFile` dir'))