{-# OPTIONS_GHC -Wno-deprecations #-}
module Streamly.Internal.Data.Stream.IsStream.Combinators {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
( maxThreads
, maxBuffer
, maxYields
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspectMode
, printState
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream, mkStream, foldStreamShared)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.StreamK (Stream)
import Streamly.Internal.Data.SVar
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Int -> t m a -> t m a
maxThreads Int
n t m a
m = (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State StreamK m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Int -> t m a -> t m a
maxBuffer Int
n t m a
m = (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State StreamK m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate Maybe Rate
r t m a
m = (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
case Maybe Rate
r of
Just (Rate Double
low Double
goal Double
_ Int
_) | Double
goal Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
low ->
[Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be lower than minimum rate."
Just (Rate Double
_ Double
goal Double
high Int
_) | Double
goal Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
high ->
[Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be greater than maximum rate."
Just (Rate Double
low Double
_ Double
high Int
_) | Double
low Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
high ->
[Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Minimum rate cannot be greater than maximum rate."
Maybe Rate
_ -> State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Maybe Rate -> State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State StreamK m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
avgRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
minRate :: IsStream t => Double -> t m a -> t m a
minRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
minRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
maxRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r Int
forall a. Bounded a => a
maxBound)
constRate :: IsStream t => Double -> t m a -> t m a
constRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
constRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
_serialLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Int -> t m a -> t m a
_serialLatency Int
n t m a
m = (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State StreamK m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Int64 -> t m a -> t m a
maxYields Maybe Int64
n t m a
m = (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Maybe Int64 -> State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
n State StreamK m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial :: forall (m :: * -> *) a. Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial Maybe Int64
_ = SerialT m a -> SerialT m a
forall a. a -> a
id
printState :: MonadIO m => State Stream m a -> m ()
printState :: forall (m :: * -> *) a. MonadIO m => State StreamK m a -> m ()
printState State StreamK m a
st = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let msv :: Maybe (SVar StreamK m a)
msv = State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st
case Maybe (SVar StreamK m a)
msv of
Just SVar StreamK m a
sv -> SVar StreamK m a -> IO [Char]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO [Char]
dumpSVar SVar StreamK m a
sv IO [Char] -> ([Char] -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Char] -> IO ()
putStrLn
Maybe (SVar StreamK m a)
Nothing -> [Char] -> IO ()
putStrLn [Char]
"No SVar"
inspectMode :: IsStream t => t m a -> t m a
inspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
inspectMode t m a
m = (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State StreamK m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m