module Potoki.Conduit.MonadIO
(
consumeConduit,
)
where
import Potoki.Conduit.Prelude
import Conduit
import qualified Potoki.Core.Produce as Produce
import qualified Potoki.Core.Transform as Transform
import qualified Potoki.Core.Consume as Consume
import qualified Potoki.Core.Fetch as Fetch
import qualified Potoki.Core.IO as IO
consumeConduit :: MonadIO m => ConduitT () o m () -> Consume.Consume o r -> m r
consumeConduit conduit consume = do
elementChan <- liftIO $ newTBMChanIO 100
resultVar <- liftIO $ newEmptyMVar
liftIO $ fork $ do
result <- IO.produceAndConsume (Produce.tbmChan elementChan) consume
putMVar resultVar result
feedConduitToTBMChan conduit elementChan
liftIO $ takeMVar resultVar
feedConduitToMVar :: MonadIO m => ConduitT () o m () -> MVar (Maybe o) -> m ()
feedConduitToMVar conduit mvar = do
runConduit $ conduit .| mapM_C (liftIO . putMVar mvar . Just)
liftIO (putMVar mvar Nothing)
feedConduitToTBMChan :: MonadIO m => ConduitT () o m () -> TBMChan o -> m ()
feedConduitToTBMChan conduit chan = do
runConduit $ conduit .| mapM_C (liftIO . atomically . writeTBMChan chan)
liftIO $ atomically $ closeTBMChan chan