{-# LANGUAGE TupleSections #-}
-- |Functions for fusing 'Topic's based on TimeStamp fields of the
-- underlying messages. This module shadows some of the functionality
-- of the "Ros.TopicUtil" module. The difference is that the functions
-- exported by this module use time stamps to correlate two
-- 'Topic's. 
-- 
-- The correlation uses a bracketing pair of values from one 'Topic'
-- to pick a correspondance for each value from the other
-- 'Topic'. This bracketing approach induces some latency. The most
-- common use case is calling the 'bothNew' function with a 'Topic'
-- that produces very quickly (faster than the minimum required update
-- rate), and another 'Topic' that imposes a rate limit.
module Ros.Topic.Stamped (everyNew, interpolate, batch) where
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import System.Timeout
import qualified Ros.Topic as T
import Ros.Topic (Topic(..), metamorphM, yieldM)
import qualified Ros.Topic.Util as T
import Ros.Internal.Msg.HeaderSupport
import Ros.Internal.RosTime

-- |Given two consecutive values, pick the one with the closest time
-- stamp to another value.
pickNearest :: (HasHeader a, HasHeader b) => a -> a -> b -> a
pickNearest x1 x2 y
  | ty <= t1 = x1
  | t2 <= ty = x2
  | d1 < d2 = x1
  | otherwise = x2
  where t1 = getStamp x1
        t2 = getStamp x2
        ty = getStamp y
        d1 = diffROSTime ty t1
        d2 = diffROSTime t2 ty

-- |@findBrackets t1 t2@ Pairs each element of @t2@ with the pair of
-- consecutive elements from @t1@ that brackets it in time, and the 
-- time interval in seconds covered by that bracket.
findBrackets :: (HasHeader a, HasHeader b) =>
                Topic IO a -> Topic IO b -> Topic IO ((a,a,Double), b)
findBrackets t1 t2 = T.concats . metamorphM (go t2) $ T.consecutive t1
  where go t (x,y) = let start = getStamp x
                         stop = getStamp y
                         bracket = (x,y, diffSeconds stop start)
                     in do (items, rest) <- T.break ((< stop) . getStamp) $
                                            T.dropWhile ((< start) . getStamp) t
                           let items' = map (bracket,) items
                           yieldM items' (go rest)

-- |Remove an element from a 'Topic' if the next element from that
-- 'Topic' is composed of elements bearing the exact same sequence
-- IDs in their headers (as obtained by 'getSequence').
removeDups :: (Functor m, Monad m, HasHeader a, HasHeader b) =>
              Topic m (a,b) -> Topic m (a,b)
removeDups = T.catMaybes . fmap check . T.consecutive
  where check ((x1,y1), x@(x2,y2))
          | getSequence x1 == getSequence x2 && 
            getSequence y1 == getSequence y2 = Nothing
          | otherwise = Just x

-- |Returns a 'Topic' that produces a new pair for every value
-- produced by either of the component 'Topic's. The value of the
-- other element of the pair will be the element from the other
-- 'Topic' with the nearest time stamp. The resulting 'Topic' will
-- produce a new value at the rate of the faster component 'Topic'.
everyNew :: (HasHeader a, HasHeader b) => 
            Topic IO a -> Topic IO b -> IO (Topic IO (a,b))
everyNew t1 t2 = 
  do (t1a, t1b) <- T.tee t1
     (t2a, t2b) <- T.tee t2
     let bracketLeft = pickLeft `fmap` findBrackets t1a t2a
         bracketRight = pickRight `fmap` findBrackets t2b t1b
     return . removeDups $ bracketLeft `T.merge` bracketRight
  where pickLeft ((x1,x2,_), y) = (pickNearest x1 x2 y, y)
        pickRight ((y1,y2,_), x) = (x, pickNearest y1 y2 x)

-- |The application @interpolate f t1 t2@ produces a new 'Topic' that
-- pairs every element of @t2@ with an interpolation of two temporally
-- bracketing values from @t1@. The interpolation is effected with the
-- supplied function, @f@, that is given the two values to interpolate
-- and the linear ratio to find between them. This ratio is determined
-- by the time stamp of the intervening element of @t2@.
interpolate :: (HasHeader a, HasHeader b) => 
               (a -> a -> Double -> a) -> Topic IO a -> Topic IO b -> 
               Topic IO (a,b)
interpolate f t1 t2 = interp `fmap` findBrackets t1 t2
  where interp ((x1,x2,dt),y) = let tx1 = getStamp x1
                                    ty = getStamp y
                                in (f x1 x2 (diffSeconds ty tx1 / dt), y)

-- |Batch 'Topic' values that arrive within the given time window
-- (expressed in seconds). When a value arrives, the window opens and
-- all values received within that window are returned in a list, then
-- the next value is awaited before opening the window again. Intended
-- usage is to gather approximately simultaneous events into
-- batches. Note that the times used to batch messages are arrival
-- times rather than time stamps. This is what lets us close the
-- window, rather than having to admit any message that ever arrives
-- with a compatible time stamp.
batch :: Double -> Topic IO a -> Topic IO [a]
batch timeWindow t0 = 
  Topic $ do (x,t') <- runTopic t0
             start <- getCurrentTime
             let go acc t = do now <- getCurrentTime
                               let dt = fromRational . toRational $
                                        diffUTCTime now start
                                   dMs = floor $ (timeWindow - dt) * 1000000
                               if dMs == 0
                                 then return (reverse acc, k t)
                                 else do r <- timeout dMs $ runTopic t
                                         case r of
                                           Just (x',t'') -> go (x':acc) t''
                                           Nothing -> return (reverse acc, k t)
             go [x] t'
    where k = batch timeWindow