{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Vector
(
vectorEngine
, vectorEngineM
, groupByHashableKey
, groupByOrderedKey
, toList
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import qualified Control.Foldl as FL
import Control.Monad ( (<=<) )
import qualified Data.Foldable as F
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
import qualified Data.Vector as V
import Data.Vector ( Vector
, toList
)
import Control.Arrow ( second )
unpackVector :: MRC.Unpack x y -> Vector x -> Vector y
unpackVector :: forall x y. Unpack x y -> Vector x -> Vector y
unpackVector (MRC.Filter x -> Bool
t) = forall a. (a -> Bool) -> Vector a -> Vector a
V.filter x -> Bool
t
unpackVector (MRC.Unpack x -> g y
f) = forall a b. (a -> Vector b) -> Vector a -> Vector b
V.concatMap (forall a. [a] -> Vector a
V.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackVector #-}
unpackVectorM :: Monad m => MRC.UnpackM m x y -> Vector x -> m (Vector y)
unpackVectorM :: forall (m :: * -> *) x y.
Monad m =>
UnpackM m x y -> Vector x -> m (Vector y)
unpackVectorM (MRC.FilterM x -> m Bool
t) = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Vector a -> m (Vector a)
V.filterM x -> m Bool
t
unpackVectorM (MRC.UnpackM x -> m (g y)
f) =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. (a -> Vector b) -> Vector a -> Vector b
V.concatMap forall a. a -> a
id) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. [a] -> Vector a
V.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList) forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackVectorM #-}
groupByHashableKey
:: forall k c . (Hashable k, Eq k) => Vector (k, c) -> Vector (k, Seq.Seq c)
groupByHashableKey :: forall k c.
(Hashable k, Eq k) =>
Vector (k, c) -> Vector (k, Seq c)
groupByHashableKey Vector (k, c)
v =
let hm :: HashMap k (Seq c)
hm = forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall a. Vector a -> [a]
V.toList forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a. a -> Seq a
Seq.singleton) Vector (k, c)
v
in forall a. [a] -> Vector a
V.fromList forall a b. (a -> b) -> a -> b
$ forall k v. HashMap k v -> [(k, v)]
HMS.toList HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: forall k c . Ord k => Vector (k, c) -> Vector (k, Seq.Seq c)
groupByOrderedKey :: forall k c. Ord k => Vector (k, c) -> Vector (k, Seq c)
groupByOrderedKey Vector (k, c)
v =
let hm :: Map k (Seq c)
hm = forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall a. Vector a -> [a]
V.toList forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a. a -> Seq a
Seq.singleton) Vector (k, c)
v
in forall a. [a] -> Vector a
V.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
MS.toList Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}
vectorEngine
:: (Foldable g, Functor g)
=> (Vector (k, c) -> Vector (k, g c))
-> MRE.MapReduceFold y k c Vector x d
vectorEngine :: forall (g :: * -> *) k c y x d.
(Foldable g, Functor g) =>
(Vector (k, c) -> Vector (k, g c))
-> MapReduceFold y k c Vector x d
vectorEngine Vector (k, c) -> Vector (k, g c)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
( forall a b. (a -> b) -> Vector a -> Vector b
V.map (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector (k, c) -> Vector (k, g c)
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> Vector a -> Vector b
V.map y -> (k, c)
a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall x y. Unpack x y -> Vector x -> Vector y
unpackVector Unpack x y
u
)
forall (v :: * -> *) a. Vector v a => Fold a (v a)
FL.vector
{-# INLINABLE vectorEngine #-}
vectorEngineM
:: (Monad m, Traversable g)
=> (Vector (k, c) -> Vector (k, g c))
-> MRE.MapReduceFoldM m y k c Vector x d
vectorEngineM :: forall (m :: * -> *) (g :: * -> *) k c y x d.
(Monad m, Traversable g) =>
(Vector (k, c) -> Vector (k, g c))
-> MapReduceFoldM m y k c Vector x d
vectorEngineM Vector (k, c) -> Vector (k, g c)
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r = forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM
( (forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r)) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector (k, c) -> Vector (k, g c)
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM y -> m (k, c)
a forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< forall (m :: * -> *) x y.
Monad m =>
UnpackM m x y -> Vector x -> m (Vector y)
unpackVectorM UnpackM m x y
u)
)
(forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize forall (v :: * -> *) a. Vector v a => Fold a (v a)
FL.vector)
{-# INLINABLE vectorEngineM #-}