{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE TypeApplications      #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.List
Description : map-reduce-folds builders
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

map-reduce engine (fold builder) using [] as its intermediate type.
-}
module Control.MapReduce.Engines.List
  (
  -- * Engines
    listEngine
  , listEngineM

  -- * @groupBy@ Functions
  , groupByHashableKey
  , groupByOrderedKey

  -- * Helpers
  , unpackList
  , unpackListM
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE

import qualified Control.Foldl                 as FL
import qualified Data.List                     as L
import qualified Data.Foldable                 as F
import           Data.Hashable                  ( Hashable )
import qualified Data.HashMap.Strict           as HMS
import qualified Data.Map.Strict               as MS
import qualified Data.Sequence                 as Seq
import           Control.Monad                  ( filterM )
import           Control.Arrow                  ( second )



-- | unpack for list based map/reduce
unpackList :: MRC.Unpack x y -> [x] -> [y]
unpackList :: Unpack x y -> [x] -> [y]
unpackList (MRC.Filter x -> Bool
t) = (x -> Bool) -> [x] -> [x]
forall a. (a -> Bool) -> [a] -> [a]
L.filter x -> Bool
t
unpackList (MRC.Unpack x -> g y
f) = (x -> [y]) -> [x] -> [y]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
L.concatMap (g y -> [y]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (g y -> [y]) -> (x -> g y) -> x -> [y]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackList #-}

-- | effectful unpack for list based map/reduce
unpackListM :: MRC.UnpackM m x y -> [x] -> m [y]
unpackListM :: UnpackM m x y -> [x] -> m [y]
unpackListM (MRC.FilterM x -> m Bool
t) = (x -> m Bool) -> [x] -> m [x]
forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM x -> m Bool
t
unpackListM (MRC.UnpackM x -> m (g y)
f) = ([[y]] -> [y]) -> m [[y]] -> m [y]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[y]] -> [y]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
L.concat (m [[y]] -> m [y]) -> ([x] -> m [[y]]) -> [x] -> m [y]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> m [y]) -> [x] -> m [[y]]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((g y -> [y]) -> m (g y) -> m [y]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g y -> [y]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (m (g y) -> m [y]) -> (x -> m (g y)) -> x -> m [y]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackListM #-}

-- | group the mapped and assigned values by key using a Data.HashMap.Strict
groupByHashableKey :: (Hashable k, Eq k) => [(k, c)] -> [(k, Seq.Seq c)]
groupByHashableKey :: [(k, c)] -> [(k, Seq c)]
groupByHashableKey =
  HashMap k (Seq c) -> [(k, Seq c)]
forall k v. HashMap k v -> [(k, v)]
HMS.toList (HashMap k (Seq c) -> [(k, Seq c)])
-> ([(k, c)] -> HashMap k (Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> HashMap k (Seq c)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> HashMap k (Seq c))
-> ([(k, c)] -> [(k, Seq c)]) -> [(k, c)] -> HashMap k (Seq c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton)
{-# INLINABLE groupByHashableKey #-}

-- | group the mapped and assigned values by key using a Data.HashMap.Strict
groupByOrderedKey :: Ord k => [(k, c)] -> [(k, Seq.Seq c)]
groupByOrderedKey :: [(k, c)] -> [(k, Seq c)]
groupByOrderedKey =
  Map k (Seq c) -> [(k, Seq c)]
forall k a. Map k a -> [(k, a)]
MS.toList (Map k (Seq c) -> [(k, Seq c)])
-> ([(k, c)] -> Map k (Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> Map k (Seq c)
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> Map k (Seq c))
-> ([(k, c)] -> [(k, Seq c)]) -> [(k, c)] -> Map k (Seq c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton)
{-# INLINABLE groupByOrderedKey #-}

-- | map-reduce-fold builder using (Hashable k, Eq k) keys and returning a [] result
listEngine
  :: (Foldable g, Functor g)
  => ([(k, c)] -> [(k, g c)])
  -> MRE.MapReduceFold y k c [] x d
listEngine :: ([(k, c)] -> [(k, g c)]) -> MapReduceFold y k c [] x d
listEngine [(k, c)] -> [(k, g c)]
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = ([x] -> [d]) -> Fold x [x] -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
  (((k, g c) -> d) -> [(k, g c)] -> [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((k -> g c -> d) -> (k, g c) -> d
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry ((k -> g c -> d) -> (k, g c) -> d)
-> (k -> g c -> d) -> (k, g c) -> d
forall a b. (a -> b) -> a -> b
$ Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r) ([(k, g c)] -> [d]) -> ([x] -> [(k, g c)]) -> [x] -> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(k, c)] -> [(k, g c)]
groupByKey ([(k, c)] -> [(k, g c)]) -> ([x] -> [(k, c)]) -> [x] -> [(k, g c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> (k, c)) -> [y] -> [(k, c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap y -> (k, c)
a ([y] -> [(k, c)]) -> ([x] -> [y]) -> [x] -> [(k, c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y -> [x] -> [y]
forall x y. Unpack x y -> [x] -> [y]
unpackList Unpack x y
u)
  Fold x [x]
forall a. Fold a [a]
FL.list
{-# INLINABLE listEngine #-}

-- | effectful map-reduce-fold builder using (Hashable k, Eq k) keys and returning a [] result
listEngineM
  :: (Monad m, Traversable g)
  => ([(k, c)] -> [(k, g c)])
  -> MRE.MapReduceFoldM m y k c [] x d
listEngineM :: ([(k, c)] -> [(k, g c)]) -> MapReduceFoldM m y k c [] x d
listEngineM [(k, c)] -> [(k, g c)]
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
rM = ([x] -> m [d]) -> FoldM m x [x] -> FoldM m x [d]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM
  ( (((k, g c) -> m d) -> [(k, g c)] -> m [d]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((k -> g c -> m d) -> (k, g c) -> m d
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry ((k -> g c -> m d) -> (k, g c) -> m d)
-> (k -> g c -> m d) -> (k, g c) -> m d
forall a b. (a -> b) -> a -> b
$ ReduceM m k c d -> k -> g c -> m d
forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
rM) ([(k, g c)] -> m [d]) -> m [(k, g c)] -> m [d]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<)
  (m [(k, g c)] -> m [d]) -> ([x] -> m [(k, g c)]) -> [x] -> m [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(k, c)] -> [(k, g c)]) -> m [(k, c)] -> m [(k, g c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(k, c)] -> [(k, g c)]
groupByKey
  (m [(k, c)] -> m [(k, g c)])
-> ([x] -> m [(k, c)]) -> [x] -> m [(k, g c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((y -> m (k, c)) -> [y] -> m [(k, c)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse y -> m (k, c)
a ([y] -> m [(k, c)]) -> m [y] -> m [(k, c)]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<)
  (m [y] -> m [(k, c)]) -> ([x] -> m [y]) -> [x] -> m [(k, c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnpackM m x y -> [x] -> m [y]
forall (m :: * -> *) x y. UnpackM m x y -> [x] -> m [y]
unpackListM UnpackM m x y
u
  )
  (Fold x [x] -> FoldM m x [x]
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize Fold x [x]
forall a. Fold a [a]
FL.list)
{-# INLINABLE listEngineM #-}