{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE InstanceSigs          #-}
{-# LANGUAGE BangPatterns          #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Simple
  (
  
    noUnpack
  , simpleUnpack
  , filterUnpack
  
  , assign
  
  
  , processAndLabel
  , processAndLabelM
  , foldAndLabel
  , foldAndLabelM
  
  , reduceMapWithKey
  , reduceMMapWithKey
  
  , mapReduceFold
  , mapReduceFoldM
  , hashableMapReduceFold
  , hashableMapReduceFoldM
  , unpackOnlyFold
  , unpackOnlyFoldM
  
  , concatFold
  , concatFoldM
  
  , module Control.MapReduce.Core
  , Hashable
  )
where
import qualified Control.MapReduce.Core        as MR
import           Control.MapReduce.Core 
import qualified Control.MapReduce.Engines.Streaming
                                               as MRST
import qualified Control.MapReduce.Engines.Streamly
                                               as MRSL
import qualified Control.MapReduce.Engines.List
                                               as MRL
import qualified Control.Foldl                 as FL
import qualified Data.Foldable                 as F
import           Data.Functor.Identity          ( Identity(Identity)
                                                , runIdentity
                                                )
import           Data.Hashable                  ( Hashable )
noUnpack :: MR.Unpack x x
noUnpack :: Unpack x x
noUnpack = (x -> Bool) -> Unpack x x
forall x. (x -> Bool) -> Unpack x x
MR.Filter ((x -> Bool) -> Unpack x x) -> (x -> Bool) -> Unpack x x
forall a b. (a -> b) -> a -> b
$ Bool -> x -> Bool
forall a b. a -> b -> a
const Bool
True
{-# INLINABLE noUnpack #-}
simpleUnpack :: (x -> y) -> MR.Unpack x y
simpleUnpack :: (x -> y) -> Unpack x y
simpleUnpack x -> y
f = (x -> Identity y) -> Unpack x y
forall (g :: * -> *) x y. Traversable g => (x -> g y) -> Unpack x y
MR.Unpack ((x -> Identity y) -> Unpack x y)
-> (x -> Identity y) -> Unpack x y
forall a b. (a -> b) -> a -> b
$ y -> Identity y
forall a. a -> Identity a
Identity (y -> Identity y) -> (x -> y) -> x -> Identity y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> y
f
{-# INLINABLE simpleUnpack #-}
filterUnpack :: (x -> Bool) -> MR.Unpack x x
filterUnpack :: (x -> Bool) -> Unpack x x
filterUnpack = (x -> Bool) -> Unpack x x
forall x. (x -> Bool) -> Unpack x x
MR.Filter
{-# INLINABLE filterUnpack #-}
assign :: forall k y c . (y -> k) -> (y -> c) -> MR.Assign k y c
assign :: (y -> k) -> (y -> c) -> Assign k y c
assign y -> k
getKey y -> c
getCols = let f :: y -> (k, c)
f !y
y = (y -> k
getKey y
y, y -> c
getCols y
y) in (y -> (k, c)) -> Assign k y c
forall y k c. (y -> (k, c)) -> Assign k y c
MR.Assign y -> (k, c)
f
{-# INLINABLE assign #-}
reduceMapWithKey :: (k -> y -> z) -> MR.Reduce k x y -> MR.Reduce k x z
reduceMapWithKey :: (k -> y -> z) -> Reduce k x y -> Reduce k x z
reduceMapWithKey k -> y -> z
f Reduce k x y
r = case Reduce k x y
r of
  MR.Reduce     k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
g  -> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall k x d.
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> d)
-> Reduce k x d
MR.Reduce ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
 -> Reduce k x z)
-> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> (h x -> y) -> h x -> z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
g k
k)
  MR.ReduceFold k -> Fold x y
gf -> (k -> Fold x z) -> Reduce k x z
forall k x d. (k -> Fold x d) -> Reduce k x d
MR.ReduceFold ((k -> Fold x z) -> Reduce k x z)
-> (k -> Fold x z) -> Reduce k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> Fold x y -> Fold x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> Fold x y
gf k
k)
{-# INLINABLE reduceMapWithKey #-}
reduceMMapWithKey :: (k -> y -> z) -> MR.ReduceM m k x y -> MR.ReduceM m k x z
reduceMMapWithKey :: (k -> y -> z) -> ReduceM m k x y -> ReduceM m k x z
reduceMMapWithKey k -> y -> z
f ReduceM m k x y
r = case ReduceM m k x y
r of
  MR.ReduceM     k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
g  -> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m d)
-> ReduceM m k x d
MR.ReduceM ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
 -> ReduceM m k x z)
-> (k
    -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (m y -> m z) -> (h x -> m y) -> h x -> m z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((y -> z) -> m y -> m z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k)) (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
g k
k)
  MR.ReduceFoldM k -> FoldM m x y
gf -> (k -> FoldM m x z) -> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> FoldM m x d) -> ReduceM m k x d
MR.ReduceFoldM ((k -> FoldM m x z) -> ReduceM m k x z)
-> (k -> FoldM m x z) -> ReduceM m k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> FoldM m x y -> FoldM m x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> FoldM m x y
gf k
k)
{-# INLINABLE reduceMMapWithKey #-}
processAndLabel
  :: (forall h . (Foldable h, Functor h) => h x -> y)
  -> (k -> y -> z)
  -> MR.Reduce k x z
processAndLabel :: (forall (h :: * -> *). (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z) -> Reduce k x z
processAndLabel forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
process k -> y -> z
relabel = (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall k x d.
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> d)
-> Reduce k x d
MR.Reduce ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
 -> Reduce k x z)
-> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> k -> y -> z
relabel k
k (y -> z) -> (h x -> y) -> h x -> z
forall b c a. (b -> c) -> (a -> b) -> a -> c
. h x -> y
forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
process
{-# INLINABLE processAndLabel #-}
processAndLabelM
  :: Monad m
  => (forall h . (Foldable h, Functor h) => h x -> m y)
  -> (k -> y -> z)
  -> MR.ReduceM m k x z
processAndLabelM :: (forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y)
-> (k -> y -> z) -> ReduceM m k x z
processAndLabelM forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
processM k -> y -> z
relabel =
  (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m d)
-> ReduceM m k x d
MR.ReduceM ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
 -> ReduceM m k x z)
-> (k
    -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> m y -> m z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) (m y -> m z) -> (h x -> m y) -> h x -> m z
forall b c a. (b -> c) -> (a -> b) -> a -> c
. h x -> m y
forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
processM
{-# INLINABLE processAndLabelM #-}
foldAndLabel :: FL.Fold x y -> (k -> y -> z) -> MR.Reduce k x z
foldAndLabel :: Fold x y -> (k -> y -> z) -> Reduce k x z
foldAndLabel Fold x y
fld k -> y -> z
relabel = let q :: k -> Fold x z
q !k
k = (y -> z) -> Fold x y -> Fold x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) Fold x y
fld in (k -> Fold x z) -> Reduce k x z
forall k x d. (k -> Fold x d) -> Reduce k x d
MR.ReduceFold k -> Fold x z
q
{-# INLINABLE foldAndLabel #-}
foldAndLabelM
  :: Monad m => FL.FoldM m x y -> (k -> y -> z) -> MR.ReduceM m k x z
foldAndLabelM :: FoldM m x y -> (k -> y -> z) -> ReduceM m k x z
foldAndLabelM FoldM m x y
fld k -> y -> z
relabel =
  let q :: k -> FoldM m x z
q !k
k = (y -> z) -> FoldM m x y -> FoldM m x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) FoldM m x y
fld in (k -> FoldM m x z) -> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> FoldM m x d) -> ReduceM m k x d
MR.ReduceFoldM k -> FoldM m x z
q
{-# INLINABLE foldAndLabelM #-}
concatFold :: (Monoid d, Foldable g) => FL.Fold a (g d) -> FL.Fold a d
concatFold :: Fold a (g d) -> Fold a d
concatFold = (g d -> d) -> Fold a (g d) -> Fold a d
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g d -> d
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold
concatFoldM
  :: (Monad m, Monoid d, Foldable g) => FL.FoldM m a (g d) -> FL.FoldM m a d
concatFoldM :: FoldM m a (g d) -> FoldM m a d
concatFoldM = (g d -> d) -> FoldM m a (g d) -> FoldM m a d
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g d -> d
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold
mapReduceFold
  :: Ord k
  => MR.Unpack x y 
  -> MR.Assign k y c 
  -> MR.Reduce k c d 
  -> FL.Fold x [d]
mapReduceFold :: Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
mapReduceFold Unpack x y
u Assign k y c
a Reduce k c d
r =
  (SerialT Identity d -> [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity [d] -> [d]
forall a. Identity a -> a
runIdentity (Identity [d] -> [d])
-> (SerialT Identity d -> Identity [d])
-> SerialT Identity d
-> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity d -> Identity [d]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
MRSL.resultToList)
    (Fold x (SerialT Identity d) -> Fold x [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall a b. (a -> b) -> a -> b
$ (forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z))
-> MapReduceFold y k c (SerialT Identity) x d
forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
MRSL.streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z)
forall (m :: * -> *) k c.
(Monad m, Ord k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
MRSL.groupByOrderedKey Unpack x y
u Assign k y c
a Reduce k c d
r
{-# INLINABLE mapReduceFold #-}
mapReduceFoldM
  :: (Monad m, Ord k)
  => MR.UnpackM m x y 
  -> MR.AssignM m k y c 
  -> MR.ReduceM m k c d 
  -> FL.FoldM m x [d]
mapReduceFoldM :: UnpackM m x y
-> AssignM m k y c -> ReduceM m k c d -> FoldM m x [d]
mapReduceFoldM UnpackM m x y
u AssignM m k y c
a ReduceM m k c d
r =
  (m [d] -> m [d]) -> FoldM m x (m [d]) -> FoldM m x [d]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM m [d] -> m [d]
forall a. a -> a
id (FoldM m x (m [d]) -> FoldM m x [d])
-> FoldM m x (m [d]) -> FoldM m x [d]
forall a b. (a -> b) -> a -> b
$ (StreamResult m d -> m [d])
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamResult m d -> m [d]
forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
MRST.resultToList (FoldM m x (StreamResult m d) -> FoldM m x (m [d]))
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall a b. (a -> b) -> a -> b
$ (forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
MRST.streamingEngineM
    forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r
forall (m :: * -> *) k c r.
(Monad m, Ord k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
MRST.groupByOrderedKey
    UnpackM m x y
u
    AssignM m k y c
a
    ReduceM m k c d
r
{-# INLINABLE mapReduceFoldM #-}
hashableMapReduceFold
  :: (Hashable k, Eq k)
  => MR.Unpack x y 
  -> MR.Assign k y c 
  -> MR.Reduce k c d 
  -> FL.Fold x [d]
hashableMapReduceFold :: Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
hashableMapReduceFold Unpack x y
u Assign k y c
a Reduce k c d
r =
  (SerialT Identity d -> [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity [d] -> [d]
forall a. Identity a -> a
runIdentity (Identity [d] -> [d])
-> (SerialT Identity d -> Identity [d])
-> SerialT Identity d
-> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity d -> Identity [d]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
MRSL.resultToList)
    (Fold x (SerialT Identity d) -> Fold x [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall a b. (a -> b) -> a -> b
$ (forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z))
-> MapReduceFold y k c (SerialT Identity) x d
forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
MRSL.streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z)
forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
MRSL.groupByHashableKey Unpack x y
u Assign k y c
a Reduce k c d
r
{-# INLINABLE hashableMapReduceFold #-}
hashableMapReduceFoldM
  :: (Monad m, Hashable k, Eq k)
  => MR.UnpackM m x y 
  -> MR.AssignM m k y c 
  -> MR.ReduceM m k c d 
  -> FL.FoldM m x [d]
hashableMapReduceFoldM :: UnpackM m x y
-> AssignM m k y c -> ReduceM m k c d -> FoldM m x [d]
hashableMapReduceFoldM UnpackM m x y
u AssignM m k y c
a ReduceM m k c d
r =
  (m [d] -> m [d]) -> FoldM m x (m [d]) -> FoldM m x [d]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM m [d] -> m [d]
forall a. a -> a
id (FoldM m x (m [d]) -> FoldM m x [d])
-> FoldM m x (m [d]) -> FoldM m x [d]
forall a b. (a -> b) -> a -> b
$ (StreamResult m d -> m [d])
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamResult m d -> m [d]
forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
MRST.resultToList (FoldM m x (StreamResult m d) -> FoldM m x (m [d]))
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall a b. (a -> b) -> a -> b
$ (forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
MRST.streamingEngineM
    forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r
forall (m :: * -> *) k c r.
(Monad m, Hashable k, Eq k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
MRST.groupByHashableKey
    UnpackM m x y
u
    AssignM m k y c
a
    ReduceM m k c d
r
{-# INLINABLE hashableMapReduceFoldM #-}
unpackOnlyFold :: MR.Unpack x y -> FL.Fold x [y]
unpackOnlyFold :: Unpack x y -> Fold x [y]
unpackOnlyFold Unpack x y
u = ([x] -> [y]) -> Fold x [x] -> Fold x [y]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Unpack x y -> [x] -> [y]
forall x y. Unpack x y -> [x] -> [y]
MRL.unpackList Unpack x y
u) Fold x [x]
forall a. Fold a [a]
FL.list
{-# INLINABLE unpackOnlyFold #-}
unpackOnlyFoldM :: Monad m => MR.UnpackM m x y -> FL.FoldM m x [y]
unpackOnlyFoldM :: UnpackM m x y -> FoldM m x [y]
unpackOnlyFoldM UnpackM m x y
u = ([x] -> m [y]) -> FoldM m x [x] -> FoldM m x [y]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM (UnpackM m x y -> [x] -> m [y]
forall (m :: * -> *) x y. UnpackM m x y -> [x] -> m [y]
MRL.unpackListM UnpackM m x y
u) (Fold x [x] -> FoldM m x [x]
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize Fold x [x]
forall a. Fold a [a]
FL.list)
{-# INLINABLE unpackOnlyFoldM #-}