module Feldspar.Stream
(Stream
,head
,tail
,map
,intersperse
,interleave
,scan
,mapAccum
,iterate
,repeat
,unfold
,drop
,dropWhile
,filter
,partition
,zip
,zipWith
,unzip
,take
,splitAt
,cycle
,recurrence
,recurrenceI
,iir
,fir
)
where
import Feldspar.Core
import qualified Prelude
import Feldspar.Range
import Feldspar.Prelude hiding (filter,repeat,iterate,cycle)
import Control.Arrow
import Feldspar.Vector (Vector, DVector
,vector
,freezeVector,indexed
,sum,length,replicate)
data Stream a = forall state . (Computable a, Computable state) =>
Stream (StepFunction state a) state
data StepFunction state a
= Continuous (state -> (a,state))
| Stuttering (state -> (a,Data Bool, state))
step :: (Computable state, Computable a) =>
StepFunction state a -> (state -> (a,state))
step (Continuous next) init = next init
step (Stuttering next) init = (a,st)
where (a,_,st) = while (not . snd3) (next . thd3) (next init)
stuttering :: StepFunction state a -> (state -> (a, Data Bool, state))
stuttering (Stuttering next) = next
stuttering (Continuous next) = \state -> let (a,st) = next state
in (a,true,st)
mapStep :: ((stateA -> (a,Data Bool, stateA)) ->
(stateB -> (b,Data Bool, stateB)))
-> StepFunction stateA a -> StepFunction stateB b
mapStep mkStep (Stuttering next) = Stuttering (mkStep next)
mapStep mkStep (Continuous next) = Continuous newStep
where newStep a = let (b,_,st) = mkStep (\a -> let (b,st) = next a
in (b,true,st)) a
in (b,st)
fst3 (a,_,_) = a
snd3 (_,b,_) = b
thd3 (_,_,c) = c
first3 f (a,b,c) = (f a,b,c)
second3 f (a,b,c) = (a,f b,c)
third3 f (a,b,c) = (a,b,f c)
head :: Computable a => Stream a -> a
head (Stream next init) = fst $ step next init
tail :: Computable a => Stream a -> Stream a
tail (Stream next init) = Stream next (snd $ step next init)
map :: (Computable a, Computable b) =>
(a -> b) -> Stream a -> Stream b
map f (Stream next init) = Stream (mapStep (first3 f .) next) init
intersperse :: a -> Stream a -> Stream a
intersperse a (Stream next init) =
Stream (mapStep newNext next) (true,init)
where newNext next (b,st) = b ? (let (e,isValid,st') = next st
in isValid ? ( (e,true,(false,st'))
, (e,false,(true,st'))
)
,(a,true,(true,st))
)
interleave :: Stream a -> Stream a -> Stream a
interleave (Stream (Continuous next1) init1) (Stream (Continuous next2) init2)
= Stream (Continuous next) (true,init1,init2)
where next (b,st1,st2) = b ? (let (a,st1') = next1 st1
in (a,(false,st1',st2))
,let (a,st2') = next2 st2
in (a,(true,st1,st2'))
)
interleave (Stream next1 init1) (Stream next2 init2)
= Stream (Stuttering next) (true,init1,init2)
where next (b,st1,st2) = b ? (let (a,isValid,st1') = stuttering next1 st1
in isValid ? ( (a,true,(false,st1',st2))
, (a,false,(true,st1',st2))
)
,let (a,isValid,st2') = stuttering next2 st2
in isValid ? ( (a,true,(true,st1,st2'))
, (a,false,(false,st1,st2'))
)
)
scan :: Computable a => (a -> b -> a) -> a -> Stream b -> Stream a
scan f a (Stream next init)
= Stream (mapStep newNext next) (a,init)
where newNext next (acc,st) = let (a,isValid,st') = next st
in isValid ? ( (acc,true, (f acc a,st') )
, (acc,false, (acc,st') )
)
scan1 :: Computable a => (a -> a -> a) -> Stream a -> Stream a
scan1 f (Stream next init)
= Stream (mapStep newNext next) (a,true,newInit)
where (a,newInit) = step next init
newNext next (a,isFirst,st)
= isFirst ? ( (a, true, (a,false,st))
, let (b,isValid,st') = next st
in isValid ? ( let elem = f a b
in (elem, true, (elem,false,st'))
, (a,false, (a,false,st'))
)
)
mapAccum :: (Computable acc, Computable b) =>
(acc -> a -> (acc,b)) -> acc -> Stream a -> Stream b
mapAccum f acc (Stream next init)
= Stream (Continuous newNext) (init,acc)
where newNext (st,acc)
= let (a,st') = step next st
(acc',b) = f acc a
in (b, (st',acc'))
iterate :: Computable a => (a -> a) -> a -> Stream a
iterate f init = Stream (Continuous next) init
where next a = (a, f a)
repeat :: Computable a => a -> Stream a
repeat a = Stream (Continuous next) unit
where next _ = (a,unit)
unfold :: (Computable a, Computable c) => (c -> (a,c)) -> c -> Stream a
unfold next init = Stream (Continuous next) init
drop :: Data Unsigned32 -> Stream a -> Stream a
drop i (Stream next init) = Stream next newState
where (newState,_) = while cond body (init,i)
cond (st,i) = i > 0
body (st,i) = let (_,b,st') = stuttering next st
in b ? ( (st',i1)
, (st',i))
dropWhile p (Stream next init) = Stream next newState
where (_,newState) = while cond body (step next init)
cond (a,st) = p a
body (_,st) = step next st
filter :: (a -> Data Bool) -> Stream a -> Stream a
filter p (Stream next init) = Stream (Stuttering newNext) init
where newNext st = let (a,isValid,st') = stuttering next st
in isValid && p a ? ( (a,true, st')
, (a,false,st')
)
partition :: (a -> Data Bool) -> Stream a -> (Stream a, Stream a)
partition p stream = (filter p stream, filter (not . p) stream)
zip :: Stream a -> Stream b -> Stream (a,b)
zip (Stream next1 init1) (Stream next2 init2)
= Stream (Continuous next) (init1,init2)
where next (st1,st2) = ( (a,b), (st1',st2') )
where (a,st1') = step next1 st1
(b,st2') = step next2 st2
zipWith :: Computable c => (a -> b -> c) -> Stream a -> Stream b -> Stream c
zipWith f (Stream next1 init1) (Stream next2 init2)
= Stream (Continuous next) (init1,init2)
where next (st1,st2) = ( f a b, (st1',st2'))
where (a,st1') = step next1 st1
(b,st2') = step next2 st2
unzip :: (Computable a, Computable b) => Stream (a,b) -> (Stream a, Stream b)
unzip stream = (map fst stream, map snd stream)
instance RandomAccess (Stream a) where
type Element (Stream a) = a
(Stream next init) ! n = fst3 $ while ((/= 0) . thd3) body (a,st,n)
where body (a,st,i) = let (a,isValid,st') = stuttering next st
in isValid ? ( (a,st',i1)
, (a,st',i)
)
(a,st) = step next init
take :: Storable a => Data Int -> Stream (Data a) -> Data [a]
take n (Stream next init)
= snd3 $ while cond body
(0,array (mapMonotonic fromIntegral (dataSize n) :> universal) [],init)
where cond (i,_ ,_ ) = i < n
body (i,arr,st) = let (a,isValid,st') = stuttering next st
in isValid ? ( (i+1,setIx arr i a,st')
, (i, arr, st')
)
splitAt :: Storable a =>
Data Int -> Stream (Data a) -> (Data [a], Stream (Data a))
splitAt n (Stream next init) = (arr,Stream next st)
where
(_,arr,st) =
while cond body
(0,array (mapMonotonic fromIntegral (dataSize n) :> universal) [],init)
cond (i,_ ,_ ) = i < n
body (i,arr,st) = let (a,isValid,st') = stuttering next st
in isValid ? ( (i+1,setIx arr i a,st')
, (i, arr, st')
)
cycle :: Computable a => Vector a -> Stream a
cycle vec = Stream (Continuous next) 0
where next i = (vec ! i, (i + 1) `rem` length vec)
recurrence :: Storable a =>
DVector a -> ((Int -> Data a) -> Data a) -> Stream (Data a)
recurrence init mkExpr = Stream (Continuous next) (buf,0)
where buf = freezeVector init
len = length init
next (buf,ix) =
let a = mkExpr (\i -> getIx buf ((value i + ix) `rem` len))
in (getIx buf (ix `rem` len), (setIx buf (ix `rem` len) a, ix + 1))
recurrenceI :: (Storable a, Storable b) =>
DVector a -> Stream (Data a) -> DVector b ->
((Data Int -> Data a) -> (Data Int -> Data b) -> Data b) ->
Stream (Data b)
recurrenceI ii (Stream (Continuous st) s) io mkExpr
= Stream (Continuous step) (ibuf,obuf,s,0)
where ibuf = freezeVector ii
obuf = freezeVector io
p = length ii
q = length io
step (ibuf,obuf,s,ix) =
let (a,s') = st s
ibuf' = p /= 0 ? (setIx ibuf (ix `rem` p) a, ibuf)
b = mkExpr (\i -> getIx ibuf' ((i + ix) `rem` p))
(\i -> getIx obuf ((i + ix 1) `rem` q))
in (q /= 0 ? (getIx obuf (ix `rem` q),b),
(ibuf'
,q /= 0 ? (setIx obuf (ix `rem` q) b,obuf)
,s'
,ix + 1))
recurrenceI ii (Stream (Stuttering st) s) io mkExpr
= Stream (Stuttering step) (ibuf,obuf,s,0)
where ibuf = freezeVector ii
obuf = freezeVector io
p = length ii
q = length io
step (ibuf,obuf,s,ix) =
let (a,isValid,s') = st s
ibuf' = p /= 0 ? (setIx ibuf (ix `rem` p) a,ibuf)
b = mkExpr (\i -> getIx ibuf' ((i + ix) `rem` p))
(\i -> getIx obuf ((i + ix 1) `rem` q))
in isValid ?( (q /= 0 ? (getIx obuf (ix `rem` q), b), true,
(ibuf'
,q /= 0 ? (setIx obuf (ix `rem` q) b,obuf)
,s'
,ix + 1))
, (q /= 0 ? (getIx obuf (ix `rem` q),b), false,
(ibuf
,obuf
,s'
,ix))
)
slidingAvg :: Data Int -> Stream (Data Int) -> Stream (Data Int)
slidingAvg n str = recurrenceI (replicate n 0) str (vector [])
(\input _ -> sum (indexed n input) `quot` n)
fir :: DVector Float ->
Stream (Data Float) -> Stream (Data Float)
fir b input =
recurrenceI (replicate n 0) input
(vector [])
(\input _ -> sum (indexed n (\i -> b!i * input!(ni))))
where n = length b
iir :: Data Float -> DVector Float -> DVector Float ->
Stream (Data Float) -> Stream (Data Float)
iir a0 a b input =
recurrenceI (replicate q 0) input
(replicate p 0)
(\input output -> 1 / a0 *
( sum (indexed p (\i -> b!i * input!(pi)))
sum (indexed q (\j -> a!j * output!(qj))))
)
where p = length b
q = length a
instance RandomAccess (Data Int -> Data a) where
type Element (Data Int -> Data a) = Data a
(!) = ($)
even n = n `rem` 2 == 0