{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-|
Module : Streaming.Producer
Description : Simple data type for streaming and associated combinators
Copyright : (c) Eric Torreborre, 2017
Someone Else, 2014
License : MIT
Maintainer : etorreborre@yahoo.com
Stability : experimental
-}
module Streaming.Producer (
Producer(..)
, Stream(..)
, done
, one
, more
, emit
, append
, filter
, take
, drop
, chunk
, runList
, runChunks
) where
import Control.Applicative (liftA2)
import Data.Functor.Identity
import qualified Data.List as DL (drop, filter, take)
import Prelude hiding (drop, filter, take)
-- data types
-- | A Producer generates values of type a with effects of type m
newtype Producer m a =
Producer { runStream :: m (Stream m a) }
-- | ADT for the produced elements. There are either:
--
-- * no element
--
-- * one element
--
-- * several elements (a "chunk") followed by the next producer
--
data Stream m a =
Done
| One a
| More [a] (Producer m a)
-- |
-- == Constructors
-- | A Producer with no elements
done :: Applicative m => Producer m a
done = Producer (pure Done)
-- | A Producer with one element
one :: Applicative m => a -> Producer m a
one a = Producer (pure (One a))
-- | A Producer with n elements
emit :: Applicative m => [a] -> Producer m a
emit as = more as done
-- | A Producer with n elements and another Producer
more :: Applicative m => [a] -> Producer m a -> Producer m a
more as next = Producer (pure (More as next))
-- |
-- == Combinators
-- | Append 2 Producers together
append :: Applicative m => Producer m a -> Producer m a -> Producer m a
append (Producer s1) (Producer s2) = Producer (liftA2 appendStream s1 s2)
appendStream :: Applicative m => Stream m a -> Stream m a -> Stream m a
appendStream s Done = s
appendStream Done s = s
appendStream (One a) (More as next) = More (a:as) next
appendStream (One a1) (One a2) = More [a1, a2] done
appendStream (More as1 (Producer next)) s2 = More as1 (Producer (fmap (`appendStream` s2) next))
mapStream :: Monad m => (Stream m a -> Producer m a) -> Producer m a -> Producer m a
mapStream f p = Producer $ runStream p >>= \s -> runStream (f s)
-- | Filter the values of a Producer
filter :: Monad m => (a -> Bool) -> Producer m a -> Producer m a
filter = mapStream . filterStream
filterStream :: Monad m => (a -> Bool) -> Stream m a -> Producer m a
filterStream _ Done = done
filterStream f (One a) = if f a then one a else done
filterStream f (More as next) =
more (DL.filter f as) (filter f next)
-- | Take the first n elements
-- If n <= 0 an empty Producer is returned
take :: Monad m => Int -> Producer m a -> Producer m a
take = mapStream . takeStream
takeStream :: Monad m => Int -> Stream m a -> Producer m a
takeStream _ Done = done
takeStream n (One a) = if n <= 0 then done else one a
takeStream n (More as next) =
let diff = n - length as in
if diff > 0 then
more as (take diff next)
else
more (DL.take n as) done
-- | Drop the first n elements
-- If n <= 0 nothing is dropped
drop :: Monad m => Int -> Producer m a -> Producer m a
drop = mapStream . dropStream
dropStream :: Monad m => Int -> Stream m a -> Producer m a
dropStream _ Done = done
dropStream n (One a) = if n > 0 then done else one a
dropStream n (More as next) =
let diff = n - length as in
if diff >= 0 then
drop diff next
else
more (DL.drop n as) next
-- | Make sure that the underlying chunks have a size n
-- as much as possible
chunk :: Monad m => Int -> Producer m a -> Producer m a
chunk = mapStream . chunkStream
chunkStream :: Monad m => Int -> Stream m a -> Producer m a
chunkStream _ Done = done
chunkStream _ (One a) = one a
chunkStream n (More as next) =
if n <= 0 then more as next
else
go [] (More as next)
where
--go :: [a] -> Stream m a -> Producer m a
go acc Done = emit acc
go acc (One a) =
emit (acc ++ [a])
go acc (More as1 next1) =
let needed = n - length acc in
if length as1 >= needed then
emit (acc ++ DL.take needed as1) `append`
chunk n (more (DL.drop needed as1) next1)
else
mapStream (go (acc ++ as1)) next1
-- |
-- == Observations
-- The following functions can "run" a Producer to get values back
-- | return a list of values
runList :: Monad m => Producer m a -> m [a]
runList (Producer ma) = ma >>= runListStream
runListStream :: Monad m => Stream m a -> m [a]
runListStream Done = pure []
runListStream (One a) = pure [a]
runListStream (More as next) = (\x -> as ++ x) <$> runList next
-- | return a list of chunks
runChunks :: Monad m => Producer m a -> m [[a]]
runChunks (Producer ma) = ma >>= runChunksStream
runChunksStream :: Monad m => Stream m a -> m [[a]]
runChunksStream Done = pure []
runChunksStream (One a) = pure [[a]]
runChunksStream (More as next) =
appendChunk <$> runChunks next
where
-- appendChunk :: [[a]] -> [[a]]
appendChunk [] = [as]
appendChunk [[]] = [as]
appendChunk (c:cs) = as:c:cs
-- Instances
instance (Show a) => Show (Producer Identity a) where
show p = show (runList p)
instance (Functor m) => Functor (Producer m) where
fmap f (Producer s) = Producer (fmap (fmap f) s)
instance (Monad m) => Applicative (Producer m) where
pure a = Producer (return (One a))
(<*>) p1 p2 = p1 >>= (`fmap` p2)
instance (Monad m) => Monad (Producer m) where
return a = Producer (return (One a))
Producer s >>= f = Producer (s >>= f') where
f' Done = pure Done
f' (One a) = runStream (f a)
f' (More [] next) = runStream (next >>= f)
f' (More (a:as) next) = runStream (f a `append` (more as next >>= f))
instance (Functor m) => Functor (Stream m) where
fmap _ Done = Done
fmap f (One a) = One (f a)
fmap f (More as next) = More (fmap f as) (fmap f next)