module Holumbus.Distribution.DNode.Network
(
SocketServer
, getSocketServerName
, getSocketServerPort
, HandlerFunction
, startSocketServer
, stopSocketServer
, performUnsafeSendRequest
, performSafeSendRequest
, performMaybeSendRequest
, putByteStringMessage
, getByteStringMessage
)
where
import Prelude hiding ( catch )
import Control.Concurrent
import Control.Exception (
Exception
, IOException
, bracket
, catch
)
import qualified Data.ByteString.Lazy as B
import Data.Typeable
import Network
import qualified Network.Socket as Socket
import System.Log.Logger
import System.CPUTime
import System.IO
import System.Posix
import Text.Printf
import Holumbus.Common.Utils (handleAll)
localLogger :: String
localLogger = "Holumbus.Distribution.DNode.Network"
data SocketServerException = SocketServerException ThreadId
deriving (Typeable, Show)
instance Exception SocketServerException where
data SocketServer = SocketServer {
ss_ThreadId :: ! ThreadId
, ss_HostName :: ! HostName
, ss_PortNumber :: ! PortNumber
} deriving (Show)
getSocketServerName :: SocketServer -> HostName
getSocketServerName = ss_HostName
getSocketServerPort :: SocketServer -> PortNumber
getSocketServerPort = ss_PortNumber
type HandlerFunction a = Handle -> IO a
startSocketServer
:: HandlerFunction ()
-> Int
-> Int
-> IO (Maybe SocketServer)
startSocketServer f actPo maxPo
= do
s <- (getFirstSocket actPo maxPo)
case s of
Nothing ->
return Nothing
(Just (so, po)) ->
do
hn <- getHostName
tid <- forkIO $
do
handleAll
(\e ->
do
putStrLn $ "ERROR - socket closed with exception: " ++ show e
sClose so
) $
do
catch
(waitForRequests f so)
(handler so)
return (Just $ SocketServer tid hn po)
where
handler :: Socket -> SocketServerException -> IO ()
handler so (SocketServerException i)
= do
sClose so
putStrLn $ "socket normally closed by thread " ++ show i
stopSocketServer :: SocketServer -> IO ()
stopSocketServer ss
= do
let i = ss_ThreadId ss
me <- myThreadId
debugM localLogger $ "stopping socket server... with threadId: " ++ show i ++ " - form threadId: " ++ show me
throwTo i (SocketServerException me)
yield
getHostName :: IO (HostName)
getHostName
= do
(hn, _) <- Socket.getNameInfo [] True False (Socket.SockAddrUnix "localhost")
return (maybe "localhost" id hn)
getFirstSocket :: Int -> Int -> IO (Maybe (Socket, PortNumber))
getFirstSocket actPo maxPo
= do
if (actPo > maxPo)
then do
return Nothing
else do
handleAll (return (getFirstSocket (actPo+1) maxPo)) $
do
debugM localLogger $ "getFirstSocket: getting Socket for: " ++ show actPo
socket <- getSocket $ fromIntegral actPo
return (Just (socket, fromIntegral actPo))
getSocket :: PortNumber -> IO (Socket)
getSocket po =
withSocketsDo $ do
_ <- installHandler sigPIPE Ignore Nothing
socket <- listenOn (PortNumber po)
return socket
waitForRequests :: HandlerFunction () -> Socket -> IO ()
waitForRequests f socket =
do
(hdl,_,_) <- accept socket
_ <- forkIO $ processRequest f hdl
waitForRequests f socket
processRequest :: HandlerFunction () -> Handle -> IO ()
processRequest f conn =
bracket (return conn) (hClose) (processRequest')
where
processRequest' hdl =
do
hSetBuffering hdl $ BlockBuffering Nothing
t1 <- getCPUTime
debugM localLogger "starting to dispatch request"
handleAll (\e -> errorM localLogger $ "UnknownError: " ++ show e) $ f hdl
t2 <- getCPUTime
d <- return ((fromIntegral (t2 t1) / 1000000000000) :: Float)
ds <- return (printf "%.4f" d)
infoM localLogger ("request processed in " ++ ds ++ " sec")
sendRequest :: HandlerFunction a -> HostName -> PortNumber -> IO a
sendRequest f n p =
withSocketsDo $ do
_ <- installHandler sigPIPE Ignore Nothing
bracket (connectTo n (PortNumber p)) (hClose) (send)
where
send hdl
= do
hSetBuffering hdl $ BlockBuffering Nothing
f hdl
performUnsafeSendRequest :: HandlerFunction a -> HostName -> PortNumber -> IO a
performUnsafeSendRequest = sendRequest
performSafeSendRequest :: HandlerFunction a -> a -> HostName -> PortNumber -> IO a
performSafeSendRequest f d n p
= catch (sendRequest f n p)
(\(e ::IOException) ->
do
debugM localLogger $ show e
return d)
performMaybeSendRequest :: HandlerFunction a -> HostName -> PortNumber -> IO (Maybe a)
performMaybeSendRequest f n p
= catch (do
res <- sendRequest f n p
return (Just res))
(\(e ::IOException) ->
do
debugM localLogger $ show e
return Nothing)
putByteStringMessage :: B.ByteString -> Handle -> IO ()
putByteStringMessage msg hdl
= do
handleAll (\e -> do
errorM localLogger $ "putMessage: " ++ show e
errorM localLogger $ "message: " ++ show msg
) $ do
hPutStrLn hdl ((show $ B.length msg) ++ " ")
B.hPut hdl msg
hFlush hdl
getByteStringMessage :: Handle -> IO (B.ByteString)
getByteStringMessage hdl
= do
line <- hGetLine hdl
let pkg = words line
raw <- B.hGet hdl (read $ head pkg)
return raw