{-# LANGUAGE Trustworthy, DeriveDataTypeable, DeriveFunctor, ScopedTypeVariables, RecursiveDo #-}

module FRP.Reactivity.AlternateEvent (Event(..), eFromML, nonDispatchedIO, runEvent,
-- ** A minimal set of combinators
EventStream(..), filterMaybes, delay, delays,
-- ** Optimizer rules
fmap', scan') where

import qualified Data.Map as M
import Data.Unique
import Data.IORef
import Data.Typeable
import Data.Time.Clock.POSIX
import Data.Maybe
import Data.Monoid
import Data.Traversable
import Control.Monad hiding (mapM)
import Control.Monad.Reader hiding (mapM)
import Control.Applicative
import Control.Concurrent
import Control.Monad.Catch
import System.IO.Unsafe
import FRP.Reactivity.Measurement
import FRP.Reactivity.RPC
import Prelude hiding (mapM)

-- | Event streams are here presented using the publisher-subscriber model (push-based handling
--   in contrast to the pull-based handling of 'MeasurementWrapper'). Such an event
--   is represented by a subscription function and a callback. The subscription function finishes fast allowing
--   the caller to continue.
--
--   The motivation for introducing this data type is that, while the 'Measurement'/'MeasurementWrapper'
--   system is fast, its intensive use of memory cells that need to be garbage collected means
--   that it may not be fast enough for some purposes.
newtype Event t = Event { unEvent :: (t -> POSIXTime -> RPC ()) -> RPC (RPC ()) } deriving (Typeable, Functor)

-- avoid inlining until rules have fired
{-# INLINE[0] eFromML #-}
-- | Extracts an 'Event' from a list of measurements. No attempt has been made to sync up
-- the time order of different calls to 'eFromML'. For this reason, it is not true that
-- "eFromML ml `mplus` eFromML ml2" equals "eFromML (ml `merge` ml2)"; please do any necessary
-- syncing before calling 'eFromML'.
eFromML :: [Measurement t] -> Event t
eFromML ls = Event (\f -> liftM (_nonDispatchedIO . killThread) $ rpcFork (mapM_ (\meas -> _nonDispatchedIO (getValue meas) >>= uncurry f) ls))

instance Monad Event where
	return x = Event (\f -> f x 0 >> return (return ()))
	Event f >>= g = Event (\h -> do
		-- Lots of machinery to track all subscriptions...
		ref <- _nonDispatchedIO (newIORef (return ()))
		liftM (\m -> m >> join (_nonDispatchedIO (readIORef ref))) (f (get2 ref h))) where
		get h t x' t' = h x' (max t t')
		get2 ref h x t = unEvent (g x) (get h t) >>= \m -> _nonDispatchedIO (atomicModifyIORef ref (\m2 -> (m2 >> m, ())))
	fail _ = mzero

-- All the properties (associativity, unitality etc.) fall out of the monad axioms.
instance MonadPlus Event where
	mzero = Event (\_ -> return (return ())) -- Return immediately
	mplus e e2 = Event (\f -> liftM2 (>>) (unEvent e f) (unEvent e2 f))

instance Monoid (Event t) where
	mempty = mzero
	mappend = mplus

instance MonadIO Event where
	liftIO m = Event (\f -> liftIO m >>= \x ->
		_nonDispatchedIO (measure (return ())) >>= \meas ->
		f x (time meas) >> return (return ()))

-- I/O can be useful...
nonDispatchedIO m = Event (\f -> _nonDispatchedIO (measure m >>= getValue) >>= uncurry f >> return (return ()))

-- | Run an event -- see .Basic module for a way to run with a Windows event loop.
runEvent :: Event t -> (t -> POSIXTime -> IO ()) -> IO ()
runEvent e f = void $ do
	mv <- newEmptyMVar
	forkIO $ void $ runReaderT (unRPC $ unEvent e (\x t -> liftIO (f x t))) (False, mv)
	rpcServer mv

instance Applicative Event where
	pure = return
	(<*>) = ap

instance Alternative Event where
	empty = mzero
	(<|>) = mplus

class (Functor e, MonadPlus e) => EventStream e where
	-- | Prooty self-explanatory.
	eventFromList :: [(t, POSIXTime)] -> e t
	-- | The "scan" primitive is analogous to "scanl" for lists.
	scan :: (t -> u -> (t, v)) -> t -> e u -> e v
	-- | A primitive like "switch" is the main way of implementing behaviors that can be switched
	-- in and out as required.
	switch :: e (e t) -> e t
	-- | The main use of "withRemainder" is to implement manual aging of inputs. It helps prevent space
	-- and time leaks.
	withRemainder :: e t -> e (t, e t)
	-- | Construct a channel in order to receive external events.
	channel :: IO (t -> IO (), e t)
	-- | Gets the current time along with every occurrence.
	adjoinTime :: e t -> e (t, POSIXTime)

retryLoop ref = -- It's a comin'
	_nonDispatchedIO (readIORef ref) >>= either (\_ -> _nonDispatchedIO yield >> retryLoop ref) id

eventSwitch e = Event (\f -> do
	-- Just keep track of the most recent subscription...
	ref <- _nonDispatchedIO (newIORef (Right (return ())))
	liftM (\m -> m >> retryLoop ref) (unEvent e (handler f ref))) where
	handler f ref e' t = do
		ei <- _nonDispatchedIO (atomicModifyIORef ref (\m -> (Left (), m)))
		either
			(\_ -> _nonDispatchedIO yield >> handler f ref e' t)
			(\m -> do
			-- Unsubscribe from it...
			m
			-- ... and switch in the new one
			m' <- rpcFinally
				(unEvent e' f)
				(_nonDispatchedIO (writeIORef ref (Right (return ()))))
			_nonDispatchedIO (writeIORef ref (Right m')))
			ei

instance EventStream Event where
	eventFromList = eFromML . fromList

	scan f x e = Event (\g -> do
		ref <- _nonDispatchedIO (newIORef x)
		unEvent e (\y t -> _nonDispatchedIO (atomicModifyIORef ref (\x -> let (x', z) = f x y in (x', z))) >>= \z -> g z t))

	switch = eventSwitch

	withRemainder e = Event (\f -> unEvent e (\x t -> f (x, e) t))

	channel = do
		ref <- newIORef M.empty
		return (add ref, e ref)
		where
		-- Machinery to keep track of subscriptions. This will maintain a set of callbacks
		-- that want to receive messages. When a subscriber unsubscribes, it will be
		-- removed from the set.
		add ref x = measure (return ()) >>= \meas -> readIORef ref >>= \mp -> void $ mapM (\f -> f x (time meas)) mp
		unsub ref un = _nonDispatchedIO $ atomicModifyIORef ref (\mp -> (M.delete un mp, ()))
		e ref = Event (\f -> _nonDispatchedIO newUnique >>= \un ->
			RPC ask >>= \(_, mv) ->
			_nonDispatchedIO (atomicModifyIORef ref (\mp -> (M.insert un (\x t -> runReaderT (unRPC (f x t)) (False, mv)) mp, ())))
			>> return (unsub ref un))

	adjoinTime e = Event (\f -> unEvent e (\x t -> f (x, t) t))

filterMaybes :: (MonadPlus m) => m (Maybe t) -> m t
filterMaybes e = e >>= \x -> guard (isJust x) >> return (fromJust x)

delay t e = filterMaybes $ adjoinTime (return Nothing `mplus` fmap Just e) >>= \(x, t') -> eventFromList [(x,t+t')]

delays :: (EventStream e, MonadIO e) => e (t, POSIXTime) -> e t
delays e = e >>= \(x, t) -> eventFromList [(x, t)]

----------------------------------------
-- Optimization

{-# INLINE fmap' #-}
fmap' :: (Measurement t -> Measurement u) -> [Measurement t] -> Event u
fmap' f e = scan' (\_ x -> let y = f x in (y, y)) (fmap (const undefined) (head e)) e

{-# INLINE scan' #-}
scan' f x e = delays $ fmap (\x -> (copoint x, time x)) $ scan (\y z -> f y (unsafePerformIO $ assertMeasurement (return z)))
	x
	(adjoinTime (eFromML e))

{-# RULES
"eFromML/map" forall f e. eFromML (map f e) = fmap' f e
"eFromML/scanl" forall f x e. eFromML (scanl f x e) = scan' (\x y -> let z = f x y in (z, z)) x e
"eFromML/mzero" eFromML [] = mzero
  #-}