{-# LANGUAGE CPP                   #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE BangPatterns          #-}
{-# OPTIONS_GHC -O2 -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.Streams
Description : map-reduce-folds builders
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

map-reduce engine (fold builder) using @Streamly@ streams as its intermediate and return type.

Notes:
1. These are polymorphic in the return stream type.  Thought the streams do have to be @serial@ when @groupBy@ is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.

2. There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream.  I've not
been able to verify that this is faster on an appropriate task with appropriate runtime settings.
-}
module Control.MapReduce.Engines.Streamly
  (
    -- * Engines
    streamlyEngine
  , streamlyEngineM
  , concurrentStreamlyEngine

    -- * Streamly Combinators
  , toStreamlyFold
  , toStreamlyFoldM

    -- * Result Extraction
  , resultToList
  , concatStream
  , concatStreamFold
  , concatStreamFoldM
  , concatConcurrentStreamFold

    -- * @groupBy@ Functions
  , groupByHashableKey
  , groupByOrderedKey
  , groupByHashableKeyST
  , groupByDiscriminatedKey

    -- * Re-Exports
  , SerialT
  , WSerialT
  , AheadT
  , AsyncT
  , WAsyncT
  , ParallelT
  , MonadAsync
  , IsStream
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE

import           Control.Arrow                  ( second )
import qualified Control.Foldl                 as FL
import           Control.Monad.ST              as ST
import qualified Data.Discrimination.Grouping  as DG
import qualified Data.Foldable                 as F
import           Data.Functor.Identity          ( Identity(runIdentity) )
import           Data.Hashable                  ( Hashable )
import qualified Data.HashMap.Strict           as HMS
import qualified Data.HashTable.Class          as HT
import qualified Data.HashTable.ST.Cuckoo      as HTC
import qualified Data.List.NonEmpty            as LNE
import qualified Data.Maybe                    as Maybe
import qualified Data.Map.Strict               as MS
import qualified Data.Sequence                 as Seq
#if MIN_VERSION_streamly(0,8,0)
import qualified Streamly.Prelude              as S
import qualified Streamly.Internal.Data.Fold   as SF
import           Streamly.Prelude              ( SerialT
                                                , WSerialT
                                                , AheadT
                                                , AsyncT
                                                , WAsyncT
                                                , ParallelT
                                                , MonadAsync
                                                , IsStream
                                                )
#else
import qualified Streamly.Prelude              as S
import qualified Streamly                      as S
import qualified Streamly.Internal.Data.Fold   as SF
import           Streamly                       ( SerialT
                                                , WSerialT
                                                , AheadT
                                                , AsyncT
                                                , WAsyncT
                                                , ParallelT
                                                , MonadAsync
                                                , IsStream
                                                )
--import qualified Streamly.Internal.Data.Parser.ParserK.Type as Streamly
#endif

#if MIN_VERSION_streamly(0,8,0)
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: m a -> t m a
fromEffect = m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.fromEffect
{-# INLINE fromEffect #-}

-- | convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
toStreamlyFoldM :: Functor m => FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM :: FoldM m a b -> Fold m a b
toStreamlyFoldM (FL.FoldM x -> a -> m x
step m x
start x -> m b
done) = (x -> a -> m (Step x b))
-> m (Step x b) -> (x -> m b) -> Fold m a b
forall s a (m :: * -> *) b.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
SF.mkFoldM x -> a -> m (Step x b)
step' (x -> Step x b
forall s b. s -> Step s b
SF.Partial (x -> Step x b) -> m x -> m (Step x b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m x
start) x -> m b
done where
  step' :: x -> a -> m (Step x b)
step' x
s a
a = x -> Step x b
forall s b. s -> Step s b
SF.Partial (x -> Step x b) -> m x -> m (Step x b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> x -> a -> m x
step x
s a
a

-- | convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold :: Fold a b -> Fold m a b
toStreamlyFold (FL.Fold x -> a -> x
step x
start x -> b
done) = (x -> a -> Step x b) -> Step x b -> (x -> b) -> Fold m a b
forall (m :: * -> *) s a b.
Monad m =>
(s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
SF.mkFold x -> a -> Step x b
step' (x -> Step x b
forall s b. s -> Step s b
SF.Partial x
start) x -> b
done where
  step' :: x -> a -> Step x b
step' x
s a
a = x -> Step x b
forall s b. s -> Step s b
SF.Partial (x -> Step x b) -> x -> Step x b
forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
s a
a
#else
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect = S.yieldM
{-# INLINE fromEffect #-}

-- | convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
toStreamlyFoldM :: FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM (FL.FoldM step start done) = SF.mkFold step start done

-- | convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold (FL.Fold step start done) = SF.mkPure step start done
#endif

-- | unpack for streamly based map/reduce
unpackStream :: S.IsStream t => MRC.Unpack x y -> t Identity x -> t Identity y
unpackStream :: Unpack x y -> t Identity x -> t Identity y
unpackStream (MRC.Filter x -> Bool
t) = (x -> Bool) -> t Identity x -> t Identity x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = (x -> t Identity y) -> t Identity x -> t Identity y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (g y -> t Identity y
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable (g y -> t Identity y) -> (x -> g y) -> x -> t Identity y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackStream #-}

-- | effectful (monadic) unpack for streamly based map/reduce
unpackStreamM :: (S.IsStream t, Monad m) => MRC.UnpackM m x y -> t m x -> t m y
unpackStreamM :: UnpackM m x y -> t m x -> t m y
unpackStreamM (MRC.FilterM x -> m Bool
t) = (x -> m Bool) -> t m x -> t m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = (x -> m (t m y)) -> t m x -> t m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
S.concatMapM ((g y -> t m y) -> m (g y) -> m (t m y)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g y -> t m y
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable (m (g y) -> m (t m y)) -> (x -> m (g y)) -> x -> m (t m y)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackStreamM #-}

-- | make a stream into an (effectful) @[]@
resultToList :: (Monad m, S.IsStream t) => t m a -> m [a]
resultToList :: t m a -> m [a]
resultToList = SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList (SerialT m a -> m [a]) -> (t m a -> SerialT m a) -> t m a -> m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt

-- | mappend all in a monoidal stream
concatStream :: (Monad m, Monoid a) => S.SerialT m a -> m a
concatStream :: SerialT m a -> m a
concatStream = (a -> a -> a) -> a -> SerialT m a -> m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
S.foldl' a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>) a
forall a. Monoid a => a
mempty

-- | mappend everything in a pure Streamly fold
concatStreamFold :: Monoid b => FL.Fold a (S.SerialT Identity b) -> FL.Fold a b
concatStreamFold :: Fold a (SerialT Identity b) -> Fold a b
concatStreamFold = (SerialT Identity b -> b)
-> Fold a (SerialT Identity b) -> Fold a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity b -> b
forall a. Identity a -> a
runIdentity (Identity b -> b)
-> (SerialT Identity b -> Identity b) -> SerialT Identity b -> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity b -> Identity b
forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream)

-- | mappend everything in an effectful Streamly fold.
concatStreamFoldM
  :: (Monad m, Monoid b, S.IsStream t) => FL.FoldM m a (t m b) -> FL.FoldM m a b
concatStreamFoldM :: FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM = (t m b -> m b) -> FoldM m a (t m b) -> FoldM m a b
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM (SerialT m b -> m b
forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream (SerialT m b -> m b) -> (t m b -> SerialT m b) -> t m b -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m b -> SerialT m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt)

-- | mappend everything in a concurrent Streamly fold.
concatConcurrentStreamFold
  :: (Monad m, Monoid b, S.IsStream t) => FL.Fold a (t m b) -> FL.FoldM m a b
concatConcurrentStreamFold :: Fold a (t m b) -> FoldM m a b
concatConcurrentStreamFold = FoldM m a (t m b) -> FoldM m a b
forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM (FoldM m a (t m b) -> FoldM m a b)
-> (Fold a (t m b) -> FoldM m a (t m b))
-> Fold a (t m b)
-> FoldM m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fold a (t m b) -> FoldM m a (t m b)
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize

-- | map-reduce-fold builder returning a @SerialT Identity d@ result
streamlyEngine
  :: (Foldable g, Functor g)
  => (forall z . S.SerialT Identity (k, z) -> S.SerialT Identity (k, g z))
  -> MRE.MapReduceFold y k c (SerialT Identity) x d
streamlyEngine :: (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (SerialT Identity x -> x -> SerialT Identity x)
-> SerialT Identity x
-> (SerialT Identity x -> SerialT Identity d)
-> Fold x (SerialT Identity d)
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
  ((x -> SerialT Identity x -> SerialT Identity x)
-> SerialT Identity x -> x -> SerialT Identity x
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> SerialT Identity x -> SerialT Identity x
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
  SerialT Identity x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
  ( ((k, g c) -> d) -> SerialT Identity (k, g c) -> SerialT Identity d
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map (\(k
k, g c
lc) -> Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
  (SerialT Identity (k, g c) -> SerialT Identity d)
-> (SerialT Identity x -> SerialT Identity (k, g c))
-> SerialT Identity x
-> SerialT Identity d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity (k, c) -> SerialT Identity (k, g c)
forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey
  (SerialT Identity (k, c) -> SerialT Identity (k, g c))
-> (SerialT Identity x -> SerialT Identity (k, c))
-> SerialT Identity x
-> SerialT Identity (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> (k, c)) -> SerialT Identity y -> SerialT Identity (k, c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map y -> (k, c)
a
  (SerialT Identity y -> SerialT Identity (k, c))
-> (SerialT Identity x -> SerialT Identity y)
-> SerialT Identity x
-> SerialT Identity (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y -> SerialT Identity x -> SerialT Identity y
forall (t :: (* -> *) -> * -> *) x y.
IsStream t =>
Unpack x y -> t Identity x -> t Identity y
unpackStream Unpack x y
u
  )
{-# INLINABLE streamlyEngine #-}

-- | unpack for concurrent streamly based map/reduce
unpackConcurrently
  :: (S.MonadAsync m, S.IsStream t) => MRC.Unpack x y -> t m x -> t m y
unpackConcurrently :: Unpack x y -> t m x -> t m y
unpackConcurrently (MRC.Filter x -> Bool
t) = (x -> Bool) -> t m x -> t m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackConcurrently (MRC.Unpack x -> g y
f) = (x -> t m y) -> t m x -> t m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (g y -> t m y
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable (g y -> t m y) -> (x -> g y) -> x -> t m y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackConcurrently #-}

-- | possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an @(Istream t, MonadAsync m) => t m d@ result
concurrentStreamlyEngine
  :: forall tIn tOut m g y k c x d
   . (S.IsStream tIn, S.IsStream tOut, S.MonadAsync m, Foldable g, Functor g)
  => (forall z . S.SerialT m (k, z) -> S.SerialT m (k, g z))
  -> MRE.MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine :: (forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (tIn m x -> x -> tIn m x)
-> tIn m x -> (tIn m x -> tOut m d) -> Fold x (tOut m d)
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
  (\tIn m x
s x
a' -> (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
a') m x -> tIn m x -> tIn m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`S.consM` tIn m x
s)
  tIn m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
  ( ((k, g c) -> m d) -> tOut m (k, g c) -> tOut m d
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> d -> m d
forall (m :: * -> *) a. Monad m => a -> m a
return (d -> m d) -> d -> m d
forall a b. (a -> b) -> a -> b
$ Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
  (tOut m (k, g c) -> tOut m d)
-> (tIn m x -> tOut m (k, g c)) -> tIn m x -> tOut m d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(IsStream SerialT, IsStream tOut) =>
SerialT m a -> tOut m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @SerialT @tOut -- make it concurrent for reducing
  (SerialT m (k, g c) -> tOut m (k, g c))
-> (tIn m x -> SerialT m (k, g c)) -> tIn m x -> tOut m (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m (k, c) -> SerialT m (k, g c)
forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey
  (SerialT m (k, c) -> SerialT m (k, g c))
-> (tIn m x -> SerialT m (k, c)) -> tIn m x -> SerialT m (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(IsStream tIn, IsStream SerialT) =>
tIn m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @tIn @SerialT-- make it serial for grouping
  (tIn m (k, c) -> SerialT m (k, c))
-> (tIn m x -> tIn m (k, c)) -> tIn m x -> SerialT m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> m (k, c)) -> tIn m y -> tIn m (k, c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM ((k, c) -> m (k, c)
forall (m :: * -> *) a. Monad m => a -> m a
return ((k, c) -> m (k, c)) -> (y -> (k, c)) -> y -> m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> (k, c)
a)
  (tIn m y -> tIn m (k, c))
-> (tIn m x -> tIn m y) -> tIn m x -> tIn m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (tIn m x -> tIn m y) -> tIn m x -> tIn m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(S.|$) (Unpack x y -> tIn m x -> tIn m y
forall (m :: * -> *) (t :: (* -> *) -> * -> *) x y.
(MonadAsync m, IsStream t) =>
Unpack x y -> t m x -> t m y
unpackConcurrently Unpack x y
u)
  )
{-# INLINABLE concurrentStreamlyEngine #-}


-- | effectful map-reduce-fold engine returning a (Istream t => t m d) result
-- The "MonadAsync" constraint here more or less requires us to run in IO, or something IO like.
streamlyEngineM
  :: (S.IsStream t, Monad m, S.MonadAsync m, Traversable g)
  => (forall z . SerialT m (k, z) -> SerialT m (k, g z))
  -> MRE.MapReduceFoldM m y k c (t m) x d
streamlyEngineM :: (forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFoldM m y k c (t m) x d
streamlyEngineM forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
  Fold x (t m d) -> FoldM m x (t m d)
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize
    (Fold x (t m d) -> FoldM m x (t m d))
-> Fold x (t m d) -> FoldM m x (t m d)
forall a b. (a -> b) -> a -> b
$ (SerialT m d -> t m d) -> Fold x (SerialT m d) -> Fold x (t m d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap SerialT m d -> t m d
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt
    (Fold x (SerialT m d) -> Fold x (t m d))
-> Fold x (SerialT m d) -> Fold x (t m d)
forall a b. (a -> b) -> a -> b
$ (SerialT m x -> x -> SerialT m x)
-> SerialT m x
-> (SerialT m x -> SerialT m d)
-> Fold x (SerialT m d)
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
        ((x -> SerialT m x -> SerialT m x)
-> SerialT m x -> x -> SerialT m x
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> SerialT m x -> SerialT m x
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
        SerialT m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
        ( ((k, g c) -> m d) -> SerialT m (k, g c) -> SerialT m d
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> ReduceM m k c d -> k -> g c -> m d
forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
        (SerialT m (k, g c) -> SerialT m d)
-> (SerialT m x -> SerialT m (k, g c))
-> SerialT m x
-> SerialT m d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m (k, c) -> SerialT m (k, g c)
forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey -- this requires a serial stream.
        (SerialT m (k, c) -> SerialT m (k, g c))
-> (SerialT m x -> SerialT m (k, c))
-> SerialT m x
-> SerialT m (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> m (k, c)) -> SerialT m y -> SerialT m (k, c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM y -> m (k, c)
a
        (SerialT m y -> SerialT m (k, c))
-> (SerialT m x -> SerialT m y) -> SerialT m x -> SerialT m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnpackM m x y -> SerialT m x -> SerialT m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x y.
(IsStream t, Monad m) =>
UnpackM m x y -> t m x -> t m y
unpackStreamM UnpackM m x y
u
        )
{-# INLINABLE streamlyEngineM #-}

-- TODO: Try using Streamly folds and Map.insertWith instead of toList and fromListWith.  Prolly the same.
-- | Group streamly stream of @(k,c)@ by @hashable@ key.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByHashableKey
  :: (Monad m, Hashable k, Eq k)
  => S.SerialT m (k, c)
  -> S.SerialT m (k, Seq.Seq c)
groupByHashableKey :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKey SerialT m (k, c)
s = do
  [(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
  let hm :: HashMap k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> HashMap k (Seq c)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> HashMap k (Seq c))
-> [(k, Seq c)] -> HashMap k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second ((c -> Seq c) -> (k, c) -> (k, Seq c))
-> (c -> Seq c) -> (k, c) -> (k, Seq c)
forall a b. (a -> b) -> a -> b
$ c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
  (k -> Seq c -> SerialT m (k, Seq c) -> SerialT m (k, Seq c))
-> SerialT m (k, Seq c)
-> HashMap k (Seq c)
-> SerialT m (k, Seq c)
forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> (k, Seq c) -> SerialT m (k, Seq c) -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}

-- TODO: Try using Streamly folds and Map.insertWith instead of toList and fromListWith.  Prolly the same.
-- | Group streamly stream of @(k,c)@ by ordered key.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByOrderedKey
  :: (Monad m, Ord k) => S.SerialT m (k, c) -> S.SerialT m (k, Seq.Seq c)
groupByOrderedKey :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByOrderedKey SerialT m (k, c)
s = do
  [(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
  let hm :: Map k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> Map k (Seq c)
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> Map k (Seq c)) -> [(k, Seq c)] -> Map k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second ((c -> Seq c) -> (k, c) -> (k, Seq c))
-> (c -> Seq c) -> (k, c) -> (k, Seq c)
forall a b. (a -> b) -> a -> b
$ c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
  (k -> Seq c -> SerialT m (k, Seq c) -> SerialT m (k, Seq c))
-> SerialT m (k, Seq c) -> Map k (Seq c) -> SerialT m (k, Seq c)
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> (k, Seq c) -> SerialT m (k, Seq c) -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}

-- | Group streamly stream of @(k,c)@ by @hashable@ key. Uses mutable hashtables running in the ST monad.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByHashableKeyST
  :: (Monad m, Hashable k, Eq k)
  => S.SerialT m (k, c)
  -> S.SerialT m (k, Seq.Seq c)
groupByHashableKeyST :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKeyST SerialT m (k, c)
st = do
  [(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
st)
  (forall s. ST s (SerialT m (k, Seq c))) -> SerialT m (k, Seq c)
forall a. (forall s. ST s a) -> a
ST.runST ((forall s. ST s (SerialT m (k, Seq c))) -> SerialT m (k, Seq c))
-> (forall s. ST s (SerialT m (k, Seq c))) -> SerialT m (k, Seq c)
forall a b. (a -> b) -> a -> b
$ do
    HashTable s k (Seq c)
hm <- ((Seq c -> Seq c -> Seq c)
-> [(k, Seq c)] -> ST s (HashTable s k (Seq c))
forall (h :: * -> * -> * -> *) k v s.
(HashTable h, Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> ST s (h s k v)
MRE.fromListWithHT @HTC.HashTable) Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>)
      ([(k, Seq c)] -> ST s (HashTable s k (Seq c)))
-> [(k, Seq c)] -> ST s (HashTable s k (Seq c))
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
    (SerialT m (k, Seq c) -> (k, Seq c) -> ST s (SerialT m (k, Seq c)))
-> SerialT m (k, Seq c)
-> HashTable s k (Seq c)
-> ST s (SerialT m (k, Seq c))
forall (h :: * -> * -> * -> *) a k v s.
HashTable h =>
(a -> (k, v) -> ST s a) -> a -> h s k v -> ST s a
HT.foldM (\SerialT m (k, Seq c)
s' (k
k, Seq c
sc) -> SerialT m (k, Seq c) -> ST s (SerialT m (k, Seq c))
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT m (k, Seq c) -> ST s (SerialT m (k, Seq c)))
-> SerialT m (k, Seq c) -> ST s (SerialT m (k, Seq c))
forall a b. (a -> b) -> a -> b
$ (k, Seq c) -> SerialT m (k, Seq c) -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
sc) SerialT m (k, Seq c)
s') SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashTable s k (Seq c)
hm
{-# INLINABLE groupByHashableKeyST #-}


-- | Group streamly stream of @(k,c)@ by key with instance of Grouping from <http://hackage.haskell.org/package/discrimination>.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByDiscriminatedKey
  :: (Monad m, DG.Grouping k)
  => S.SerialT m (k, c)
  -> S.SerialT m (k, Seq.Seq c)
groupByDiscriminatedKey :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByDiscriminatedKey SerialT m (k, c)
s = do
  [(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
  let g :: LNE.NonEmpty (k, c) -> (k, Seq.Seq c)
      g :: NonEmpty (k, c) -> (k, Seq c)
g NonEmpty (k, c)
x = let k :: k
k = (k, c) -> k
forall a b. (a, b) -> a
fst (NonEmpty (k, c) -> (k, c)
forall a. NonEmpty a -> a
LNE.head NonEmpty (k, c)
x) in (k
k, NonEmpty (Seq c) -> Seq c
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold (NonEmpty (Seq c) -> Seq c) -> NonEmpty (Seq c) -> Seq c
forall a b. (a -> b) -> a -> b
$ ((k, c) -> Seq c) -> NonEmpty (k, c) -> NonEmpty (Seq c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (c -> Seq c
forall a. a -> Seq a
Seq.singleton (c -> Seq c) -> ((k, c) -> c) -> (k, c) -> Seq c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, c) -> c
forall a b. (a, b) -> b
snd) NonEmpty (k, c)
x)
  [(k, Seq c)] -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable ([(k, Seq c)] -> SerialT m (k, Seq c))
-> [(k, Seq c)] -> SerialT m (k, Seq c)
forall a b. (a -> b) -> a -> b
$ [Maybe (k, Seq c)] -> [(k, Seq c)]
forall a. [Maybe a] -> [a]
Maybe.catMaybes ([Maybe (k, Seq c)] -> [(k, Seq c)])
-> ([[(k, c)]] -> [Maybe (k, Seq c)]) -> [[(k, c)]] -> [(k, Seq c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(k, c)] -> Maybe (k, Seq c)) -> [[(k, c)]] -> [Maybe (k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((NonEmpty (k, c) -> (k, Seq c))
-> Maybe (NonEmpty (k, c)) -> Maybe (k, Seq c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap NonEmpty (k, c) -> (k, Seq c)
forall k c. NonEmpty (k, c) -> (k, Seq c)
g (Maybe (NonEmpty (k, c)) -> Maybe (k, Seq c))
-> ([(k, c)] -> Maybe (NonEmpty (k, c)))
-> [(k, c)]
-> Maybe (k, Seq c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(k, c)] -> Maybe (NonEmpty (k, c))
forall a. [a] -> Maybe (NonEmpty a)
LNE.nonEmpty) ([[(k, c)]] -> [(k, Seq c)]) -> [[(k, c)]] -> [(k, Seq c)]
forall a b. (a -> b) -> a -> b
$ ((k, c) -> k) -> [(k, c)] -> [[(k, c)]]
forall b a. Grouping b => (a -> b) -> [a] -> [[a]]
DG.groupWith (k, c) -> k
forall a b. (a, b) -> a
fst [(k, c)]
lkc
{-# INLINABLE groupByDiscriminatedKey #-}