module Data.Conduit.Extras where

import Control.Concurrent.Chan
import Control.Exception
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift
import Control.Monad.Trans
import Data.Conduit
import Prelude

-- | Write to a channel terminating with @Nothing@
sinkChanTerminate :: (MonadUnliftIO m, MonadIO m) => Chan (Maybe a) -> ConduitT a o m ()
sinkChanTerminate :: forall (m :: * -> *) a o.
(MonadUnliftIO m, MonadIO m) =>
Chan (Maybe a) -> ConduitT a o m ()
sinkChanTerminate Chan (Maybe a)
ch =
  (SomeException -> ConduitT a o m ())
-> ConduitT a o m () -> ConduitT a o m ()
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC
    ( \SomeException
e -> IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ do
        Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch Maybe a
forall a. Maybe a
Nothing
        SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (SomeException
e :: SomeException)
    )
    (ConduitT a o m () -> ConduitT a o m ())
-> ConduitT a o m () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ do
      (a -> ConduitT a o m ()) -> ConduitT a o m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((a -> ConduitT a o m ()) -> ConduitT a o m ())
-> (a -> ConduitT a o m ()) -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ \a
msg -> IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch (a -> Maybe a
forall a. a -> Maybe a
Just a
msg)
      IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch Maybe a
forall a. Maybe a
Nothing

-- | Write to a channel terminating with @Nothing@
sinkChan :: (MonadUnliftIO m, MonadIO m) => Chan (Maybe a) -> ConduitT a o m ()
sinkChan :: forall (m :: * -> *) a o.
(MonadUnliftIO m, MonadIO m) =>
Chan (Maybe a) -> ConduitT a o m ()
sinkChan Chan (Maybe a)
ch =
  (SomeException -> ConduitT a o m ())
-> ConduitT a o m () -> ConduitT a o m ()
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC
    ( \SomeException
e -> IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ do
        SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (SomeException
e :: SomeException)
    )
    (ConduitT a o m () -> ConduitT a o m ())
-> ConduitT a o m () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ do
      (a -> ConduitT a o m ()) -> ConduitT a o m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((a -> ConduitT a o m ()) -> ConduitT a o m ())
-> (a -> ConduitT a o m ()) -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ \a
msg -> IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch (a -> Maybe a
forall a. a -> Maybe a
Just a
msg)

-- | Read a channel until @Nothing@ is encountered
sourceChan :: MonadIO m => Chan (Maybe a) -> ConduitT i a m ()
sourceChan :: forall (m :: * -> *) a i.
MonadIO m =>
Chan (Maybe a) -> ConduitT i a m ()
sourceChan Chan (Maybe a)
ch = do
  Maybe a
mmsg <- IO (Maybe a) -> ConduitT i a m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> ConduitT i a m (Maybe a))
-> IO (Maybe a) -> ConduitT i a m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Chan (Maybe a) -> IO (Maybe a)
forall a. Chan a -> IO a
readChan Chan (Maybe a)
ch
  case Maybe a
mmsg of
    Maybe a
Nothing -> IO () -> ConduitT i a m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT i a m ()) -> IO () -> ConduitT i a m ()
forall a b. (a -> b) -> a -> b
$ Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch Maybe a
forall a. Maybe a
Nothing
    Just a
msg -> a -> ConduitT i a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
msg ConduitT i a m () -> ConduitT i a m () -> ConduitT i a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Chan (Maybe a) -> ConduitT i a m ()
forall (m :: * -> *) a i.
MonadIO m =>
Chan (Maybe a) -> ConduitT i a m ()
sourceChan Chan (Maybe a)
ch

conduitToCallbacks :: (MonadUnliftIO m, MonadIO m) => ConduitT () o m a -> (o -> m ()) -> m a
conduitToCallbacks :: forall (m :: * -> *) o a.
(MonadUnliftIO m, MonadIO m) =>
ConduitT () o m a -> (o -> m ()) -> m a
conduitToCallbacks ConduitT () o m a
c o -> m ()
w = do
  ConduitT () Void m a -> m a
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () o m a
c ConduitT () o m a -> ConduitT o Void m () -> ConduitT () Void m a
forall (m :: * -> *) a b r c.
Monad m =>
ConduitT a b m r -> ConduitT b c m () -> ConduitT a c m r
`fuseUpstream` (o -> ConduitT o Void m ()) -> ConduitT o Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (m () -> ConduitT o Void m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT o Void m ())
-> (o -> m ()) -> o -> ConduitT o Void m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> m ()
w))