{-# OPTIONS_GHC -Wno-deprecations #-}
module Streamly.Internal.Data.Stream.IsStream.Top {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
(
sampleFromThen
, sampleIntervalStart
, sampleIntervalEnd
, sampleBurstStart
, sampleBurstEnd
, sortBy
, intersectBy
, intersectBySorted
, differenceBy
, mergeDifferenceBy
, unionBy
, mergeUnionBy
, crossJoin
, joinInner
, joinInnerMap
, joinInnerMerge
, mergeLeftJoin
, joinLeftMap
, mergeOuterJoin
, joinOuterMap
)
where
#include "inline.hs"
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Fold as Fold (one, last)
import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.Parser as Parser
(groupByRollingEither)
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as StreamD
import Prelude hiding (filter, zipWith, concatMap, concat)
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
(\(Int
i, a
_) -> Int
i forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i forall a. Num a => a -> a -> a
- Int
offset) forall a. Integral a => a -> a -> a
`mod` Int
stride forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.one
data BurstState t x =
BurstNone
| BurstWait !t !x
| BurstDone !x
| BurstDoneNext !x !t !x
{-# INLINE sampleBurst #-}
sampleBurst :: (IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
sampleAtEnd Double
gap t m a
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe forall {t} {a}. BurstState t a -> Maybe a
extract
forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
Stream.scanl' forall {x}.
BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step forall t x. BurstState t x
BurstNone
forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
Stream.interjectSuffix Double
0.01 (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map forall a. a -> Maybe a
Just t m a
xs)
where
gap1 :: RelTime64
gap1 = forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap forall a. Num a => a -> a -> a
* Double
10forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
{-# INLINE step #-}
step :: BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 x
BurstNone (RelTime64
t1, Just x
x1) = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
step BurstState RelTime64 x
BurstNone (RelTime64, Maybe x)
_ = forall t x. BurstState t x
BurstNone
step (BurstDone x
_) (RelTime64
t1, Just x
x1) = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
step (BurstDone x
_) (RelTime64, Maybe x)
_ = forall t x. BurstState t x
BurstNone
step old :: BurstState RelTime64 x
old@(BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
| RelTime64
t1 forall a. Num a => a -> a -> a
- RelTime64
t0 forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = forall t x. x -> BurstState t x
BurstDone x
x0
| Bool
otherwise = BurstState RelTime64 x
old
step (BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
| RelTime64
t1 forall a. Num a => a -> a -> a
- RelTime64
t0 forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
| Bool
sampleAtEnd = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
| Bool
otherwise = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0
step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
| RelTime64
t1 forall a. Num a => a -> a -> a
- RelTime64
t0 forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = forall t x. x -> BurstState t x
BurstDone x
x0
| Bool
otherwise = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t0 x
x0
step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
| RelTime64
t1 forall a. Num a => a -> a -> a
- RelTime64
t0 forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
| Bool
sampleAtEnd = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
| Bool
otherwise = forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0
{-# INLINE extract #-}
extract :: BurstState t a -> Maybe a
extract (BurstDoneNext a
x t
_ a
_) = forall a. a -> Maybe a
Just a
x
extract (BurstDone a
x) = forall a. a -> Maybe a
Just a
x
extract BurstState t a
_ = forall a. Maybe a
Nothing
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
True
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
False
{-# INLINE sortBy #-}
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy :: forall (m :: * -> *) a.
MonadCatch m =>
(a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy a -> a -> Ordering
cmp =
let p :: Parser a m (Either (SerialT m a) (SerialT m a))
p =
forall (m :: * -> *) a b c.
Monad m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser a m (Either b c)
Parser.groupByRollingEither
(\a
x -> (forall a. Ord a => a -> a -> Bool
< Ordering
GT) forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
(forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
Fold.toStreamKRev)
(forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
Fold.toStreamK)
in forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith (forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
cmp) forall a. a -> a
id
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m b
Stream.rights forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
Stream.parseMany (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id) forall {m :: * -> *} {m :: * -> *}.
Parser a m (Either (SerialT m a) (SerialT m a))
p)
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: forall (t :: * -> * -> *) m a b.
Monad (t m) =>
t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
a
a <- t m a
s1
b
b <- t m b
s2
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
{-# INLINE joinInner #-}
joinInner ::
forall (t :: (Type -> Type) -> Type -> Type) m a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner a -> b -> Bool
eq t m a
s1 t m b
s2 = do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\a
a ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\b
b ->
if a
a a -> b -> Bool
`eq` b
b
then forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure (a
a, b
b)
else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
) t m b
s2
) t m a
s1
toMap :: (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k v
kv (k
k, v
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k v
b Map k v
kv) forall k a. Map k a
Map.empty
{-# INLINE joinInnerMap #-}
joinInnerMap :: (IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap t m (k, a)
s1 t m (k, b)
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
Just c
b -> forall a. a -> Maybe a
Just (a
k, b
a, c
b)
Maybe c
Nothing -> forall a. Maybe a
Nothing
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge = forall a. HasCallStack => a
undefined
{-# INLINE joinLeftMap #-}
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap t m (k, a)
s1 t m (k, b)
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, b
a, forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, b
a, forall a. Maybe a
Nothing)
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE joinOuterMap #-}
joinOuterMap ::
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap t m (k, a)
s1 t m (k, b)
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Map k a
km1 <- forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, a)
s1
Map k b
km2 <- forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
let res1 :: t m (k, Maybe a, Maybe b)
res1 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
where
joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)
let res2 :: t m (k, Maybe a, Maybe b)
res2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
where
joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
_ -> forall a. Maybe a
Nothing
Maybe a
Nothing -> forall a. a -> Maybe a
Just (a
k, forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
b)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
Stream.serial t m (k, Maybe a, Maybe b)
res1 forall {a}. t m (k, Maybe a, Maybe b)
res2
where
kvFold :: SerialT m (k, a) -> m (Map k a)
kvFold = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted a -> a -> Ordering
eq t m a
s1 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
StreamD.intersectBySorted a -> a -> Ordering
eq (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
s1)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s1
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
IORef [a]
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$! forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: t m a
s3 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef [a]
ref
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM forall {m :: * -> *}. MonadIO m => a -> m a
f t m a
s1 forall a. Semigroup a => a -> a -> a
<> t m a
s3
{-# INLINE mergeUnionBy #-}
mergeUnionBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = forall a. HasCallStack => a
undefined