{-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE FlexibleInstances #-} {-| This module provide different utility functions that allow to use safe higher level usage. Conduit pairs allow creation of an internal datastructure that acts as a bridge, and provides input and output conduits. The structure itself is hidden internally and can't be used directly, this provide an additional safeness. In order to create a bridge from your own datastructures you need to do the following: * Make it an instance of 'UnboundedStream' or 'BoundedStream' depending on it's properties: > instance BoundedStream (Proxy2 TBMQueue) TBMQueue where > mkBStream _ i = atomically $ newTBMQueue i * Add 'IsConduit' instance. > instance MonadIO m => IsConduit m TBMQueue where > mkSource = sourceTBMQueue > mkSink = flip sinkTBMQueue True * Use "pair" or "pairBounded" to create a bridge. Because bridge data structure is hidden and not seen in parameters, we need proxy type to help compiler to choose type, we use "Proxy2" for that. > pairTBMQueue = pairBounded (proxy2 :: Proxy2 TBMQueue a) * Now we can create a pair of conduits: > (src, snk) <- pairTBMQueue 32 > Control.Concurrent.Async.concurrently (sender src) (receviver snk) As channel is not visible we can close it or send additional messages bypassing conduit code. This package provides predefined pairs for all STM types that are used in the package. -} module Data.Conduit.Utils ( -- * Conduit pairs -- ** Low level functions pairBounded -- MonadIO m => m (Source m a, Sink m a ()) , pair -- MonadIO m => Int -> m (Source m a, Sink m a ()) -- ** Classes , UnboundedStream(..) , BoundedStream(..) , IsConduit(..) -- ** Types , Proxy2 , proxy2 -- Proxy a b -- * Specialized functions -- $flist , pairTQueue -- MonadIO m => m (Source m a, Sink a m ()) , pairTMQueue -- MonadIO m => m (Source m a, Sink a m ()) , pairTMChan -- MonadIO m => m (Source m a, Sink a m ()) , pairTBQueue -- MonadIO m => Int -> m (Source m a, Sink a m ()) , pairTBMQueue -- MonadIO m => Int -> m (Source m a, Sink a m ()) , pairTBMChan -- MonadIO m => Int -> m (Source m a, Sink a m ()) ) where import Data.Conduit import Data.Conduit.TMChan import Data.Conduit.TQueue import Control.Concurrent.STM import Control.Concurrent.STM.TMQueue import Control.Concurrent.STM.TBMQueue import Control.Monad.IO.Class -- | Proxy type that can be used to create opaque values. -- -- This proxy type is required because pair hides internal data structure -- and proxy is used to help compiler infer internal type. data Proxy2 (a :: * -> *) b = Proxy2 -- | Construct 'Proxy2' value. -- -- > (proxy2 :: Proxy2 TChan a) proxy2 :: Proxy2 a b proxy2 = Proxy2 -- | Class for structures that can handle unbounded stream of values. -- Such streams break conduit assumptions that constant memory will be -- used, because if receiver is slower then sender than values will -- be accumulated. class UnboundedStream i o | i -> o where mkUStream :: i a -> IO (o a) -- | Class for structures that can handle bounded stream of values i.e. -- there is exists 'Int' value that sets an upper limit on the number -- of values that can be handled by structure. Exact meaning of this -- limit may depend on the carrier type. class BoundedStream i o | i -> o where mkBStream :: i a -> Int -> IO (o a) -- | Class that describes how we can make conduit out of the carrier -- value. class MonadIO m => IsConduit m (x :: * -> *) where mkSink :: x a -> Sink a m () mkSource :: x a -> Source m a -- | Create bounded conduit pair, see "BoundedStream" class description. pairBounded :: (MonadIO m, IsConduit m o, BoundedStream i o) => i a -- ^ Type description. -> Int -- ^ Conduit size. -> m (Source m a, Sink a m ()) pairBounded p s = do q <- liftIO $ mkBStream p s return (mkSource q, mkSink q) -- | Create unbounded pair, see "UnboundedStream" class description. pair :: (MonadIO m, IsConduit m o, UnboundedStream i o) => i a -- ^ Type description. -> m (Source m a, Sink a m ()) pair p = do q <- liftIO $ mkUStream p return (mkSource q, mkSink q) ------------------------------------------------------------------------------- -- Instances ------------------------------------------------------------------------------- instance BoundedStream (Proxy2 TBQueue) TBQueue where mkBStream _ i = atomically $ newTBQueue i instance BoundedStream (Proxy2 TBMQueue) TBMQueue where mkBStream _ i = atomically $ newTBMQueue i instance BoundedStream (Proxy2 TBMChan) TBMChan where mkBStream _ i = atomically $ newTBMChan i instance UnboundedStream (Proxy2 TMQueue) TMQueue where mkUStream _ = atomically $ newTMQueue instance UnboundedStream (Proxy2 TQueue) TQueue where mkUStream _ = atomically $ newTQueue instance UnboundedStream (Proxy2 TMChan) TMChan where mkUStream _ = atomically $ newTMChan instance MonadIO m => IsConduit m TBQueue where mkSource = sourceTBQueue mkSink = sinkTBQueue instance MonadIO m => IsConduit m TBMQueue where mkSource = sourceTBMQueue mkSink = flip sinkTBMQueue True instance MonadIO m => IsConduit m TMQueue where mkSource = sourceTMQueue mkSink = flip sinkTMQueue True instance MonadIO m => IsConduit m TQueue where mkSource = sourceTQueue mkSink = sinkTQueue instance MonadIO m => IsConduit m TBMChan where mkSource = sourceTBMChan mkSink = flip sinkTBMChan True instance MonadIO m => IsConduit m TMChan where mkSource = sourceTMChan mkSink = flip sinkTMChan True ------------------------------------------------------------------------------- -- Specialized functions ------------------------------------------------------------------------------- -- $flist -- List of specialized functions, that can create a bridges over STM types, -- where *B* stands for bounded *M* stands for closable. If data structure -- is not closable then there is no way to notify receiver side that bridge -- is closed, so it's possible to use it only in infinite streams of when -- some other mechanism of notification is used. pairTQueue, pairTMQueue, pairTMChan :: MonadIO m => m (Source m a, Sink a m ()) pairTQueue = pair (proxy2 :: Proxy2 TQueue a) pairTMQueue = pair (proxy2 :: Proxy2 TMQueue a) pairTMChan = pair (proxy2 :: Proxy2 TMChan a) pairTBQueue, pairTBMQueue, pairTBMChan :: MonadIO m => Int -> m (Source m a, Sink a m ()) pairTBQueue = pairBounded (proxy2 :: Proxy2 TBQueue a) pairTBMQueue = pairBounded (proxy2 :: Proxy2 TBMQueue a) pairTBMChan = pairBounded (proxy2 :: Proxy2 TBMChan a)