{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE InstanceSigs          #-}
{-# LANGUAGE BangPatterns          #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Simple
Description : Simplified interfaces and helper functions for map-reduce-folds
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

Helper functions and default Engines and grouping functions for assembling map/reduce folds.
-}
module Control.MapReduce.Simple
  (
  -- * Unpackers
    noUnpack
  , simpleUnpack
  , filterUnpack

  -- * Assigners
  , assign

  -- * Reducers
  -- $reducers
  , processAndLabel
  , processAndLabelM
  , foldAndLabel
  , foldAndLabelM

  -- * Reduce Transformers 
  , reduceMapWithKey
  , reduceMMapWithKey

  -- * Default Map-Reduce Folds to @[]@
  , mapReduceFold
  , mapReduceFoldM
  , hashableMapReduceFold
  , hashableMapReduceFoldM
  , unpackOnlyFold
  , unpackOnlyFoldM

  -- * Simplify Results
  , concatFold
  , concatFoldM

  -- * Re-Exports
  , module Control.MapReduce.Core
  , Hashable
  )
where

import qualified Control.MapReduce.Core        as MR
import           Control.MapReduce.Core -- for re-export.  I don't like the unqualified-ness of this.
import qualified Control.MapReduce.Engines.Streaming
                                               as MRST
import qualified Control.MapReduce.Engines.Streamly
                                               as MRSL
import qualified Control.MapReduce.Engines.List
                                               as MRL

--import qualified Control.MapReduce.Parallel    as MRP

import qualified Control.Foldl                 as FL
import qualified Data.Foldable                 as F
import           Data.Functor.Identity          ( Identity(Identity)
                                                , runIdentity
                                                )

import           Data.Hashable                  ( Hashable )

-- | Don't do anything in the unpacking stage
noUnpack :: MR.Unpack x x
noUnpack :: forall x. Unpack x x
noUnpack = forall x. (x -> Bool) -> Unpack x x
MR.Filter forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Bool
True
{-# INLINABLE noUnpack #-}

-- | unpack using the given function
simpleUnpack :: (x -> y) -> MR.Unpack x y
simpleUnpack :: forall x y. (x -> y) -> Unpack x y
simpleUnpack x -> y
f = forall (g :: * -> *) x y. Traversable g => (x -> g y) -> Unpack x y
MR.Unpack forall a b. (a -> b) -> a -> b
$ forall a. a -> Identity a
Identity forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> y
f
{-# INLINABLE simpleUnpack #-}

-- | Filter while unpacking, using the given function
filterUnpack :: (x -> Bool) -> MR.Unpack x x
filterUnpack :: forall x. (x -> Bool) -> Unpack x x
filterUnpack = forall x. (x -> Bool) -> Unpack x x
MR.Filter
{-# INLINABLE filterUnpack #-}

-- | Assign via two functions of @y@, one that provides the key and one that provides the data to be grouped by that key.
assign :: forall k y c . (y -> k) -> (y -> c) -> MR.Assign k y c
assign :: forall k y c. (y -> k) -> (y -> c) -> Assign k y c
assign y -> k
getKey y -> c
getCols = let f :: y -> (k, c)
f !y
y = (y -> k
getKey y
y, y -> c
getCols y
y) in forall y k c. (y -> (k, c)) -> Assign k y c
MR.Assign y -> (k, c)
f
{-# INLINABLE assign #-}

-- | map a reduce using the given function of key and reduction result.  
reduceMapWithKey :: (k -> y -> z) -> MR.Reduce k x y -> MR.Reduce k x z
reduceMapWithKey :: forall k y z x. (k -> y -> z) -> Reduce k x y -> Reduce k x z
reduceMapWithKey k -> y -> z
f Reduce k x y
r = case Reduce k x y
r of
  MR.Reduce     k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
g  -> forall k x d.
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> d)
-> Reduce k x d
MR.Reduce forall a b. (a -> b) -> a -> b
$ \k
k -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
g k
k)
  MR.ReduceFold k -> Fold x y
gf -> forall k x d. (k -> Fold x d) -> Reduce k x d
MR.ReduceFold forall a b. (a -> b) -> a -> b
$ \k
k -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> Fold x y
gf k
k)
{-# INLINABLE reduceMapWithKey #-}

-- | map a monadic reduction with a (non-monadic) function of the key and reduction result
reduceMMapWithKey :: (k -> y -> z) -> MR.ReduceM m k x y -> MR.ReduceM m k x z
reduceMMapWithKey :: forall k y z (m :: * -> *) x.
(k -> y -> z) -> ReduceM m k x y -> ReduceM m k x z
reduceMMapWithKey k -> y -> z
f ReduceM m k x y
r = case ReduceM m k x y
r of
  MR.ReduceM     k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
g  -> forall (m :: * -> *) k x d.
Monad m =>
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m d)
-> ReduceM m k x d
MR.ReduceM forall a b. (a -> b) -> a -> b
$ \k
k -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k)) (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
g k
k)
  MR.ReduceFoldM k -> FoldM m x y
