module Database.PostgreSQL.Stream.Parallel ( parallelStream, ) where import Control.Monad.Trans import Control.Concurrent import Control.Concurrent.STM import Data.Conduit import Data.Conduit.TMChan import Data.Conduit.TQueue import Control.Monad.Trans.Resource (runResourceT, ResourceT) import Database.PostgreSQL.Stream.Connection import qualified Database.PostgreSQL.LibPQ as PQ parallelStream :: PQ.Connection -> (PQ.Connection -> Source (ResourceT IO) a) -- Source -> Sink a (ResourceT IO) () -- Sink -> IO () parallelStream conn producer consumer = do chan <- atomically $ newTBMChan 32 withPgConnection conn $ \c -> do tid <- forkIO . runResourceT $ producer c $$ sinkTBMChan chan True res <- runResourceT $ sourceTBMChan chan $$ consumer print res