{-# LANGUAGE AllowAmbiguousTypes  #-}
{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module      : DynamicPipeline.Channel
-- Copyright   : (c) 2021 Juan Pablo Royo Sales
--
-- License     : BSD3
-- Maintainer  : juanpablo.royo@gmail.com
-- Stability   : experimental
-- Portability : GHC
--
module DynamicPipeline.Channel
  ( ReadChannel
  , WriteChannel
  , (|=>)
  , (|=>|)
  , (|>=>)
  , (|>=>|)
  , mapF_
  , map_
  , mapM_
  , mapMF_
  , foldM_
  , foldWithM_
  , push
  , pull
  , unfoldM
  , unfoldFile
  , unfoldT
  , newChannel
  , end
  , finish
  ) where

import qualified Control.Concurrent                                as CC
import           Control.Concurrent.Chan.Unagi.NoBlocking
import           Control.Lens                                                                                  hiding ( (<|)
                                                                                                                      )
import           Data.ByteString                                   as B
import           Data.Foldable                                     as F
                                                                                                               hiding ( mapM_
                                                                                                                      )
import           Data.HList                                                                                    hiding ( foldM_
                                                                                                                      , mapM_
                                                                                                                      )
import           GHC.IO.Handle                                     as H
import           Relude                                            as R
                                                                                                               hiding ( mapM_
                                                                                                                      )


-- | 'WriteChannel' can only write values into some Channel Queue
-- 
-- [@a@]: Type that this Channel can write
newtype WriteChannel a = WriteChannel { WriteChannel a -> InChan (Maybe a)
unWrite :: InChan (Maybe a) }

-- | 'ReadChannel' can only read values of a previously written Channel. It is connected to a 'WriteChannel' but hidden for the user 
-- 
-- [@a@]: Type that this Channel can read
newtype ReadChannel a = ReadChannel { ReadChannel a -> OutChan (Maybe a)
unRead :: OutChan (Maybe a) }

-- | 'map_' is a /Natural Transformation/ from consumer 'ReadChannel' to some producer 'WriteChannel' applying a transformation with function @f@
{-# INLINE map_ #-}
map_ :: MonadIO m
     => ReadChannel a -- ^'ReadChannel'
     -> WriteChannel b -- ^'ReadChannel'
     -> (a -> b) -- ^Monadic Transformation to do with read element
     -> m ()
map_ :: ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
map_ rc :: ReadChannel a
rc wc :: WriteChannel b
wc f :: a -> b
f = ReadChannel a -> (a -> m ()) -> m ()
forall (m :: * -> *) a.
MonadIO m =>
ReadChannel a -> (a -> m ()) -> m ()
foldM_ ReadChannel a
rc ((a -> m ()) -> m ()) -> (a -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ (b -> WriteChannel b -> m ()) -> WriteChannel b -> b -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip b -> WriteChannel b -> m ()
forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m ()
push WriteChannel b
wc (b -> m ()) -> (a -> b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f

-- | Same as 'map_' but with 'id' combinator
(|=>) :: MonadIO m => ReadChannel a -> WriteChannel a -> m ()
|=> :: ReadChannel a -> WriteChannel a -> m ()
(|=>) rc :: ReadChannel a
rc wc :: WriteChannel a
wc = ReadChannel a -> WriteChannel a -> (a -> a) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
map_ ReadChannel a
rc WriteChannel a
wc a -> a
forall a. a -> a
id

infixl 5 |=>

-- | Same as 'map_' but mark Eof Channel after all processing
{-# INLINE mapF_ #-}
mapF_ :: MonadIO m
      => ReadChannel a -- ^'ReadChannel'
      -> WriteChannel b -- ^'ReadChannel'
      -> (a -> b) -- ^Monadic Transformation to do with read element
      -> m ()
mapF_ :: ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
mapF_ rc :: ReadChannel a
rc wc :: WriteChannel b
wc f :: a -> b
f = ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
map_ ReadChannel a
rc WriteChannel b
wc a -> b
f m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WriteChannel b -> m ()
forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m ()
finish WriteChannel b
wc

-- | Alias 'mapF_'
(|=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
|=>| :: ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
(|=>|) = ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
mapF_

infixl 5 |=>|


-- | Same as 'map_' But applying a Monadic mapping
{-# INLINE mapM_ #-}
mapM_ :: MonadIO m
      => ReadChannel a -- ^'ReadChannel'
      -> WriteChannel b -- ^'ReadChannel'
      -> (a -> m (Maybe b)) -- ^Monadic Transformation to do with read element
      -> m ()
mapM_ :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
mapM_ rc :: ReadChannel a
rc wc :: WriteChannel b
wc f :: a -> m (Maybe b)
f = ReadChannel a -> (a -> m ()) -> m ()
forall (m :: * -> *) a.
MonadIO m =>
ReadChannel a -> (a -> m ()) -> m ()
foldM_ ReadChannel a
rc ((a -> m ()) -> m ()) -> (a -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> (b -> m ()) -> Maybe b -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (b -> WriteChannel b -> m ()
forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m ()
`push` WriteChannel b
wc) (Maybe b -> m ()) -> (a -> m (Maybe b)) -> a -> m ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< a -> m (Maybe b)
f

-- | Alias 'mapM_'
(|>=>) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
|>=> :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
(|>=>) = ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
mapM_

infixr 5 |>=>

-- | Same as 'mapM_' but mark Eof Channel after all processing
{-# INLINE mapMF_ #-}
mapMF_ :: MonadIO m
       => ReadChannel a -- ^'ReadChannel'
       -> WriteChannel b -- ^'ReadChannel'
       -> (a -> m (Maybe b)) -- ^Monadic Transformation to do with read element
       -> m ()
mapMF_ :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
mapMF_ rc :: ReadChannel a
rc wc :: WriteChannel b
wc f :: a -> m (Maybe b)
f = ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
mapM_ ReadChannel a
rc WriteChannel b
wc a -> m (Maybe b)
f m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WriteChannel b -> m ()
forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m ()
finish WriteChannel b
wc

-- | Alias 'mapMF_'
(|>=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
|>=>| :: ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
(|>=>|) = ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
mapMF_

infixr 5 |>=>|


-- | 'foldM_' is a /Catamorphism/ for consuming a 'ReadChannel' and do some Monadic @m@ computation with each element
{-# INLINE foldM_ #-}
foldM_ :: MonadIO m
       => ReadChannel a -- ^'ReadChannel'
       -> (a -> m ()) -- ^Computation to do with read element
       -> m ()
foldM_ :: ReadChannel a -> (a -> m ()) -> m ()
foldM_ = (ReadChannel a -> m () -> (a -> m ()) -> m ())
-> m () -> ReadChannel a -> (a -> m ()) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReadChannel a -> m () -> (a -> m ()) -> m ()
forall (m :: * -> *) a.
MonadIO m =>
ReadChannel a -> m () -> (a -> m ()) -> m ()
foldWithM_ (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())

-- | Idem 'foldM_' but allows pass a monadic computation to perform at the end of the Channel
{-# INLINE foldWithM_ #-}
foldWithM_ :: MonadIO m
           => ReadChannel a -- ^'ReadChannel'
           -> m () -- ^Computation to do at the end of the channel
           -> (a -> m ()) -- ^Computation to do with read element
           -> m ()
foldWithM_ :: ReadChannel a -> m () -> (a -> m ()) -> m ()
foldWithM_ = ReadChannel a -> m () -> (a -> m ()) -> m ()
forall (m :: * -> *) t b a.
MonadIO m =>
ReadChannel t -> m b -> (t -> m a) -> m b
loop'
  where loop' :: ReadChannel t -> m b -> (t -> m a) -> m b
loop' c :: ReadChannel t
c onNothing :: m b
onNothing io :: t -> m a
io = m b -> (t -> m b) -> Maybe t -> m b
forall b a. b -> (a -> b) -> Maybe a -> b
maybe m b
onNothing (\e :: t
e -> t -> m a
io t
e m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ReadChannel t -> m b -> (t -> m a) -> m b
loop' ReadChannel t
c m b
onNothing t -> m a
io) (Maybe t -> m b) -> m (Maybe t) -> m b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Maybe t) -> m (Maybe t)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ReadChannel t -> IO (Maybe t)
forall (m :: * -> *) a. MonadIO m => ReadChannel a -> m (Maybe a)
pull ReadChannel t
c)

-- | Push element @a@ into 'WriteChannel'
{-# INLINE push #-}
push :: MonadIO m => a -> WriteChannel a -> m ()
push :: a -> WriteChannel a -> m ()
push a :: a
a c :: WriteChannel a
c = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ InChan (Maybe a) -> Maybe a -> IO ()
forall a. InChan a -> a -> IO ()
writeChan (WriteChannel a -> InChan (Maybe a)
forall a. WriteChannel a -> InChan (Maybe a)
unWrite WriteChannel a
c) (a -> Maybe a
forall a. a -> Maybe a
Just a
a)

-- | Pull element @Maybe a@ from 'ReadChannel'
{-# INLINE pull #-}
pull :: MonadIO m => ReadChannel a -> m (Maybe a)
pull :: ReadChannel a -> m (Maybe a)
pull = IO (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> m (Maybe a))
-> (ReadChannel a -> IO (Maybe a)) -> ReadChannel a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> OutChan (Maybe a) -> IO (Maybe a)
forall a. IO () -> OutChan a -> IO a
readChan (Int -> IO ()
CC.threadDelay 100) (OutChan (Maybe a) -> IO (Maybe a))
-> (ReadChannel a -> OutChan (Maybe a))
-> ReadChannel a
-> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReadChannel a -> OutChan (Maybe a)
forall a. ReadChannel a -> OutChan (Maybe a)
unRead

-- | Finalize Channel to indicate EOF mark and allow progress on following consumers
finish :: MonadIO m => WriteChannel a -> m ()
finish :: WriteChannel a -> m ()
finish = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (WriteChannel a -> IO ()) -> WriteChannel a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WriteChannel a -> IO ()
forall a. WriteChannel a -> IO ()
end


-- | Coalgebra with Monadic computation to Feed some 'WriteChannel'
--
-- [@m@]: Monadic computation wrapping Coalgebra
--
-- [@a@]: Element get from some Source and to be write in some Channel
--
-- | unfold from a Monadic seed @m a@ to a 'WriteChannel'
{-# INLINE unfoldM #-}
unfoldM :: forall m a b
         . MonadIO m
        => m a -- ^Monadic Seed 
        -> (a -> b) -- ^Map input from seed to something to be written in Channel
        -> m Bool -- ^When stop unfolding
        -> WriteChannel b -- ^'WriteChannel' to write input seed elements
        -> m ()
unfoldM :: m a -> (a -> b) -> m Bool -> WriteChannel b -> m ()
unfoldM = m a -> (a -> b) -> m Bool -> WriteChannel b -> m ()
forall (m :: * -> *) a a.
MonadIO m =>
m a -> (a -> a) -> m Bool -> WriteChannel a -> m ()
loop'
 where
  loop' :: m a -> (a -> a) -> m Bool -> WriteChannel a -> m ()
loop' seed :: m a
seed fn :: a -> a
fn stopIfM :: m Bool
stopIfM writeChannel :: WriteChannel a
writeChannel =
    m Bool -> m () -> m () -> m ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM m Bool
stopIfM (WriteChannel a -> m ()
forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m ()
finish WriteChannel a
writeChannel) (m a
seed m a -> (a -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (a -> WriteChannel a -> m ()) -> WriteChannel a -> a -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip a -> WriteChannel a -> m ()
forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m ()
push WriteChannel a
writeChannel (a -> m ()) -> (a -> a) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a
fn m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a -> (a -> a) -> m Bool -> WriteChannel a -> m ()
loop' m a
seed a -> a
fn m Bool
stopIfM WriteChannel a
writeChannel)
-- | Using 'unfoldM', unfold from file
{-# INLINE unfoldFile #-}
unfoldFile :: MonadIO m
           => FilePath -- ^Seed 'FilePath' to read from
           -> WriteChannel b -- ^'WriteChannel' to write File contents
           -> (ByteString -> b) -- ^Transform 'ByteString' read from File to something meaningful for your App
           -> m ()
unfoldFile :: FilePath -> WriteChannel b -> (ByteString -> b) -> m ()
unfoldFile file :: FilePath
file writeChannel :: WriteChannel b
writeChannel fn :: ByteString -> b
fn =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath -> IOMode -> (Handle -> IO ()) -> IO ()
forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r
R.withFile FilePath
file IOMode
ReadMode ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \h :: Handle
h -> IO ByteString
-> (ByteString -> b) -> IO Bool -> WriteChannel b -> IO ()
forall (m :: * -> *) a a.
MonadIO m =>
m a -> (a -> a) -> m Bool -> WriteChannel a -> m ()
unfoldM (Handle -> IO ByteString
B.hGetLine Handle
h) ByteString -> b
fn (Handle -> IO Bool
H.hIsEOF Handle
h) WriteChannel b
writeChannel

-- | Idem 'unfoldM' but for 'Foldable', for example a List @[a]@. Useful for testing purpose
{-# INLINE unfoldT #-}
unfoldT :: (MonadIO m, Foldable t) => t a -> WriteChannel b -> (a -> b) -> m ()
unfoldT :: t a -> WriteChannel b -> (a -> b) -> m ()
unfoldT ts :: t a
ts writeChannel :: WriteChannel b
writeChannel fn :: a -> b
fn = t a -> (a -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ t a
ts ((b -> WriteChannel b -> m ()) -> WriteChannel b -> b -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip b -> WriteChannel b -> m ()
forall (m :: * -> *) a. MonadIO m => a -> WriteChannel a -> m ()
push WriteChannel b
writeChannel (b -> m ()) -> (a -> b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
fn) m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WriteChannel b -> m ()
forall (m :: * -> *) a. MonadIO m => WriteChannel a -> m ()
finish WriteChannel b
writeChannel

{-# WARNING newChannel "INTERNAL USE" #-}
{-# NOINLINE newChannel #-}
newChannel :: forall a . IO (WriteChannel a, ReadChannel a)
newChannel :: IO (WriteChannel a, ReadChannel a)
newChannel = (InChan (Maybe a) -> WriteChannel a)
-> (OutChan (Maybe a) -> ReadChannel a)
-> (InChan (Maybe a), OutChan (Maybe a))
-> (WriteChannel a, ReadChannel a)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap InChan (Maybe a) -> WriteChannel a
forall a. InChan (Maybe a) -> WriteChannel a
WriteChannel OutChan (Maybe a) -> ReadChannel a
forall a. OutChan (Maybe a) -> ReadChannel a
ReadChannel ((InChan (Maybe a), OutChan (Maybe a))
 -> (WriteChannel a, ReadChannel a))
-> IO (InChan (Maybe a), OutChan (Maybe a))
-> IO (WriteChannel a, ReadChannel a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (InChan (Maybe a), OutChan (Maybe a))
forall a. IO (InChan a, OutChan a)
newChan

{-# WARNING end "INTERNAL USE" #-}
{-# INLINE end #-}
end :: WriteChannel a -> IO ()
end :: WriteChannel a -> IO ()
end = (InChan (Maybe a) -> Maybe a -> IO ())
-> Maybe a -> InChan (Maybe a) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip InChan (Maybe a) -> Maybe a -> IO ()
forall a. InChan a -> a -> IO ()
writeChan Maybe a
forall a. Maybe a
Nothing (InChan (Maybe a) -> IO ())
-> (WriteChannel a -> InChan (Maybe a)) -> WriteChannel a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WriteChannel a -> InChan (Maybe a)
forall a. WriteChannel a -> InChan (Maybe a)
unWrite