connection-pool-0.2.2: Connection pool built on top of resource-pool and streaming-commons.

Copyright(c) 2014-2016 Peter Trško
LicenseBSD3
Maintainerpeter.trsko@gmail.com
Stabilityunstable
PortabilityGHC specific language extensions.
Safe HaskellNone
LanguageHaskell2010

Data.ConnectionPool

Contents

Description

Connection pools for TCP clients and UNIX Socket clients, later is not supported on Windows.

This package is built on top of resource-pool and streaming-commons packages. The later allows us to use conduit-extra package for implementing TCP and UNIX Sockets clients. Package conduit-extra defines appSource and appSink based on abstractions from streaming-commons package and they can be therefore reused. Difference between using conduit-extra or streaming-commons is that instead of using runTCPClient (or its lifted variant runGeneralTCPClient from conduit-extra) one would use withTcpClientConnection, and instead of runUnixClient it would be withUnixClientConnection. There is also more generic function named withConnection, which takes either ConnectionPool instance.

Synopsis

TCP Client Example

Here is a simple example that demonstrates how TCP client can be created and how connection pool behaves.

{-# LANGUAGE OverloadedStrings #-}
module Main (main)
  where

import Control.Concurrent
    ( forkIO
    , newEmptyMVar
    , putMVar
    , readMVar
    , threadDelay
    )
import Control.Monad (void, mapM_)
import System.Environment (getArgs)

import Control.Lens ((.~), (&))
import Data.ConnectionPool
    ( createTcpClientPool
    , numberOfResourcesPerStripe
    , numberOfStripes
    , withTcpClientConnection
    )
import Data.Default.Class (Default(def))
import Data.Streaming.Network
    ( appWrite
    , clientSettingsTCP
    )


main :: IO ()
main = do
    [port, numStripes, numPerStripe] <- getArgs
    pool <- createTcpClientPool
        (poolParams numStripes numPerStripe)
        (clientSettingsTCP (read port) "127.0.0.1")
    thread1 <- newEmptyMVar
    thread2 <- newEmptyMVar
    void . forkIO . withTcpClientConnection pool $ \appData -> do
       threadDelay 1000
       appWrite appData "1: I'm alive!\n"
       putMVar thread1 ()
    void . forkIO . withTcpClientConnection pool $ \appData -> do
       appWrite appData "2: I'm alive!\n"
       putMVar thread2 ()
    mapM_ readMVar [thread1, thread2]
  where
    poolParams m n =
        def & numberOfStripes .~ read m
            & numberOfResourcesPerStripe .~ read n

To test it we can use socat or some netcat like application. Our test will require two terminals, in one we will execute socat as a server listenting on UNIX socket and in the other one we execute above example.

Simple TCP server listening on port 8001 that prints what it receives to stdout:

$ socat TCP4-LISTEN:8001,bind=127.0.0.1,fork -

The fork parameter in the above example is important, otherwise socat would terminate when client closes its connection.

If we run above example as:

$ runghc tcp-example.hs 8001 1 1

We can see that socat received following text:

1: I'm alive!
2: I'm alive!

But if we increment number of stripes or number of connections (resources) per stripe, then we will get:

2: I'm alive!
1: I'm alive!

The reason for this is that we use threadDelay 100 in the first executed thread. So when we have only one stripe and one connection per stripe, then we have only one connection in the pool. Therefore when the first thread executes and acquires a connection, then all the other threads (the other one in above example) will block. If we have more then one connection available in our pool, then the first thread acquires connection, blocks on threadDelay call, but the other thread also acquires connection and prints its output while the first thread is still blocked on threadDelay. This example demonstrates how connection pool behaves if it reached its capacity and when it has enough free resources.

Unix Client Example

Here is a simple example that demonstrates how UNIX Sockets client can be created and how connection pool behaves.

{-# LANGUAGE OverloadedStrings #-}
module Main (main)
  where

import Control.Concurrent
    ( forkIO
    , newEmptyMVar
    , putMVar
    , readMVar
    , threadDelay
    )
import Control.Monad (void, mapM_)
import System.Environment (getArgs)

import Control.Lens ((.~), (&))
import Data.ConnectionPool
    ( createUnixClientPool
    , numberOfResourcesPerStripe
    , numberOfStripes
    , withUnixClientConnection
    )
import Data.Default.Class (Default(def))
import Data.Streaming.Network
    ( appWrite
    , clientSettingsUnix
    )


main :: IO ()
main = do
    [socket, numStripes, numPerStripe] <- getArgs
    pool <- createUnixClientPool
        (poolParams numStripes numPerStripe)
        (clientSettingsUnix socket)
    thread1 <- newEmptyMVar
    thread2 <- newEmptyMVar
    void . forkIO . withUnixClientConnection pool $ \appData -> do
       threadDelay 100
       appWrite appData "1: I'm alive!\n"
       putMVar thread1 ()
    void . forkIO . withUnixClientConnection pool $ \appData -> do
       appWrite appData "2: I'm alive!\n"
       putMVar thread2 ()
    mapM_ readMVar [thread1, thread2]
  where
    poolParams m n =
        def & numberOfStripes .~ read m
            & numberOfResourcesPerStripe .~ read n

Above example is very similar to our TCP Client Example and most notably the implementation of two client threads is the same. Testing it is very similar to testing TCP Client Example, but we would use different command for socat and for executing the example.

Simple UNIX socket server that prints what it receives to stdout:

$ socat UNIX-LISTEN:test.sock,fork -

Parameter fork has the same importance as when we used it in the command for running TCP server.

We can execute UNIX Sockets Example using:

$ runghc unix-sockets-example.hs test.sock 1 1

Result of the test will be the same in case of using one stripe and one connection per stripe, and when we increase total number connections, to what we had with the TCP Client Example.

Connection Pool

For each supported protocol we have a ConnectionPool data family instance that is tagged with supported protocol. Currently it can be either TcpClient or UnixClient. This way we are able to use same core implementation for both and only need to deviate from common code where necessary.

Under the hood we use Socket to represent connections and that limits possible implementations of ConnectionPool instances to protocols supported by network package.

Those interested in details should look in to Data.ConnectionPool.Internal.ConnectionPool and Data.ConnectionPool.Internal.ConnectionPoolFamily modules.

data family ConnectionPool :: k -> * Source #

Family of connection pools parametrised by transport protocol.

Definition changed in version 0.2 to be kind polymorphic (only on GHC >= 7.10), and became part of stable API by being moved in to Data.ConnectionPool.Family module.

Instances

HasConnectionPool HandlerParams Socket () (ConnectionPool * UnixClient) Source #

Since version 0.2.

HasConnectionPool HandlerParams Socket SockAddr (ConnectionPool * TcpClient) Source #

Since version 0.2.

Show (ConnectionPool * TcpClient) # 
Show (ConnectionPool * UnixClient) # 
Generic (ConnectionPool * TcpClient) # 
Generic (ConnectionPool * UnixClient) # 
data ConnectionPool * TcpClient Source #

Connection pool for TCP clients.

Definition changed in version 0.1.3 and 0.2. Instances for Generic and Show introduced in version 0.2.

data ConnectionPool * UnixClient Source #

Connection pool for UNIX Socket clients.

Definition changed in version 0.1.3 and 0.2. Instances for Generic and Show introduced in version 0.2.

type Rep (ConnectionPool * TcpClient) # 
type Rep (ConnectionPool * TcpClient) = D1 * (MetaData "ConnectionPool" "Data.ConnectionPool.Internal.TCP" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" True) (C1 * (MetaCons "TcpConnectionPool" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (ConnectionPool HandlerParams Socket SockAddr))))
type Rep (ConnectionPool * UnixClient) # 
type Rep (ConnectionPool * UnixClient) = D1 * (MetaData "ConnectionPool" "Data.ConnectionPool.Internal.Unix" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" True) (C1 * (MetaCons "UnixConnectionPool" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (ConnectionPool HandlerParams Socket ()))))

Constructing Connection Pool

This section describes basic principles that are shared among provided connection pools. It can also provide basic information in case of creating new connection pool while using API and types from this package.

For each protocol we provide separate function that creates ConnectionPool instance. For TCP clients it's createTcpClientPool and for UNIX Socket clients it's createUnixClientPool (not available on Windows).

In each case two kinds of values need to be provided as parameters to such functions:

  1. Parameters of underlying resource pool like how to organize stripes and parameters for algorithm that handles resource releasing, etc.
  2. Transport protocol parameters like IP address, port, UNIX Socket file, and similar.

To simplify things we provide ResourcePoolParams data type that is accepted by concrete constructors of ConnectionPool instances and it wraps all common connection pool parameters. And for protocol specific settings this package reuses data types from streaming-commons library.

As a result, of the above, type signature of function that creates connection pool for some protocol named MyProtocol could look like:

createMyProtocolPool
    :: ResourcePoolParams
    -> MyProtocolParams
    -> IO (ConnectionPool MyProtocol)

To further simplify things this package defines default value for ResourcePoolParams using Default type class that has only one method named def. Instance of this class is declared using minimal possible values of each parameter required by underlying resource pool. In example, to specify connection pool with 2 stripes with 8 connections in each stripe, but keeping connection idle timeout on its default value, we can simply use:

def & numberOfStripes .~ 2
    & numberOfResourcesPerStripe .~ 8

Where functions & and .~ are defined by lens package. Function & is also available in base >= 4.8.

data ResourcePoolParams Source #

Parameters of resource pool that describe things like its internal structure. See createPool for details.

Instance for Generic introduced in version 0.2.

Instances

Data ResourcePoolParams Source # 

Methods

gfoldl :: (forall d b. Data d => c (d -> b) -> d -> c b) -> (forall g. g -> c g) -> ResourcePoolParams -> c ResourcePoolParams #

gunfold :: (forall b r. Data b => c (b -> r) -> c r) -> (forall r. r -> c r) -> Constr -> c ResourcePoolParams #

toConstr :: ResourcePoolParams -> Constr #

dataTypeOf :: ResourcePoolParams -> DataType #

dataCast1 :: Typeable (* -> *) t => (forall d. Data d => c (t d)) -> Maybe (c ResourcePoolParams) #

dataCast2 :: Typeable (* -> * -> *) t => (forall d e. (Data d, Data e) => c (t d e)) -> Maybe (c ResourcePoolParams) #

gmapT :: (forall b. Data b => b -> b) -> ResourcePoolParams -> ResourcePoolParams #

gmapQl :: (r -> r' -> r) -> r -> (forall d. Data d => d -> r') -> ResourcePoolParams -> r #

gmapQr :: (r' -> r -> r) -> r -> (forall d. Data d => d -> r') -> ResourcePoolParams -> r #

gmapQ :: (forall d. Data d => d -> u) -> ResourcePoolParams -> [u] #

gmapQi :: Int -> (forall d. Data d => d -> u) -> ResourcePoolParams -> u #

gmapM :: Monad m => (forall d. Data d => d -> m d) -> ResourcePoolParams -> m ResourcePoolParams #

gmapMp :: MonadPlus m => (forall d. Data d => d -> m d) -> ResourcePoolParams -> m ResourcePoolParams #

gmapMo :: MonadPlus m => (forall d. Data d => d -> m d) -> ResourcePoolParams -> m ResourcePoolParams #

Show ResourcePoolParams Source # 
Generic ResourcePoolParams Source # 
Default ResourcePoolParams Source #
numberOfStripes = 1
resourceIdleTimeout = 0.5
numberOfResourcesPerStripe = 1
type Rep ResourcePoolParams Source # 
type Rep ResourcePoolParams = D1 * (MetaData "ResourcePoolParams" "Data.ConnectionPool.Internal.ResourcePoolParams" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" False) (C1 * (MetaCons "ResourcePoolParams" PrefixI True) ((:*:) * (S1 * (MetaSel (Just Symbol "_numberOfStripes") NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 * Int)) ((:*:) * (S1 * (MetaSel (Just Symbol "_resourceIdleTimeout") NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 * NominalDiffTime)) (S1 * (MetaSel (Just Symbol "_numberOfResourcesPerStripe") NoSourceUnpackedness SourceStrict DecidedStrict) (Rec0 * Int)))))

Lenses

For details on how to use leses as these see lens package where you might find a good starting point.

numberOfResourcesPerStripe :: Functor f => (Int -> f Int) -> ResourcePoolParams -> f ResourcePoolParams Source #

Lens for accessing maximum number of resources to keep open per stripe. The smallest acceptable value is 1 (default).

numberOfStripes :: Functor f => (Int -> f Int) -> ResourcePoolParams -> f ResourcePoolParams Source #

Lens for accessing stripe count. The number of distinct sub-pools to maintain. The smallest acceptable value is 1 (default).

resourceIdleTimeout :: Functor f => (NominalDiffTime -> f NominalDiffTime) -> ResourcePoolParams -> f ResourcePoolParams Source #

Lens for accessing amount of time for which an unused resource is kept open. The smallest acceptable value is 0.5 seconds (default).

Validation

Sometimes one needs to validate parameters as early as possible, e.g. while parsing command line options.

Usage example:

validateResourcePoolParams $ someParams
    & resourceIdleTimeout .~ 1
    & numberOfResourcesPerStripe .~ 16

Most usually one would use def instead of someParams. Functions & and .~ are defined in lens package. Function & is also available in base >= 4.8.

validateResourcePoolParams Source #

Arguments

:: ResourcePoolParams

Parameters to validate.

-> Either String ResourcePoolParams

Either error message or the same value of ResourcePoolParams passed as a first argument.

Check if all parameters for underlying resource pool are valid:

For more details see createPool.

Since version 0.1.1.0.

TCP Client Connection Pool

data TcpClient Source #

Type tag used to specialize connection pool for TCP clients.

Instance for Generic introduced in version 0.2.

Instances

Generic TcpClient Source # 

Associated Types

type Rep TcpClient :: * -> * #

HasConnectionPool HandlerParams Socket SockAddr (ConnectionPool * TcpClient) Source #

Since version 0.2.

ConnectionPoolFor * TcpClient Source #

Defined using:

withConnection = withTcpClientConnection
destroyAllConnections = destroyAllTcpClientConnections

Since version 0.2.

Associated Types

type HandlerData TcpClient (protocol :: TcpClient) :: * Source #

Show (ConnectionPool * TcpClient) Source # 
Generic (ConnectionPool * TcpClient) Source # 
type Rep TcpClient Source # 
type Rep TcpClient = D1 * (MetaData "TcpClient" "Data.ConnectionPool.Internal.TCP" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" False) (V1 *)
data ConnectionPool * TcpClient Source #

Connection pool for TCP clients.

Definition changed in version 0.1.3 and 0.2. Instances for Generic and Show introduced in version 0.2.

type HandlerData * TcpClient Source # 
type Rep (ConnectionPool * TcpClient) Source # 
type Rep (ConnectionPool * TcpClient) = D1 * (MetaData "ConnectionPool" "Data.ConnectionPool.Internal.TCP" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" True) (C1 * (MetaCons "TcpConnectionPool" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (ConnectionPool HandlerParams Socket SockAddr))))

data ClientSettings :: * #

Settings for a TCP client, specifying how to connect to the server.

data AppData :: * #

The data passed to an Application.

Instances

HasReadWrite AppData 

Methods

readLens :: Functor f => (IO ByteString -> f (IO ByteString)) -> AppData -> f AppData #

writeLens :: Functor f => ((ByteString -> IO ()) -> f (ByteString -> IO ())) -> AppData -> f AppData #

createTcpClientPool :: ResourcePoolParams -> ClientSettings -> IO (ConnectionPool TcpClient) Source #

Create connection pool for TCP clients.

withTcpClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool TcpClient -> (AppData -> m r) -> m r Source #

Temporarily take a TCP connection from a pool, run client with it, and return it to the pool afterwards. For details how connections are allocated see withResource.

tryWithTcpClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool TcpClient -> (AppData -> m r) -> m (Maybe r) Source #

Similar to withConnection, but only performs action if a TCP connection could be taken from the pool without blocking. Otherwise, tryWithResource returns immediately with Nothing (ie. the action function is not called). Conversely, if a connection can be acquired from the pool without blocking, the action is performed and it's result is returned, wrapped in a Just.

Since version 0.2.

destroyAllTcpClientConnections :: ConnectionPool TcpClient -> IO () Source #

Destroy all TCP connections that might be still open in a connection pool. This is useful when one needs to release all resources at once and not to wait for idle timeout to be reached.

For more details see destroyAllResources.

Since version 0.1.1.0.

UNIX Client Connection Pool

data UnixClient Source #

Type tag used to specialize connection pool for UNIX Socket clients.

Instance for Generic introduced in version 0.2.

Instances

Generic UnixClient Source # 

Associated Types

type Rep UnixClient :: * -> * #

HasConnectionPool HandlerParams Socket () (ConnectionPool * UnixClient) Source #

Since version 0.2.

ConnectionPoolFor * UnixClient Source #

Defined using:

withConnection = withUnixClientConnection
destroyAllConnections = destroyAllUnixClientConnections

Since version 0.2.

Associated Types

type HandlerData UnixClient (protocol :: UnixClient) :: * Source #

Show (ConnectionPool * UnixClient) Source # 
Generic (ConnectionPool * UnixClient) Source # 
type Rep UnixClient Source # 
type Rep UnixClient = D1 * (MetaData "UnixClient" "Data.ConnectionPool.Internal.Unix" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" False) (V1 *)
data ConnectionPool * UnixClient Source #

Connection pool for UNIX Socket clients.

Definition changed in version 0.1.3 and 0.2. Instances for Generic and Show introduced in version 0.2.

type HandlerData * UnixClient Source # 
type Rep (ConnectionPool * UnixClient) Source # 
type Rep (ConnectionPool * UnixClient) = D1 * (MetaData "ConnectionPool" "Data.ConnectionPool.Internal.Unix" "connection-pool-0.2.2-6QO2HFAP66s57rNsrMe34b" True) (C1 * (MetaCons "UnixConnectionPool" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (ConnectionPool HandlerParams Socket ()))))

data ClientSettingsUnix :: * #

Settings for a Unix domain sockets client.

data AppDataUnix :: * #

The data passed to a Unix domain sockets Application.

Instances

createUnixClientPool :: ResourcePoolParams -> ClientSettingsUnix -> IO (ConnectionPool UnixClient) Source #

Create connection pool for UNIX Sockets clients.

withUnixClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m r Source #

Temporarily take a UNIX Sockets connection from a pool, run client with it, and return it to the pool afterwards. For details how connections are allocated see withResource.

tryWithUnixClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m (Maybe r) Source #

Similar to withConnection, but only performs action if a UNIX Sockets connection could be taken from the pool without blocking. Otherwise, tryWithResource returns immediately with Nothing (ie. the action function is not called). Conversely, if a connection can be acquired from the pool without blocking, the action is performed and it's result is returned, wrapped in a Just.

Since version 0.2.

destroyAllUnixClientConnections :: ConnectionPool UnixClient -> IO () Source #

Destroy all UNIX Sockets connections that might be still open in a connection pool. This is useful when one needs to release all resources at once and not to wait for idle timeout to be reached.

For more details see destroyAllResources.

Since version 0.1.1.0.

Polymorphic Interface

Since version 0.2.

class ConnectionPoolFor (protocol :: k) where Source #

Type class for common connection pool operations. It intentionally doesn't handle connection pool creation, which is best left to dedicated smart constructors.

Since version 0.2.

Associated Types

type HandlerData protocol Source #

Data passed to individual connection handler.

Methods

withConnection :: MonadBaseControl IO m => ConnectionPool protocol -> (HandlerData protocol -> m r) -> m r Source #

Temporarily take a connection from a pool, run handler with it, and return it to the pool afterwards.

Since version 0.2.

tryWithConnection :: MonadBaseControl IO m => ConnectionPool protocol -> (HandlerData protocol -> m r) -> m (Maybe r) Source #

Similar to withConnection, but only performs action if a connection could be taken from the pool without blocking. Otherwise, tryWithResource returns immediately with Nothing (ie. the action function is not called). Conversely, if a connection can be acquired from the pool without blocking, the action is performed and it's result is returned, wrapped in a Just.

Since version 0.2.

destroyAllConnections :: ConnectionPool protocol -> IO () Source #

Destroy all connections that might be still open in a connection pool. This is useful when one needs to release all resources at once and not to wait for idle timeout to be reached.

Since version 0.2.