{-# LANGUAGE CPP #-}

module Ribosome.Data.Conduit.Composition where

import Conduit
import qualified Control.Concurrent.Async.Lifted as A
import Control.Concurrent.Async.Lifted hiding (link2)
import Control.Concurrent.STM hiding (atomically, newTVarIO)
import Control.Exception.Lifted (finally)
import Control.Monad hiding (forM_)
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Loops
import Control.Monad.Trans.Resource
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Cereal as C
import qualified Data.Conduit.List as CL
import Data.Serialize
import Prelude hiding (get, put)
import System.Directory (removeFile)
import System.IO

-- | Concurrently join the producer and consumer, using a bounded queue of the
-- given size. The producer will block when the queue is full, if it is
-- producing faster than the consumers is taking from it. Likewise, if the
-- consumer races ahead, it will block until more input is available.
--
-- Exceptions are properly managed and propagated between the two sides, so
-- the net effect should be equivalent to not using buffer at all, save for
-- the concurrent interleaving of effects.
--
-- The underlying monad must always be an instance of
-- 'MonadBaseControl IO'.  If at least one of the two conduits is a
-- 'CFConduit', it must additionally be a in instance of
-- 'MonadResource'.
--
-- This function is similar to '$$'; for one more like '=$=', see
-- 'buffer''.
--
-- >>> buffer 1 (CL.sourceList [1,2,3]) CL.consume
-- [1,2,3]
buffer :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m)
          => Natural -- ^ Size of the bounded queue in memory.
          -> c1 () x m ()
          -> c2 x Void m r
          -> m r
buffer :: Natural -> c1 () x m () -> c2 x Void m r -> m r
buffer Natural
i c1 () x m ()
c1 c2 x Void m r
c2 = c3 () Void m r -> m r
forall (c :: * -> * -> (* -> *) -> * -> *) (m :: * -> *) r.
(CRunnable c, RunConstraints c m) =>
c () Void m r -> m r
runCConduit (Natural -> c1 () x m () -> c2 x Void m r -> c3 () Void m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i c1 () x m ()
c1 c2 x Void m r
c2)

-- | An operator form of 'buffer'.  In general you should be able to replace
-- any use of '$$' with '$$&' and suddenly reap the benefit of
-- concurrency, if your conduits were spending time waiting on each other.
--
-- The underlying monad must always be an instance of
-- 'MonadBaseControl IO'.  If at least one of the two conduits is a
-- 'CFConduit', it must additionally be a in instance of
-- 'MonadResource'.
--
-- >>> CL.sourceList [1,2,3] $$& CL.consume
-- [1,2,3]
--
-- It can be combined with '$=&' and '$='.  This creates two threads;
-- the first thread produces the list and the second thread does the
-- map and the consume:
--
-- >>> CL.sourceList [1,2,3] $$& mapC (*2) $= CL.consume
-- [2,4,6]
--
-- This creates three threads.  The three conduits all run in their
-- own threads:
--
-- >>> CL.sourceList [1,2,3] $$& mapC (*2) $=& CL.consume
-- [2,4,6]
--
-- >>> CL.sourceList [1,2,3] $$& (mapC (*2) $= mapC (+1)) $=& CL.consume
-- [3,5,7]
($$&) :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) => c1 () x m () -> c2 x Void m r -> m r
c1 () x m ()
a $$& :: c1 () x m () -> c2 x Void m r -> m r
$$& c2 x Void m r
b = c3 () Void m r -> m r
forall (c :: * -> * -> (* -> *) -> * -> *) (m :: * -> *) r.
(CRunnable c, RunConstraints c m) =>
c () Void m r -> m r
runCConduit (c1 () x m ()
a c1 () x m () -> c2 x Void m r -> c3 () Void m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
c1 i x m () -> c2 x o m r -> c3 i o m r
=$=& c2 x Void m r
b)
infixr 0 $$&

