{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
module Torch.Data.StreamedPipeline
(
Datastream (..),
DatastreamOptions (..),
datastreamOpts,
streamFrom,
streamFrom',
MonadBase (..),
MonadBaseControl (..),
)
where
import Control.Arrow (second)
import Control.Concurrent.Async.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Foldl (FoldM)
import qualified Control.Foldl as L
import Control.Monad
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Cont (ContT (..))
import Control.Monad.Trans.Control
import qualified Data.Vector as V
import Pipes
import Pipes.Concurrent hiding (atomically)
import qualified Pipes.Prelude as P
import Torch.Data.Internal
class Monad m => Datastream m seed dataset sample | dataset -> sample where
streamSamples :: dataset -> seed -> ListT m sample
newtype DatastreamOptions = DatastreamOptions
{
DatastreamOptions -> Int
bufferSize :: Int
}
datastreamOpts :: DatastreamOptions
datastreamOpts :: DatastreamOptions
datastreamOpts = DatastreamOptions {bufferSize :: Int
bufferSize = Int
4}
streamFrom ::
forall sample m dataset seed b.
(Datastream m seed dataset sample, MonadBaseControl IO m, MonadBase IO m) =>
DatastreamOptions ->
dataset ->
ListT m seed ->
ContT b m (ListT m sample)
streamFrom :: forall sample (m :: * -> *) dataset seed b.
(Datastream m seed dataset sample, MonadBaseControl IO m,
MonadBase IO m) =>
DatastreamOptions
-> dataset -> ListT m seed -> ContT b m (ListT m sample)
streamFrom DatastreamOptions {Int
bufferSize :: DatastreamOptions -> Int
bufferSize :: Int
..} dataset
dataset ListT m seed
seeds = Int -> (Output sample -> m ()) -> ContT b m (ListT m sample)
forall a (m :: * -> *) b.
MonadBaseControl IO m =>
Int -> (Output a -> m ()) -> ContT b m (ListT m a)
runWithBuffer Int
bufferSize ((Output sample -> m ()) -> ContT b m (ListT m sample))
-> (Output sample -> m ()) -> ContT b m (ListT m sample)
forall a b. (a -> b) -> a -> b
$ dataset -> ListT m seed -> Output sample -> m ()
forall (m :: * -> *) seed dataset sample.
(Datastream m seed dataset sample, MonadBaseControl IO m) =>
dataset -> ListT m seed -> Output sample -> m ()
readSamples dataset
dataset ListT m seed
seeds
streamFrom' ::
forall sample m f dataset seed b.
(Show sample, Datastream m seed dataset sample, MonadBaseControl IO m, MonadBase IO m, MonadIO m, Foldable f) =>
DatastreamOptions ->
dataset ->
f seed ->
ContT b m (ListT m sample)
streamFrom' :: forall sample (m :: * -> *) (f :: * -> *) dataset seed b.
(Show sample, Datastream m seed dataset sample,
MonadBaseControl IO m, MonadBase IO m, MonadIO m, Foldable f) =>
DatastreamOptions
-> dataset -> f seed -> ContT b m (ListT m sample)
streamFrom' DatastreamOptions {Int
bufferSize :: DatastreamOptions -> Int
bufferSize :: Int
..} dataset
dataset f seed
seeds = do
TVar Int
workerTracker <- STM (TVar Int) -> ContT b m (TVar Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TVar Int) -> ContT b m (TVar Int))
-> STM (TVar Int) -> ContT b m (TVar Int)
forall a b. (a -> b) -> a -> b
$ Int -> STM (TVar Int)
forall a. a -> STM (TVar a)
newTVar Int
0
let consumeSeeds :: Vector (Output sample, Input sample, STM ())
-> Output sample -> Proxy X () () X m ()
consumeSeeds Vector (Output sample, Input sample, STM ())
mailboxes Output sample
o = do
Proxy X () () (Output sample, Input sample, STM ()) m ()
-> ((Output sample, Input sample, STM ()) -> Proxy X () () X m ())
-> Proxy X () () X m ()
forall (m :: * -> *) x' x b' b a' c' c.
Functor m =>
Proxy x' x b' b m a'
-> (b -> Proxy x' x c' c m b') -> Proxy x' x c' c m a'
for (Vector (Output sample, Input sample, STM ())
-> Proxy X () () (Output sample, Input sample, STM ()) m ()
forall (m :: * -> *) (f :: * -> *) a x' x.
(Functor m, Foldable f) =>
f a -> Proxy x' x () a m ()
each Vector (Output sample, Input sample, STM ())
mailboxes) (((Output sample, Input sample, STM ()) -> Proxy X () () X m ())
-> Proxy X () () X m ())
-> ((Output sample, Input sample, STM ()) -> Proxy X () () X m ())
-> Proxy X () () X m ()
forall a b. (a -> b) -> a -> b
$ \(Output sample
_, Input sample
input, STM ()
_) -> TVar Int -> Input sample -> Producer sample m ()
forall (m :: * -> *) a.
MonadIO m =>
TVar Int -> Input a -> Producer a m ()
fromInputOnce TVar Int
workerTracker Input sample
input Producer sample m ()
-> Proxy () sample () X m () -> Proxy X () () X m ()
forall (m :: * -> *) a' a b r c' c.
Functor m =>
Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r
>-> Output sample -> Consumer' sample m ()
forall (m :: * -> *) a.
MonadBase IO m =>
Output a -> Consumer' a m ()
toOutput' Output sample
o
Bool
keepReading <- m Bool -> Proxy X () () X m Bool
forall (m :: * -> *) a. Monad m => m a -> Proxy X () () X m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Bool -> Proxy X () () X m Bool)
-> m Bool -> Proxy X () () X m Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> m Bool) -> STM Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ (\Int
x -> Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Vector (Output sample, Input sample, STM ()) -> Int
forall a. Vector a -> Int
V.length Vector (Output sample, Input sample, STM ())
mailboxes) (Int -> Bool) -> STM Int -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
workerTracker
Bool -> Proxy X () () X m () -> Proxy X () () X m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
keepReading (Proxy X () () X m () -> Proxy X () () X m ())
-> Proxy X () () X m () -> Proxy X () () X m ()
forall a b. (a -> b) -> a -> b
$ Vector (Output sample, Input sample, STM ())
-> Output sample -> Proxy X () () X m ()
consumeSeeds Vector (Output sample, Input sample, STM ())
mailboxes Output sample
o
Int -> (Output sample -> m ()) -> ContT b m (ListT m sample)
forall a (m :: * -> *) b.
MonadBaseControl IO m =>
Int -> (Output a -> m ()) -> ContT b m (ListT m a)
runWithBuffer Int
bufferSize ((Output sample -> m ()) -> ContT b m (ListT m sample))
-> (Output sample -> m ()) -> ContT b m (ListT m sample)
forall a b. (a -> b) -> a -> b
$ \Output sample
o ->
m (Vector (seed, (Output sample, Input sample, STM ())))
-> (Vector (seed, (Output sample, Input sample, STM ())) -> m ())
-> (Vector (seed, (Output sample, Input sample, STM ())) -> m ())
-> m ()
forall (m :: * -> *) a b c.
MonadBaseControl IO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
liftedBracket
(FoldM m seed (Vector (seed, (Output sample, Input sample, STM ())))
-> f seed
-> m (Vector (seed, (Output sample, Input sample, STM ())))
forall (f :: * -> *) (m :: * -> *) a b.
(Foldable f, Monad m) =>
FoldM m a b -> f a -> m b
L.foldM FoldM m seed (Vector (seed, (Output sample, Input sample, STM ())))
forall (m :: * -> *) seed a.
MonadIO m =>
FoldM m seed (Vector (seed, (Output a, Input a, STM ())))
pairSeedWithBuffer f seed
seeds)
(((seed, (Output sample, Input sample, STM ())) -> m ())
-> Vector (seed, (Output sample, Input sample, STM ())) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((seed, (Output sample, Input sample, STM ())) -> STM ())
-> (seed, (Output sample, Input sample, STM ()))
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Output sample, Input sample, STM ()) -> STM ()
forall {a} {b} {c}. (a, b, c) -> c
third ((Output sample, Input sample, STM ()) -> STM ())
-> ((seed, (Output sample, Input sample, STM ()))
-> (Output sample, Input sample, STM ()))
-> (seed, (Output sample, Input sample, STM ()))
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (seed, (Output sample, Input sample, STM ()))
-> (Output sample, Input sample, STM ())
forall a b. (a, b) -> b
snd))
( \Vector (seed, (Output sample, Input sample, STM ()))
a ->
let mailboxes :: Vector (Output sample, Input sample, STM ())
mailboxes = (seed, (Output sample, Input sample, STM ()))
-> (Output sample, Input sample, STM ())
forall a b. (a, b) -> b
snd ((seed, (Output sample, Input sample, STM ()))
-> (Output sample, Input sample, STM ()))
-> Vector (seed, (Output sample, Input sample, STM ()))
-> Vector (Output sample, Input sample, STM ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (seed, (Output sample, Input sample, STM ()))
a
seedAndOutput :: Vector (seed, Output sample)
seedAndOutput = ((Output sample, Input sample, STM ()) -> Output sample)
-> (seed, (Output sample, Input sample, STM ()))
-> (seed, Output sample)
forall b c d. (b -> c) -> (d, b) -> (d, c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second (Output sample, Input sample, STM ()) -> Output sample
forall {a} {b} {c}. (a, b, c) -> a
fst3 ((seed, (Output sample, Input sample, STM ()))
-> (seed, Output sample))
-> Vector (seed, (Output sample, Input sample, STM ()))
-> Vector (seed, Output sample)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (seed, (Output sample, Input sample, STM ()))
a
in m () -> m () -> m ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m ()
concurrently_
(dataset -> Vector (seed, Output sample) -> m ()
forall (m :: * -> *) seed (f :: * -> *) dataset sample.
(Datastream m seed dataset sample, MonadBaseControl IO m,
MonadIO m, Foldable f) =>
dataset -> f (seed, Output sample) -> m ()
readSamplesDeterministic dataset
dataset Vector (seed, Output sample)
seedAndOutput m () -> m () -> m ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`liftedFinally` ((Output sample, Input sample, STM ()) -> m ())
-> Vector (Output sample, Input sample, STM ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Output sample, Input sample, STM ()) -> STM ())
-> (Output sample, Input sample, STM ())
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Output sample, Input sample, STM ()) -> STM ()
forall {a} {b} {c}. (a, b, c) -> c
third) Vector (Output sample, Input sample, STM ())
mailboxes)
(Proxy X () () X m () -> m ()
forall (m :: * -> *) r. Monad m => Effect m r -> m r
runEffect (Vector (Output sample, Input sample, STM ())
-> Output sample -> Proxy X () () X m ()
consumeSeeds Vector (Output sample, Input sample, STM ())
mailboxes Output sample
o) m () -> m () -> m ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`liftedFinally` ((Output sample, Input sample, STM ()) -> m ())
-> Vector (Output sample, Input sample, STM ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Output sample, Input sample, STM ()) -> STM ())
-> (Output sample, Input sample, STM ())
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Output sample, Input sample, STM ()) -> STM ()
forall {a} {b} {c}. (a, b, c) -> c
third) Vector (Output sample, Input sample, STM ())
mailboxes)
)
where
fst3 :: (a, b, c) -> a
fst3 (a
a, b
_, c
_) = a
a
third :: (a, b, c) -> c
third (a
_, b
_, c
c) = c
c
readSamples ::
forall m seed dataset sample.
(Datastream m seed dataset sample, MonadBaseControl IO m) =>
dataset ->
ListT m seed ->
Output sample ->
m ()
readSamples :: forall (m :: * -> *) seed dataset sample.
(Datastream m seed dataset sample, MonadBaseControl IO m) =>
dataset -> ListT m seed -> Output sample -> m ()
readSamples dataset
dataset ListT m seed
seeds Output sample
outputBox =
let this :: Concurrently m () -> seed -> Concurrently m ()
this = (seed -> Concurrently m () -> Concurrently m ())
-> Concurrently m () -> seed -> Concurrently m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((seed -> Concurrently m () -> Concurrently m ())
-> Concurrently m () -> seed -> Concurrently m ())
-> (seed -> Concurrently m () -> Concurrently m ())
-> Concurrently m ()
-> seed
-> Concurrently m ()
forall a b. (a -> b) -> a -> b
$ Concurrently m () -> Concurrently m () -> Concurrently m ()
forall a. Monoid a => a -> a -> a
mappend (Concurrently m () -> Concurrently m () -> Concurrently m ())
-> (seed -> Concurrently m ())
-> seed
-> Concurrently m ()
-> Concurrently m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> Concurrently m ()
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m () -> Concurrently m ())
-> (seed -> m ()) -> seed -> Concurrently m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Effect m () -> m ()
forall (m :: * -> *) r. Monad m => Effect m r -> m r
runEffect (Effect m () -> m ()) -> (seed -> Effect m ()) -> seed -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proxy X () () sample m ()
-> Proxy () sample () X m () -> Effect m ()
forall (m :: * -> *) a' a b r c' c.
Functor m =>
Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r
>-> Output sample -> Consumer' sample m ()
forall (m :: * -> *) a.
MonadBase IO m =>
Output a -> Consumer' a m ()
toOutput' Output sample
outputBox) (Proxy X () () sample m () -> Effect m ())
-> (seed -> Proxy X () () sample m ()) -> seed -> Effect m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ListT m sample -> Proxy X () () sample m ()
forall (m :: * -> *) a. ListT m a -> Producer a m ()
enumerate (ListT m sample -> Proxy X () () sample m ())
-> (seed -> ListT m sample) -> seed -> Proxy X () () sample m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) seed dataset sample.
Datastream m seed dataset sample =>
dataset -> seed -> ListT m sample
streamSamples @m @seed @dataset @sample dataset
dataset
in m (m ()) -> m ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m ()) -> m ())
-> (Producer seed m () -> m (m ())) -> Producer seed m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Concurrently m () -> seed -> Concurrently m ())
-> Concurrently m ()
-> (Concurrently m () -> m ())
-> Producer seed m ()
-> m (m ())
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Producer a m () -> m b
P.fold Concurrently m () -> seed -> Concurrently m ()
this Concurrently m ()
forall a. Monoid a => a
mempty Concurrently m () -> m ()
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently (Producer seed m () -> m ()) -> Producer seed m () -> m ()
forall a b. (a -> b) -> a -> b
$ ListT m seed -> Producer seed m ()
forall (m :: * -> *) a. ListT m a -> Producer a m ()
enumerate ListT m seed
seeds
readSamplesDeterministic ::
forall m seed f dataset sample.
(Datastream m seed dataset sample, MonadBaseControl IO m, MonadIO m, Foldable f) =>
dataset ->
f (seed, Output sample) ->
m ()
readSamplesDeterministic :: forall (m :: * -> *) seed (f :: * -> *) dataset sample.
(Datastream m seed dataset sample, MonadBaseControl IO m,
MonadIO m, Foldable f) =>
dataset -> f (seed, Output sample) -> m ()
readSamplesDeterministic dataset
dataset f (seed, Output sample)
seeds =
let this :: Concurrently m () -> (seed, Output sample) -> Concurrently m ()
this Concurrently m ()
c (seed
seed, Output sample
outputBox) =
Concurrently m () -> Concurrently m () -> Concurrently m ()
forall a. Monoid a => a -> a -> a
mappend Concurrently m ()
c (Concurrently m () -> Concurrently m ())
-> (ListT m sample -> Concurrently m ())
-> ListT m sample
-> Concurrently m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> Concurrently m ()
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m () -> Concurrently m ())
-> (ListT m sample -> m ()) -> ListT m sample -> Concurrently m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Effect m () -> m ()
forall (m :: * -> *) r. Monad m => Effect m r -> m r
runEffect (Effect m () -> m ())
-> (ListT m sample -> Effect m ()) -> ListT m sample -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proxy X () () sample m ()
-> Proxy () sample () X m () -> Effect m ()
forall (m :: * -> *) a' a b r c' c.
Functor m =>
Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r
>-> Output sample -> Consumer' sample m ()
forall (m :: * -> *) a.
MonadBase IO m =>
Output a -> Consumer' a m ()
toOutput' Output sample
outputBox) (Proxy X () () sample m () -> Effect m ())
-> (ListT m sample -> Proxy X () () sample m ())
-> ListT m sample
-> Effect m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ListT m sample -> Proxy X () () sample m ()
forall (m :: * -> *) a. ListT m a -> Producer a m ()
enumerate (ListT m sample -> Concurrently m ())
-> ListT m sample -> Concurrently m ()
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) seed dataset sample.
Datastream m seed dataset sample =>
dataset -> seed -> ListT m sample
streamSamples @m @seed @dataset @sample dataset
dataset seed
seed
in Fold (seed, Output sample) (m ())
-> f (seed, Output sample) -> m ()
forall (f :: * -> *) a b. Foldable f => Fold a b -> f a -> b
L.fold ((Concurrently m () -> (seed, Output sample) -> Concurrently m ())
-> Concurrently m ()
-> (Concurrently m () -> m ())
-> Fold (seed, Output sample) (m ())
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
L.Fold Concurrently m () -> (seed, Output sample) -> Concurrently m ()
this Concurrently m ()
forall a. Monoid a => a
mempty Concurrently m () -> m ()
forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently) f (seed, Output sample)
seeds
pairSeedWithBuffer :: MonadIO m => FoldM m seed (V.Vector (seed, (Output a, Input a, STM ())))
pairSeedWithBuffer :: forall (m :: * -> *) seed a.
MonadIO m =>
FoldM m seed (Vector (seed, (Output a, Input a, STM ())))
pairSeedWithBuffer = (seed -> m (seed, (Output a, Input a, STM ())))
-> FoldM
m
(seed, (Output a, Input a, STM ()))
(Vector (seed, (Output a, Input a, STM ())))
-> FoldM m seed (Vector (seed, (Output a, Input a, STM ())))
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> FoldM m b r -> FoldM m a r
L.premapM (\seed
a -> (seed
a,) ((Output a, Input a, STM ())
-> (seed, (Output a, Input a, STM ())))
-> m (Output a, Input a, STM ())
-> m (seed, (Output a, Input a, STM ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Output a, Input a, STM ())
forall {a}. m (Output a, Input a, STM ())
makeMailbox) (FoldM
m
(seed, (Output a, Input a, STM ()))
(Vector (seed, (Output a, Input a, STM ())))
-> FoldM m seed (Vector (seed, (Output a, Input a, STM ()))))
-> FoldM
m
(seed, (Output a, Input a, STM ()))
(Vector (seed, (Output a, Input a, STM ())))
-> FoldM m seed (Vector (seed, (Output a, Input a, STM ())))
forall a b. (a -> b) -> a -> b
$ Fold
(seed, (Output a, Input a, STM ()))
(Vector (seed, (Output a, Input a, STM ())))
-> FoldM
m
(seed, (Output a, Input a, STM ()))
(Vector (seed, (Output a, Input a, STM ())))
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
L.generalize Fold
(seed, (Output a, Input a, STM ()))
(Vector (seed, (Output a, Input a, STM ())))
forall (v :: * -> *) a. Vector v a => Fold a (v a)
L.vector
where
makeMailbox :: m (Output a, Input a, STM ())
makeMailbox = IO (Output a, Input a, STM ()) -> m (Output a, Input a, STM ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Output a, Input a, STM ()) -> m (Output a, Input a, STM ()))
-> IO (Output a, Input a, STM ()) -> m (Output a, Input a, STM ())
forall a b. (a -> b) -> a -> b
$ Buffer a -> IO (Output a, Input a, STM ())
forall a. Buffer a -> IO (Output a, Input a, STM ())
spawn' (Int -> Buffer a
forall a. Int -> Buffer a
bounded Int
1)
fromInputOnce :: MonadIO m => TVar Int -> Input a -> Producer a m ()
fromInputOnce :: forall (m :: * -> *) a.
MonadIO m =>
TVar Int -> Input a -> Producer a m ()
fromInputOnce TVar Int
workerTracker Input a
input = do
Maybe a
ma <- STM (Maybe a) -> Proxy X () () a m (Maybe a)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe a) -> Proxy X () () a m (Maybe a))
-> STM (Maybe a) -> Proxy X () () a m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Input a -> STM (Maybe a)
forall a. Input a -> STM (Maybe a)
recv Input a
input
case Maybe a
ma of
Maybe a
Nothing -> do
STM () -> Producer a m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> Producer a m ()) -> STM () -> Producer a m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
workerTracker STM Int -> (Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
workerTracker (Int -> STM ()) -> (Int -> Int) -> Int -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Int
1
() -> Producer a m ()
forall a. a -> Proxy X () () a m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
a -> do
a -> Producer a m ()
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a
() -> Producer a m ()
forall a. a -> Proxy X () () a m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()