{-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE PackageImports #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} -- | * Introduction -- -- Contain combinators for concurrently joining conduits, such that -- the producing side may continue to produce (up to the queue size) -- as the consumer is concurrently consuming. module Data.Conduit.Async ( module Data.Conduit.Async.Composition , gatherFrom , drainTo ) where import Control.Monad.IO.Class import Control.Monad.Loops import Control.Monad.Trans.Class import Data.Conduit import Data.Conduit.Async.Composition import Data.Foldable import UnliftIO -- | Gather output values asynchronously from an action in the base monad and -- then yield them downstream. This provides a means of working around the -- restriction that 'ConduitM' cannot be an instance of 'MonadBaseControl' -- in order to, for example, yield values from within a Haskell callback -- function called from a C library. gatherFrom :: (MonadIO m, MonadUnliftIO m) => Int -- ^ Size of the queue to create -> (TBQueue o -> m ()) -- ^ Action that generates output values -> ConduitT () o m () gatherFrom size scatter = do chan <- liftIO $ newTBQueueIO (fromIntegral size) worker <- lift $ async (scatter chan) gather worker chan where gather worker chan = do (xs, mres) <- liftIO $ atomically $ do xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan) (xs,) <$> pollSTM worker traverse_ yield xs case mres of Just (Left e) -> liftIO $ throwIO (e :: SomeException) Just (Right r) -> return r Nothing -> gather worker chan -- | Drain input values into an asynchronous action in the base monad via a -- bounded 'TBQueue'. This is effectively the dual of 'gatherFrom'. drainTo :: (MonadIO m, MonadUnliftIO m) => Int -- ^ Size of the queue to create -> (TBQueue (Maybe i) -> m r) -- ^ Action to consume input values -> ConduitT i Void m r drainTo size gather = do chan <- liftIO $ newTBQueueIO (fromIntegral size) worker <- lift $ async (gather chan) scatter worker chan where scatter worker chan = do mval <- await (mx, action) <- liftIO $ atomically $ do mres <- pollSTM worker case mres of Just (Left e) -> return (Nothing, liftIO $ throwIO (e :: SomeException)) Just (Right r) -> return (Just r, return ()) Nothing -> do writeTBQueue chan mval return (Nothing, return ()) action case mx of Just x -> return x Nothing -> scatter worker chan