module Data.Conduit.Network
(
sourceSocket
, sinkSocket
, Application
, AppData
, appSource
, appSink
, appSockAddr
, appLocalAddr
, ServerSettings
, serverSettings
, serverPort
, serverHost
, serverAfterBind
, serverNeedLocalAddr
, runTCPServer
, runTCPServerWithHandle
, ConnectionHandle (..)
, ClientSettings
, clientSettings
, clientPort
, clientHost
, runTCPClient
, HostPreference (..)
, bindPort
, getSocket
, acceptSafe
) where
import Prelude hiding (catch)
import Data.Conduit
import qualified Network.Socket as NS
import Network.Socket (Socket)
import Network.Socket.ByteString (sendAll, recv)
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Exception (throwIO, SomeException, try, finally, bracket, IOException, catch)
import Control.Monad (forever)
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (forkIO, threadDelay, newEmptyMVar, putMVar, takeMVar)
import Data.Conduit.Network.Internal
import Data.Conduit.Network.Utils (HostPreference)
import qualified Data.Conduit.Network.Utils as Utils
#if defined(__GLASGOW_HASKELL__) && defined(mingw32_HOST_OS)
#define SOCKET_ACCEPT_RECV_WORKAROUND
#endif
safeRecv :: Socket -> Int -> IO S.ByteString
#ifndef SOCKET_ACCEPT_RECV_WORKAROUND
safeRecv = recv
#else
safeRecv s buf = do
var <- newEmptyMVar
forkIO $ recv s buf `catch` (\(_::IOException) -> return S.empty) >>= putMVar var
takeMVar var
#endif
sourceSocket :: MonadIO m => Socket -> Producer m ByteString
sourceSocket socket =
loop
where
loop = do
bs <- lift $ liftIO $ safeRecv socket 4096
if S.null bs
then return ()
else yield bs >> loop
sinkSocket :: MonadIO m => Socket -> Consumer ByteString m ()
sinkSocket socket =
loop
where
loop = await >>= maybe (return ()) (\bs -> lift (liftIO $ sendAll socket bs) >> loop)
type Application m = AppData m -> m ()
serverSettings :: Monad m
=> Int
-> HostPreference
-> ServerSettings m
serverSettings port host = ServerSettings
{ serverPort = port
, serverHost = host
, serverAfterBind = const $ return ()
, serverNeedLocalAddr = False
}
data ConnectionHandle m = ConnectionHandle { getHandle :: Socket -> NS.SockAddr -> Maybe NS.SockAddr -> m () }
runTCPServerWithHandle :: (MonadIO m, MonadBaseControl IO m) => ServerSettings m -> ConnectionHandle m -> m ()
runTCPServerWithHandle (ServerSettings port host afterBind needLocalAddr) handle = control $ \run -> bracket
(liftIO $ bindPort port host)
(liftIO . NS.sClose)
(\socket -> run $ do
afterBind socket
forever $ serve socket)
where
serve lsocket = do
(socket, addr) <- liftIO $ acceptSafe lsocket
mlocal <- if needLocalAddr
then fmap Just $ liftIO (NS.getSocketName socket)
else return Nothing
let
handler = getHandle handle
app' run = run (handler socket addr mlocal) >> return ()
appClose run = app' run `finally` NS.sClose socket
control $ \run -> forkIO (appClose run) >> run (return ())
runTCPServer :: (MonadIO m, MonadBaseControl IO m) => ServerSettings m -> Application m -> m ()
runTCPServer settings app = runTCPServerWithHandle settings (ConnectionHandle app')
where app' socket addr mlocal =
let ad = AppData
{ appSource = sourceSocket socket
, appSink = sinkSocket socket
, appSockAddr = addr
, appLocalAddr = mlocal
}
in
app ad
clientSettings :: Monad m
=> Int
-> ByteString
-> ClientSettings m
clientSettings port host = ClientSettings
{ clientPort = port
, clientHost = host
}
runTCPClient :: (MonadIO m, MonadBaseControl IO m) => ClientSettings m -> (AppData m -> m a) -> m a
runTCPClient (ClientSettings port host) app = control $ \run -> bracket
(getSocket host port)
(NS.sClose . fst)
(\(s, address) -> run $ app AppData
{ appSource = sourceSocket s
, appSink = sinkSocket s
, appSockAddr = address
, appLocalAddr = Nothing
})
getSocket :: ByteString -> Int -> IO (NS.Socket, NS.SockAddr)
getSocket host' port' = do
(sock, addr) <- Utils.getSocket (S8.unpack host') port' NS.Stream
ee <- try' $ NS.connect sock (NS.addrAddress addr)
case ee of
Left e -> NS.sClose sock >> throwIO e
Right () -> return (sock, NS.addrAddress addr)
where
try' :: IO a -> IO (Either SomeException a)
try' = try
bindPort :: Int -> HostPreference -> IO Socket
bindPort p s = do
sock <- Utils.bindPort p s NS.Stream
NS.listen sock (max 2048 NS.maxListenQueue)
return sock
acceptSafe :: Socket -> IO (Socket, NS.SockAddr)
acceptSafe socket =
#ifndef SOCKET_ACCEPT_RECV_WORKAROUND
loop
#else
do var <- newEmptyMVar
forkIO $ loop >>= putMVar var
takeMVar var
#endif
where
loop =
NS.accept socket `catch` \(_ :: IOException) -> do
threadDelay 1000000
loop