{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE UndecidableInstances #-} -- | -- Module : DynamicPipeline.Channel -- Copyright : (c) 2021 Juan Pablo Royo Sales -- -- License : BSD3 -- Maintainer : juanpablo.royo@gmail.com -- Stability : experimental -- Portability : GHC -- module DynamicPipeline.Channel ( ReadChannel , WriteChannel , (|=>) , (|=>|) , (|>=>) , (|>=>|) , mapF_ , map_ , mapM_ , mapMF_ , foldM_ , foldWithM_ , push , pull , unfoldM , unfoldFile , unfoldT , newChannel , end , finish ) where import qualified Control.Concurrent as CC import Control.Concurrent.Chan.Unagi.NoBlocking import Control.Lens hiding ( (<|) ) import Data.ByteString as B import Data.Foldable as F hiding ( mapM_ ) import Data.HList hiding ( foldM_ , mapM_ ) import GHC.IO.Handle as H import Relude as R hiding ( mapM_ ) -- | 'WriteChannel' can only write values into some Channel Queue -- -- [@a@]: Type that this Channel can write newtype WriteChannel a = WriteChannel { WriteChannel a -> InChan (Maybe a) unWrite :: InChan (Maybe a) } -- | 'ReadChannel' can only read values of a previously written Channel. It is connected to a 'WriteChannel' but hidden for the user -- -- [@a@]: Type that this Channel can read newtype ReadChannel a = ReadChannel { ReadChannel a -> OutChan (Maybe a) unRead :: OutChan (Maybe a) } -- | 'map_' is a /Natural Transformation/ from consumer 'ReadChannel' to some producer 'WriteChannel' applying a transformation with function @f@ {-# INLINE map_ #-} map_ :: MonadIO m => ReadChannel a -- ^'ReadChannel' -> WriteChannel b -- ^'ReadChannel' -> (a -> b) -- ^Monadic Transformation to do with read element -> m () map_ :: ReadChannel a -> WriteChannel b -> (a -> b) -> m () map_ rc :: ReadChannel a rc wc :: WriteChannel b wc f :: a -> b f = ReadChannel a -> (a -> m ()) -> m () forall (m :: * -> *) a. MonadIO m => ReadChannel a -> (a -> m ()) -> m () foldM_ ReadChannel a rc ((a -> m ()) -> m ()) -> (a -> m ()) -> m () forall a b. (a -> b) -> a -> b $ (b -> WriteChannel b -> m ()) -> WriteChannel b -> b -> m () forall a b c. (a -> b -> c) -> b -> a -> c flip b -> WriteChannel b -> m () forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m () push WriteChannel b wc (b -> m ()) -> (a -> b) -> a -> m () forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> b f -- | Same as 'map_' but with 'id' combinator (|=>) :: MonadIO m => ReadChannel a -> WriteChannel a -> m () |=> :: ReadChannel a -> WriteChannel a -> m () (|=>) rc :: ReadChannel a rc wc :: WriteChannel a wc = ReadChannel a -> WriteChannel a -> (a -> a) -> m () forall (m :: * -> *) a b. MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m () map_ ReadChannel a rc WriteChannel a wc a -> a forall a. a -> a id infixl 5 |=> -- | Same as 'map_' but mark Eof Channel after all processing {-# INLINE mapF_ #-} mapF_ :: MonadIO m => ReadChannel a -- ^'ReadChannel' -> WriteChannel b -- ^'ReadChannel' -> (a -> b) -- ^Monadic Transformation to do with read element -> m () mapF_ :: ReadChannel a -> WriteChannel b -> (a -> b) -> m () mapF_ rc :: ReadChannel a rc wc :: WriteChannel b wc f :: a -> b f = ReadChannel a -> WriteChannel b -> (a -> b) -> m () forall (m :: * -> *) a b. MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m () map_ ReadChannel a rc WriteChannel b wc a -> b f m () -> m () -> m () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> WriteChannel b -> m () forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m () finish WriteChannel b wc -- | Alias 'mapF_' (|=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m () |=>| :: ReadChannel a -> WriteChannel b -> (a -> b) -> m () (|=>|) = ReadChannel a -> WriteChannel b -> (a -> b) -> m () forall (m :: * -> *) a b. MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m () mapF_ infixl 5 |=>| -- | Same as 'map_' But applying a Monadic mapping {-# INLINE mapM_ #-} mapM_ :: MonadIO m => ReadChannel a -- ^'ReadChannel' -> WriteChannel b -- ^'ReadChannel' -> (a -> m (Maybe b)) -- ^Monadic Transformation to do with read element -> m () mapM_ :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () mapM_ rc :: ReadChannel a rc wc :: WriteChannel b wc f :: a -> m (Maybe b) f = ReadChannel a -> (a -> m ()) -> m () forall (m :: * -> *) a. MonadIO m => ReadChannel a -> (a -> m ()) -> m () foldM_ ReadChannel a rc ((a -> m ()) -> m ()) -> (a -> m ()) -> m () forall a b. (a -> b) -> a -> b $ m () -> (b -> m ()) -> Maybe b -> m () forall b a. b -> (a -> b) -> Maybe a -> b maybe (() -> m () forall (f :: * -> *) a. Applicative f => a -> f a pure ()) (b -> WriteChannel b -> m () forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m () `push` WriteChannel b wc) (Maybe b -> m ()) -> (a -> m (Maybe b)) -> a -> m () forall (m :: * -> *) b c a. Monad m => (b -> m c) -> (a -> m b) -> a -> m c <=< a -> m (Maybe b) f -- | Alias 'mapM_' (|>=>) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () |>=> :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () (|>=>) = ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () forall (m :: * -> *) a b. MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () mapM_ infixr 5 |>=> -- | Same as 'mapM_' but mark Eof Channel after all processing {-# INLINE mapMF_ #-} mapMF_ :: MonadIO m => ReadChannel a -- ^'ReadChannel' -> WriteChannel b -- ^'ReadChannel' -> (a -> m (Maybe b)) -- ^Monadic Transformation to do with read element -> m () mapMF_ :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () mapMF_ rc :: ReadChannel a rc wc :: WriteChannel b wc f :: a -> m (Maybe b) f = ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () forall (m :: * -> *) a b. MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () mapM_ ReadChannel a rc WriteChannel b wc a -> m (Maybe b) f m () -> m () -> m () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> WriteChannel b -> m () forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m () finish WriteChannel b wc -- | Alias 'mapMF_' (|>=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () |>=>| :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () (|>=>|) = ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () forall (m :: * -> *) a b. MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () mapMF_ infixr 5 |>=>| -- | 'foldM_' is a /Catamorphism/ for consuming a 'ReadChannel' and do some Monadic @m@ computation with each element {-# INLINE foldM_ #-} foldM_ :: MonadIO m => ReadChannel a -- ^'ReadChannel' -> (a -> m ()) -- ^Computation to do with read element -> m () foldM_ :: ReadChannel a -> (a -> m ()) -> m () foldM_ = (ReadChannel a -> m () -> (a -> m ()) -> m ()) -> m () -> ReadChannel a -> (a -> m ()) -> m () forall a b c. (a -> b -> c) -> b -> a -> c flip ReadChannel a -> m () -> (a -> m ()) -> m () forall (m :: * -> *) a. MonadIO m => ReadChannel a -> m () -> (a -> m ()) -> m () foldWithM_ (() -> m () forall (f :: * -> *) a. Applicative f => a -> f a pure ()) -- | Idem 'foldM_' but allows pass a monadic computation to perform at the end of the Channel {-# INLINE foldWithM_ #-} foldWithM_ :: MonadIO m => ReadChannel a -- ^'ReadChannel' -> m () -- ^Computation to do at the end of the channel -> (a -> m ()) -- ^Computation to do with read element -> m () foldWithM_ :: ReadChannel a -> m () -> (a -> m ()) -> m () foldWithM_ = ReadChannel a -> m () -> (a -> m ()) -> m () forall (m :: * -> *) t b a. MonadIO m => ReadChannel t -> m b -> (t -> m a) -> m b loop' where loop' :: ReadChannel t -> m b -> (t -> m a) -> m b loop' c :: ReadChannel t c onNothing :: m b onNothing io :: t -> m a io = m b -> (t -> m b) -> Maybe t -> m b forall b a. b -> (a -> b) -> Maybe a -> b maybe m b onNothing (\e :: t e -> t -> m a io t e m a -> m b -> m b forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> ReadChannel t -> m b -> (t -> m a) -> m b loop' ReadChannel t c m b onNothing t -> m a io) (Maybe t -> m b) -> m (Maybe t) -> m b forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b =<< IO (Maybe t) -> m (Maybe t) forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (ReadChannel t -> IO (Maybe t) forall (m :: * -> *) a. MonadIO m => ReadChannel a -> m (Maybe a) pull ReadChannel t c) -- | Push element @a@ into 'WriteChannel' {-# INLINE push #-} push :: MonadIO m => a -> WriteChannel a -> m () push :: a -> WriteChannel a -> m () push a :: a a c :: WriteChannel a c = IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> IO () -> m () forall a b. (a -> b) -> a -> b $ InChan (Maybe a) -> Maybe a -> IO () forall a. InChan a -> a -> IO () writeChan (WriteChannel a -> InChan (Maybe a) forall a. WriteChannel a -> InChan (Maybe a) unWrite WriteChannel a c) (a -> Maybe a forall a. a -> Maybe a Just a a) -- | Pull element @Maybe a@ from 'ReadChannel' {-# INLINE pull #-} pull :: MonadIO m => ReadChannel a -> m (Maybe a) pull :: ReadChannel a -> m (Maybe a) pull = IO (Maybe a) -> m (Maybe a) forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO (Maybe a) -> m (Maybe a)) -> (ReadChannel a -> IO (Maybe a)) -> ReadChannel a -> m (Maybe a) forall b c a. (b -> c) -> (a -> b) -> a -> c . IO () -> OutChan (Maybe a) -> IO (Maybe a) forall a. IO () -> OutChan a -> IO a readChan (Int -> IO () CC.threadDelay 100) (OutChan (Maybe a) -> IO (Maybe a)) -> (ReadChannel a -> OutChan (Maybe a)) -> ReadChannel a -> IO (Maybe a) forall b c a. (b -> c) -> (a -> b) -> a -> c . ReadChannel a -> OutChan (Maybe a) forall a. ReadChannel a -> OutChan (Maybe a) unRead -- | Finalize Channel to indicate EOF mark and allow progress on following consumers finish :: MonadIO m => WriteChannel a -> m () finish :: WriteChannel a -> m () finish = IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> (WriteChannel a -> IO ()) -> WriteChannel a -> m () forall b c a. (b -> c) -> (a -> b) -> a -> c . WriteChannel a -> IO () forall a. WriteChannel a -> IO () end -- | Coalgebra with Monadic computation to Feed some 'WriteChannel' -- -- [@m@]: Monadic computation wrapping Coalgebra -- -- [@a@]: Element get from some Source and to be write in some Channel -- -- | unfold from a Monadic seed @m a@ to a 'WriteChannel' {-# INLINE unfoldM #-} unfoldM :: forall m a b . MonadIO m => m a -- ^Monadic Seed -> (a -> b) -- ^Map input from seed to something to be written in Channel -> m Bool -- ^When stop unfolding -> WriteChannel b -- ^'WriteChannel' to write input seed elements -> m () unfoldM :: m a -> (a -> b) -> m Bool -> WriteChannel b -> m () unfoldM = m a -> (a -> b) -> m Bool -> WriteChannel b -> m () forall (m :: * -> *) a a. MonadIO m => m a -> (a -> a) -> m Bool -> WriteChannel a -> m () loop' where loop' :: m a -> (a -> a) -> m Bool -> WriteChannel a -> m () loop' seed :: m a seed fn :: a -> a fn stopIfM :: m Bool stopIfM writeChannel :: WriteChannel a writeChannel = m Bool -> m () -> m () -> m () forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a ifM m Bool stopIfM (WriteChannel a -> m () forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m () finish WriteChannel a writeChannel) (m a seed m a -> (a -> m ()) -> m () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= (a -> WriteChannel a -> m ()) -> WriteChannel a -> a -> m () forall a b c. (a -> b -> c) -> b -> a -> c flip a -> WriteChannel a -> m () forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m () push WriteChannel a writeChannel (a -> m ()) -> (a -> a) -> a -> m () forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> a fn m () -> m () -> m () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> m a -> (a -> a) -> m Bool -> WriteChannel a -> m () loop' m a seed a -> a fn m Bool stopIfM WriteChannel a writeChannel) -- | Using 'unfoldM', unfold from file {-# INLINE unfoldFile #-} unfoldFile :: MonadIO m => FilePath -- ^Seed 'FilePath' to read from -> WriteChannel b -- ^'WriteChannel' to write File contents -> (ByteString -> b) -- ^Transform 'ByteString' read from File to something meaningful for your App -> m () unfoldFile :: FilePath -> WriteChannel b -> (ByteString -> b) -> m () unfoldFile file :: FilePath file writeChannel :: WriteChannel b writeChannel fn :: ByteString -> b fn = IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> IO () -> m () forall a b. (a -> b) -> a -> b $ FilePath -> IOMode -> (Handle -> IO ()) -> IO () forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r R.withFile FilePath file IOMode ReadMode ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ \h :: Handle h -> IO ByteString -> (ByteString -> b) -> IO Bool -> WriteChannel b -> IO () forall (m :: * -> *) a a. MonadIO m => m a -> (a -> a) -> m Bool -> WriteChannel a -> m () unfoldM (Handle -> IO ByteString B.hGetLine Handle h) ByteString -> b fn (Handle -> IO Bool H.hIsEOF Handle h) WriteChannel b writeChannel -- | Idem 'unfoldM' but for 'Foldable', for example a List @[a]@. Useful for testing purpose {-# INLINE unfoldT #-} unfoldT :: (MonadIO m, Foldable t) => t a -> WriteChannel b -> (a -> b) -> m () unfoldT :: t a -> WriteChannel b -> (a -> b) -> m () unfoldT ts :: t a ts writeChannel :: WriteChannel b writeChannel fn :: a -> b fn = t a -> (a -> m ()) -> m () forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => t a -> (a -> m b) -> m () forM_ t a ts ((b -> WriteChannel b -> m ()) -> WriteChannel b -> b -> m () forall a b c. (a -> b -> c) -> b -> a -> c flip b -> WriteChannel b -> m () forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m () push WriteChannel b writeChannel (b -> m ()) -> (a -> b) -> a -> m () forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> b fn) m () -> m () -> m () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> WriteChannel b -> m () forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m () finish WriteChannel b writeChannel {-# WARNING newChannel "INTERNAL USE" #-} {-# NOINLINE newChannel #-} newChannel :: forall a . IO (WriteChannel a, ReadChannel a) newChannel :: IO (WriteChannel a, ReadChannel a) newChannel = (InChan (Maybe a) -> WriteChannel a) -> (OutChan (Maybe a) -> ReadChannel a) -> (InChan (Maybe a), OutChan (Maybe a)) -> (WriteChannel a, ReadChannel a) forall (p :: * -> * -> *) a b c d. Bifunctor p => (a -> b) -> (c -> d) -> p a c -> p b d bimap InChan (Maybe a) -> WriteChannel a forall a. InChan (Maybe a) -> WriteChannel a WriteChannel OutChan (Maybe a) -> ReadChannel a forall a. OutChan (Maybe a) -> ReadChannel a ReadChannel ((InChan (Maybe a), OutChan (Maybe a)) -> (WriteChannel a, ReadChannel a)) -> IO (InChan (Maybe a), OutChan (Maybe a)) -> IO (WriteChannel a, ReadChannel a) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> IO (InChan (Maybe a), OutChan (Maybe a)) forall a. IO (InChan a, OutChan a) newChan {-# WARNING end "INTERNAL USE" #-} {-# INLINE end #-} end :: WriteChannel a -> IO () end :: WriteChannel a -> IO () end = (InChan (Maybe a) -> Maybe a -> IO ()) -> Maybe a -> InChan (Maybe a) -> IO () forall a b c. (a -> b -> c) -> b -> a -> c flip InChan (Maybe a) -> Maybe a -> IO () forall a. InChan a -> a -> IO () writeChan Maybe a forall a. Maybe a Nothing (InChan (Maybe a) -> IO ()) -> (WriteChannel a -> InChan (Maybe a)) -> WriteChannel a -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . WriteChannel a -> InChan (Maybe a) forall a. WriteChannel a -> InChan (Maybe a) unWrite