-- | An operator form of 'buffer''.  In general you should be able to replace
-- any use of '=$=' with '=$=&' and '$$' either with '$$&' or '=$='
-- and 'runCConduit' and suddenly reap the benefit of concurrency, if
-- your conduits were spending time waiting on each other.
--
-- >>> runCConduit $ CL.sourceList [1,2,3] =$=& CL.consume
-- [1,2,3]
(=$=&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
c1 i x m ()
a =$=& :: c1 i x m () -> c2 x o m r -> c3 i o m r
=$=& c2 x o m r
b = Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
64 c1 i x m ()
a c2 x o m r
b
infixr 2 =$=&

-- | An alias for '=$=&' by analogy with '=$=' and '$='.
($=&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
$=& :: c1 i x m () -> c2 x o m r -> c3 i o m r
($=&) = c1 i x m () -> c2 x o m r -> c3 i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
c1 i x m () -> c2 x o m r -> c3 i o m r
(=$=&)
infixl 1 $=&

-- | An alias for '=$=&' by analogy with '=$=' and '=$'.
(=$&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
=$& :: c1 i x m () -> c2 x o m r -> c3 i o m r
(=$&) = c1 i x m () -> c2 x o m r -> c3 i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
c1 i x m () -> c2 x o m r -> c3 i o m r
(=$=&)
infixr 2 =$&

-- | Conduits are concatenable; this class describes how.
-- class CCatable (c1 :: * -> * -> (* -> *) -> * -> *) (c2 :: * -> * -> (* -> *) -> * -> *) (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where
class CCatable c1 c2 (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where
  -- | Concurrently join the producer and consumer, using a bounded queue of the
  -- given size. The producer will block when the queue is full, if it is
  -- producing faster than the consumers is taking from it. Likewise, if the
  -- consumer races ahead, it will block until more input is available.
  --
  -- Exceptions are properly managed and propagated between the two sides, so
  -- the net effect should be equivalent to not using buffer at all, save for
  -- the concurrent interleaving of effects.
  --
  -- This function is similar to '=$='; for one more like '$$', see
  -- 'buffer'.
  --
  -- >>> runCConduit $ buffer' 1 (CL.sourceList [1,2,3]) CL.consume
  -- [1,2,3]
  buffer' :: Natural -- ^ Size of the bounded queue in memory
             -> c1 i x m ()
             -> c2 x o m r
             -> c3 i o m r

-- | Like 'buffer', except that when the bounded queue is overflowed, the
-- excess is cached in a local file so that consumption from upstream may
-- continue. When the queue becomes exhausted by yielding, it is filled
-- from the cache until all elements have been yielded.
--
-- Note that the maximum amount of memory consumed is equal to (2 *
-- memorySize + 1), so take this into account when picking a chunking size.
--
-- This function is similar to '$$'; for one more like '=$=', see
-- 'bufferToFile''.
--
-- >>> runResourceT $ bufferToFile 1 Nothing "/tmp" (CL.sourceList [1,2,3]) CL.consume
-- [1,2,3]
bufferToFile :: (CFConduitLike c1, CFConduitLike c2, Serialize x, MonadBaseControl IO m, MonadResource m, MonadThrow m)
                => Natural -- ^ Size of the bounded queue in memory
                -> Maybe Int -- ^ Max elements to keep on disk at one time
                -> FilePath -- ^ Directory to write temp files to
                -> c1 () x m ()
                -> c2 x Void m r
                -> m r
bufferToFile :: Natural
-> Maybe Int -> FilePath -> c1 () x m () -> c2 x Void m r -> m r
bufferToFile Natural
bufsz Maybe Int
dsksz FilePath
tmpDir c1 () x m ()
c1 c2 x Void m r
c2 = CFConduit () Void m r -> m r
forall (c :: * -> * -> (* -> *) -> * -> *) (m :: * -> *) r.
(CRunnable c, RunConstraints c m) =>
c () Void m r -> m r
runCConduit (Natural
-> Maybe Int
-> FilePath
-> c1 () x m ()
-> c2 x Void m r
-> CFConduit () Void m r
forall (c1 :: * -> * -> (* -> *) -> * -> *)
       (c2 :: * -> * -> (* -> *) -> * -> *) x i (m :: * -> *) o r.
(CFConduitLike c1, CFConduitLike c2, Serialize x) =>
Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir c1 () x m ()
c1 c2 x Void m r
c2)

-- | Like 'buffer'', except that when the bounded queue is overflowed, the
-- excess is cached in a local file so that consumption from upstream may
-- continue. When the queue becomes exhausted by yielding, it is filled
-- from the cache until all elements have been yielded.
--
-- Note that the maximum amount of memory consumed is equal to (2 *
-- memorySize + 1), so take this into account when picking a chunking size.
--
-- This function is similar to '=$='; for one more like '$$', see
-- 'bufferToFile'.
--
-- >>> runResourceT $ runCConduit $ bufferToFile' 1 Nothing "/tmp" (CL.sourceList [1,2,3]) CL.consume
-- [1,2,3]
--
-- It is frequently convenient to define local function to use this in operator form:
--
-- >>> :{
-- runResourceT $ do
--   let buf c = bufferToFile' 10 Nothing "/tmp" c -- eta-conversion to avoid monomorphism restriction
--   runCConduit $ CL.sourceList [0x30, 0x31, 0x32] `buf` mapC (toEnum :: Int -> Char) `buf` CL.consume
-- :}
-- "012"
bufferToFile' :: (CFConduitLike c1, CFConduitLike c2, Serialize x)
                 => Natural -- ^ Size of the bounded queue in memory
                 -> Maybe Int -- ^ Max elements to keep on disk at one time
                 -> FilePath -- ^ Directory to write temp files to
                 -> c1 i x m ()
                 -> c2 x o m r
                 -> CFConduit i o m r
bufferToFile' :: Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir c1 i x m ()
c1 c2 x o m r
c2 = CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
combine (c1 i x m () -> CFConduit i x m ()
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit c1 i x m ()
c1) (c2 x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit c2 x o m r
c2)
  where combine :: CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
combine (FSingle ConduitT i x m ()
a) CFConduit x o m r
b = Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
forall x i (m :: * -> *) o r.
Serialize x =>
Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tmpDir ConduitT i x m ()
a CFConduit x o m r
b
        combine (FMultiple Natural
i ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i ConduitT i x m ()
a (Natural
-> Maybe Int
-> FilePath
-> CFConduit x x m ()
-> CFConduit x o m r
-> CFConduit x o m r
forall (c1 :: * -> * -> (* -> *) -> * -> *)
       (c2 :: * -> * -> (* -> *) -> * -> *) x i (m :: * -> *) o r.
(CFConduitLike c1, CFConduitLike c2, Serialize x) =>
Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir CFConduit x x m ()
as CFConduit x o m r
b)
        combine (FMultipleF Natural
bufsz' Maybe Int
dsksz' FilePath
tmpDir' ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
forall x i (m :: * -> *) o r.
Serialize x =>
Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
FMultipleF Natural
bufsz' Maybe Int
dsksz' FilePath
tmpDir' ConduitT i x m ()
a (Natural
-> Maybe Int
-> FilePath
-> CFConduit x x m ()
-> CFConduit x o m r
-> CFConduit x o m r
forall (c1 :: * -> * -> (* -> *) -> * -> *)
       (c2 :: * -> * -> (* -> *) -> * -> *) x i (m :: * -> *) o r.
(CFConduitLike c1, CFConduitLike c2, Serialize x) =>
Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir CFConduit x x m ()
as CFConduit x o m r
b)

-- | Conduits are, once there's a producer on one end and a consumer
-- on the other, runnable.
class CRunnable c where
  type RunConstraints c (m :: * -> *) :: Constraint
  -- | Execute a conduit concurrently.  This is the concurrent
  -- equivalent of 'runConduit'.
  --
  -- The underlying monad must always be an instance of
  -- 'MonadBaseControl IO'.  If the conduits is a 'CFConduit', it must
  -- additionally be a in instance of 'MonadResource'.
  runCConduit :: (RunConstraints c m) => c () Void m r -> m r

instance CCatable ConduitT ConduitT CConduit where
  buffer' :: Natural
-> ConduitT i x m () -> ConduitT x o m r -> CConduit i o m r
buffer' Natural
i ConduitT i x m ()
a ConduitT x o m r
b = Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (ConduitT i x m () -> CConduit i x m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT i x m ()
a) (ConduitT x o m r -> CConduit x o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT x o m r
b)

instance CCatable ConduitT CConduit CConduit where
  buffer' :: Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
buffer' Natural
i ConduitT i x m ()
a CConduit x o m r
b = Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (ConduitT i x m () -> CConduit i x m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT i x m ()
a) CConduit x o m r
b

instance CCatable ConduitT CFConduit CFConduit where
  buffer' :: Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
buffer' Natural
i ConduitT i x m ()
a CFConduit x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (ConduitT i x m () -> CFConduit i x m ()
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit ConduitT i x m ()
a) CFConduit x o m r
b

instance CCatable CConduit ConduitT CConduit where
  buffer' :: Natural
-> CConduit i x m () -> ConduitT x o m r -> CConduit i o m r
buffer' Natural
i CConduit i x m ()
a ConduitT x o m r
b = Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CConduit i x m ()
a (ConduitT x o m r -> CConduit x o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT x o m r
b)

instance CCatable CConduit CConduit CConduit where
  buffer' :: Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
buffer' Natural
i (Single ConduitT i x m ()
a) CConduit x o m r
b = Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
Multiple Natural
i ConduitT i x m ()
a CConduit x o m r
b
  buffer' Natural
i (Multiple Natural
i' ConduitT i x m ()
a CConduit x x m ()
as) CConduit x o m r
b = Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
Multiple Natural
i' ConduitT i x m ()
a (Natural
-> CConduit x x m () -> CConduit x o m r -> CConduit x o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CConduit x x m ()
as CConduit x o m r
b)

instance CCatable CConduit CFConduit CFConduit where
  buffer' :: Natural
-> CConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
buffer' Natural
i CConduit i x m ()
a CFConduit x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (CConduit i x m () -> CFConduit i x m ()
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit CConduit i x m ()
a) CFConduit x o m r
b

instance CCatable CFConduit ConduitT CFConduit where
  buffer' :: Natural
-> CFConduit i x m () -> ConduitT x o m r -> CFConduit i o m r
buffer' Natural
i CFConduit i x m ()
a ConduitT x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit i x m ()
a (ConduitT x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit ConduitT x o m r
b)

instance CCatable CFConduit CConduit CFConduit where
  buffer' :: Natural
-> CFConduit i x m () -> CConduit x o m r -> CFConduit i o m r
buffer' Natural
i CFConduit i x m ()
a CConduit x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit i x m ()
a (CConduit x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit CConduit x o m r
b)

instance CCatable CFConduit CFConduit CFConduit where
  buffer' :: Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
buffer' Natural
i (FSingle ConduitT i x m ()
a) CFConduit x o m r
b = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i ConduitT i x m ()
a CFConduit x o m r
b
  buffer' Natural
i (FMultiple Natural
i' ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i' ConduitT i x m ()
a (Natural
-> CFConduit x x m () -> CFConduit x o m r -> CFConduit x o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit x x m ()
as CFConduit x o m r
b)
  buffer' Natural
i (FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tmpDir ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
forall x i (m :: * -> *) o r.
Serialize x =>
Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tmpDir ConduitT i x m ()
a (Natural
-> CFConduit x x m () -> CFConduit x o m r -> CFConduit x o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
       (c2 :: k -> * -> (* -> *) -> * -> *)
       (c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit x x m ()
as CFConduit x o m r
b)

instance CRunnable ConduitT where
  type RunConstraints ConduitT m = (Monad m)
  runCConduit :: ConduitT () Void m r -> m r
runCConduit = ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit

instance CRunnable CConduit where
  type RunConstraints CConduit m = (MonadBaseControl IO m, MonadIO m)
  runCConduit :: CConduit () Void m r -> m r
runCConduit (Single ConduitT () Void m r
c) = ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit ConduitT () Void m r
c
  runCConduit (Multiple Natural
bufsz ConduitT () x m ()
c CConduit x Void m r
cs) = do
    TBQueue (Maybe x)
chan <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
    m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan ConduitT () x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' ->
      TBQueue (Maybe x) -> Async (StM m ()) -> CConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadIO m) =>
TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage TBQueue (Maybe x)
chan Async (StM m ())
c' CConduit x Void m r
cs

instance CRunnable CFConduit where
  type RunConstraints CFConduit m = (MonadBaseControl IO m, MonadIO m, MonadResource m, MonadThrow m)
  runCConduit :: CFConduit () Void m r -> m r
runCConduit (FSingle ConduitT () Void m r
c) = ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit ConduitT () Void m r
c
  runCConduit (FMultiple Natural
bufsz ConduitT () x m ()
c CFConduit x Void m r
cs) = do
    TBQueue (Maybe x)
chan <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
    m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan ConduitT () x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' ->
      ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (TBQueue (Maybe x) -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe x)
chan) Async (StM m ())
c' CFConduit x Void m r
cs
  runCConduit (FMultipleF Natural
bufsz Maybe Int
filemax FilePath
tempDir ConduitT () x m ()
c CFConduit x Void m r
cs) = do
    BufferContext m x
context <- IO (BufferContext m x) -> m (BufferContext m x)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BufferContext m x) -> m (BufferContext m x))
-> IO (BufferContext m x) -> m (BufferContext m x)
forall a b. (a -> b) -> a -> b
$ TBQueue x
-> TQueue (ConduitT () x m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m x
forall (m :: * -> *) a.
TBQueue a
-> TQueue (ConduitT () a m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m a
BufferContext (TBQueue x
 -> TQueue (ConduitT () x m ())
 -> TVar (Maybe Int)
 -> TVar Bool
 -> FilePath
 -> BufferContext m x)
-> IO (TBQueue x)
-> IO
     (TQueue (ConduitT () x m ())
      -> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> IO (TBQueue x)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
                                      IO
  (TQueue (ConduitT () x m ())
   -> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TQueue (ConduitT () x m ()))
-> IO
     (TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (TQueue (ConduitT () x m ()))
forall a. IO (TQueue a)
newTQueueIO
                                      IO (TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar (Maybe Int))
-> IO (TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Int -> IO (TVar (Maybe Int))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe Int
filemax
                                      IO (TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar Bool) -> IO (FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Bool -> IO (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
                                      IO (FilePath -> BufferContext m x)
-> IO FilePath -> IO (BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> FilePath -> IO FilePath
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
tempDir
    m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (BufferContext m x -> ConduitT () x m () -> m ()
forall (m :: * -> *) x.
(MonadIO m, MonadResource m, Serialize x, MonadThrow m) =>
BufferContext m x -> ConduitT () x m () -> m ()
fsender BufferContext m x
context ConduitT () x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' ->
      ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (BufferContext m x -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
BufferContext m o -> ConduitT () o m ()
freceiver BufferContext m x
context) Async (StM m ())
c' CFConduit x Void m r
cs

-- | A "concurrent conduit", in which the stages run in parallel with
-- a buffering queue between them.
data CConduit i o m r where
  Single :: ConduitT i o m r -> CConduit i o m r
  Multiple :: Natural -> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r

-- C.C.A.L's link2 has the wrong type:  https://github.com/maoe/lifted-async/issues/16
link2 :: MonadBase IO m => Async a -> Async b -> m ()
link2 :: Async a -> Async b -> m ()
link2 = (IO () -> m ()
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Async b -> IO ()) -> Async b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((Async b -> IO ()) -> Async b -> m ())
-> (Async a -> Async b -> IO ()) -> Async a -> Async b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> Async b -> IO ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
A.link2

-- Combines a producer with a queue, sending it everything the
-- producer produces.
sender :: (MonadIO m) => TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender :: TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe o)
chan ConduitT () o m ()
input = do
  ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () o m ()
input ConduitT () o m () -> ConduitM o Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (o -> m ()) -> ConduitM o Void m ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C (TBQueue (Maybe o) -> Maybe o -> m ()
forall (m :: * -> *) a. MonadIO m => TBQueue a -> a -> m ()
send TBQueue (Maybe o)
chan (Maybe o -> m ()) -> (o -> Maybe o) -> o -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Maybe o
forall a. a -> Maybe a
Just)
  TBQueue (Maybe o) -> Maybe o -> m ()
forall (m :: * -> *) a. MonadIO m => TBQueue a -> a -> m ()
send TBQueue (Maybe o)
chan Maybe o
forall a. Maybe a
Nothing

-- One "layer" of withAsync in a CConduit run.
stage :: (MonadBaseControl IO m, MonadIO m) => TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage :: TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage TBQueue (Maybe i)
chan Async x
prevAsync (Single ConduitT i Void m r
c) =
  -- The last layer; feed the output of "chan" into the conduit and
  -- wait for the result.
  m r -> (Async (StM m r) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m r -> m r) -> ConduitT () Void m r -> m r
forall a b. (a -> b) -> a -> b
$ TBQueue (Maybe i) -> ConduitT () i m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe i)
chan ConduitT () i m () -> ConduitT i Void m r -> ConduitT () Void m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i Void m r
c) ((Async (StM m r) -> m r) -> m r)
-> (Async (StM m r) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m r)
c' -> do
    Async x -> Async (StM m r) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m r)
c'
    Async (StM m r) -> m r
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m r)
c'
stage TBQueue (Maybe i)
chan Async x
prevAsync (Multiple Natural
bufsz ConduitT i x m ()
c CConduit x Void m r
cs) = do
  -- not the last layer, so take the input from "chan", have this
  -- layer's conduit process it, and send the conduit's output to the
  -- next layer.
  TBQueue (Maybe x)
chan' <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
  m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan' (ConduitT () x m () -> m ()) -> ConduitT () x m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue (Maybe i) -> ConduitT () i m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe i)
chan ConduitT () i m () -> ConduitT i x m () -> ConduitT () x m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' -> do
    Async x -> Async (StM m ()) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m ())
c'
    TBQueue (Maybe x) -> Async (StM m ()) -> CConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadIO m) =>
TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage TBQueue (Maybe x)
chan' Async (StM m ())
c' CConduit x Void m r
cs

-- A Producer which produces the values of the given channel until
-- Nothing is received.  This is the other half of "sender".
receiver :: (MonadIO m) => TBQueue (Maybe o) -> ConduitT () o m ()
receiver :: TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe o)
chan = do
  Maybe o
mx <- TBQueue (Maybe o) -> ConduitT () o m (Maybe o)
forall (m :: * -> *) a. MonadIO m => TBQueue a -> m a
recv TBQueue (Maybe o)
chan
  case Maybe o
mx of
   Maybe o
Nothing -> () -> ConduitT () o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
   Just o
x -> o -> ConduitT () o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
x ConduitT () o m () -> ConduitT () o m () -> ConduitT () o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TBQueue (Maybe o) -> ConduitT () o m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe o)
chan

-- | A "concurrent conduit", in which the stages run in parallel with
-- a buffering queue and possibly a disk file between them.
data CFConduit i o m r where
  FSingle :: ConduitT i o m r -> CFConduit i o m r
  FMultiple :: Natural -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
  FMultipleF :: (Serialize x) => Natural -> Maybe Int -> FilePath -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r

class CFConduitLike a where
  asCFConduit :: a i o m r -> CFConduit i o m r

instance CFConduitLike ConduitT where
  asCFConduit :: ConduitT i o m r -> CFConduit i o m r
asCFConduit = ConduitT i o m r -> CFConduit i o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CFConduit i o m r
FSingle

instance CFConduitLike CConduit where
  asCFConduit :: CConduit i o m r -> CFConduit i o m r
asCFConduit (Single ConduitT i o m r
c) = ConduitT i o m r -> CFConduit i o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CFConduit i o m r
FSingle ConduitT i o m r
c
  asCFConduit (Multiple Natural
i ConduitT i x m ()
c CConduit x o m r
cs) = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i ConduitT i x m ()
c (CConduit x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit CConduit x o m r
cs)

instance CFConduitLike CFConduit where
  asCFConduit :: CFConduit i o m r -> CFConduit i o m r
asCFConduit = CFConduit i o m r -> CFConduit i o m r
forall a. a -> a
id

data BufferContext m a = BufferContext { BufferContext m a -> TBQueue a
chan :: TBQueue a
                                       , BufferContext m a -> TQueue (ConduitT () a m ())
restore :: TQueue (ConduitT () a m ())
                                       , BufferContext m a -> TVar (Maybe Int)
slotsFree :: TVar (Maybe Int)
                                       , BufferContext m a -> TVar Bool
done :: TVar Bool
                                       , BufferContext m a -> FilePath
tempDir :: FilePath
                                       }

-- The file-backed equivlent of "sender".  This sends the values
-- generated by "input" to the "chan" in the BufferContext until it
-- gets full, then flushes it to disk via "persistChan".
fsender :: (MonadIO m, MonadResource m, Serialize x, MonadThrow m) => BufferContext m x -> ConduitT () x m () -> m ()
fsender :: BufferContext m x -> ConduitT () x m () -> m ()
fsender bc :: BufferContext m x
bc@BufferContext{FilePath
TVar Bool
TVar (Maybe Int)
TQueue (ConduitT () x m ())
TBQueue x
tempDir :: FilePath
done :: TVar Bool
slotsFree :: TVar (Maybe Int)
restore :: TQueue (ConduitT () x m ())
chan :: TBQueue x
$sel:tempDir:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> FilePath
$sel:done:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar Bool
$sel:slotsFree:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar (Maybe Int)
$sel:restore:BufferContext :: forall (m :: * -> *) a.
BufferContext m a -> TQueue (ConduitT () a m ())
$sel:chan:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TBQueue a
..} ConduitT () x m ()
input = do
  ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () x m ()
input ConduitT () x m () -> ConduitM x Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (x -> m ()) -> ConduitM x Void m ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C x -> m ()
f
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
done Bool
True
  where
    f :: x -> m ()
f x
x = m (m ()) -> m ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m ()) -> m ()) -> m (m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (m ()) -> m (m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (m ()) -> m (m ())) -> IO (m ()) -> m (m ())
forall a b. (a -> b) -> a -> b
$ STM (m ()) -> IO (m ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (m ()) -> IO (m ())) -> STM (m ()) -> IO (m ())
forall a b. (a -> b) -> a -> b
$
      (TBQueue x -> x -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue x
chan x
x STM () -> STM (m ()) -> STM (m ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m () -> STM (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())) STM (m ()) -> STM (m ()) -> STM (m ())
forall a. STM a -> STM a -> STM a
`orElse` do
        m ()
action <- BufferContext m x -> STM (m ())
forall (m :: * -> *) o.
(MonadIO m, MonadResource m, Serialize o, MonadThrow m) =>
BufferContext m o -> STM (m ())
persistChan BufferContext m x
bc
        TBQueue x -> x -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue x
chan x
x
        return m ()
action

-- Connect a stage to another stage via either an in-memory queue or a
-- disk buffer.  This is the file-backed equivalent of "stage".
fstage :: (MonadBaseControl IO m, MonadResource m, MonadThrow m) => ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage :: ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage ConduitT () i m ()
prevStage Async x
prevAsync (FSingle ConduitT i Void m r
c) =
  -- The final conduit in the chain; just accept everything from
  -- the previous stage and wait for the result.
  m r -> (Async (StM m r) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m r -> m r) -> ConduitT () Void m r -> m r
forall a b. (a -> b) -> a -> b
$ ConduitT () i m ()
prevStage ConduitT () i m () -> ConduitT i Void m r -> ConduitT () Void m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i Void m r
c) ((Async (StM m r) -> m r) -> m r)
-> (Async (StM m r) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m r)
c' -> do
    Async x -> Async (StM m r) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m r)
c'
    Async (StM m r) -> m r
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m r)
c'
fstage ConduitT () i m ()
prevStage Async x
prevAsync (FMultiple Natural
bufsz ConduitT i x m ()
c CFConduit x Void m r
cs) = do
  -- This stage is connected to the next via a non-file-backed
  -- channel, so it just uses "sender" and "reciever" in the same way
  -- "stage" does.
  TBQueue (Maybe x)
chan' <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
  m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan' (ConduitT () x m () -> m ()) -> ConduitT () x m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () i m ()
prevStage ConduitT () i m () -> ConduitT i x m () -> ConduitT () x m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' -> do
    Async x -> Async (StM m ()) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m ())
c'
    ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (TBQueue (Maybe x) -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe x)
chan') Async (StM m ())
c' CFConduit x Void m r
cs
fstage ConduitT () i m ()
prevStage Async x
prevAsync (FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tempDir ConduitT i x m ()
c CFConduit x Void m r
cs) = do
  -- This potentially needs to write its output to a file, so it uses
  -- "fsender" send and tells the next stage to use "freceiver" to read.
  BufferContext m x
bc <- IO (BufferContext m x) -> m (BufferContext m x)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BufferContext m x) -> m (BufferContext m x))
-> IO (BufferContext m x) -> m (BufferContext m x)
forall a b. (a -> b) -> a -> b
$ TBQueue x
-> TQueue (ConduitT () x m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m x
forall (m :: * -> *) a.
TBQueue a
-> TQueue (ConduitT () a m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m a
BufferContext (TBQueue x
 -> TQueue (ConduitT () x m ())
 -> TVar (Maybe Int)
 -> TVar Bool
 -> FilePath
 -> BufferContext m x)
-> IO (TBQueue x)
-> IO
     (TQueue (ConduitT () x m ())
      -> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> IO (TBQueue x)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
                               IO
  (TQueue (ConduitT () x m ())
   -> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TQueue (ConduitT () x m ()))
-> IO
     (TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (TQueue (ConduitT () x m ()))
forall a. IO (TQueue a)
newTQueueIO
                               IO (TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar (Maybe Int))
-> IO (TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Int -> IO (TVar (Maybe Int))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe Int
dsksz
                               IO (TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar Bool) -> IO (FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Bool -> IO (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
                               IO (FilePath -> BufferContext m x)
-> IO FilePath -> IO (BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> FilePath -> IO FilePath
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
tempDir
  m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (BufferContext m x -> ConduitT () x m () -> m ()
forall (m :: * -> *) x.
(MonadIO m, MonadResource m, Serialize x, MonadThrow m) =>
BufferContext m x -> ConduitT () x m () -> m ()
fsender BufferContext m x
bc (ConduitT () x m () -> m ()) -> ConduitT () x m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () i m ()
prevStage ConduitT () i m () -> ConduitT i x m () -> ConduitT () x m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' -> do
    Async x -> Async (StM m ()) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m ())
c'
    ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (BufferContext m x -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
BufferContext m o -> ConduitT () o m ()
freceiver BufferContext m x
bc) Async (StM m ())
c' CFConduit x Void m r
cs

-- Receives from disk files or the in-memory queue if no spill-to-disk
-- has occurred.
freceiver :: (MonadIO m) => BufferContext m o -> ConduitT () o m ()
freceiver :: BufferContext m o -> ConduitT () o m ()
freceiver BufferContext{FilePath
TVar Bool
TVar (Maybe Int)
TQueue (ConduitT () o m ())
TBQueue o
tempDir :: FilePath
done :: TVar Bool
slotsFree :: TVar (Maybe Int)
restore :: TQueue (ConduitT () o m ())
chan :: TBQueue o
$sel:tempDir:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> FilePath
$sel:done:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar Bool
$sel:slotsFree:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar (Maybe Int)
$sel:restore:BufferContext :: forall (m :: * -> *) a.
BufferContext m a -> TQueue (ConduitT () a m ())
$sel:chan:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TBQueue a
..} = ConduitT () o m ()
loop where
  loop :: ConduitT () o m ()
loop = do
    (ConduitT () o m ()
src, Bool
exit) <- IO (ConduitT () o m (), Bool)
-> ConduitT () o m (ConduitT () o m (), Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ConduitT () o m (), Bool)
 -> ConduitT () o m (ConduitT () o m (), Bool))
-> IO (ConduitT () o m (), Bool)
-> ConduitT () o m (ConduitT () o m (), Bool)
forall a b. (a -> b) -> a -> b
$ STM (ConduitT () o m (), Bool) -> IO (ConduitT () o m (), Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (ConduitT () o m (), Bool) -> IO (ConduitT () o m (), Bool))
-> STM (ConduitT () o m (), Bool) -> IO (ConduitT () o m (), Bool)
forall a b. (a -> b) -> a -> b
$ do
      (TQueue (ConduitT () o m ()) -> STM (ConduitT () o m ())
forall a. TQueue a -> STM a
readTQueue TQueue (ConduitT () o m ())
restore STM (ConduitT () o m ())
-> (ConduitT () o m () -> STM (ConduitT () o m (), Bool))
-> STM (ConduitT () o m (), Bool)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\ConduitT () o m ()
action -> (ConduitT () o m (), Bool) -> STM (ConduitT () o m (), Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () o m ()
action, Bool
False))) STM (ConduitT () o m (), Bool)
-> STM (ConduitT () o m (), Bool) -> STM (ConduitT () o m (), Bool)
forall a. STM a -> STM a -> STM a
`orElse` do
        [o]
xs <- TBQueue o -> STM [o]
forall a. TBQueue a -> STM [a]
exhaust TBQueue o
chan
        Bool
isDone <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
done
        return ([o] -> ConduitT () o m ()
forall (m :: * -> *) a i. Monad m => [a] -> ConduitT i a m ()
CL.sourceList [o]
xs, Bool
isDone)
    ConduitT () o m ()
src
    Bool -> ConduitT () o m () -> ConduitT () o m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
exit ConduitT () o m ()
loop

-- The channel is full, so (return an action which will) spill it to disk, unless too
-- many items are there already.
persistChan :: (MonadIO m, MonadResource m, Serialize o, MonadThrow m) => BufferContext m o -> STM (m ())
persistChan :: BufferContext m o -> STM (m ())
persistChan BufferContext{FilePath
TVar Bool
TVar (Maybe Int)
TQueue (ConduitT () o m ())
TBQueue o
tempDir :: FilePath
done :: TVar Bool
slotsFree :: TVar (Maybe Int)
restore :: TQueue (ConduitT () o m ())
chan :: TBQueue o
$sel:tempDir:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> FilePath
$sel:done:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar Bool
$sel:slotsFree:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar (Maybe Int)
$sel:restore:BufferContext :: forall (m :: * -> *) a.
BufferContext m a -> TQueue (ConduitT () a m ())
$sel:chan:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TBQueue a
..} = do
  [o]
xs <- TBQueue o -> STM [o]
forall a. TBQueue a -> STM [a]
exhaust TBQueue o
chan
  Maybe Int
mslots <- TVar (Maybe Int) -> STM (Maybe Int)
forall a. TVar a -> STM a
readTVar TVar (Maybe Int)
slotsFree
  let len :: Int
len = [o] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [o]
xs
  Maybe Int -> (Int -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Int
mslots ((Int -> STM ()) -> STM ()) -> (Int -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Int
slots -> Bool -> STM ()
check (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
slots)
  TMVar (FilePath, ReleaseKey)
filePath <- STM (TMVar (FilePath, ReleaseKey))
forall a. STM (TMVar a)
newEmptyTMVar
  TQueue (ConduitT () o m ()) -> ConduitT () o m () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ConduitT () o m ())
restore (ConduitT () o m () -> STM ()) -> ConduitT () o m () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
    (FilePath
path, ReleaseKey
key) <- IO (FilePath, ReleaseKey) -> ConduitT () o m (FilePath, ReleaseKey)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (FilePath, ReleaseKey)
 -> ConduitT () o m (FilePath, ReleaseKey))
-> IO (FilePath, ReleaseKey)
-> ConduitT () o m (FilePath, ReleaseKey)
forall a b. (a -> b) -> a -> b
$ STM (FilePath, ReleaseKey) -> IO (FilePath, ReleaseKey)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (FilePath, ReleaseKey) -> IO (FilePath, ReleaseKey))
-> STM (FilePath, ReleaseKey) -> IO (FilePath, ReleaseKey)
forall a b. (a -> b) -> a -> b
$ TMVar (FilePath, ReleaseKey) -> STM (FilePath, ReleaseKey)
forall a. TMVar a -> STM a
takeTMVar TMVar (FilePath, ReleaseKey)
filePath
    FilePath -> ConduitT () ByteString m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
path ConduitT () ByteString m ()
-> ConduitM ByteString o m () -> ConduitT () o m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| do
      Get o -> ConduitM ByteString o m ()
forall (m :: * -> *) o.
MonadThrow m =>
Get o -> ConduitT ByteString o m ()
C.conduitGet2 Get o
forall t. Serialize t => Get t
get
      IO () -> ConduitM ByteString o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitM ByteString o m ())
-> IO () -> ConduitM ByteString o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe Int) -> (Maybe Int -> Maybe Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Maybe Int)
slotsFree ((Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len))
      ReleaseKey -> ConduitM ByteString o m ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
key
  case [o]
xs of
   [] -> m () -> STM (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
   [o]
_ -> do
     TVar (Maybe Int) -> (Maybe Int -> Maybe Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Maybe Int)
slotsFree ((Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
len))
     return $ do
       (ReleaseKey
key, (FilePath
path, Handle
h)) <- IO (FilePath, Handle)
-> ((FilePath, Handle) -> IO ())
-> m (ReleaseKey, (FilePath, Handle))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (FilePath -> FilePath -> IO (FilePath, Handle)
openBinaryTempFile FilePath
tempDir FilePath
"conduit.bin") (\(FilePath
path, Handle
h) -> Handle -> IO ()
hClose Handle
h IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`finally` FilePath -> IO ()
removeFile FilePath
path)
       IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
         ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [o] -> ConduitT () o IO ()
forall (m :: * -> *) a i. Monad m => [a] -> ConduitT i a m ()
CL.sourceList [o]
xs ConduitT () o IO ()
-> ConduitM o Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Putter o -> ConduitT o ByteString IO ()
forall (m :: * -> *) a.
Monad m =>
Putter a -> ConduitT a ByteString m ()
C.conduitPut Putter o
forall t. Serialize t => Putter t
put ConduitT o ByteString IO ()
-> ConduitM ByteString Void IO () -> ConduitM o Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Handle -> ConduitM ByteString Void IO ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
CB.sinkHandle Handle
h
         Handle -> IO ()
hClose Handle
h
         STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (FilePath, ReleaseKey) -> (FilePath, ReleaseKey) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (FilePath, ReleaseKey)
filePath (FilePath
path, ReleaseKey
key)

exhaust :: TBQueue a -> STM [a]
exhaust :: TBQueue a -> STM [a]
exhaust TBQueue a
chan = STM Bool -> STM a -> STM [a]
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m [a]
whileM (Bool -> Bool
not (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
chan) (TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue TBQueue a
chan)

recv :: (MonadIO m) => TBQueue a -> m a
recv :: TBQueue a -> m a
recv TBQueue a
c = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> (STM a -> IO a) -> STM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM a -> m a) -> STM a -> m a
forall a b. (a -> b) -> a -> b
$ TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue TBQueue a
c

send :: (MonadIO m) => TBQueue a -> a -> m ()
send :: TBQueue a -> a -> m ()
send TBQueue a
c = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (a -> IO ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> (a -> STM ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
c

ccMap :: ( i1 . ConduitT i1 o1 m a -> ConduitT i1 o2 m a) -> CConduit i o1 m a -> CConduit i o2 m a
ccMap :: (forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a)
-> CConduit i o1 m a -> CConduit i o2 m a
ccMap forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a
f =
  CConduit i o1 m a -> CConduit i o2 m a
go
  where
    go :: CConduit i o1 m a -> CConduit i o2 m a
go (Single ConduitT i o1 m a
c) =
      ConduitT i o2 m a -> CConduit i o2 m a
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single (ConduitT i o1 m a -> ConduitT i o2 m a
forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a
f ConduitT i o1 m a
c)
    go (Multiple Natural
bufsz ConduitT i x m ()
l CConduit x o1 m a
r) =
      Natural
-> ConduitT i x m () -> CConduit x o2 m a -> CConduit i o2 m a
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
Multiple Natural
bufsz ConduitT i x m ()
l ((forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a)
-> CConduit x o1 m a -> CConduit x o2 m a
forall o1 (m :: * -> *) a o2 i.
(forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a)
-> CConduit i o1 m a -> CConduit i o2 m a
ccMap forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a
f CConduit x o1 m a
r)