module HaskellWorks.Data.Conduit.Combinator where
import Control.Concurrent (MVar, putMVar, tryTakeMVar)
import Control.Monad (void)
import Control.Monad.IO.Class
import Data.Conduit
import qualified Data.Conduit.List as L
import Data.Maybe
import Data.Time.Clock.POSIX as T
effect :: Monad m => (a -> m b) -> Conduit a m a
effect f = L.mapM (\a -> f a >> return a)
effect' :: Monad m => m b -> Conduit a m a
effect' m = L.mapM (\a -> m >> return a)
inJust :: Monad m => Conduit a m c -> Conduit (Maybe a) m (Maybe c)
inJust c = getZipConduit
$ ZipConduit (L.filter isNothing .| L.map (const Nothing))
<* ZipConduit (L.concat .| c .| L.map Just )
mvarSink :: MonadIO m => MVar a -> Sink a m ()
mvarSink mvar = awaitForever $ \v ->
liftIO $ tryTakeMVar mvar >> putMVar mvar v
tapWith :: Monad m => Conduit a m b -> Sink b m () -> Conduit a m a
tapWith f s = passthroughSink (f .| s) (const $ return ())
tap :: Monad m => Sink a m () -> Conduit a m a
tap s = passthroughSink s (const $ return ())
tapPred :: Monad m => (a -> Bool) -> Sink a m () -> Sink a m () -> Conduit a m a
tapPred p tr fl =
tap (L.filter p .| tr) .| tap (L.filter (not . p) .| fl)
sinkWithPred :: Monad m => (a -> Bool) -> Sink a m () -> Sink a m () -> Sink a m ()
sinkWithPred p tr fl =
void $ sequenceSinks [L.filter p .| tr, L.filter (not . p) .| fl]
projectNothings :: Monad m => Conduit (Maybe a) m ()
projectNothings = awaitForever $ maybe (yield ()) (const $ return ())
tapNothing :: Monad m => Sink () m () -> Conduit (Maybe a) m (Maybe a)
tapNothing = tapWith projectNothings
divertNothing :: Monad m => Sink () m () -> Conduit (Maybe a) m a
divertNothing sink = tapNothing sink .| L.catMaybes
projectLefts :: Monad m => Conduit (Either l r) m l
projectLefts = awaitForever $ either yield (const $ return ())
projectRights :: Monad m => Conduit (Either l r) m r
projectRights = awaitForever $ either (const $ return ()) yield
tapLeft :: Monad m => Sink l m () -> Conduit (Either l r) m (Either l r)
tapLeft = tapWith projectLefts
divertLeft :: Monad m => Sink l m () -> Conduit (Either l r) m r
divertLeft sink = tapLeft sink .| projectRights
tapRight :: Monad m => Sink r m () -> Conduit (Either l r) m (Either l r)
tapRight = tapWith projectRights
divertRight :: Monad m => Sink r m () -> Conduit (Either l r) m l
divertRight sink = tapRight sink .| projectLefts
everyN :: Monad m => Int -> Conduit a m a
everyN n = go 1
where
go n' = await >>= maybe (return ()) (\x ->
if n' < n
then go (n'+1)
else yield x >> go 1)
everyNSeconds :: MonadIO m => Int -> Conduit a m a
everyNSeconds interval = go 0
where
go t = do
mmsg <- await
case mmsg of
Nothing -> pure ()
Just msg -> do
ct <- liftIO $ (round . T.utcTimeToPOSIXSeconds) <$> T.getCurrentTime
if ct > t
then yield msg >> go (ct + interval)
else go t