module Simulation.Aivika.Trans.Stream
       (
        Stream(..),
        
        emptyStream,
        mergeStreams,
        mergeQueuedStreams,
        mergePriorityStreams,
        concatStreams,
        concatQueuedStreams,
        concatPriorityStreams,
        splitStream,
        splitStreamQueueing,
        splitStreamPrioritising,
        splitStreamFiltering,
        splitStreamFilteringQueueing,
        
        streamUsingId,
        
        prefetchStream,
        delayStream,
        
        arrivalStream,
        
        memoStream,
        zipStreamSeq,
        zipStreamParallel,
        zip3StreamSeq,
        zip3StreamParallel,
        unzipStream,
        streamSeq,
        streamParallel,
        
        consumeStream,
        sinkStream,
        
        repeatProcess,
        mapStream,
        mapStreamM,
        accumStream,
        apStream,
        apStreamM,
        filterStream,
        filterStreamM,
        takeStream,
        takeStreamWhile,
        takeStreamWhileM,
        dropStream,
        dropStreamWhile,
        dropStreamWhileM,
        singletonStream,
        joinStream,
        
        failoverStream,
        
        signalStream,
        streamSignal,
        
        leftStream,
        rightStream,
        replaceLeftStream,
        replaceRightStream,
        partitionEitherStream,
        
        cloneStream,
        firstArrivalStream,
        lastArrivalStream,
        assembleAccumStream,
        
        traceStream) where
import Data.Maybe
import Data.Monoid
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Parameter
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource.Base
import Simulation.Aivika.Trans.QueueStrategy
import Simulation.Aivika.Trans.Queue.Infinite.Base
import Simulation.Aivika.Arrival (Arrival(..))
newtype Stream m a = Cons { runStream :: Process m (a, Stream m a)
                            
                          }
instance MonadDES m => Functor (Stream m) where
  
  fmap = mapStream
instance MonadDES m => Applicative (Stream m) where
  
  pure a = let y = Cons (return (a, y)) in y
  
  
  (<*>) = apStream
instance MonadDES m => Alternative (Stream m) where
  
  empty = emptyStream
  
  (<|>) = mergeStreams
instance MonadDES m => Monoid (Stream m a) where
  
  mempty  = emptyStream
  
  mappend = mergeStreams
  
  mconcat = concatStreams
streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a
streamUsingId pid (Cons s) =
  Cons $ processUsingId pid s
memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a)
memoStream (Cons s) =
  do p <- memoProcess $
          do ~(x, xs) <- s
             xs' <- liftSimulation $ memoStream xs
             return (x, xs')
     return (Cons p)
zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq (Cons sa) (Cons sb) = Cons y where
  y = do ~(x, xs) <- sa
         ~(y, ys) <- sb
         return ((x, y), zipStreamSeq xs ys)
zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Cons sa) (Cons sb) = Cons y where
  y = do ~((x, xs), (y, ys)) <- zipProcessParallel sa sb
         return ((x, y), zipStreamParallel xs ys)
zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq (Cons sa) (Cons sb) (Cons sc) = Cons y where
  y = do ~(x, xs) <- sa
         ~(y, ys) <- sb
         ~(z, zs) <- sc
         return ((x, y, z), zip3StreamSeq xs ys zs)
zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel (Cons sa) (Cons sb) (Cons sc) = Cons y where
  y = do ~((x, xs), (y, ys), (z, zs)) <- zip3ProcessParallel sa sb sc
         return ((x, y, z), zip3StreamParallel xs ys zs)
unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream s =
  do s' <- memoStream s
     let sa = mapStream fst s'
         sb = mapStream snd s'
     return (sa, sb)
streamSeq :: MonadDES m => [Stream m a] -> Stream m [a]
streamSeq xs = Cons y where
  y = do ps <- forM xs runStream
         return (map fst ps, streamSeq $ map snd ps)
streamParallel :: MonadDES m => [Stream m a] -> Stream m [a]
streamParallel xs = Cons y where
  y = do ps <- processParallel $ map runStream xs
         return (map fst ps, streamParallel $ map snd ps)
repeatProcess :: MonadDES m => Process m a -> Stream m a
repeatProcess p = Cons y where
  y = do a <- p
         return (a, repeatProcess p)
mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b
mapStream f (Cons s) = Cons y where
  y = do (a, xs) <- s
         return (f a, mapStream f xs)
mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b
mapStreamM f (Cons s) = Cons y where
  y = do (a, xs) <- s
         b <- f a
         return (b, mapStreamM f xs)
accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream f acc xs = Cons $ loop xs acc where
  loop (Cons s) acc =
    do (a, xs) <- s
       (acc', b) <- f acc a
       return (b, Cons $ loop xs acc') 
apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b
apStream (Cons sf) (Cons sa) = Cons y where
  y = do (f, sf') <- sf
         (a, sa') <- sa
         return (f a, apStream sf' sa')
apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM (Cons sf) (Cons sa) = Cons y where
  y = do (f, sf') <- sf
         (a, sa') <- sa
         x <- f a
         return (x, apStreamM sf' sa')
filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
filterStream p (Cons s) = Cons y where
  y = do (a, xs) <- s
         if p a
           then return (a, filterStream p xs)
           else let Cons z = filterStream p xs in z
filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM p (Cons s) = Cons y where
  y = do (a, xs) <- s
         b <- p a
         if b
           then return (a, filterStreamM p xs)
           else let Cons z = filterStreamM p xs in z
leftStream :: MonadDES m => Stream m (Either a b) -> Stream m a
leftStream (Cons s) = Cons y where
  y = do (a, xs) <- s
         case a of
           Left a  -> return (a, leftStream xs)
           Right _ -> let Cons z = leftStream xs in z
rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b
rightStream (Cons s) = Cons y where
  y = do (a, xs) <- s
         case a of
           Left _  -> let Cons z = rightStream xs in z
           Right a -> return (a, rightStream xs)
replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
  z = do (a, xs) <- sab
         case a of
           Left _ ->
             do (b, ys) <- sc
                return (Left b, replaceLeftStream xs ys)
           Right a ->
             return (Right a, replaceLeftStream xs ys0)
replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
  z = do (a, xs) <- sab
         case a of
           Right _ ->
             do (b, ys) <- sc
                return (Right b, replaceRightStream xs ys)
           Left a ->
             return (Left a, replaceRightStream xs ys0)
partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream s =
  do s' <- memoStream s
     return (leftStream s', rightStream s')
splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
splitStream = splitStreamQueueing FCFS
splitStreamQueueing :: (MonadDES m, EnqueueStrategy m s)
                       => s
                       
                       -> Int
                       
                       -> Stream m a
                       
                       -> Simulation m [Stream m a]
                       
splitStreamQueueing s n x =
  do ref <- newRef x
     res <- newResource s 1
     let reader =
           usingResource res $
           do p <- liftEvent $ readRef ref
              (a, xs) <- runStream p
              liftEvent $ writeRef ref xs
              return a
     return $ map (\i -> repeatProcess reader) [1..n]
splitStreamPrioritising :: (MonadDES m, PriorityQueueStrategy m s p)
                           => s
                           
                           -> [Stream m p]
                           
                           -> Stream m a
                           
                           -> Simulation m [Stream m a]
                           
splitStreamPrioritising s ps x =
  do ref <- newRef x
     res <- newResource s 1
     let stream (Cons p) = Cons z where
           z = do (p', ps) <- p
                  a <- usingResourceWithPriority res p' $
                       do p <- liftEvent $ readRef ref
                          (a, xs) <- runStream p
                          liftEvent $ writeRef ref xs
                          return a
                  return (a, stream ps)
     return $ map stream ps
splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFiltering = splitStreamFilteringQueueing FCFS
splitStreamFilteringQueueing :: (MonadDES m, EnqueueStrategy m s)
                                => s
                                
                                -> [a -> Event m Bool]
                                
                                -> Stream m a
                                
                                -> Simulation m [Stream m a]
                                
splitStreamFilteringQueueing s preds x =
  do ref <- liftSimulation $ newRef x
     res <- newResource s 1
     let reader pred =
           do a <-
                usingResource res $
                do p <- liftEvent $ readRef ref
                   (a, xs) <- runStream p
                   liftEvent $
                     do f <- pred a
                        if f
                          then do writeRef ref xs
                                  return $ Just a
                          else do writeRef ref $ Cons (return (a, xs))
                                  return Nothing
              case a of
                Just a  -> return a
                Nothing -> reader pred
     return $ map (repeatProcess . reader) preds
concatStreams :: MonadDES m => [Stream m a] -> Stream m a
concatStreams = concatQueuedStreams FCFS
concatQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
                       => s
                       
                       -> [Stream m a]
                       
                       -> Stream m a
                       
concatQueuedStreams s streams = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
         conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         ref <- liftSimulation $ newRef Nothing
         let writer p =
               do (a, xs) <- runStream p
                  requestResource writing
                  liftEvent $ writeRef ref (Just a)
                  releaseResource reading
                  requestResource conting
                  writer xs
             reader =
               do requestResource reading
                  Just a <- liftEvent $ readRef ref
                  liftEvent $ writeRef ref Nothing
                  releaseResource writing
                  return a
         forM_ streams $ spawnProcess . writer
         a <- reader
         let xs = repeatProcess (releaseResource conting >> reader)
         return (a, xs)
concatPriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
                         => s
                         
                         -> [Stream m (p, a)]
                         
                         -> Stream m a
                         
concatPriorityStreams s streams = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
         conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         ref <- liftSimulation $ newRef Nothing
         let writer p =
               do ((priority, a), xs) <- runStream p
                  requestResourceWithPriority writing priority
                  liftEvent $ writeRef ref (Just a)
                  releaseResource reading
                  requestResource conting
                  writer xs
             reader =
               do requestResource reading
                  Just a <- liftEvent $ readRef ref
                  liftEvent $ writeRef ref Nothing
                  releaseResource writing
                  return a
         forM_ streams $ spawnProcess . writer
         a <- reader
         let xs = repeatProcess (releaseResource conting >> reader)
         return (a, xs)
mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a
mergeStreams = mergeQueuedStreams FCFS
mergeQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
                      => s
                      
                      -> Stream m a
                      
                      -> Stream m a
                      
                      -> Stream m a
                      
mergeQueuedStreams s x y = concatQueuedStreams s [x, y]
mergePriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
                        => s
                        
                        -> Stream m (p, a)
                        
                        -> Stream m (p, a)
                        
                        -> Stream m a
                        
mergePriorityStreams s x y = concatPriorityStreams s [x, y]
emptyStream :: MonadDES m => Stream m a
emptyStream = Cons neverProcess
consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m ()
consumeStream f (Cons s) =
  do (a, xs) <- s
     f a
     consumeStream f xs
sinkStream :: MonadDES m => Stream m a -> Process m ()
sinkStream (Cons s) =
  do (a, xs) <- s
     sinkStream xs
  
prefetchStream :: MonadDES m => Stream m a -> Stream m a
prefetchStream s = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount FCFS 1 (Just 1)
         ref <- liftSimulation $ newRef Nothing
         let writer p =
               do (a, xs) <- runStream p
                  requestResource writing
                  liftEvent $ writeRef ref (Just a)
                  releaseResource reading
                  writer xs
             reader =
               do requestResource reading
                  Just a <- liftEvent $ readRef ref
                  liftEvent $ writeRef ref Nothing
                  releaseResource writing
                  return a
         spawnProcess $ writer s
         runStream $ repeatProcess reader
signalStream :: MonadDES m => Signal m a -> Process m (Stream m a)
signalStream s =
  do q <- liftSimulation newFCFSQueue
     h <- liftEvent $
          handleSignal s $ 
          enqueue q
     whenCancellingProcess $ disposeEvent h
     return $ repeatProcess $ dequeue q
streamSignal :: MonadDES m => Stream m a -> Process m (Signal m a)
streamSignal z =
  do s <- liftSimulation newSignalSource
     spawnProcess $
       consumeStream (liftEvent . triggerSignal s) z
     return $ publishSignal s
arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a)
arrivalStream s = Cons $ loop s Nothing where
  loop s t0 = do (a, xs) <- runStream s
                 t <- liftDynamics time
                 let b = Arrival { arrivalValue = a,
                                   arrivalTime  = t,
                                   arrivalDelay =
                                     case t0 of
                                       Nothing -> Nothing
                                       Just t0 -> Just (t  t0) }
                 return (b, Cons $ loop xs (Just t))
delayStream :: MonadDES m => a -> Stream m a -> Stream m a
delayStream a0 s = Cons $ return (a0, s)
singletonStream :: MonadDES m => a -> Stream m a
singletonStream a = Cons $ return (a, emptyStream)
joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a
joinStream m = Cons $ m >>= runStream
failoverStream :: MonadDES m => [Stream m a] -> Stream m a
failoverStream ps = Cons z where
  z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         writing <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
         ref <- liftSimulation $ newRef Nothing
         pid <- processId
         let writer p =
               do requestResource writing
                  pid' <- processId
                  (a, xs) <-
                    finallyProcess (runStream p) $
                    liftEvent $
                    do cancelled' <- processCancelled pid'
                       when cancelled' $
                         releaseResourceWithinEvent writing
                  liftEvent $ writeRef ref (Just a)
                  releaseResource reading
                  writer xs
             reader =
               do releaseResource writing
                  requestResource reading
                  Just a <- liftEvent $ readRef ref
                  liftEvent $ writeRef ref Nothing
                  return a
             loop [] = return ()
             loop (p: ps) =
               do pid' <- processId
                  h' <- liftEvent $
                        handleSignal (processCancelling pid) $ \() ->
                        cancelProcessWithId pid'
                  finallyProcess (writer p) $
                    liftEvent $
                    do disposeEvent h'
                       cancelled <- processCancelled pid
                       unless cancelled $
                         do cancelled' <- processCancelled pid'
                            unless cancelled' $
                              error "Expected the sub-process to be cancelled: failoverStream"
                            runProcess $ loop ps
         liftEvent $ runProcess $ loop ps
         runStream $ repeatProcess reader
takeStream :: MonadDES m => Int -> Stream m a -> Stream m a
takeStream n s
  | n <= 0    = emptyStream
  | otherwise =
    Cons $
    do (a, xs) <- runStream s
       return (a, takeStream (n  1) xs)
takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile p s =
  Cons $
  do (a, xs) <- runStream s
     if p a
       then return (a, takeStreamWhile p xs)
       else neverProcess
takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM p s =
  Cons $
  do (a, xs) <- runStream s
     f <- p a
     if f
       then return (a, takeStreamWhileM p xs)
       else neverProcess
dropStream :: MonadDES m => Int -> Stream m a -> Stream m a
dropStream n s
  | n <= 0    = s
  | otherwise =
    Cons $
    do (a, xs) <- runStream s
       runStream $ dropStream (n  1) xs
dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile p s =
  Cons $
  do (a, xs) <- runStream s
     if p a
       then runStream $ dropStreamWhile p xs
       else return (a, xs)
dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM p s =
  Cons $
  do (a, xs) <- runStream s
     f <- p a
     if f
       then runStream $ dropStreamWhileM p xs
       else return (a, xs)
cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
cloneStream n s =
  do qs  <- forM [1..n] $ \i -> newFCFSQueue
     rs  <- newFCFSResource 1
     ref <- newRef s
     let reader m q =
           do a <- liftEvent $ tryDequeue q
              case a of
                Just a  -> return a
                Nothing ->
                  usingResource rs $
                  do a <- liftEvent $ tryDequeue q
                     case a of
                       Just a  -> return a
                       Nothing ->
                         do s <- liftEvent $ readRef ref
                            (a, xs) <- runStream s
                            liftEvent $ writeRef ref xs
                            forM_ (zip [1..] qs) $ \(i, q) ->
                              unless (i == m) $
                              liftEvent $ enqueue q a
                            return a
     forM (zip [1..] qs) $ \(i, q) ->
       return $ repeatProcess $ reader i q
firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
firstArrivalStream n s = assembleAccumStream f (1, Nothing) s
  where f (i, a0) a =
          let a0' = Just $ fromMaybe a a0
          in if i `mod` n == 0
             then return ((1, Nothing), a0')
             else return ((i + 1, a0'), Nothing)
lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
lastArrivalStream n s = assembleAccumStream f 1 s
  where f i a =
          if i `mod` n == 0
          then return (1, Just a)
          else return (i + 1, Nothing)
assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b
assembleAccumStream f acc s =
  mapStream fromJust $
  filterStream isJust $
  accumStream f acc s
traceStream :: MonadDES m
               => Maybe String
               
               -> Maybe String
               
               -> Stream m a
               
               -> Stream m a
traceStream request response s = Cons $ loop s where
  loop s = do (a, xs) <-
                case request of
                  Nothing -> runStream s
                  Just message ->
                    traceProcess message $
                    runStream s
              case response of
                Nothing -> return (a, Cons $ loop xs)
                Just message ->
                  traceProcess message $
                  return (a, Cons $ loop xs)