module Data.Conduit.Async ( buffer
, ($$&)
, gatherFrom
, drainTo
) where
import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.STM
import Control.Exception.Lifted
import Control.Monad.IO.Class
import Control.Monad.Loops
import Control.Monad.Trans.Class
import Control.Monad.Trans.Control
import Data.Conduit
import Data.Conduit.List as CL
buffer :: (MonadBaseControl IO m, MonadIO m)
=> Int -> Producer m a -> Consumer a m b -> m b
buffer size input output = do
chan <- liftIO $ newTBQueueIO size
control $ \runInIO ->
withAsync (runInIO $ sender chan) $ \input' ->
withAsync (runInIO $ recv chan $$ output) $ \output' -> do
link2 input' output'
wait output'
where
send chan = liftIO . atomically . writeTBQueue chan
sender chan = do
input $$ CL.mapM_ (send chan . Just)
send chan Nothing
recv chan = do
mx <- liftIO $ atomically $ readTBQueue chan
case mx of
Nothing -> return ()
Just x -> yield x >> recv chan
($$&) :: (MonadIO m, MonadBaseControl IO m)
=> Producer m a -> Consumer a m b -> m b
($$&) = buffer 64
gatherFrom :: (MonadIO m, MonadBaseControl IO m)
=> Int
-> (TBQueue o -> m ())
-> Producer m o
gatherFrom size scatter = do
chan <- liftIO $ newTBQueueIO size
worker <- lift $ async (scatter chan)
lift . restoreM =<< gather worker chan
where
gather worker chan = do
(xs, mres) <- liftIO $ atomically $ do
xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
(xs,) <$> pollSTM worker
Prelude.mapM_ yield xs
case mres of
Just (Left e) -> liftIO $ throwIO (e :: SomeException)
Just (Right r) -> return r
Nothing -> gather worker chan
drainTo :: (MonadIO m, MonadBaseControl IO m)
=> Int
-> (TBQueue (Maybe i) -> m r)
-> Consumer i m r
drainTo size gather = do
chan <- liftIO $ newTBQueueIO size
worker <- lift $ async (gather chan)
lift . restoreM =<< scatter worker chan
where
scatter worker chan = do
mval <- await
(mx, action) <- liftIO $ atomically $ do
mres <- pollSTM worker
case mres of
Just (Left e) ->
return (Nothing, liftIO $ throwIO (e :: SomeException))
Just (Right r) ->
return (Just r, return ())
Nothing -> do
writeTBQueue chan mval
return (Nothing, return ())
action
case mx of
Just x -> return x
Nothing -> scatter worker chan