{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-|

Module      : Streaming.Producer
Description : Simple data type for streaming and associated combinators
Copyright   : (c) Eric Torreborre, 2017
                  Someone Else, 2014
License     : MIT
Maintainer  : etorreborre@yahoo.com
Stability   : experimental

-}
module Streaming.Producer (
    Producer(..)
  , Stream(..)
  , done
  , one
  , more
  , emit
  , append
  , filter
  , take
  , drop
  , chunk
  , runList
  , runChunks
) where

import           Control.Applicative   (liftA2)
import           Data.Functor.Identity
import qualified Data.List             as DL (drop, filter, take)
import           Prelude               hiding (drop, filter, take)

-- data types

-- | A Producer generates values of type a with effects of type m
newtype Producer m a =
  Producer { runStream :: m (Stream m a) }


-- | ADT for the produced elements. There are either:
--
--      * no element
--
--      * one element
--
--      * several elements (a "chunk") followed by the next producer
--
data Stream m a =
    Done
  | One a
  | More [a] (Producer m a)

-- |
-- == Constructors

-- | A Producer with no elements
done :: Applicative m => Producer m a
done = Producer (pure Done)

-- | A Producer with one element
one :: Applicative m => a -> Producer m a
one a = Producer (pure (One a))

-- | A Producer with n elements
emit :: Applicative m => [a] -> Producer m a
emit as = more as done

-- | A Producer with n elements and another Producer
more :: Applicative m => [a] -> Producer m a -> Producer m a
more as next = Producer (pure (More as next))

-- |
-- == Combinators

-- | Append 2 Producers together
append :: Applicative m => Producer m a -> Producer m a -> Producer m a
append (Producer s1) (Producer s2) =  Producer (liftA2 appendStream s1 s2)

appendStream :: Applicative m => Stream m a -> Stream m a -> Stream m a
appendStream s Done = s
appendStream Done s = s
appendStream (One a) (More as next) = More (a:as) next
appendStream (One a1) (One a2) = More [a1, a2] done
appendStream (More as1 (Producer next)) s2 = More as1 (Producer (fmap (`appendStream` s2) next))

mapStream :: Monad m => (Stream m a -> Producer m a) -> Producer m a -> Producer m a
mapStream f p = Producer $ runStream p >>= \s -> runStream (f s)

-- | Filter the values of a Producer
filter :: Monad m => (a -> Bool) -> Producer m a -> Producer m a
filter = mapStream . filterStream

filterStream :: Monad m => (a -> Bool) -> Stream m a -> Producer m a
filterStream _ Done = done
filterStream f (One a) = if f a then one a else done
filterStream f (More as next) =
  more (DL.filter f as) (filter f next)

-- | Take the first n elements
-- If n <= 0 an empty Producer is returned
take :: Monad m => Int -> Producer m a -> Producer m a
take = mapStream . takeStream

takeStream :: Monad m => Int -> Stream m a -> Producer m a
takeStream _ Done = done
takeStream n (One a) = if n <= 0 then done else one a
takeStream n (More as next) =
  let diff = n - length as in
    if diff > 0 then
      more as (take diff next)
    else
      more (DL.take n as) done

-- | Drop the first n elements
-- If n <= 0 nothing is dropped
drop :: Monad m => Int -> Producer m a -> Producer m a
drop = mapStream . dropStream

dropStream :: Monad m => Int -> Stream m a -> Producer m a
dropStream _ Done = done
dropStream n (One a) = if n > 0 then done else one a
dropStream n (More as next) =
  let diff = n - length as in
    if diff >= 0 then
      drop diff next
    else
      more (DL.drop n as) next

-- | Make sure that the underlying chunks have a size n
-- as much as possible
chunk :: Monad m => Int -> Producer m a -> Producer m a
chunk = mapStream . chunkStream

chunkStream :: Monad m => Int -> Stream m a -> Producer m a
chunkStream _ Done    = done
chunkStream _ (One a) = one a
chunkStream n (More as next) =
  if n <= 0 then more as next
  else
    go [] (More as next)
    where
      --go :: [a] -> Stream m a -> Producer m a
      go acc Done = emit acc

      go acc (One a) =
        emit (acc ++ [a])

      go acc (More as1 next1) =
        let needed = n - length acc in
          if length as1 >= needed then
            emit (acc ++ DL.take needed as1) `append`
            chunk n (more (DL.drop needed as1) next1)
          else
            mapStream (go (acc ++ as1)) next1

-- |
-- == Observations
-- The following functions can "run" a Producer to get values back

-- | return a list of values
runList :: Monad m => Producer m a -> m [a]
runList (Producer ma) = ma >>= runListStream

runListStream :: Monad m => Stream m a -> m [a]
runListStream Done           = pure []
runListStream (One a)        = pure [a]
runListStream (More as next) =  (\x -> as ++ x) <$> runList next

-- | return a list of chunks
runChunks :: Monad m => Producer m a -> m [[a]]
runChunks (Producer ma) = ma >>= runChunksStream

runChunksStream :: Monad m => Stream m a -> m [[a]]
runChunksStream Done           = pure []
runChunksStream (One a)        = pure [[a]]
runChunksStream (More as next) =
  appendChunk <$> runChunks next
  where
    -- appendChunk :: [[a]] -> [[a]]
    appendChunk []     = [as]
    appendChunk [[]]   = [as]
    appendChunk (c:cs) = as:c:cs

-- Instances
instance (Show a) => Show (Producer Identity a) where
  show p = show (runList p)

instance (Functor m) => Functor (Producer m) where
  fmap f (Producer s) = Producer (fmap (fmap f) s)

instance (Monad m) => Applicative (Producer m) where
    pure a = Producer (return (One a))

    (<*>) p1 p2 = p1 >>= (`fmap` p2)

instance (Monad m) => Monad (Producer m) where
  return a = Producer (return (One a))

  Producer s >>= f = Producer (s >>= f') where
    f' Done               = pure Done
    f' (One a)            = runStream (f a)
    f' (More [] next)     = runStream (next >>= f)
    f' (More (a:as) next) = runStream (f a `append` (more as next >>= f))

instance (Functor m) => Functor (Stream m) where
  fmap _ Done           = Done
  fmap f (One a)        = One (f a)
  fmap f (More as next) = More (fmap f as) (fmap f next)