gf -> forall (m :: * -> *) k x d.
Monad m =>
(k -> FoldM m x d) -> ReduceM m k x d
MR.ReduceFoldM forall a b. (a -> b) -> a -> b
$ \k
k -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> FoldM m x y
gf k
k)
{-# INLINABLE reduceMMapWithKey #-}


{- $reducers
The most common case is that the reduction doesn't depend on the key.
These functions combine a key-independent processing step and a labeling step for the four variations of @Reduce@.
-}

-- | create a Reduce from a function of the grouped data to y and a function from the key and y to the result type
processAndLabel
  :: (forall h . (Foldable h, Functor h) => h x -> y)
  -> (k -> y -> z)
  -> MR.Reduce k x z
processAndLabel :: forall x y k z.
(forall (h :: * -> *). (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z) -> Reduce k x z
processAndLabel forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
process k -> y -> z
relabel = forall k x d.
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> d)
-> Reduce k x d
MR.Reduce forall a b. (a -> b) -> a -> b
$ \k
k -> k -> y -> z
relabel k
k forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
process
{-# INLINABLE processAndLabel #-}

-- | create a monadic ReduceM from a function of the grouped data to (m y) and a function from the key and y to the result type
processAndLabelM
  :: Monad m
  => (forall h . (Foldable h, Functor h) => h x -> m y)
  -> (k -> y -> z)
  -> MR.ReduceM m k x z
processAndLabelM :: forall (m :: * -> *) x y k z.
Monad m =>
(forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y)
-> (k -> y -> z) -> ReduceM m k x z
processAndLabelM forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
processM k -> y -> z
relabel =
  forall (m :: * -> *) k x d.
Monad m =>
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m d)
-> ReduceM m k x d
MR.ReduceM forall a b. (a -> b) -> a -> b
$ \k
k -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
processM
{-# INLINABLE processAndLabelM #-}

-- | create a Reduce from a fold of the grouped data to y and a function from the key and y to the result type
foldAndLabel :: FL.Fold x y -> (k -> y -> z) -> MR.Reduce k x z
foldAndLabel :: forall x y k z. Fold x y -> (k -> y -> z) -> Reduce k x z
foldAndLabel Fold x y
fld k -> y -> z
relabel = let q :: k -> Fold x z
q !k
k = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) Fold x y
fld in forall k x d. (k -> Fold x d) -> Reduce k x d
MR.ReduceFold k -> Fold x z
q
{-# INLINABLE foldAndLabel #-}

-- | create a monadic ReduceM from a monadic fold of the grouped data to (m y) and a function from the key and y to the result type
foldAndLabelM
  :: Monad m => FL.FoldM m x y -> (k -> y -> z) -> MR.ReduceM m k x z
foldAndLabelM :: forall (m :: * -> *) x y k z.
Monad m =>
FoldM m x y -> (k -> y -> z) -> ReduceM m k x z
foldAndLabelM FoldM m x y
fld k -> y -> z
relabel =
  let q :: k -> FoldM m x z
q !k
k = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) FoldM m x y
fld in forall (m :: * -> *) k x d.
Monad m =>
(k -> FoldM m x d) -> ReduceM m k x d
MR.ReduceFoldM k -> FoldM m x z
q
{-# INLINABLE foldAndLabelM #-}

-- | The simple fold types return lists of results.  Often we want to merge these into some other structure via (<>)
concatFold :: (Monoid d, Foldable g) => FL.Fold a (g d) -> FL.Fold a d
concatFold :: forall d (g :: * -> *) a.
(Monoid d, Foldable g) =>
Fold a (g d) -> Fold a d
concatFold = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold

-- | The simple fold types return lists of results.  Often we want to merge these into some other structure via (<>)
concatFoldM
  :: (Monad m, Monoid d, Foldable g) => FL.FoldM m a (g d) -> FL.FoldM m a d
concatFoldM :: forall (m :: * -> *) d (g :: * -> *) a.
(Monad m, Monoid d, Foldable g) =>
FoldM m a (g d) -> FoldM m a d
concatFoldM = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold

mapReduceFold
  :: Ord k
  => MR.Unpack x y -- ^ unpack x to none or one or many y's
  -> MR.Assign k y c -- ^ assign each y to a key value pair (k,c)
  -> MR.Reduce k c d -- ^ reduce a grouped [c] to d
  -> FL.Fold x [d]
mapReduceFold :: forall k x y c d.
Ord k =>
Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
mapReduceFold Unpack x y
u Assign k y c
a Reduce k c d
r =
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
MRSL.resultToList)
    forall a b. (a -> b) -> a -> b
$ forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
MRSL.streamlyEngine forall (m :: * -> *) k c.
(Monad m, Ord k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
MRSL.groupByOrderedKey Unpack x y
u Assign k y c
a Reduce k c d
r
{-# INLINABLE mapReduceFold #-}

mapReduceFoldM
  :: (Monad m, Ord k)
  => MR.UnpackM m x y -- ^ unpack x to none or one or many y's
  -> MR.AssignM m k y c -- ^ assign each y to a key value pair (k,c)
  -> MR.ReduceM m k c d -- ^ reduce a grouped [c] to d
  -> FL.FoldM m x [d]
mapReduceFoldM :: forall (m :: * -> *) k x y c d.
(Monad m, Ord k) =>
UnpackM m x y
-> AssignM m k y c -> ReduceM m k c d -> FoldM m x [d]
mapReduceFoldM UnpackM m x y
u AssignM m k y c
a ReduceM m k c d
r =
  forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM forall a. a -> a
id forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
MRST.resultToList forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
MRST.streamingEngineM
    forall (m :: * -> *) k c r.
(Monad m, Ord k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
MRST.groupByOrderedKey
    UnpackM m x y
u
    AssignM m k y c
a
    ReduceM m k c d
r
{-# INLINABLE mapReduceFoldM #-}

hashableMapReduceFold
  :: (Hashable k, Eq k)
  => MR.Unpack x y -- ^ unpack x to none or one or many y's
  -> MR.Assign k y c -- ^ assign each y to a key value pair (k,c)
  -> MR.Reduce k c d -- ^ reduce a grouped [c] to d
  -> FL.Fold x [d]
hashableMapReduceFold :: forall k x y c d.
(Hashable k, Eq k) =>
Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
hashableMapReduceFold Unpack x y
u Assign k y c
a Reduce k c d
r =
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
MRSL.resultToList)
    forall a b. (a -> b) -> a -> b
$ forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
MRSL.streamlyEngine forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
MRSL.groupByHashableKey Unpack x y
u Assign k y c
a Reduce k c d
r
{-# INLINABLE hashableMapReduceFold #-}

hashableMapReduceFoldM
  :: (Monad m, Hashable k, Eq k)
  => MR.UnpackM m x y -- ^ unpack x to to none or one or many y's
  -> MR.AssignM m k y c -- ^ assign each y to a key value pair (k,c)
  -> MR.ReduceM m k c d -- ^ reduce a grouped [c] to d
  -> FL.FoldM m x [d]
hashableMapReduceFoldM :: forall (m :: * -> *) k x y c d.
(Monad m, Hashable k, Eq k) =>
UnpackM m x y
-> AssignM m k y c -> ReduceM m k c d -> FoldM m x [d]
hashableMapReduceFoldM UnpackM m x y
u AssignM m k y c
a ReduceM m k c d
r =
  forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM forall a. a -> a
id forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
MRST.resultToList forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
MRST.streamingEngineM
    forall (m :: * -> *) k c r.
(Monad m, Hashable k, Eq k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
MRST.groupByHashableKey
    UnpackM m x y
u
    AssignM m k y c
a
    ReduceM m k c d
r
{-# INLINABLE hashableMapReduceFoldM #-}

-- | do only the unpack step.
unpackOnlyFold :: MR.Unpack x y -> FL.Fold x [y]
unpackOnlyFold :: forall x y. Unpack x y -> Fold x [y]
unpackOnlyFold Unpack x y
u = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall x y. Unpack x y -> [x] -> [y]
MRL.unpackList Unpack x y
u) forall a. Fold a [a]
FL.list
{-# INLINABLE unpackOnlyFold #-}

-- | do only the (monadic) unpack step. Use a TypeApplication to specify what to unpack to. As in 'unpackOnlyFoldM @[]'
unpackOnlyFoldM :: Monad m => MR.UnpackM m x y -> FL.FoldM m x [y]
unpackOnlyFoldM :: forall (m :: * -> *) x y. Monad m => UnpackM m x y -> FoldM m x [y]
unpackOnlyFoldM UnpackM m x y
u = forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM (forall (m :: * -> *) x y. UnpackM m x y -> [x] -> m [y]
MRL.unpackListM UnpackM m x y
u) (forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize forall a. Fold a [a]
FL.list)
{-# INLINABLE unpackOnlyFoldM #-}