{-# OPTIONS_GHC -Wno-deprecations #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Top
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Top level IsStream module that can use all other lower level IsStream
-- modules.

module Streamly.Internal.Data.Stream.IsStream.Top {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
    (
    -- * Transformation
    -- ** Sampling
    -- | Value agnostic filtering.
      sampleFromThen
    , sampleIntervalStart
    , sampleIntervalEnd
    , sampleBurstStart
    , sampleBurstEnd

    -- ** Reordering
    , sortBy

    -- * Nesting
    -- ** Set like operations
    -- | These are not exactly set operations because streams are not
    -- necessarily sets, they may have duplicated elements.
    , intersectBy
    , intersectBySorted
    , differenceBy
    , mergeDifferenceBy
    , unionBy
    , mergeUnionBy

    -- ** Join operations
    , crossJoin
    , joinInner
    , joinInnerMap
    , joinInnerMerge
    -- , joinLeft
    , mergeLeftJoin
    , joinLeftMap
    -- , joinOuter
    , mergeOuterJoin
    , joinOuterMap
    )
where

#include "inline.hs"

import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
-- import Control.Monad.Trans.Class (lift)
-- import Control.Monad.Trans.State.Strict (get, put)
-- import Data.Function ((&))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
-- import Data.Maybe (isJust)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)

import qualified Data.List as List
import qualified Data.Map.Strict as Map
-- import qualified Streamly.Internal.Data.Array.Generic as Array
--     (fromStream, length, read)
-- import qualified Streamly.Data.Array.Mut as MA
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
    (groupByRollingEither)
-- import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream as StreamD

import Prelude hiding (Foldable(..), filter, zipWith, concatMap, concat)

-- $setup
-- >>> :m
-- >>> :set -fno-warn-deprecations
-- >>> import Prelude hiding (filter, zipWith, concatMap, concat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream

------------------------------------------------------------------------------
-- Sampling
------------------------------------------------------------------------------

-- XXX We can implement this using addition instead of "mod" to make it more
-- efficient.
--
-- | @sampleFromthen offset stride@ samples the element at @offset@ index and
-- then every element at strides of @stride@.
--
-- >>> Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
-- [2,5,8]
--
-- /Pre-release/
--
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
    Int -> Int -> t m a -> t m a
sampleFromThen :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
    (t m a -> t m (Int, a))
-> (((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a))
-> ((Int, a) -> Bool)
-> t m a
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with t m a -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
        (\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)

-- | Continuously evaluate the input stream and sample the last event in time
-- window of @n@ seconds.
--
-- This is also known as @throttle@ in some libraries.
--
-- @
-- sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last
-- @
--
-- /Pre-release/
--
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleIntervalEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last

-- | Like 'sampleInterval' but samples at the beginning of the time window.
--
-- @
-- sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one
-- @
--
-- /Pre-release/
--
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleIntervalStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.one

data BurstState t x =
      BurstNone
    | BurstWait !t !x
    | BurstDone !x
    | BurstDoneNext !x !t !x

{-# INLINE sampleBurst #-}
sampleBurst :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Bool -> Double -> t m a -> t m a
sampleBurst :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
sampleAtEnd Double
gap t m a
xs =
    -- XXX Ideally we should schedule a timer event exactly after gap time,
    -- but the tick stream should work well as long as the timer
    -- granularity is small enough compared to the gap.
    (BurstState RelTime64 a -> Maybe a)
-> t m (BurstState RelTime64 a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe BurstState RelTime64 a -> Maybe a
forall {t} {a}. BurstState t a -> Maybe a
extract
        (t m (BurstState RelTime64 a) -> t m a)
-> t m (BurstState RelTime64 a) -> t m a
forall a b. (a -> b) -> a -> b
$ (BurstState RelTime64 a
 -> (RelTime64, Maybe a) -> BurstState RelTime64 a)
-> BurstState RelTime64 a
-> t m (RelTime64, Maybe a)
-> t m (BurstState RelTime64 a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
Stream.scanl' BurstState RelTime64 a
-> (RelTime64, Maybe a) -> BurstState RelTime64 a
forall {x}.
BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 a
forall t x. BurstState t x
BurstNone
        (t m (RelTime64, Maybe a) -> t m (BurstState RelTime64 a))
-> t m (RelTime64, Maybe a) -> t m (BurstState RelTime64 a)
forall a b. (a -> b) -> a -> b
$ t m (Maybe a) -> t m (RelTime64, Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
        (t m (Maybe a) -> t m (RelTime64, Maybe a))
-> t m (Maybe a) -> t m (RelTime64, Maybe a)
forall a b. (a -> b) -> a -> b
$ Double -> m (Maybe a) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
Stream.interjectSuffix Double
0.01 (Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) ((a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map a -> Maybe a
forall a. a -> Maybe a
Just t m a
xs)

    where

    gap1 :: RelTime64
gap1 = NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))

    {-# INLINE step #-}
    step :: BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 x
BurstNone (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
    step BurstState RelTime64 x
BurstNone (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone

    step (BurstDone x
_) (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
    step (BurstDone x
_) (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone

    step old :: BurstState RelTime64 x
old@(BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
        | Bool
otherwise = BurstState RelTime64 x
old
    -- This can happen due to scheduling delays, if we received back to
    -- back events spaced by more than the timeout without an
    -- intervening timeout event then we emit the old event instead of
    -- replacing it by the new.
    step (BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
        | Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
        | Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0

    step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
        | Bool
otherwise =  RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t0 x
x0
    step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
        | Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
        | Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0

    {-# INLINE extract #-}
    extract :: BurstState t a -> Maybe a
extract (BurstDoneNext a
x t
_ a
_) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
    extract (BurstDone a
x) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
    extract BurstState t a
_ = Maybe a
forall a. Maybe a
Nothing

-- | Sample one event at the end of each burst of events.  A burst is a group
-- of events close together in time, it ends when an event is spaced by more
-- than the specified time interval (in seconds) from the previous event.
--
-- This is known as @debounce@ in some libraries.
--
-- The clock granularity is 10 ms.
--
-- /Pre-release/
--
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleBurstEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd = Bool -> Double -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
True

-- | Like 'sampleBurstEnd' but samples the event at the beginning of the burst
-- instead of at the end of it.
--
-- /Pre-release/
--
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleBurstStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart = Bool -> Double -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
False

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------
--
-- We could possibly choose different algorithms depending on whether the
-- input stream is almost sorted (ascending/descending) or random. We could
-- serialize the stream to an array and use quicksort.
--
-- | Sort the input stream using a supplied comparison function.
--
-- /O(n) space/
--
-- Note: this is not the fastest possible implementation as of now.
--
-- /Pre-release/
--
{-# INLINE sortBy #-}
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
-- sortBy f = Stream.concatPairsWith (Stream.mergeBy f) Stream.fromPure
sortBy :: forall (m :: * -> *) a.
MonadCatch m =>
(a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy a -> a -> Ordering
cmp =
    let p :: Parser a m (Either (SerialT m a) (SerialT m a))
p =
            (a -> a -> Bool)
-> Fold m a (SerialT m a)
-> Fold m a (SerialT m a)
-> Parser a m (Either (SerialT m a) (SerialT m a))
forall (m :: * -> *) a b c.
Monad m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser a m (Either b c)
Parser.groupByRollingEither
                (\a
x -> (Ordering -> Ordering -> Bool
forall a. Ord a => a -> a -> Bool
< Ordering
GT) (Ordering -> Bool) -> (a -> Ordering) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
                ((StreamK m a -> SerialT m a)
-> Fold m a (StreamK m a) -> Fold m a (SerialT m a)
forall a b. (a -> b) -> Fold m a a -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamK m a -> SerialT m a
forall (m :: * -> *) a. StreamK m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream Fold m a (StreamK m a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
Fold.toStreamKRev)
                ((StreamK m a -> SerialT m a)
-> Fold m a (StreamK m a) -> Fold m a (SerialT m a)
forall a b. (a -> b) -> Fold m a a -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamK m a -> SerialT m a
forall (m :: * -> *) a. StreamK m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream Fold m a (StreamK m a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
Fold.toStreamK)
     in   (SerialT m a -> SerialT m a -> SerialT m a)
-> (SerialT m a -> SerialT m a)
-> SerialT m (SerialT m a)
-> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith ((a -> a -> Ordering) -> SerialT m a -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
cmp) SerialT m a -> SerialT m a
forall a. a -> a
id
        (SerialT m (SerialT m a) -> SerialT m a)
-> (SerialT m a -> SerialT m (SerialT m a))
-> SerialT m a
-> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m (Either ParseError (SerialT m a))
-> SerialT m (SerialT m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m b
Stream.rights (SerialT m (Either ParseError (SerialT m a))
 -> SerialT m (SerialT m a))
-> (SerialT m a -> SerialT m (Either ParseError (SerialT m a)))
-> SerialT m a
-> SerialT m (SerialT m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Parser a m (SerialT m a)
-> SerialT m a -> SerialT m (Either ParseError (SerialT m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
Stream.parseMany ((Either (SerialT m a) (SerialT m a) -> SerialT m a)
-> Parser a m (Either (SerialT m a) (SerialT m a))
-> Parser a m (SerialT m a)
forall a b. (a -> b) -> Parser a m a -> Parser a m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((SerialT m a -> SerialT m a)
-> (SerialT m a -> SerialT m a)
-> Either (SerialT m a) (SerialT m a)
-> SerialT m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SerialT m a -> SerialT m a
forall a. a -> a
id SerialT m a -> SerialT m a
forall a. a -> a
id) Parser a m (Either (SerialT m a) (SerialT m a))
forall {m :: * -> *} {m :: * -> *}.
Parser a m (Either (SerialT m a) (SerialT m a))
p)

------------------------------------------------------------------------------
-- SQL Joins
------------------------------------------------------------------------------
--
-- Some references:
-- * https://en.wikipedia.org/wiki/Relational_algebra
-- * https://en.wikipedia.org/wiki/Join_(SQL)

-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only
-- constraint, the best would be to use an Array with linear search. If the
-- second stream is sorted we can also use a binary search, using Ord
-- constraint or an ordering function.
--
-- For Storables we can cache the second stream into an unboxed array for
-- possibly faster access/compact representation?
--
-- If we do not want to keep the stream in memory but always read it from the
-- source (disk/network) every time we iterate through it then we can do that
-- too by reading the stream every time, the stream must have immutable state
-- in that case and the user is responsible for the behavior if the stream
-- source changes during iterations. We can also use an Unfold instead of
-- stream. We probably need a way to distinguish streams that can be read
-- mutliple times without any interference (e.g. unfolding a stream using an
-- immutable handle would work i.e. using pread/pwrite instead of maintianing
-- an offset in the handle).

-- XXX We can do this concurrently.
--
-- | This is the same as 'Streamly.Internal.Data.Unfold.outerProduct' but less
-- efficient.
--
-- The second stream is evaluated multiple times. If the second stream is
-- consume-once stream then it can be cached in an 'Data.Array.Array' before
-- calling this function. Caching may also improve performance if the stream is
-- expensive to evaluate.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: forall (t :: * -> * -> *) m a b.
Monad (t m) =>
t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
    -- XXX use concatMap instead?
    a
a <- t m a
s1
    b
b <- t m b
s2
    (a, b) -> t m (a, b)
forall a. a -> t m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array we could use
-- binary search if we have an Ord instance or Ordering returning function. The
-- time complexity would then become (m x log n).
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal by the given equality pedicate then return the tuple (a, b).
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it (e.g. in a
-- 'Data.Array.Array') before calling this function. Caching may also improve
-- performance if the stream is expensive to evaluate.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- You should almost always use joinInnerMap instead of joinInner. joinInnerMap
-- is an order of magnitude faster. joinInner may be used when the second
-- stream is generated from a seed, therefore, need not be stored in memory and
-- the amount of memory it takes is a concern.
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE joinInner #-}
joinInner ::
    forall (t :: (Type -> Type) -> Type -> Type) m a b.
    (IsStream t, Monad m) =>
        (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner a -> b -> Bool
eq t m a
s1 t m b
s2 = do
    -- ConcatMap works faster than bind
    (a -> t m (a, b)) -> t m a -> t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\a
a ->
        (b -> t m (a, b)) -> t m b -> t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\b
b ->
            if a
a a -> b -> Bool
`eq` b
b
            then (a, b) -> t m (a, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure (a
a, b
b)
            else t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
            ) t m b
s2
        ) t m a
s1

-- XXX Generate error if a duplicate insertion is attempted?
toMap ::  (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap = (Map k v -> (k, v) -> Map k v)
-> Map k v -> SerialT m (k, v) -> m (Map k v)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k v
kv (k
k, v
b) -> k -> v -> Map k v -> Map k v
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k v
b Map k v
kv) Map k v
forall k a. Map k a
Map.empty

-- If the second stream is too big it can be partitioned based on hashes and
-- then we can process one parition at a time.
--
-- XXX An IntMap may be faster when the keys are Int.
-- XXX Use hashmap instead of map?
--
-- | Like 'joinInner' but uses a 'Map' for efficiency.
--
-- If the input streams have duplicate keys, the behavior is undefined.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinInnerMap #-}
joinInnerMap :: (IsStream t, Monad m, Ord k) =>
    t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap t m (k, a)
s1 t m (k, b)
s2 =
    m (t m (k, a, b)) -> t m (k, a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, a, b)) -> t m (k, a, b))
-> m (t m (k, a, b)) -> t m (k, a, b)
forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- SerialT m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
        t m (k, a, b) -> m (t m (k, a, b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t m (k, a, b) -> m (t m (k, a, b)))
-> t m (k, a, b) -> m (t m (k, a, b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> Maybe (k, a, b)) -> t m (k, a) -> t m (k, a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (Map k b -> (k, a) -> Maybe (k, a, b)
forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) t m (k, a)
s1

    where

    joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
        case a
k a -> Map a c -> Maybe c
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
            Just c
b -> (a, b, c) -> Maybe (a, b, c)
forall a. a -> Maybe a
Just (a
k, b
a, c
b)
            Maybe c
Nothing -> Maybe (a, b, c)
forall a. Maybe a
Nothing

-- | Like 'joinInner' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge = (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined

{-
-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array or a seek capable
-- stream then we could use binary search if we have an Ord instance or
-- Ordering returning function. The time complexity would then become (m x log
-- n).
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal then return the tuple @(a, Just b)@.  If @a@ is not present in @t
-- m b@ then return @(a, Nothing)@.
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- @
-- rightJoin = flip joinLeft
-- @
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE joinLeft #-}
joinLeft :: Monad m =>
    (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft eq s1 s2 = Stream.evalStateT (return False) $ do
    a <- Stream.liftInner s1
    -- XXX should we use StreamD monad here?
    -- XXX Is there a better way to perform some action at the end of a loop
    -- iteration?
    lift $ put False
    let final = do
            r <- lift get
            if r
            then Stream.nil
            else Stream.fromPure Nothing
    b <- fmap Just (Stream.liftInner s2) <> final
    case b of
        Just b1 ->
            if a `eq` b1
            then do
                lift $ put True
                return (a, Just b1)
            else Stream.nil
        Nothing -> return (a, Nothing)
-}

-- | Like 'joinLeft' but uses a hashmap for efficiency.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinLeftMap #-}
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
    t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap t m (k, a)
s1 t m (k, b)
s2 =
    m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b))
-> m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- SerialT m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
        t m (k, a, Maybe b) -> m (t m (k, a, Maybe b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (k, a, Maybe b) -> m (t m (k, a, Maybe b)))
-> t m (k, a, Maybe b) -> m (t m (k, a, Maybe b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> (k, a, Maybe b)) -> t m (k, a) -> t m (k, a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (Map k b -> (k, a) -> (k, a, Maybe b)
forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) t m (k, a)
s1

            where

            joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
                case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                    Just a
b -> (a
k, b
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
                    Maybe a
Nothing -> (a
k, b
a, Maybe a
forall a. Maybe a
Nothing)

-- | Like 'joinLeft' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin :: -- Monad m =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (a, Maybe b)
forall a. HasCallStack => a
undefined

{-
-- XXX We can do this concurrently.
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal by the given equality pedicate then return the tuple (Just a, Just
-- b).  If @a@ is not found in @t m b@ then return (a, Nothing), return
-- (Nothing, b) for vice-versa.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE joinOuter #-}
joinOuter :: MonadIO m =>
       (a -> b -> Bool)
    -> SerialT m a
    -> SerialT m b
    -> SerialT m (Maybe a, Maybe b)
joinOuter eq s1 s =
    Stream.concatM $ do
        inputArr <- Array.fromStream s
        let len = Array.length inputArr
        foundArr <-
            Stream.fold
            (MA.writeN len)
            (Stream.fromList (Prelude.replicate len False))
        return $ go inputArr foundArr <> leftOver inputArr foundArr

    where

    leftOver inputArr foundArr =
            let stream1 = IsStream.fromSerial $ Array.read inputArr
                stream2 = Stream.unfold MA.reader foundArr
            in Stream.filter
                    isJust
                    ( Stream.zipWith (\x y ->
                        if y
                        then Nothing
                        else Just (Nothing, Just x)
                        ) stream1 stream2
                    ) & Stream.catMaybes

    go inputArr foundArr = Stream.evalStateT (return False) $ do
        a <- Stream.liftInner s1
        -- XXX should we use StreamD monad here?
        -- XXX Is there a better way to perform some action at the end of a loop
        -- iteration?
        lift $ put False
        let final = do
                r <- lift get
                if r
                then Stream.nil
                else Stream.fromPure Nothing
        (i, b) <-
            let stream = IsStream.fromSerial $ Array.read inputArr
             in Stream.indexed $ fmap Just (Stream.liftInner stream) <> final

        case b of
            Just b1 ->
                if a `eq` b1
                then do
                    lift $ put True
                    MA.putIndex i True foundArr
                    return (Just a, Just b1)
                else Stream.nil
            Nothing -> return (Just a, Nothing)
-}

-- Put the b's that have been paired, in another hash or mutate the hash to set
-- a flag. At the end go through @t m b@ and find those that are not in that
-- hash to return (Nothing, b).
--
-- | Like 'joinOuter' but uses a 'Map' for efficiency.
--
-- Space: O(m + n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinOuterMap #-}
joinOuterMap ::
    (IsStream t, Ord k, MonadIO m) =>
    t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap t m (k, a)
s1 t m (k, b)
s2 =
    m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b))
-> m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        Map k a
km1 <- SerialT m (k, a) -> m (Map k a)
forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold (SerialT m (k, a) -> m (Map k a))
-> SerialT m (k, a) -> m (Map k a)
forall a b. (a -> b) -> a -> b
$ t m (k, a) -> SerialT m (k, a)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, a)
s1
        Map k b
km2 <- SerialT m (k, b) -> m (Map k b)
forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2

        -- XXX Not sure if toList/fromList would fuse optimally. We may have to
        -- create a fused Map.toStream function.
        let res1 :: t m (k, Maybe a, Maybe b)
res1 = ((k, a) -> (k, Maybe a, Maybe b))
-> t m (k, a) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (Map k b -> (k, a) -> (k, Maybe a, Maybe b)
forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2) (t m (k, a) -> t m (k, Maybe a, Maybe b))
-> t m (k, a) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, a)] -> t m (k, a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList ([(k, a)] -> t m (k, a)) -> [(k, a)] -> t m (k, a)
forall a b. (a -> b) -> a -> b
$ Map k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
                    where
                    joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
                        case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
b -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
                            Maybe a
Nothing -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing)

        -- XXX We can take advantage of the lookups in the first pass above to
        -- reduce the number of lookups in this pass. If we keep mutable cells
        -- in the second Map, we can flag it in the first pass and not do any
        -- lookup in the second pass if it is flagged.
        let res2 :: t m (k, Maybe a, Maybe b)
res2 = ((k, b) -> Maybe (k, Maybe a, Maybe b))
-> t m (k, b) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (Map k a -> (k, b) -> Maybe (k, Maybe a, Maybe b)
forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1) (t m (k, b) -> t m (k, Maybe a, Maybe b))
-> t m (k, b) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, b)] -> t m (k, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList ([(k, b)] -> t m (k, b)) -> [(k, b)] -> t m (k, b)
forall a b. (a -> b) -> a -> b
$ Map k b -> [(k, b)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
                    where
                    joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
                        case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
_ -> Maybe (a, Maybe a, Maybe a)
forall a. Maybe a
Nothing
                            Maybe a
Nothing -> (a, Maybe a, Maybe a) -> Maybe (a, Maybe a, Maybe a)
forall a. a -> Maybe a
Just (a
k, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)

        t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b)))
-> t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ t m (k, Maybe a, Maybe b)
-> t m (k, Maybe a, Maybe b) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
Stream.serial t m (k, Maybe a, Maybe b)
res1 t m (k, Maybe a, Maybe b)
forall {a}. t m (k, Maybe a, Maybe b)
res2

        where

        -- XXX Generate error if a duplicate insertion is attempted?
        kvFold :: SerialT m (k, a) -> m (Map k a)
kvFold = (Map k a -> (k, a) -> Map k a)
-> Map k a -> SerialT m (k, a) -> m (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty

-- | Like 'joinOuter' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin :: -- Monad m =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Set operations (special joins)
------------------------------------------------------------------------------
--
-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only constraint
-- the best would be to use an Array with linear search. If the second stream
-- is sorted we can also use a binary search, using Ord constraint.

-- | 'intersectBy' is essentially a filtering operation that retains only those
-- elements in the first stream that are present in the second stream.
--
-- >>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [1,2,2]
--
-- >>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
-- [2,1,1]
--
-- 'intersectBy' is similar to but not the same as 'joinInner':
--
-- >>> Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [1,1,2,2]
--
-- Space: O(n) where @n@ is the number of elements in the second stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
            -- This may work well when s2 is small
            [a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
            t m a -> m (t m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> (a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1

-- | Like 'intersectBy' but works only on streams sorted in ascending order.
--
-- Space: O(1)
--
-- Time: O(m+n)
--
-- /Pre-release/
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted a -> a -> Ordering
eq t m a
s1 =
      Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
    (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
StreamD.intersectBySorted a -> a -> Ordering
eq (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
s1)
    (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD

-- Roughly joinLeft s1 s2 = s1 `difference` s2 + s1 `intersection` s2

-- | Delete first occurrences of those elements from the first stream that are
-- present in the second stream. If an element occurs multiple times in the
-- second stream as many occurrences of it are deleted from the first stream.
--
-- >>> Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
-- [2]
--
-- The following laws hold:
--
-- @
-- (s1 `serial` s2) `differenceBy eq` s1 === s2
-- (s1 `wSerial` s2) `differenceBy eq` s1 === s2
-- @
--
-- Same as the list 'Data.List.//' operation.
--
-- Space: O(m) where @m@ is the number of elements in the first stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
            -- This may work well if s1 is small
            -- If s1 is big we can go through s1, deleting elements from s2 and
            -- not emitting an element if it was successfully deleted from s2.
            -- we will need a deleteBy that can return whether the element was
            -- deleted or not.
            [a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s1
            ([a] -> t m a) -> m [a] -> m (t m a)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList (m [a] -> m (t m a)) -> m [a] -> m (t m a)
forall a b. (a -> b) -> a -> b
$ ([a] -> a -> [a]) -> [a] -> t m a -> m [a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2

-- | Like 'differenceBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined

-- | This is essentially an append operation that appends all the extra
-- occurrences of elements from the second stream that are not already present
-- in the first stream.
--
-- >>> Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
-- [1,2,2,4,3]
--
-- Equivalent to the following except that @s1@ is evaluated only once:
--
-- @
-- unionBy eq s1 s2 = s1 \`serial` (s2 `differenceBy eq` s1)
-- @
--
-- Similar to 'joinOuter' but not the same.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
            [a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
            -- XXX we can use postscanlMAfter' instead of IORef
            IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
            let f :: a -> m a
f a
x = do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
                    a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
                s3 :: t m a
s3 = m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
                        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
                            [a]
xs1 <- IO [a] -> m [a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
                            t m a -> m (t m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
            t m a -> m (t m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM a -> m a
forall {m :: * -> *}. MonadIO m => a -> m a
f t m a
s1 t m a -> t m a -> t m a
forall a. Semigroup a => a -> a -> a
<> t m a
s3

-- | Like 'unionBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeUnionBy #-}
mergeUnionBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined