{-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE TupleSections #-} -- | -- Module: $HEADER$ -- Description: Connection pools for various transport protocols. -- Copyright: (c) 2014 Peter Trsko -- License: BSD3 -- -- Maintainer: peter.trsko@gmail.com -- Stability: unstable -- Portability: non-portable (CPP, FlexibleContexts, NoImplicitPrelude, -- TupleSections) -- -- Connection pools for TCP clients and UNIX Socket clients (not supported on -- Windows). -- -- This package is built on top of -- and -- -- packages. The later allows us to use -- package -- for implementing TCP and UNIX Sockets clients. Package /conduit-extra/ -- defines @appSource@ and @appSink@ based on abstractions from -- /streaming-commons/ package and they can be therefore reused. Difference -- between using /conduit-extra/ or /streaming-commons/ is that instead of -- using @runTCPClient@ (or its lifted variant @runGeneralTCPClient@ from -- /conduit-extra/) one would use 'withTcpClientConnection', and instead of -- @runUnixClient@ it would be 'withUnixClientConnection'. module Data.ConnectionPool ( -- * TCP Client Example -- -- $tcpClientExample #ifndef WINDOWS -- Windows doesn't support UNIX Sockets. -- * Unix Client Example -- -- $unixClientExample #endif -- !WINDOWS -- * Connection Pool -- -- $connectionPool ConnectionPool -- * Constructing Connection Pool -- -- $constructingConnectionPool , ResourcePoolParams , numberOfResourcesPerStripe , numberOfStripes , resourceIdleTimeout -- * TCP Client Connection Pool , TcpClient , ClientSettings , AppData , createTcpClientPool , withTcpClientConnection #ifndef WINDOWS -- Windows doesn't support UNIX Sockets. -- * UNIX Client Connection Pool , UnixClient , ClientSettingsUnix , AppDataUnix , createUnixClientPool , withUnixClientConnection #endif -- !WINDOWS ) where import Control.Applicative ((<$>)) import Data.Function ((.)) import Data.Maybe (Maybe(Nothing)) import System.IO (IO) import qualified Network.Socket as Socket (sClose) import Control.Monad.Trans.Control (MonadBaseControl) import Data.Streaming.Network ( AppData , ClientSettings #ifndef WINDOWS -- Windows doesn't support UNIX Sockets. , AppDataUnix , ClientSettingsUnix , getPath , getSocketUnix #endif -- !WINDOWS ) import Data.ConnectionPool.Internal.ConnectionPoolFamily ( ConnectionPool , TcpClient #ifndef WINDOWS -- Windows doesn't support UNIX Sockets. , UnixClient #endif -- !WINDOWS ) import qualified Data.ConnectionPool.Internal.ConnectionPool as Internal ( createConnectionPool , withConnection ) import qualified Data.ConnectionPool.Internal.ConnectionPoolFamily as Internal ( ConnectionPool(..) ) import Data.ConnectionPool.Internal.ResourcePoolParams ( ResourcePoolParams , numberOfResourcesPerStripe , numberOfStripes , resourceIdleTimeout ) import qualified Data.ConnectionPool.Internal.Streaming as Internal ( acquireTcpClientConnection , runTcpApp #ifndef WINDOWS -- Windows doesn't support UNIX Sockets. , runUnixApp #endif -- !WINDOWS ) -- | Create connection pool for TCP clients. createTcpClientPool :: ResourcePoolParams -> ClientSettings -> IO (ConnectionPool TcpClient) createTcpClientPool poolParams tcpParams = Internal.TcpConnectionPool <$> Internal.createConnectionPool acquire release poolParams where acquire = Internal.acquireTcpClientConnection tcpParams release = Socket.sClose -- | Temporarily take a TCP connection from a pool, run client with it, and -- return it to the pool afterwards. For details how connections are allocated -- see 'Data.Pool.withResource'. withTcpClientConnection :: MonadBaseControl IO m => ConnectionPool TcpClient -> (AppData -> m r) -> m r withTcpClientConnection (Internal.TcpConnectionPool pool) = Internal.withConnection pool . Internal.runTcpApp Nothing #ifndef WINDOWS -- Windows doesn't support UNIX Sockets. -- | Create connection pool for UNIX Sockets clients. createUnixClientPool :: ResourcePoolParams -> ClientSettingsUnix -> IO (ConnectionPool UnixClient) createUnixClientPool poolParams unixParams = Internal.UnixConnectionPool <$> Internal.createConnectionPool acquire release poolParams where acquire = (, ()) <$> getSocketUnix (getPath unixParams) release = Socket.sClose -- | Temporarily take a UNIX Sockets connection from a pool, run client with -- it, and return it to the pool afterwards. For details how connections are -- allocated see 'Data.Pool.withResource'. withUnixClientConnection :: MonadBaseControl IO m => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m r withUnixClientConnection (Internal.UnixConnectionPool pool) = Internal.withConnection pool . Internal.runUnixApp #endif -- !WINDOWS -- $tcpClientExample -- -- Here is a simple example that demonstrates how TCP client can be created and -- how connection pool behaves. -- -- @ -- {-\# LANGUAGE OverloadedStrings \#-} -- module Main (main) -- where -- -- import Control.Monad (void) -- import Control.Concurrent (forkIO, threadDelay) -- import System.Environment (getArgs) -- -- import Control.Lens ((.~), (&)) -- import Data.ConnectionPool -- ( 'createTcpClientPool' -- , 'numberOfResourcesPerStripe' -- , 'numberOfStripes' -- , 'withTcpClientConnection' -- ) -- import Data.Default.Class (Default(def)) -- import Data.Streaming.Network (appWrite, clientSettingsTCP) -- -- -- main :: IO () -- main = do -- [port, numStripes, numPerStripe] <- getArgs -- pool <- 'createTcpClientPool' -- (poolParams numStripes numPerStripe) -- (clientSettingsTCP (read port) "127.0.0.1") -- void . forkIO . 'withTcpClientConnection' pool $ \\appData -> do -- threadDelay 100 -- appWrite appData "1: I'm alive!\\n" -- void . forkIO . 'withTcpClientConnection' pool $ \\appData -> -- appWrite appData "2: I'm alive!\\n" -- where -- poolParams m n = -- def & 'numberOfStripes' .~ read m -- & 'numberOfResourcesPerStripe' .~ read n -- @ -- -- To test it we can use @socat@ or some @netcat@ like application. Our test -- will require two terminals, in one we will execute @socat@ as a server -- listenting on UNIX socket and in the other one we execute above example. -- -- Simple TCP server listening on port @8001@ that prints what it receives to -- stdout: -- -- > $ socat TCP4-LISTEN:8001,bind=127.0.0.1,fork - -- -- The @fork@ parameter in the above example is important, otherwise @socat@ -- would terminate when client closes its connection. -- -- If we run above example as: -- -- > $ runghc tcp-example.hs 8001 1 1 -- -- We can see that @socat@ received following text: -- -- > 1: I'm alive! -- > 2: I'm alive! -- -- But if we increment number of stripes or number of connections (resources) -- per stripe, then we will get: -- -- > 2: I'm alive! -- > 1: I'm alive! -- -- The reason for this is that we use @threadDelay 100@ in the first executed -- thread. So when we have only one stripe and one connection per stripe, then -- we have only one connection in the pool. Therefore when the first thread -- executes and acquires a connection, then all the other threads (the other -- one in above example) will block. If we have more then one connection -- available in our pool, then the first thread acquires connection, blocks on -- @threadDelay@ call, but the other thread also acquires connection and prints -- its output while the first thread is still blocked on @threadDelay@. This -- example demonstrates how connection pool behaves if it reached its capacity -- and when it has onough free resources. -- $unixClientExample -- -- Here is a simple example that demonstrates how UNIX Sockets client can be -- created and how connection pool behaves. -- -- @ -- {-\# LANGUAGE OverloadedStrings \#-} -- module Main (main) -- where -- -- import Control.Concurrent (forkIO, threadDelay) -- import Control.Monad (void) -- import System.Environment (getArgs) -- -- import Control.Lens ((.~), (&)) -- import Data.ConnectionPool -- ( 'createUnixClientPool' -- , 'numberOfResourcesPerStripe' -- , 'numberOfStripes' -- , 'withUnixClientConnection' -- ) -- import Data.Default.Class (Default(def)) -- import Data.Streaming.Network (appWrite, clientSettingsUnix) -- -- -- main :: IO () -- main = do -- [socket, numStripes, numPerStripe] <- getArgs -- pool <- 'createUnixClientPool' -- (poolParams numStripes numPerStripe) -- (clientSettingsUnix socket) -- void . forkIO . 'withUnixClientConnection' pool $ \\appData -> do -- threadDelay 100 -- appWrite appData "1: I'm alive!\\n" -- void . forkIO . 'withUnixClientConnection' pool $ \\appData -> -- appWrite appData "2: I'm alive!\\n" -- where -- poolParams m n = -- def & 'numberOfStripes' .~ read m -- & 'numberOfResourcesPerStripe' .~ read n -- @ -- -- Above example is very similar to our TCP Client Example and most notably the -- implementation of two client threads is the same. Testing it is very similar -- to testing TCP Client Example, but we would use different command for -- @socat@ and for executing the example. -- -- Simple UNIX socket server that prints what it receives to stdout: -- -- > $ socat UNIX-LISTEN:test.sock,fork - -- -- Parameter @fork@ has the same importance as when we used it in the command -- for running TCP server. -- -- We can execute UNIX Sockets Example using: -- -- > $ runghc unix-sockets-example.hs test.sock 1 1 -- -- Result of the test will be the same in case of using one stripe and one -- connection per stripe, and when we increase total number connections, to -- what we had with the TCP Client Example. -- $connectionPool -- -- For each supported protocol we have a 'ConnectionPool' data family instance -- that is tagged with supported protocol. Currently it can be either -- 'TcpClient' or 'UnixClient'. This way we are able to use same core -- implementation for both and only need to deviate from common code where -- necessary. -- -- Under the hood we use 'Network.Socket.Socket' to represent connections and -- that limits possible implementations of 'ConnectionPool' instances to -- protocols supported by -- package. -- -- Those interested in details should look in to -- "Data.ConnectionPool.Internal.ConnectionPool" and -- "Data.ConnectionPool.Internal.ConnectionPoolFamily" modules. -- $constructingConnectionPool -- -- For each protocol we provide separate function that creates 'ConnectionPool' -- instance. For TCP clients it's 'createTcpClientPool' and for UNIX Socket -- clients it's 'createUnixClientPool' (not available on Windows). -- -- To simplify things we provide 'ResourcePoolParams' data type that is -- accepted by concrete constructors of 'ConnectionPool' instances and it wraps -- all common connection pool parameters. -- -- In example, to specify connection pool with 2 stripes with 8 connections in -- each stripe we can use: -- -- @ -- def & numberOfStripes .~ 2 -- & numberOfResourcesPerStripe .~ 8 -- @ -- -- Functions @&@ and @.~@ are defined by -- package, and 'def' is a -- method of 'Default' type class defined in -- -- package.