module Simulation.Aivika.Stream
       (
        Stream(..),
        
        emptyStream,
        mergeStreams,
        mergeQueuedStreams,
        mergePriorityStreams,
        concatStreams,
        concatQueuedStreams,
        concatPriorityStreams,
        splitStream,
        splitStreamQueuing,
        splitStreamPrioritising,
        
        streamUsingId,
        
        prefetchStream,
        
        memoStream,
        zipStreamSeq,
        zipStreamParallel,
        zip3StreamSeq,
        zip3StreamParallel,
        unzipStream,
        streamSeq,
        streamParallel,
        
        consumeStream,
        sinkStream,
        
        repeatProcess,
        mapStream,
        mapStreamM,
        apStreamDataFirst,
        apStreamDataLater,
        apStreamParallel,
        filterStream,
        filterStreamM,
        
        leftStream,
        rightStream,
        replaceLeftStream,
        replaceRightStream,
        partitionEitherStream) where
import Data.IORef
import Data.Maybe
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Simulation
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Resource
import Simulation.Aivika.QueueStrategy
newtype Stream a = Cons { runStream :: Process (a, Stream a)
                          
                        }
instance Functor Stream where
  
  fmap f (Cons s) = Cons y where
    y = do ~(x, xs) <- s
           return (f x, fmap f xs)
instance Monoid (Stream a) where
  mempty  = emptyStream
  mappend = mergeStreams
  mconcat = concatStreams
streamUsingId :: ProcessId -> Stream a -> Stream a
streamUsingId pid (Cons s) =
  Cons $ processUsingId pid s
memoStream :: Stream a -> Simulation (Stream a)
memoStream (Cons s) =
  do p <- memoProcess $
          do ~(x, xs) <- s
             xs' <- liftSimulation $ memoStream xs
             return (x, xs')
     return (Cons p)
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
zipStreamSeq (Cons sa) (Cons sb) = Cons y where
  y = do ~(x, xs) <- sa
         ~(y, ys) <- sb
         return ((x, y), zipStreamSeq xs ys)
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Cons sa) (Cons sb) = Cons y where
  y = do ~((x, xs), (y, ys)) <- zipProcessParallel sa sb
         return ((x, y), zipStreamParallel xs ys)
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq (Cons sa) (Cons sb) (Cons sc) = Cons y where
  y = do ~(x, xs) <- sa
         ~(y, ys) <- sb
         ~(z, zs) <- sc
         return ((x, y, z), zip3StreamSeq xs ys zs)
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel (Cons sa) (Cons sb) (Cons sc) = Cons y where
  y = do ~((x, xs), (y, ys), (z, zs)) <- zip3ProcessParallel sa sb sc
         return ((x, y, z), zip3StreamParallel xs ys zs)
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream s =
  do s' <- memoStream s
     let sa = mapStream fst s'
         sb = mapStream snd s'
     return (sa, sb)
streamSeq :: [Stream a] -> Stream [a]
streamSeq xs = Cons y where
  y = do ps <- forM xs $ runStream
         return (map fst ps, streamSeq $ map snd ps)
streamParallel :: [Stream a] -> Stream [a]
streamParallel xs = Cons y where
  y = do ps <- processParallel $ map runStream xs
         return (map fst ps, streamParallel $ map snd ps)
repeatProcess :: Process a -> Stream a
repeatProcess p = Cons y where
  y = do a <- p
         return (a, repeatProcess p)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream = fmap
mapStreamM :: (a -> Process b) -> Stream a -> Stream b
mapStreamM f (Cons s) = Cons y where
  y = do (a, xs) <- s
         b <- f a
         return (b, mapStreamM f xs)
apStreamDataFirst :: Process (a -> b) -> Stream a -> Stream b
apStreamDataFirst f (Cons s) = Cons y where
  y = do ~(a, xs) <- s
         g <- f
         return (g a, apStreamDataFirst f xs)
apStreamDataLater :: Process (a -> b) -> Stream a -> Stream b
apStreamDataLater f (Cons s) = Cons y where
  y = do g <- f
         ~(a, xs) <- s
         return (g a, apStreamDataLater f xs)
apStreamParallel :: Process (a -> b) -> Stream a -> Stream b
apStreamParallel f (Cons s) = Cons y where
  y = do ~(g, (a, xs)) <- zipProcessParallel f s
         return (g a, apStreamParallel f xs)
filterStream :: (a -> Bool) -> Stream a -> Stream a
filterStream p (Cons s) = Cons y where
  y = do (a, xs) <- s
         if p a
           then return (a, filterStream p xs)
           else let Cons z = filterStream p xs in z
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
filterStreamM p (Cons s) = Cons y where
  y = do (a, xs) <- s
         b <- p a
         if b
           then return (a, filterStreamM p xs)
           else let Cons z = filterStreamM p xs in z
leftStream :: Stream (Either a b) -> Stream a
leftStream (Cons s) = Cons y where
  y = do (a, xs) <- s
         case a of
           Left a  -> return (a, leftStream xs)
           Right _ -> let Cons z = leftStream xs in z
