module Data.Conduit.Extras where

import Control.Concurrent.Chan
import Control.Exception
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift
import Control.Monad.Trans
import Data.Conduit
import Prelude

-- | Write to a channel terminating with @Nothing@
sinkChanTerminate :: (MonadUnliftIO m, MonadIO m) => Chan (Maybe a) -> ConduitT a o m ()
sinkChanTerminate :: forall (m :: * -> *) a o.
(MonadUnliftIO m, MonadIO m) =>
Chan (Maybe a) -> ConduitT a o m ()
sinkChanTerminate Chan (Maybe a)
ch =
  forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC
    ( \SomeException
e -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
        forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch forall a. Maybe a
Nothing
        forall e a. Exception e => e -> IO a
throwIO (SomeException
e :: SomeException)
    )
    forall a b. (a -> b) -> a -> b
$ do
      forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever forall a b. (a -> b) -> a -> b
$ \a
msg -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch (forall a. a -> Maybe a
Just a
msg)
      forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch forall a. Maybe a
Nothing

-- | Write to a channel terminating with @Nothing@
sinkChan :: (MonadUnliftIO m, MonadIO m) => Chan (Maybe a) -> ConduitT a o m ()
sinkChan :: forall (m :: * -> *) a o.
(MonadUnliftIO m, MonadIO m) =>
Chan (Maybe a) -> ConduitT a o m ()
sinkChan Chan (Maybe a)
ch =
  forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC
    ( \SomeException
e -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
        forall e a. Exception e => e -> IO a
throwIO (SomeException
e :: SomeException)
    )
    forall a b. (a -> b) -> a -> b
$ do
      forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever forall a b. (a -> b) -> a -> b
$ \a
msg -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch (forall a. a -> Maybe a
Just a
msg)

-- | Read a channel until @Nothing@ is encountered
sourceChan :: MonadIO m => Chan (Maybe a) -> ConduitT i a m ()
sourceChan :: forall (m :: * -> *) a i.
MonadIO m =>
Chan (Maybe a) -> ConduitT i a m ()
sourceChan Chan (Maybe a)
ch = do
  Maybe a
mmsg <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Chan a -> IO a
readChan Chan (Maybe a)
ch
  case Maybe a
mmsg of
    Maybe a
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch forall a. Maybe a
Nothing
    Just a
msg -> forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
msg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a i.
MonadIO m =>
Chan (Maybe a) -> ConduitT i a m ()
sourceChan Chan (Maybe a)
ch

conduitToCallbacks :: (MonadUnliftIO m, MonadIO m) => ConduitT () o m a -> (o -> m ()) -> m a
conduitToCallbacks :: forall (m :: * -> *) o a.
(MonadUnliftIO m, MonadIO m) =>
ConduitT () o m a -> (o -> m ()) -> m a
conduitToCallbacks ConduitT () o m a
c o -> m ()
w = do
  forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () o m a
c forall (m :: * -> *) a b r c.
Monad m =>
ConduitT a b m r -> ConduitT b c m () -> ConduitT a c m r
`fuseUpstream` forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> m ()
w))