{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE CPP #-} #include "fusion-phases.h" ----------------------------------------------------------------------------- -- | -- Module : Data.Array.Vector.Stream.Flat.Stream -- Copyright : (c) 2006 Roman Leshchinskiy -- License : see libraries/ndp/LICENSE -- -- Maintainer : Roman Leshchinskiy -- Stability : internal -- Portability : non-portable (existentials) -- -- Description --------------------------------------------------------------- -- -- Basic types for stream-based fusion -- module Data.Array.Vector.Stream where import Debug.Trace import Data.Array.Vector.Prim.Hyperstrict data Step s a = Done | Skip !s | Yield !a !s instance Functor (Step s) where fmap f Done = Done fmap f (Skip s) = Skip s fmap f (Yield x s) = Yield (f x) s data Stream a = forall s. Stream (s -> Step s a) !s Int newtype Box a = Box a -- is this still even needed for SpecConstr? ------------------------------------------------------------------------ -- | Empty stream -- emptyS :: Stream a emptyS = Stream (const Done) () 0 -- null nullS :: Stream a -> Bool nullS (Stream next s0 _) = loop_null s0 where loop_null s = case next s of Done -> True Yield _ _ -> False Skip s' -> s' `seq` loop_null s' {-# INLINE_STREAM nullS #-} -- | Singleton stream -- singletonS :: a -> Stream a {-# INLINE_STREAM singletonS #-} singletonS x = Stream next True 1 where {-# INLINE next #-} next True = Yield x False next False = Done -- | Construction -- consS :: a -> Stream a -> Stream a {-# INLINE_STREAM consS #-} consS x (Stream next s n) = Stream next' (JustS (Box x) :*: s) (n+1) where {-# INLINE next' #-} next' (JustS (Box x) :*: s) = Yield x (NothingS :*: s) next' (NothingS :*: s) = case next s of Yield y s' -> Yield y (NothingS :*: s') Skip s' -> Skip (NothingS :*: s') Done -> Done snocS :: Stream a -> a -> Stream a {-# INLINE_STREAM snocS #-} snocS (Stream next s n) x = Stream next' (JustS s) (n+1) where {-# INLINE next' #-} next' (JustS s) = case next s of Yield y s' -> Yield y (JustS s') Skip s' -> Skip (JustS s') Done -> Yield x NothingS next' NothingS = Done -- | Replication -- replicateS :: Int -> a -> Stream a {-# INLINE_STREAM replicateS #-} replicateS n x = Stream next 0 n where {-# INLINE next #-} next i | i == n = Done | otherwise = Yield x (i+1) -- | Given a stream of (length,value) pairs and the sum of the lengths, -- replicate each value to the given length. -- -- FIXME: This should probably produce a segmented stream but since we want to -- get rid of them anyway... -- replicateEachS :: Int -> Stream (Int :*: a) -> Stream a {-# INLINE_STREAM replicateEachS #-} replicateEachS n (Stream next s _) = Stream next' (0 :*: NothingS :*: s) n where {-# INLINE next' #-} next' (0 :*: _ :*: s) = case next s of Done -> Done Skip s' -> Skip (0 :*: NothingS :*: s') Yield (k :*: x) s' -> Skip (k :*: JustS (Box x) :*: s') next' (k :*: NothingS :*: s) = Done -- FIXME: unreachable next' (k :*: JustS (Box x) :*: s) = Yield x (k-1 :*: JustS (Box x) :*: s) {- -- -- repeat a stream a given number of times -- Duplicates work. -- repeatS :: Int -> Stream e -> Stream e {-# INLINE_STREAM repeatS #-} repeatS k (Stream next0 s0 n) = Stream next (k :*: s0) (max 0 (n*k)) where {-# INLINE next #-} next (0 :*: _) = Done next (k :*: s) = case next0 s of Done -> Skip (k-1 :*: s0) -- reset iteration state Skip s' -> Skip (k :*: s') Yield y s' -> Yield y (k :*: s') -} -- | Concatenation -- (+++) :: Stream a -> Stream a -> Stream a {-# INLINE_STREAM (+++) #-} Stream next1 s1 n1 +++ Stream next2 s2 n2 = Stream next (LeftS s1) (n1 + n2) where {-# INLINE next #-} next (LeftS s1) = case next1 s1 of Done -> Skip (RightS s2) Skip s1' -> Skip (LeftS s1') Yield x s1' -> Yield x (LeftS s1') next (RightS s2) = case next2 s2 of Done -> Done Skip s2' -> Skip (RightS s2') Yield x s2' -> Yield x (RightS s2') -- | Indexing -- ---------- indexS :: Stream a -> Int -> a {-# INLINE_STREAM indexS #-} indexS (Stream next s0 _) n0 | n0 < 0 = error "Data.Array.Vector.Stream.indexS: negative index" | otherwise = loop_index n0 s0 where loop_index n s = case next s of Yield x s' | n == 0 -> x | otherwise -> s' `seq` loop_index (n-1) s' Skip s' -> s' `seq` loop_index n s' Done -> error "Data.Array.Vector.Stream.indexS: index too large" -- | Indexing -- ---------- -- | Associate each element in the 'Stream' with its index -- indexedS :: Stream a -> Stream (Int :*: a) {-# INLINE_STREAM indexedS #-} indexedS (Stream next s n) = Stream next' (0 :*: s) n where {-# INLINE next' #-} next' (i :*: s) = case next s of Yield x s' -> Yield (i :*: x) ((i+1) :*: s') Skip s' -> Skip (i :*: s') Done -> Done -- | Substreams -- ------------ headS :: Stream a -> a {-# INLINE_STREAM headS #-} headS (Stream next s0 _) = loop_head s0 where loop_head s = case next s of Yield x _ -> x Skip s' -> s' `seq` loop_head s' Done -> errorEmptyStream "head" -- | Yield the tail of a stream -- tailS :: Stream a -> Stream a {-# INLINE_STREAM tailS #-} tailS (Stream next s n) = Stream next' (False :*: s) (n-1) where {-# INLINE next' #-} next' (False :*: s) = case next s of Yield x s' -> Skip (True :*: s') Skip s' -> Skip (False :*: s') Done -> error "Stream.tailS: empty stream" next' (True :*: s) = case next s of Yield x s' -> Yield x (True :*: s') Skip s' -> Skip (True :*: s') Done -> Done -- | Conversion to\/from lists -- -------------------------- -- | Convert a list to a 'Stream' -- toStream :: [a] -> Stream a {-# INLINE_STREAM toStream #-} toStream xs = Stream gen (Box xs) (length xs) where {-# INLINE gen #-} gen (Box []) = Done gen (Box (x:xs)) = Yield x (Box xs) -- | Generate a list from a 'Stream' -- fromStream :: Stream a -> [a] {-# INLINE_STREAM fromStream #-} fromStream (Stream next s _) = gen s where gen s = case next s of Done -> [] Skip s' -> gen s' Yield x s' -> x : gen s' ------------------------------------------------------------------------ -- XXX Box is left behind. Spec constr fail? Looks like consS though initS :: Stream a -> Stream a {-# INLINE_STREAM initS #-} initS (Stream next0 s0 n) = Stream next' (NothingS :*: s0) (n-1) where {-# INLINE next' #-} next' (NothingS :*: s) = case next0 s of Yield x s' -> Skip (JustS (Box x) :*: s') Skip s' -> Skip (NothingS :*: s') Done -> errorEmptyStream "init" next' (JustS (Box x) :*: s) = case next0 s of Yield x' s' -> Yield x (JustS (Box x') :*: s') Skip s' -> Skip (JustS (Box x) :*: s') Done -> Done -- * Substreams -- ** Extracting substreams takeS :: Int -> Stream a -> Stream a {-# INLINE_STREAM takeS #-} takeS n0 (Stream next0 s0 _) = Stream next' (n0 :*: s0) (max 0 n0) where {-# INLINE next' #-} next' (n :*: s) | n <= 0 = Done | otherwise = case next0 s of Yield x s' -> Yield x ((n-1) :*: s') Skip s' -> Skip ( n :*: s') Done -> Done dropS :: Int -> Stream a -> Stream a {-# INLINE_STREAM dropS #-} dropS n0 (Stream next0 s0 n) = Stream next' (JustS (max 0 n0) :*: s0) (max 0 (n - n0)) where {-# INLINE next' #-} next' (JustS n :*: s) | n == 0 = Skip (NothingS :*: s) | otherwise = case next0 s of Yield _ s' -> Skip (JustS (n-1) :*: s') Skip s' -> Skip (JustS n :*: s') Done -> Done next' (NothingS :*: s) = case next0 s of Yield x s' -> Yield x (NothingS :*: s') Skip s' -> Skip (NothingS :*: s') Done -> Done elemS :: Eq a => a -> Stream a -> Bool {-# INLINE_STREAM elemS #-} elemS x (Stream next s0 _) = loop_elem s0 where loop_elem s = case next s of Yield y s' | x == y -> True | otherwise -> s' `seq` loop_elem s' Skip s' -> s' `seq` loop_elem s' Done -> False lookupS :: Eq a => a -> Stream (a :*: b) -> Maybe b {-# INLINE_STREAM lookupS #-} lookupS key (Stream next s0 _) = loop_lookup s0 where loop_lookup s = case next s of Yield (x :*: y) s' | key == x -> Just y | otherwise -> s' `seq` loop_lookup s' Skip s' -> s' `seq` loop_lookup s' Done -> Nothing ------------------------------------------------------------------------ -- | Mapping -- mapS :: (a -> b) -> Stream a -> Stream b {-# INLINE_STREAM mapS #-} mapS f (Stream next s n) = Stream next' s n where {-# INLINE next' #-} next' s = case next s of Done -> Done Skip s' -> Skip s' Yield x s' -> Yield (f x) s' -- | Filtering -- filterS :: (a -> Bool) -> Stream a -> Stream a {-# INLINE_STREAM filterS #-} filterS f (Stream next s n) = Stream next' s n where {-# INLINE next' #-} next' s = case next s of Done -> Done Skip s' -> Skip s' Yield x s' | f x -> Yield x s' | otherwise -> Skip s' -- | Folding -- foldS :: (b -> a -> b) -> b -> Stream a -> b {-# INLINE_STREAM foldS #-} foldS f z (Stream next s _) = fold z s where fold z s = z `seq` case next s of -- needs to be strict! Yield x s' -> s' `seq` fold (f z x) s' Skip s' -> s' `seq` fold z s' Done -> z foldl1S :: (a -> a -> a) -> Stream a -> a {-# INLINE_STREAM foldl1S #-} foldl1S f (Stream next s0 _) = loop0_foldl1' s0 where loop0_foldl1' s = case next s of Yield x s' -> s' `seq` loop_foldl1' x s' Skip s' -> s' `seq` loop0_foldl1' s' Done -> errorEmptyStream "foldl1" loop_foldl1' z s = z `seq` case next s of Yield x s' -> s' `seq` loop_foldl1' (f z x) s' Skip s' -> s' `seq` loop_foldl1' z s' Done -> z fold1MaybeS :: (a -> a -> a) -> Stream a -> MaybeS a {-# INLINE_STREAM fold1MaybeS #-} fold1MaybeS f (Stream next s _) = fold0 s where fold0 s = case next s of Done -> NothingS Skip s' -> s' `seq` fold0 s' Yield x s' -> s' `seq` fold1 x s' fold1 z s = z `seq` case next s of Done -> JustS z Skip s' -> s' `seq` fold1 z s' Yield x s' -> s' `seq` fold1 (f z x) s' -- | Scanning -- scanS :: (b -> a -> b) -> b -> Stream a -> Stream b {-# INLINE_STREAM scanS #-} scanS f z (Stream next s n) = Stream next' (Box z :*: s) n where {-# INLINE next' #-} next' (Box z :*: s) = case next s of Done -> Done Skip s' -> Skip (Box z :*: s') Yield x s' -> Yield z (Box (f z x) :*: s') scan1S :: (a -> a -> a) -> Stream a -> Stream a {-# INLINE_STREAM scan1S #-} scan1S f (Stream next s n) = Stream next' (NothingS :*: s) n where {-# INLINE next' #-} next' (NothingS :*: s) = case next s of Yield x s' -> Yield x (JustS (Box x) :*: s') Skip s' -> Skip (NothingS :*: s') Done -> Done next' (JustS (Box z) :*: s) = case next s of Yield x s' -> let y = f z x in Yield y (JustS (Box y) :*: s') Skip s' -> Skip (JustS (Box z) :*: s) Done -> Done mapAccumS :: (acc -> a -> acc :*: b) -> acc -> Stream a -> Stream b {-# INLINE_STREAM mapAccumS #-} mapAccumS f acc (Stream step s n) = Stream step' (s :*: Box acc) n where step' (s :*: Box acc) = case step s of Done -> Done Skip s' -> Skip (s' :*: Box acc) Yield x s' -> let acc' :*: y = f acc x in Yield y (s' :*: Box acc') combineS:: Stream Bool -> Stream a -> Stream a -> Stream a {-# INLINE_STREAM combineS #-} combineS (Stream next1 s m) (Stream nextS1 t1 n1) (Stream nextS2 t2 n2) = Stream next (s :*: t1 :*: t2) m where {-# INLINE next #-} next (s :*: t1 :*: t2) = case next1 s of Done -> Done Skip s' -> Skip (s' :*: t1 :*: t2 ) Yield c s' -> if trace ("\n\t\tstream: " ++ (show c) ++ "\n") c then case nextS1 t1 of Done -> error "combineS: stream 1 terminated unexpectedly" Skip t1' -> Skip (s :*: t1' :*: t2) Yield x t1' -> Yield x (s' :*: t1' :*: t2) else case nextS2 t2 of Done -> error "combineS: stream 2 terminated unexpectedly" Skip t2' -> Skip (s :*: t1 :*: t2') Yield x t2' -> Yield x (s' :*: t1 :*: t2') -- | Zipping -- -- FIXME: The definition below duplicates work if the second stream produces -- Skips. Unfortunately, GHC tends to introduce join points which break -- SpecConstr with the correct definition. -- zipWithS :: (a -> b -> c) -> Stream a -> Stream b -> Stream c {-# INLINE_STREAM zipWithS #-} zipWithS f (Stream next1 s m) (Stream next2 t n) = Stream next (s :*: t) m where {-# INLINE next #-} next (s :*: t) = case next1 s of Done -> Done Skip s' -> Skip (s' :*: t) Yield x s' -> case next2 t of Done -> Done Skip t' -> Skip (s :*: t') Yield y t' -> Yield (f x y) (s' :*: t') {- Stream next (NothingS :*: s :*: t) m where {-# INLINE next #-} next (NothingS :*: s :*: t) = t `seq` case next1 s of Done -> Done Skip s' -> Skip (NothingS :*: s' :*: t) Yield x s' -> -- Skip (JustS x :*: s' :*: t) case next2 t of Done -> Done Skip t' -> Skip (JustS (Box x) :*: s' :*: t') Yield y t' -> Yield (f x y) (NothingS :*: s' :*: t') next (JustS (Box x) :*: s :*: t) = s `seq` case next2 t of Done -> Done Skip t' -> Skip (JustS (Box x) :*: s :*: t') Yield y t' -> Yield (f x y) (NothingS :*: s :*: t') -} zipWith3S :: (a -> b -> c -> d) -> Stream a -> Stream b -> Stream c -> Stream d {-# INLINE_STREAM zipWith3S #-} zipWith3S f (Stream next1 s1 n) (Stream next2 s2 _) (Stream next3 s3 _) = Stream next (s1 :*: s2 :*: s3) n where {-# INLINE next #-} next (s1 :*: s2 :*: s3) = case next1 s1 of Done -> Done Skip s1' -> Skip (s1' :*: s2 :*: s3) Yield x s1' -> case next2 s2 of Done -> Done Skip s2' -> Skip (s1 :*: s2' :*: s3) Yield y s2' -> case next3 s3 of Done -> Done Skip s3' -> Skip (s1 :*: s2 :*: s3') Yield z s3' -> Yield (f x y z) (s1' :*: s2' :*: s3') zipS :: Stream a -> Stream b -> Stream (a :*: b) {-# INLINE zipS #-} zipS = zipWithS (:*:) ------------------------------------------------------------------------ -- | Yield an enumerated stream -- -- FIXME: Can this be implemented polymorphically? We could just use -- enumFromThenTo here, but this won't really work for parallel arrays. -- Perhaps we have to introduce an EnumP class? -- enumFromToFracS :: (Ord a, RealFrac a) => a -> a -> Stream a {-# INLINE_STREAM enumFromToFracS #-} enumFromToFracS n m = Stream next n (truncate (m - n)) where lim = m + 1/2 -- important to float this out. {-# INLINE next #-} next s | s > lim = Done -- from GHC.Real.numericEnumFromTo | otherwise = Yield s (s+1) enumFromToS :: (Integral a, Ord a) => a -> a -> Stream a {-# INLINE_STREAM enumFromToS #-} enumFromToS start end = Stream step start (max 0 (fromIntegral (end - start + 1))) where {-# INLINE step #-} step s | s > end = Done | otherwise = Yield s (s+1) -- | Yield an enumerated stream using a specific step -- -- enumFromThenToS :: Int -> Int -> Int -> Stream Int {-# INLINE enumFromThenToS #-} enumFromThenToS start next end = enumFromStepLenS start delta len where delta = next - start diff = end - start len | start < next && start <= end = ((end-start) `div` delta) + 1 | start > next && start >= end = ((start-end) `div` (start-next)) + 1 | otherwise = 0 enumFromStepLenS :: Int -> Int -> Int -> Stream Int {-# INLINE_STREAM enumFromStepLenS #-} enumFromStepLenS s d n = Stream step (s :*: n) n where step (s :*: 0) = Done step (s :*: n) = Yield s ((s+d) :*: (n-1)) -- enumFromToEachS [k1 :*: m1, ..., kn :*: mn] = [k1,...,m1,...,kn,...,mn] -- -- FIXME: monomorphic for now because we need Rebox a otherwise! -- enumFromToEachS :: Int -> Stream (Int :*: Int) -> Stream Int {-# INLINE_STREAM enumFromToEachS #-} enumFromToEachS n (Stream next s _) = Stream next' (NothingS :*: s) n where {-# INLINE next' #-} next' (NothingS :*: s) = case next s of Yield (k :*: m) s' -> Skip (JustS (k :*: m) :*: s') Skip s' -> Skip (NothingS :*: s') Done -> Done next' (JustS (k :*: m) :*: s) | k > m = Skip (NothingS :*: s) | otherwise = Yield k (JustS (k+1 :*: m) :*: s) ------------------------------------------------------------------------ findS :: (a -> Bool) -> Stream a -> Maybe a {-# INLINE_STREAM findS #-} findS p (Stream next s _) = go s where go s = case next s of Yield x s' | p x -> Just x | otherwise -> go s' Skip s' -> go s' Done -> Nothing findIndexS :: (a -> Bool) -> Stream a -> Maybe Int {-# INLINE_STREAM findIndexS #-} findIndexS p (Stream next s _) = go 0 s where go i s = case next s of Yield x s' | p x -> Just i | otherwise -> go (i+1) s' Skip s' -> go i s' Done -> Nothing ------------------------------------------------------------------------ takeWhileS :: (a -> Bool) -> Stream a -> Stream a {-# INLINE_STREAM takeWhileS #-} takeWhileS p (Stream next0 s0 n) = Stream next s0 n where {-# INLINE next #-} next s = case next0 s of Done -> Done Skip s' -> Skip s' Yield x s' | p x -> Yield x s' | otherwise -> Done dropWhileS :: (a -> Bool) -> Stream a -> Stream a {-# INLINE_STREAM dropWhileS #-} dropWhileS p (Stream next0 s0 n) = Stream next (True :*: s0) n where {-# INLINE next #-} next (True :*: s) = case next0 s of Done -> Done Skip s' -> Skip (True :*: s') Yield x s' | p x -> Skip (True :*: s') | otherwise -> Yield x (False :*: s') next (False :*: s) = case next0 s of Done -> Done Skip s' -> Skip (False :*: s') Yield x s' -> Yield x (False :*: s') ------------------------------------------------------------------------ unfoldS :: Int -> (b -> MaybeS (a :*: b)) -> b -> Stream a {-# INLINE_STREAM unfoldS #-} unfoldS n f s0 = Stream next (JustS (0 :*: s0)) n where {-# INLINE next #-} next (JustS (i :*: s)) = case f s of NothingS -> Done JustS (w :*: s') | n == i -> Yield w NothingS | otherwise -> Yield w (JustS (i+1 :*: s')) next _ = Done ------------------------------------------------------------------------ -- Common up near identical calls to `error' to reduce the number -- constant strings created when compiled: errorEmptyStream :: String -> a errorEmptyStream fun = moduleError fun "empty vector" {-# NOINLINE errorEmptyStream #-} moduleError :: String -> String -> a moduleError fun msg = error ("Data.Array.Vector.Stream." ++ fun ++ ':':' ':msg) {-# NOINLINE moduleError #-}