| Safe Haskell | Safe-Inferred |
|---|---|
| Language | Haskell2010 |
Torch.Data.StreamedPipeline
Synopsis
- class Monad m => Datastream m seed dataset sample | dataset -> sample where
- streamSamples :: dataset -> seed -> ListT m sample
- newtype DatastreamOptions = DatastreamOptions {
- bufferSize :: Int
- datastreamOpts :: DatastreamOptions
- streamFrom :: forall sample m dataset seed b. (Datastream m seed dataset sample, MonadBaseControl IO m, MonadBase IO m) => DatastreamOptions -> dataset -> ListT m seed -> ContT b m (ListT m sample)
- streamFrom' :: forall sample m f dataset seed b. (Show sample, Datastream m seed dataset sample, MonadBaseControl IO m, MonadBase IO m, MonadIO m, Foldable f) => DatastreamOptions -> dataset -> f seed -> ContT b m (ListT m sample)
- class (Applicative b, Applicative m, Monad b, Monad m) => MonadBase (b :: Type -> Type) (m :: Type -> Type) | m -> b where
- liftBase :: b α -> m α
- class MonadBase b m => MonadBaseControl (b :: Type -> Type) (m :: Type -> Type) | m -> b where
Defining a Datastream
We will show how to retrieve the IMDB dataset as an example datastream. The dataset used here can be found at https://ai.stanford.edu/~amaas/data/sentiment/
import Pipes
import qualified Pipes.Safe as Safe
import qualified Pipes.Prelude as P
import System.Directory
newtype Imdb = Imdb { dataDir :: String }
data Sentiment = Positive | Negative
instance (MonadBaseControl IO m, MonadSafe m) => Datastream m Sentiment Imdb (Text, Sentiment) where
streamSamples Imdb{..} sent = Select $ do
rawFilePaths <- zip (repeat sent) <$> (liftIO $ listDirectory (dataDir </> sentToPath sent))
let filePaths = fmap (second $ mappend (dataDir </> sentToPath sent)) rawFilePaths
for (each filePaths) $ \(rev, fp) -> Safe.withFile fp ReadMode $ \fh ->
P.zip (PT.fromHandleLn fh) (yield rev)
where sentToPath Pos = "pos" ++ pure pathSeparator
sentToPath Neg = "neg" ++ pure pathSeparatorThis streams in movie reviews from each file in either the positive review directory or the negative review directory, depending on the seed value used.
This highlights a use of seed values that is more interesting than just specifying the thread count, but also has some problems.
When running this datastream with either streamFrom or 'streamFrom'', you need to supply both Positive and Negative values as seeds
to retrieve the entire IMDB dataset, and in this case positive and negative reviews will be streamed in concurrently.
The problem with designing a datastream in this fashion is you limit the amount of concurrency (2 threads in this case) without
duplicating data. Ultimately though seeds should be quite flexible and allow you to design the concurrency how you see fit. Be careful
not to use duplicate seed values unless you want duplicate data.
Datastream
class Monad m => Datastream m seed dataset sample | dataset -> sample where Source #
The base datastream class. A dataset returns a stream of samples based on a seed value.
Methods
streamSamples :: dataset -> seed -> ListT m sample Source #
Instances
| (MonadBaseControl IO m, MonadSafe m, FromRecord batch) => Datastream m () (CsvDatastream batch) (Vector batch) Source # | |
Defined in Torch.Data.CsvDatastream Methods streamSamples :: CsvDatastream batch -> () -> ListT m (Vector batch) Source # | |
| (MonadBaseControl IO m, MonadSafe m, FromNamedRecord batch) => Datastream m () (CsvDatastreamNamed batch) (Vector batch) Source # | |
Defined in Torch.Data.CsvDatastream Methods streamSamples :: CsvDatastreamNamed batch -> () -> ListT m (Vector batch) Source # | |
| Monad m => Datastream m Int (MNIST m) (Tensor, Tensor) Source # | |
Defined in Torch.Vision | |
| Datastream m seed dataset batch => Datastream m seed (CollatedDataset m dataset batch collatedBatch) collatedBatch Source # | |
Defined in Torch.Data.Dataset Methods streamSamples :: CollatedDataset m dataset batch collatedBatch -> seed -> ListT m collatedBatch Source # | |
newtype DatastreamOptions Source #
Datastream options used when looding datastreams. Currently only buffer size is configurable,
since thread count is controlled by the number of seeds (see functions).streamFrom
Constructors
| DatastreamOptions | |
Fields
| |
datastreamOpts :: DatastreamOptions Source #
Default dataloader options, you should override the fields in this record.
Dataloading
streamFrom :: forall sample m dataset seed b. (Datastream m seed dataset sample, MonadBaseControl IO m, MonadBase IO m) => DatastreamOptions -> dataset -> ListT m seed -> ContT b m (ListT m sample) Source #
Return a stream of samples from the given dataset as a continuation. A stream of samples is generated for every seed in the given stream of seeds, and all of these streams are merged into the output stream in a non-deterministic order (if you need determinism see 'streamFrom''). Every stream created for each seed value is made in its own thread.
streamFrom' :: forall sample m f dataset seed b. (Show sample, Datastream m seed dataset sample, MonadBaseControl IO m, MonadBase IO m, MonadIO m, Foldable f) => DatastreamOptions -> dataset -> f seed -> ContT b m (ListT m sample) Source #
This function is the same as streamFrom except the seeds are specified as
a Foldable, and the stream returned has a deterministic ordering. The results
from each given seed are interspersed in the order defined by the of seeds.Foldable
Reexports
class (Applicative b, Applicative m, Monad b, Monad m) => MonadBase (b :: Type -> Type) (m :: Type -> Type) | m -> b where #
Instances
class MonadBase b m => MonadBaseControl (b :: Type -> Type) (m :: Type -> Type) | m -> b where #
Writing instances
The usual way to write a instance for a transformer
stack over a base monad MonadBaseControlB is to write an instance MonadBaseControl B B
for the base monad, and MonadTransControl T instances for every transformer
T. Instances for are then simply implemented using
MonadBaseControl, ComposeSt, defaultLiftBaseWith.defaultRestoreM
Associated Types
type StM (m :: Type -> Type) a #
Monadic state that m adds to the base monad b.
For all base (non-transformed) monads, StM m a ~ a:
StMIOa ~ a StMMaybea ~ a StM (Eithere) a ~ a StM [] a ~ a StM ((->) r) a ~ a StMIdentitya ~ a StMSTMa ~ a StM (STs) a ~ a
If m is a transformed monad, m ~ t b, is the monadic state of
the transformer StMt (given by its StT from MonadTransControl). For a
transformer stack, is defined recursively:StM
StM (IdentityTm) a ~ComposeStIdentityTm a ~ StM m a StM (MaybeTm) a ~ComposeStMaybeTm a ~ StM m (Maybea) StM (ErrorTe m) a ~ComposeStErrorTm a ~Errore => StM m (Eithere a) StM (ExceptTe m) a ~ComposeStExceptTm a ~ StM m (Eithere a) StM (ListTm) a ~ComposeStListTm a ~ StM m [a] StM (ReaderTr m) a ~ComposeStReaderTm a ~ StM m a StM (StateTs m) a ~ComposeStStateTm a ~ StM m (a, s) StM (WriterTw m) a ~ComposeStWriterTm a ~Monoidw => StM m (a, w) StM (RWSTr w s m) a ~ComposeStRWSTm a ~Monoidw => StM m (a, s, w)
Methods
liftBaseWith :: (RunInBase m b -> b a) -> m a #
liftBaseWith is similar to liftIO and liftBase in that it
lifts a base computation to the constructed monad.
Instances should satisfy similar laws as the MonadIO and MonadBase laws:
liftBaseWith (\_ -> return a) = return a
liftBaseWith (\_ -> m >>= f) = liftBaseWith (\_ -> m) >>= (\a -> liftBaseWith (\_ -> f a))
As Li-yao Xia explains, parametricity guarantees that
f $ liftBaseWith q = liftBaseWith $ runInBase -> f $ q runInBase
The difference with liftBase is that before lifting the base computation
liftBaseWith captures the state of m. It then provides the base
computation with a RunInBase function that allows running m
computations in the base monad on the captured state:
withFileLifted :: MonadBaseControl IO m => FilePath -> IOMode -> (Handle -> m a) -> m a
withFileLifted file mode action = liftBaseWith (\runInBase -> withFile file mode (runInBase . action)) >>= restoreM
-- = control $ \runInBase -> withFile file mode (runInBase . action)
-- = liftBaseOp (withFile file mode) action
is usually not implemented directly, but using
liftBaseWith.defaultLiftBaseWith
Construct a m computation from the monadic state of m that is
returned from a RunInBase function.
Instances should satisfy:
liftBaseWith (\runInBase -> runInBase m) >>= restoreM = m
is usually not implemented directly, but using
restoreM.defaultRestoreM
Instances
| MonadBaseControl Identity Identity | |
| MonadBaseControl STM STM | |
| MonadBaseControl IO IO | |
| MonadBaseControl Maybe Maybe | |
| MonadBaseControl List List | |
| MonadBaseControl b m => MonadBaseControl b (SafeT m) | |
| MonadBaseControl b m => MonadBaseControl b (MaybeT m) | |
| MonadBaseControl b m => MonadBaseControl b (ExceptT e m) | |
| MonadBaseControl b m => MonadBaseControl b (IdentityT m) | |
| MonadBaseControl b m => MonadBaseControl b (ReaderT r m) | |
| MonadBaseControl b m => MonadBaseControl b (StateT s m) | |
| MonadBaseControl b m => MonadBaseControl b (StateT s m) | |
| (Monoid w, MonadBaseControl b m) => MonadBaseControl b (WriterT w m) | |
| (Monoid w, MonadBaseControl b m) => MonadBaseControl b (WriterT w m) | |
| (Monoid w, MonadBaseControl b m) => MonadBaseControl b (RWST r w s m) | |
| (Monoid w, MonadBaseControl b m) => MonadBaseControl b (RWST r w s m) | |
| MonadBaseControl (ST s) (ST s) | |
| MonadBaseControl (Either e) (Either e) | |
| MonadBaseControl (ST s) (ST s) | |
| MonadBaseControl ((->) r) ((->) r) | |
Defined in Control.Monad.Trans.Control | |