{-| Pipes utility functions -} module SDR.PipeUtils ( fork, combine, printStream, devnull, rate, pMapAccum ) where import Data.Time.Clock import Pipes import Control.Monad -- | Fork a pipe fork :: Monad m => Producer a m r -> Producer a (Producer a m) r fork prod = runEffect $ hoist (lift . lift) prod >-> fork' where fork' = forever $ do res <- await lift $ yield res lift $ lift $ yield res -- | Combine two consumers into a single consumer combine :: Monad m => Consumer a m r -> Consumer a m r -> Consumer a m r combine x y = runEffect $ runEffect (fork func >-> hoist (lift . lift) x) >-> hoist lift y where func :: Monad m => Producer a (Consumer a m) r func = forever $ lift await >>= yield -- | A consumer that prints everything to stdout printStream :: (Show a) => Int -> Consumer a IO () printStream samples = for cat $ lift . print -- | A consumer that discards everything devnull :: Monad m => Consumer a m () devnull = forever await -- | Passthrough pipe that prints the sample rate rate :: Int -> Pipe a a IO b rate samples = do start <- lift getCurrentTime let rate' buffers = do res <- await time <- lift getCurrentTime let diff = diffUTCTime time start diffSecs :: Double diffSecs = fromRational $ toRational diff lift $ print $ buffers * fromIntegral samples / diffSecs yield res rate' (buffers + 1) rate' 1 -- | mapAccum for Pipes pMapAccum :: (Monad m) => (acc -> x -> (acc, y)) -- ^ Accumulating function -> acc -- ^ Initial value of the accumulator -> Pipe x y m () pMapAccum func acc = go acc where go acc = do dat <- await let (acc', res) = func acc dat yield res go acc'