module Data.Conduit.Async ( module Data.Conduit.Async.Composition
, gatherFrom
, drainTo
) where
import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.STM
import Control.Exception.Lifted
import Control.Monad.IO.Class
import Control.Monad.Loops
import Control.Monad.Trans.Class
import Control.Monad.Trans.Control
import Data.Conduit
import Data.Conduit.Async.Composition
gatherFrom :: (MonadIO m, MonadBaseControl IO m)
=> Int
-> (TBQueue o -> m ())
-> Producer m o
gatherFrom size scatter = do
chan <- liftIO $ newTBQueueIO size
worker <- lift $ async (scatter chan)
lift . restoreM =<< gather worker chan
where
gather worker chan = do
(xs, mres) <- liftIO $ atomically $ do
xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
(xs,) <$> pollSTM worker
Prelude.mapM_ yield xs
case mres of
Just (Left e) -> liftIO $ throwIO (e :: SomeException)
Just (Right r) -> return r
Nothing -> gather worker chan
drainTo :: (MonadIO m, MonadBaseControl IO m)
=> Int
-> (TBQueue (Maybe i) -> m r)
-> Consumer i m r
drainTo size gather = do
chan <- liftIO $ newTBQueueIO size
worker <- lift $ async (gather chan)
lift . restoreM =<< scatter worker chan
where
scatter worker chan = do
mval <- await
(mx, action) <- liftIO $ atomically $ do
mres <- pollSTM worker
case mres of
Just (Left e) ->
return (Nothing, liftIO $ throwIO (e :: SomeException))
Just (Right r) ->
return (Just r, return ())
Nothing -> do
writeTBQueue chan mval
return (Nothing, return ())
action
case mx of
Just x -> return x
Nothing -> scatter worker chan