{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE TypeApplications      #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.Streams
Description : map-reduce-folds builders
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

map-reduce engine (fold builder) using @Streaming.Streams@ as its intermediate type.  Because @Streaming.Stream@ does not end with the
type of data data in the @Stream@, we wrap this type in @StreamResult@ for the purposes of the output type of the fold.
-}
module Control.MapReduce.Engines.Streaming
  (
    -- * Helper Types
    StreamResult(..)

    -- * Engines
  , streamingEngine
  , streamingEngineM

  -- * Result Extractors
  , resultToList
  , concatStream
  , concatStreamFold
  , concatStreamFoldM

  -- * @groupBy@ functions
  , groupByHashableKey
  , groupByOrderedKey
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE

import qualified Control.Foldl                 as FL
import           Data.Functor.Identity          ( Identity )
import           Data.Hashable                  ( Hashable )
import qualified Data.HashMap.Strict           as HMS
import qualified Data.Map.Strict               as MS
import qualified Data.Sequence                 as Seq
import qualified Streaming.Prelude             as S
import qualified Streaming                     as S
import           Streaming                      ( Stream
                                                , Of
                                                )
import           Control.Arrow                  ( second )


-- | unpack for streaming based map/reduce
unpackStream
  :: MRC.Unpack x y -> S.Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream (MRC.Filter t) = S.filter t
unpackStream (MRC.Unpack f) = S.concat . S.map f
{-# INLINABLE unpackStream #-}

-- | effectful (monadic) unpack for streaming based map/reduce
unpackStreamM
  :: Monad m => MRC.UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM (MRC.FilterM t) = S.filterM t
unpackStreamM (MRC.UnpackM f) = S.concat . S.mapM f
{-# INLINABLE unpackStreamM #-}

-- | group the mapped and assigned values by key using a @Data.HashMap.Strict@
groupByHashableKey
  :: forall m k c r
   . (Monad m, Hashable k, Eq k)
  => Stream (Of (k, c)) m r
  -> Stream (Of (k, Seq.Seq c)) m r
groupByHashableKey s = S.effect $ do
  (lkc S.:> r) <- S.toList s
  let hm = HMS.fromListWith (<>) $ fmap (second Seq.singleton) lkc
  return $ HMS.foldrWithKey (\k lc s' -> S.cons (k, lc) s') (return r) hm
{-# INLINABLE groupByHashableKey #-}

-- | group the mapped and assigned values by key using a @Data.Map.Strict@
groupByOrderedKey
  :: forall m k c r
   . (Monad m, Ord k)
  => Stream (Of (k, c)) m r
  -> Stream (Of (k, Seq.Seq c)) m r
groupByOrderedKey s = S.effect $ do
  (lkc S.:> r) <- S.toList s
  let hm = MS.fromListWith (<>) $ fmap (second Seq.singleton) lkc
  return $ MS.foldrWithKey (\k lc s' -> S.cons (k, lc) s') (return r) hm
{-# INLINABLE groupByOrderedKey #-}

-- | Wrap @Stream (Of d) m ()@ in a type which has @d@ as its last parameter
newtype StreamResult m d = StreamResult { unRes :: Stream (Of d) m () }

-- | get a @[]@ result from a Stream
resultToList :: Monad m => StreamResult m d -> m [d]
resultToList = S.toList_ . unRes

concatStreaming :: (Monad m, Monoid a) => Stream (Of a) m () -> m a
concatStreaming = S.iterT g . fmap (const mempty)
  where g (a S.:> ma) = fmap (a <>) ma

-- | @mappend@ all elements of a @StreamResult@ of monoids
concatStream :: (Monad m, Monoid a) => StreamResult m a -> m a
concatStream = concatStreaming . unRes

-- | apply monoidal stream-concatenation to a fold returning a stream to produce a fold returning the monoid
concatStreamFold
  :: Monoid b => FL.Fold a (StreamResult Identity b) -> FL.Fold a b
concatStreamFold = fmap (S.runIdentity . concatStream)

-- | apply monoidal stream-concatenation to an effectful fold returning a stream to produce an effectful fold returning the monoid
concatStreamFoldM
  :: (Monad m, Monoid b) => FL.FoldM m a (StreamResult m b) -> FL.FoldM m a b
concatStreamFoldM = MRC.postMapM concatStream

-- | map-reduce-fold builder returning a @StreamResult@
streamingEngine
  :: (Foldable g, Functor g)
  => (  forall z r
      . Stream (Of (k, z)) Identity r
     -> Stream (Of (k, g z)) Identity r
     )
  -> MRE.MapReduceFold y k c (StreamResult Identity) x d
streamingEngine groupByKey u (MRC.Assign a) r = fmap StreamResult $ FL.Fold
  (flip S.cons)
  (return ())
  ( S.map (\(k, lc) -> MRE.reduceFunction r k lc)
  . groupByKey
  . S.map a
  . unpackStream u
  )
{-# INLINABLE streamingEngine #-}

-- | effectful map-reduce-fold builder returning a @StreamResult@
streamingEngineM
  :: (Monad m, Traversable g)
  => (forall z r . Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
  -> MRE.MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM groupByKey u (MRC.AssignM a) r =
  fmap StreamResult . FL.generalize $ FL.Fold
    (flip S.cons)
    (return ())
    ( S.mapM (\(k, lc) -> MRE.reduceFunctionM r k lc)
    . groupByKey
    . S.mapM a
    . unpackStreamM u
    )
{-# INLINABLE streamingEngineM #-}