{-# LANGUAGE BangPatterns #-} {-# LANGUAGE NoMonomorphismRestriction #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE KindSignatures #-} -- | * Introduction -- -- Contains a simple source and sink for linking together conduits in -- in different threads. Usage is so easy, it's best explained with an -- example: -- -- We first create a channel for communication... -- -- > do chan <- atomically $ newTBMChan 16 -- -- Then we fork a new thread loading a wackton of pictures into memory. The -- data (pictures, in this case) will be streamed down the channel to whatever -- is on the other side. -- -- > _ <- forkIO . runResourceT $ do -- > _ <- register $ atomically $ closeTBMChan chan -- > loadTextures lotsOfPictures $$ sinkTBMChan chan -- -- We register closing function explicitly, because starting with version -- @1.3.0@ @conduits@ library no longer maintain resources, so this is the -- only way to safely close channel in case of exceptions. -- -- Finally, we connect something to the other end of the channel. In this -- case, we connect a sink which uploads the textures one by one to the -- graphics card. -- -- > runResourceT $ sourceTBMChan chan $$ Conduit.mapM_ (liftIO . uploadToGraphicsCard) -- -- By running the two tasks in parallel, we no longer have to wait for one -- texture to upload to the graphics card before reading the next one from -- disk. This avoids the common switching of bottlenecks (such as between the -- disk and graphics memory) that most loading processes seem to love. -- -- Control.Concurrent.STM.TMChan and Control.Concurrent.STM.TBMChan are -- re-exported for convenience. -- -- * Caveats -- -- It is recommended to use TBMChan as much as possible, and generally avoid -- TMChan usage. TMChans are unbounded, and if used, the conduit pipeline -- will no longer use a bounded amount of space. They will essentially leak -- memory if the writer is faster than the reader. -- -- Therefore, use bounded channels as much as possible, preferably with a -- high bound so it will be hit infrequently. module Data.Conduit.TMChan ( -- * Bounded Channel Connectors module Control.Concurrent.STM.TBMChan , sourceTBMChan , sinkTBMChan -- * Unbounded Channel Connectors , module Control.Concurrent.STM.TMChan , sourceTMChan , sinkTMChan -- * Parallel Combinators , (>=<) , mergeSources , (<=>) , mergeConduits ) where import Control.Monad import Control.Monad.IO.Class ( liftIO, MonadIO ) import Control.Monad.Trans.Class import Control.Monad.Trans.Resource import Control.Concurrent (killThread, forkIOWithUnmask) import Control.Concurrent.STM hiding (atomically) import Control.Concurrent.STM.TBMChan import Control.Concurrent.STM.TMChan import Data.Conduit import Data.Foldable import qualified Data.Conduit.List as CL import UnliftIO as Lifted -- | Convert channel into the source. -- -- *N.B* Since version 4.0 this function does not close the -- channel if downstream is closed. chanSource :: MonadIO m => chan -- ^ The channel. -> (chan -> STM (Maybe a)) -- ^ The 'read' function. -> ConduitT z a m () chanSource ch reader = loop where loop = do a <- liftSTM $ reader ch case a of Just x -> yield x >> loop Nothing -> return () {-# INLINE chanSource #-} -- | Convert channel into the consumer. -- -- *N.B* chanSink :: MonadIO m => chan -- ^ The channel. -> (chan -> a -> STM ()) -- ^ The 'write' function. -> ConduitT a z m () chanSink ch writer = CL.mapM_ $ liftIO . atomically . writer ch {-# INLINE chanSink #-} -- | A simple wrapper around a TBMChan. As data is pushed into the channel, the -- source will read it and pass it down the conduit pipeline. When the -- channel is closed, the source will close also. -- -- If the channel fills up, the pipeline will stall until values are read. sourceTBMChan :: MonadIO m => TBMChan a -> ConduitT () a m () sourceTBMChan ch = chanSource ch readTBMChan {-# INLINE sourceTBMChan #-} -- | A simple wrapper around a TMChan. As data is pushed into the channel, the -- source will read it and pass it down the conduit pipeline. When the -- channel is closed, the source will close also. sourceTMChan :: MonadIO m => TMChan a -> ConduitT () a m () sourceTMChan ch = chanSource ch readTMChan {-# INLINE sourceTMChan #-} -- | A simple wrapper around a TBMChan. As data is pushed into the sink, it -- will magically begin to appear in the channel. If the channel is full, -- the sink will block until space frees up. sinkTBMChan :: MonadIO m => TBMChan a -> ConduitT a z m () sinkTBMChan ch = chanSink ch writeTBMChan {-# INLINE sinkTBMChan #-} -- | A simple wrapper around a TMChan. As data is pushed into this sink, it -- will magically begin to appear in the channel. sinkTMChan :: MonadIO m => TMChan a -> ConduitT a z m () sinkTMChan ch = chanSink ch writeTMChan {-# INLINE sinkTMChan #-} infixl 5 >=< -- | Modifies a TVar, returning its new value. modifyTVar'' :: TVar a -> (a -> a) -> STM a modifyTVar'' tv f = do !x <- f <$> readTVar tv writeTVar tv x return x liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a liftSTM = liftIO . atomically -- | Combines two sources with an unbounded channel, creating a new source -- which pulls data from a mix of the two sources: whichever produces first. -- -- The order of the new source's data is undefined, but it will be some -- combination of the two given sources. (>=<) :: (MonadResource mi, MonadIO mo, MonadUnliftIO mi) => ConduitT () a mi () -> ConduitT () a mi () -> mo (ConduitT () a mi ()) sa >=< sb = mergeSources [ sa, sb ] 16 {-# INLINE (>=<) #-} decRefcount :: TVar Int -> TBMChan a -> STM () decRefcount tv chan = do n <- modifyTVar'' tv (subtract 1) when (n == 0) $ closeTBMChan chan -- | Merges a list of sources, putting them all into a bounded channel, and -- returns a source which can be pulled from to pull from all the given -- sources in a first-come-first-serve basis. -- -- The order of the new source's data is undefined, but it will be some -- combination of the given sources. The monad of the resultant source -- (@mo@) is independent of the monads of the input sources (@mi@). -- -- @since 3.0 -- All spawned threads will be removed when source is closed or upon an -- exit from 'ResourceT' region. This means that result can only be used -- within a 'runResourceT' scope. -- -- @before 3.0 -- Spawned threads are not guaranteed to be closed. This may happen if -- Source was closed before all it's input were closed. mergeSources :: (MonadResource mi, MonadIO mo, MonadUnliftIO mi) => [ConduitT () a mi ()] -- ^ The sources to merge. -> Int -- ^ The bound of the intermediate channel. -> mo (ConduitT () a mi ()) mergeSources sx bound = do return $ do (chkey, c) <- allocate (liftSTM $ newTBMChan bound) (liftSTM . closeTBMChan) refcount <- liftSTM . newTVar $ length sx st <- lift $ askUnliftIO regs <- forM sx $ \s -> register . killThread =<< (liftIO $ forkIOWithUnmask $ \unmask -> (unmask $ unliftIO st $ runConduit $ s .| chanSink c writeTBMChan) `Lifted.finally` (liftSTM $ decRefcount refcount c)) chanSource c readTBMChan release chkey traverse_ release regs -- | Combines two conduits with unbounded channels, creating a new conduit -- which pulls data from a mix of the two: whichever produces first. -- -- The order of the new conduit's output is undefined, but it will be some -- combination of the two given conduits. (<=>) :: (MonadThrow mi, MonadIO mo, MonadUnliftIO mi) => ConduitT i i (ResourceT mi) () -> ConduitT i i (ResourceT mi) () -> ResourceT mi (ConduitT i i mo ()) sa <=> sb = mergeConduits [ sa, sb ] 16 {-# INLINE (<=>) #-} -- | Provide an input across several conduits, putting them all into a bounded -- channel. Returns a conduit which can be pulled from to pull from all the -- given conduits in a first-come-first-serve basis. -- -- The order of the new conduits's outputs is undefined, but it will be some -- combination of the given conduits. The monad of the resultant conduit -- (@mo@) is independent of the monads of the input conduits (@mi@). -- -- @since 3.0 -- Closes all worker processes when resulting conduit is closed or when execution -- leaves ResourceT context. This means that conduit is only valid inside -- 'runResouceT' scope. -- -- @before 3.0 -- Spawned threads are not guaranteed to be closed, This may happen if threads -- Conduit was closed before all threads have finished execution. {-# DEPRECATED mergeConduits "This method will dissapear in the next version." #-} mergeConduits :: (MonadIO mo, MonadUnliftIO mi) => [ConduitT i o (ResourceT mi) ()] -- ^ The conduits to merge. -> Int -- ^ The bound for the channels. -> ResourceT mi (ConduitT i o mo ()) mergeConduits conduits bound = do let len = length conduits refcount <- liftSTM $ newTVar len (iregs, iChannels) <- fmap unzip $ replicateM len $ allocate (liftSTM $ newTBMChan bound) (liftSTM . closeTBMChan) (oreg, oChannel) <- allocate (liftSTM $ newTBMChan bound) (liftSTM . closeTBMChan) regs <- forM (zip iChannels conduits) $ \(iChannel, conduit) -> Lifted.mask_ $ register . killThread <=< resourceForkIO $ (runConduit $ sourceTBMChan iChannel .| conduit .| chanSink oChannel writeTBMChan) `finally` (liftIO $ atomically $ decRefcount refcount oChannel >> closeTBMChan iChannel) treg <- register $ do traverse_ release regs traverse_ release iregs release oreg return $ do chanSink iChannels writeTBMChans -- release internal channels effiniently closing input channels traverse_ release iregs chanSource oChannel readTBMChan -- release internals everything release treg where writeTBMChans channels a = forM_ channels $ \c -> writeTBMChan c a