rightStream :: Stream (Either a b) -> Stream b
rightStream (Cons s) = Cons y where
  y = do (a, xs) <- s
         case a of
           Left _  -> let Cons z = rightStream xs in z
           Right a -> return (a, rightStream xs)
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
  z = do (a, xs) <- sab
         case a of
           Left _ ->
             do (b, ys) <- sc
                return (Left b, replaceLeftStream xs ys)
           Right a ->
             return (Right a, replaceLeftStream xs ys0)
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
  z = do (a, xs) <- sab
         case a of
           Right _ ->
             do (b, ys) <- sc
                return (Right b, replaceRightStream xs ys)
           Left a ->
             return (Left a, replaceRightStream xs ys0)
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream s =
  do s' <- memoStream s
     return (leftStream s', rightStream s')
splitStream :: Int -> Stream a -> Simulation [Stream a]
splitStream = splitStreamQueuing FCFS
splitStreamQueuing :: EnqueueStrategy s q
                      => s
                      
                      -> Int
                      
                      -> Stream a
                      
                      -> Simulation [Stream a]
                      
splitStreamQueuing s n x =
  do ref <- liftIO $ newIORef x
     res <- newResource s 1
     let reader =
           usingResource res $
           do p <- liftIO $ readIORef ref
              (a, xs) <- runStream p
              liftIO $ writeIORef ref xs
              return a
     return $ map (\i -> repeatProcess reader) [1..n]
splitStreamPrioritising :: PriorityQueueStrategy s q p
                           => s
                           
                           -> [Stream p]
                           
                           -> Stream a
                           
                           -> Simulation [Stream a]
                           
splitStreamPrioritising s ps x =
  do ref <- liftIO $ newIORef x
     res <- newResource s 1
     let stream (Cons p) = Cons z where
           z = do (p', ps) <- p
                  a <- usingResourceWithPriority res p' $
                       do p <- liftIO $ readIORef ref
                          (a, xs) <- runStream p
                          liftIO $ writeIORef ref xs
                          return a
                  return (a, stream ps)
     return $ map stream ps
concatStreams :: [Stream a] -> Stream a
concatStreams = concatQueuedStreams FCFS
concatQueuedStreams :: EnqueueStrategy s q
                       => s
                       
                       -> [Stream a]
                       
                       -> Stream a
                       
concatQueuedStreams s streams = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
         conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         ref <- liftIO $ newIORef Nothing
         let writer p =
               do (a, xs) <- runStream p
                  requestResource writing
                  liftIO $ writeIORef ref (Just a)
                  releaseResource reading
                  requestResource conting
                  writer xs
             reader =
               do requestResource reading
                  Just a <- liftIO $ readIORef ref
                  liftIO $ writeIORef ref Nothing
                  releaseResource writing
                  return a
         forM_ streams $ spawnProcess CancelTogether . writer
         a <- reader
         let xs = repeatProcess (releaseResource conting >> reader)
         return (a, xs)
concatPriorityStreams :: PriorityQueueStrategy s q p
                         => s
                         
                         -> [Stream (p, a)]
                         
                         -> Stream a
                         
concatPriorityStreams s streams = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
         conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         ref <- liftIO $ newIORef Nothing
         let writer p =
               do ((priority, a), xs) <- runStream p
                  requestResourceWithPriority writing priority
                  liftIO $ writeIORef ref (Just a)
                  releaseResource reading
                  requestResource conting
                  writer xs
             reader =
               do requestResource reading
                  Just a <- liftIO $ readIORef ref
                  liftIO $ writeIORef ref Nothing
                  releaseResource writing
                  return a
         forM_ streams $ spawnProcess CancelTogether . writer
         a <- reader
         let xs = repeatProcess (releaseResource conting >> reader)
         return (a, xs)
mergeStreams :: Stream a -> Stream a -> Stream a
mergeStreams = mergeQueuedStreams FCFS
mergeQueuedStreams :: EnqueueStrategy s q
                      => s
                      
                      -> Stream a
                      
                      -> Stream a
                      
                      -> Stream a
                      
mergeQueuedStreams s x y = concatQueuedStreams s [x, y]
mergePriorityStreams :: PriorityQueueStrategy s q p
                        => s
                        
                        -> Stream (p, a)
                        
                        -> Stream (p, a)
                        
                        -> Stream a
                        
mergePriorityStreams s x y = concatPriorityStreams s [x, y]
emptyStream :: Stream a
emptyStream = Cons z where
  z = do pid <- liftSimulation newProcessId
         
         
         
         processUsingId pid passivateProcess
         error "It should never happen: emptyStream."
consumeStream :: (a -> Process ()) -> Stream a -> Process ()
consumeStream f s = p s where
  p (Cons s) = do (a, xs) <- s
                  f a
                  p xs
sinkStream :: Stream a -> Process ()
sinkStream s = p s where
  p (Cons s) = do (a, xs) <- s
                  p xs
  
prefetchStream :: Stream a -> Stream a
prefetchStream s = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount FCFS 1 (Just 1)
         ref <- liftIO $ newIORef Nothing
         let writer p =
               do (a, xs) <- runStream p
                  requestResource writing
                  liftIO $ writeIORef ref (Just a)
                  releaseResource reading
                  writer xs
             reader =
               do requestResource reading
                  Just a <- liftIO $ readIORef ref
                  liftIO $ writeIORef ref Nothing
                  releaseResource writing
                  return a
         spawnProcess CancelTogether $ writer s
         runStream $ repeatProcess reader