module Utils.Stream where
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
data Stream m a = Terminated | Value (m (a,Stream m a))
sideEffect :: (Monad m) => (a -> m ()) -> Stream m a -> Stream m a
sideEffect p Terminated = Terminated
sideEffect p (Value next) = Value renext
where
renext = do
(r,n) <- next
p r
return (r,sideEffect p n)
listToStream [] = Terminated
listToStream (l:lst) = Value (return (l,listToStream lst))
repeatS x = Value (return (x,repeatS x))
repeatSM x = sequenceS (repeatS x)
iterateS op n = Value cont
where
cont = do
r <- op n
return $ (n,iterateS op r)
foldS op i Terminated = return i
foldS op i (Value xs) = xs >>= \(x,xn) -> foldS op (op i x) xn
foldSM op i Terminated = return i
foldSM op i (Value xs) = xs >>= \(x,xn) -> op i x >>=\opix -> foldSM op opix xn
time (a,_) = a
value (_,a) = a
mergeTimeStreams starta startb a b = mergeE (starta,startb) (mergeS a b)
mergeTimeStreamsWith sa sb op a b = fmap (\(t,(a,b)) -> (t,(op a b))) $mergeTimeStreams sa sb a b
mergeManyW starts op streams = snd $ foldl1 (\(s,m) (s1,n) -> ((op s s1),mergeTimeStreamsWith s s1 op m n)) (zip starts streams)
mergeS Terminated _ = Terminated
mergeS _ Terminated = Terminated
mergeS (Value xs) (Value ys) = Value renext
where
renext = do
(x,xn) <- xs
(y,yn) <- ys
case compare (time x) (time y) of
LT -> return (L x,mergeS xn (push y yn))
EQ -> return (B (time x,(value x,value y)),mergeS xn yn)
GT -> return (R y,mergeS (push x xn) yn)
data LRB a b c = L a | B b |R cderiving (Show)
mergeE _ Terminated = Terminated
mergeE (l,r) (Value xs) = Value renext
where
renext = do
(x,xn) <- xs
case x of
L (t,a) -> return ((t,(a,r)),mergeE (a,r) xn)
B (t,(a,b)) -> return ((t,(a,b)),mergeE (a,b) xn)
R (t,b) -> return ((t,(l,b)),mergeE (l,b) xn)
push x Terminated = Value (return (x,Terminated))
push x xs = Value (return (x,xs))
instance (Monad m) => Functor (Stream m) where
fmap _ Terminated = Terminated
fmap f (Value next) = Value renext
where
renext = do
(r,n) <- next
return (f r,fmap f n)
instance (Monad m) => Applicative (Stream m) where
pure f = repeatS f
Terminated <*> _ = Terminated
_ <*> Terminated = Terminated
(Value a) <*> (Value b) = Value renext
where
renext = do
(fun,anext) <- a
(br,bnext) <- b
return (fun br,anext<*>bnext)
zipS a b = (,) <$> a <*> b
sequenceS :: (Monad m) => Stream m (m a) -> (Stream m a)
sequenceS Terminated = Terminated
sequenceS (Value next) = Value $ do
(op,n) <- next
r <- op
return (r,sequenceS n)
mapMS :: (Monad m) => (a -> m b) -> Stream m a -> Stream m b
mapMS op s = sequenceS . fmap op $ s
dropS :: (Monad m) => Int -> Stream m a -> Stream m a
dropS _ Terminated = Terminated
dropS n next = Value renext
where
drop 0 s = return s
drop _ Terminated = return Terminated
drop n (Value next) = do
(r,ne) <- next
drop (n1) ne
renext = do
r <- drop n next
case r of
Terminated -> error "Not enough elements to drop"
Value x -> x
takeS :: (Monad m) => Int -> Stream m a -> Stream m a
takeS _ Terminated = Terminated
takeS n (Value next) = Value renext
where
renext = do
(r,ne) <- next
if n<1 then return (r,Terminated)
else return (r,takeS (n1) ne)
takeWhileS _ Terminated = Terminated
takeWhileS c (Value next) = Value renext
where
renext = do
(r,ne) <- next
if not . c $ r then return (r,Terminated)
else return (r,takeWhileS c ne)
consS a Terminated = Value (return (a, Terminated))
consS a s = Value (return (a, s))
pairS :: (Monad m) => Stream m a -> Stream m (a,a)
pairS Terminated = Terminated
pairS (Value next) = Value renext
where
renext = do
(val1,nexts2) <- next
case nexts2 of
Terminated -> return (undefined,Terminated)
(Value next2) -> do (val2,next3) <- next2
return ((val1,val2),pairS (consS val2 next3))
terminateOn :: (Monad m) => (a -> Bool) -> Stream m a -> Stream m a
terminateOn cond Terminated = Terminated
terminateOn cond (Value next) = Value renext
where
renext = do
(r,n) <- next
if cond r then return (r,Terminated)
else return (r,terminateOn cond n)
runStream Terminated = return []
runStream (Value s) = do
(n,next) <- s
r<-runStream next
return (n:r)
runStream_ Terminated = return ()
runStream_ (Value s) = do
(n,next) <- s
runStream_ next
runLast l Terminated = return l
runLast l (Value s) = do
(n,next) <- s
runLast n next
runLast1 s = runLast (error "Empty Stream") s