module HaskellWorks.Data.Conduit.Combinator where
import Control.Concurrent (MVar, putMVar, tryTakeMVar)
import Control.Monad (void)
import Control.Monad.IO.Class
import Data.Conduit
import Data.Maybe
import Data.Time.Clock.POSIX as T
import qualified Data.Conduit.List as L
maybeC :: Monad m => Conduit () m () -> Conduit a m c -> Conduit (Maybe a) m (Maybe c)
maybeC n j = getZipConduit
$ ZipConduit (L.filter isNothing .| L.map (const ()) .| n .| L.map (const Nothing))
<* ZipConduit (L.concat .| j .| L.map Just )
justC :: Monad m => Conduit a m c -> Conduit (Maybe a) m (Maybe c)
justC = maybeC (L.map id)
nothingC :: Monad m => Conduit () m () -> Conduit (Maybe a) m (Maybe a)
nothingC n = maybeC n (L.map id)
eitherC :: Monad m => Conduit l m a -> Conduit r m a -> Conduit (Either l r) m a
eitherC l r = getZipConduit
$ ZipConduit (projectLefts .| l)
<* ZipConduit (projectRights .| r)
rightC :: Monad m => Conduit r m a -> Conduit (Either l r) m (Either l a)
rightC r = eitherC (L.map Left) (r .| L.map Right)
leftC :: Monad m => Conduit l m a -> Conduit (Either l r) m (Either a r)
leftC l = eitherC (l .| L.map Left) (L.map Right)
effectC :: Monad m => (a -> m b) -> Conduit a m a
effectC f = L.mapM (\a -> f a >> return a)
effectC' :: Monad m => m b -> Conduit a m a
effectC' m = L.mapM (\a -> m >> return a)
sinkWithPred :: Monad m => (a -> Bool) -> Sink a m () -> Sink a m () -> Sink a m ()
sinkWithPred p tr fl =
void $ sequenceSinks [L.filter p .| tr, L.filter (not . p) .| fl]
projectNothings :: Monad m => Conduit (Maybe a) m ()
projectNothings = awaitForever $ maybe (yield ()) (const $ return ())
projectLefts :: Monad m => Conduit (Either l r) m l
projectLefts = awaitForever $ either yield (const $ return ())
projectRights :: Monad m => Conduit (Either l r) m r
projectRights = awaitForever $ either (const $ return ()) yield
everyN :: Monad m => Int -> Conduit a m a
everyN n = go 1
where
go n' = await >>= maybe (return ()) (\x ->
if n' < n
then go (n'+1)
else yield x >> go 1)
everyNSeconds :: MonadIO m => Int -> Conduit a m a
everyNSeconds interval = go 0
where
go t = do
mmsg <- await
case mmsg of
Nothing -> pure ()
Just msg -> do
ct <- liftIO $ (round . T.utcTimeToPOSIXSeconds) <$> T.getCurrentTime
if ct > t
then yield msg >> go (ct + interval)
else go t
effect :: Monad m => (a -> m b) -> Conduit a m a
effect = effectC
effect' :: Monad m => m b -> Conduit a m a
effect' = effectC'
inJust :: Monad m => Conduit a m c -> Conduit (Maybe a) m (Maybe c)
inJust = justC
mvarSink :: MonadIO m => MVar a -> Sink a m ()
mvarSink mvar = awaitForever $ \v ->
liftIO $ tryTakeMVar mvar >> putMVar mvar v
tapWith :: Monad m => Conduit a m b -> Sink b m () -> Conduit a m a
tapWith f s = passthroughSink (f .| s) (const $ return ())
tap :: Monad m => Sink a m () -> Conduit a m a
tap s = passthroughSink s (const $ return ())
tapPred :: Monad m => (a -> Bool) -> Sink a m () -> Sink a m () -> Conduit a m a
tapPred p tr fl =
tap (L.filter p .| tr) .| tap (L.filter (not . p) .| fl)
tapNothing :: Monad m => Sink () m () -> Conduit (Maybe a) m (Maybe a)
tapNothing = tapWith projectNothings
divertNothing :: Monad m => Sink () m () -> Conduit (Maybe a) m a
divertNothing sink = tapNothing sink .| L.catMaybes
tapLeft :: Monad m => Sink l m () -> Conduit (Either l r) m (Either l r)
tapLeft = tapWith projectLefts
divertLeft :: Monad m => Sink l m () -> Conduit (Either l r) m r
divertLeft sink = tapLeft sink .| projectRights
tapRight :: Monad m => Sink r m () -> Conduit (Either l r) m (Either l r)
tapRight = tapWith projectRights
divertRight :: Monad m => Sink r m () -> Conduit (Either l r) m l
divertRight sink = tapRight sink .| projectLefts