module Holumbus.Network.Core
(
SocketId(..)
, startSocket
, sendRequest
, performUnsafeSendRequest
, performSafeSendRequest
, performMaybeSendRequest
, putMessage
, getMessage
, ThreadIdException(..)
)
where
import Prelude hiding ( catch )
import Control.Concurrent
import Control.Exception ( Exception
, IOException
, bracket
, catch
)
import Data.Binary
import qualified Data.ByteString.Lazy as B
import Data.Typeable
import Network
import qualified Network.Socket as Socket
import System.Log.Logger
import System.CPUTime
import System.IO
import System.Posix
import Text.Printf
import Text.XML.HXT.Arrow
import Holumbus.Common.Utils ( handleAll )
localLogger :: String
localLogger = "Holumbus.Network.Core"
type ServerDispatcher = SocketId -> Handle -> SocketId -> IO ()
data ThreadIdException = ThreadIdException ThreadId
deriving (Typeable, Show)
instance Exception ThreadIdException where
data SocketId = SocketId HostName PortNumber
deriving (Show, Eq)
instance Binary (SocketId) where
put (SocketId hn po)
= put hn >> (put . toInteger) po
get
= do
hn <- get
poInt <- get
return (SocketId hn (fromInteger poInt))
instance XmlPickler SocketId where
xpickle = xpSocketId
xpSocketId :: PU SocketId
xpSocketId
= xpElem "socket" $
xpWrap(\(hn, po) -> SocketId hn (fromInteger po), \(SocketId hn po) -> (hn, (toInteger po))) $
xpPair (xpAttr "hostname" xpText) (xpAttr "port" xpickle)
startSocket
:: ServerDispatcher
-> PortNumber
-> PortNumber
-> IO (Maybe (ThreadId, HostName, PortNumber))
startSocket f actPo maxPo
= do
s <- (getFirstSocket actPo maxPo)
case s of
Nothing ->
return Nothing
(Just (so, po)) ->
do
hn <- getHostName
tid <- forkIO $
do
handleAll
(\e ->
do
putStrLn $ "ERROR - socket closed with exception: " ++ show e
sClose so
) $
do
catch
(waitForRequests f so (SocketId hn po))
(handler so)
return (Just (tid, hn, po))
where
handler :: Socket -> ThreadIdException -> IO ()
handler so (ThreadIdException i)
= do
sClose so
putStrLn $ "socket normally closed by thread " ++ show i
getHostName :: IO (HostName)
getHostName
= do
(hn, _) <- Socket.getNameInfo [] True False (Socket.SockAddrUnix "localhost")
return (maybe "localhost" id hn)
getFirstSocket :: PortNumber -> PortNumber -> IO (Maybe (Socket, PortNumber))
getFirstSocket actPo maxPo
= do
let actI = toInteger actPo
maxI = toInteger maxPo
if (actI > maxI)
then do
return Nothing
else do
handleAll (return (getFirstSocket (actPo+1) maxPo)) $
do
debugM localLogger $ "getFirstSocket: getting Socket for: " ++ show actPo
socket <- getSocket (PortNumber actPo)
return (Just (socket, actPo))
getSocket :: PortID -> IO (Socket)
getSocket po =
withSocketsDo $ do
_ <- installHandler sigPIPE Ignore Nothing
socket <- listenOn po
return socket
waitForRequests :: ServerDispatcher -> Socket -> SocketId -> IO ()
waitForRequests f socket soid =
do
client <- accept socket
_ <- forkIO $ processRequest f soid client
waitForRequests f socket soid
processRequest :: ServerDispatcher -> SocketId -> (Handle, HostName, PortNumber) -> IO ()
processRequest f soid client =
bracket (return client) (\(hdl, _, _) -> hClose hdl) (\cl -> processRequest' cl)
where
processRequest' (hdl, hst, prt) =
do
hSetBuffering hdl NoBuffering
t1 <- getCPUTime
debugM localLogger "starting to dispatch request"
handleAll (\e -> errorM localLogger $ "UnknownError: " ++ show e) $ do
f soid hdl (SocketId hst prt)
t2 <- getCPUTime
d <- return ((fromIntegral (t2 t1) / 1000000000000) :: Float)
ds <- return (printf "%.4f" d)
infoM localLogger ("request processed in " ++ ds ++ " sec")
sendRequest :: (Handle -> IO a) -> HostName -> PortNumber -> IO a
sendRequest f n p =
withSocketsDo $ do
_ <- installHandler sigPIPE Ignore Nothing
bracket (connectTo n (PortNumber p)) (hClose) (send)
where
send hdl
= do
hSetBuffering hdl NoBuffering
f hdl
performUnsafeSendRequest :: (Handle -> IO a) -> HostName -> PortNumber -> IO a
performUnsafeSendRequest = sendRequest
performSafeSendRequest :: (Handle -> IO a) -> a -> HostName -> PortNumber -> IO a
performSafeSendRequest f d n p
= catch (sendRequest f n p)
(\(e ::IOException) ->
do
debugM localLogger $ show e
return d)
performMaybeSendRequest :: (Handle -> IO a) -> HostName -> PortNumber -> IO (Maybe a)
performMaybeSendRequest f n p
= catch (do
res <- sendRequest f n p
return (Just res))
(\(e ::IOException) ->
do
debugM localLogger $ show e
return Nothing)
putMessage :: B.ByteString -> Handle -> IO ()
putMessage msg hdl
= do
handleAll (\e -> do
errorM localLogger $ "putMessage: " ++ show e
errorM localLogger $ "message: " ++ show msg
) $ do
debugM "measure.putMessage" "1"
hPutStrLn hdl ((show $ B.length msg) ++ " ")
B.hPut hdl msg
getMessage :: Handle -> IO (B.ByteString)
getMessage hdl
= do
debugM "measure.getMessage" "1"
line <- hGetLine hdl
let pkg = words line
raw <- B.hGet hdl (read $ head pkg)
return raw