{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecursiveDo #-}

{-# OPTIONS_GHC -Wall #-}
-- | This is an internal module.
module FRP.Ordrea.Base
  ( SignalGen
  , Signal, Event, Discrete

  , ExternalEvent
  , newExternalEvent, triggerExternalEvent, listenToExternalEvent

  , generatorE, filterE, stepClockE, dropStepE, eventFromList
  , scanE, mapAccumE, mapAccumEM
  , accumE, scanAccumE, scanAccumEM
  , mapMaybeE, justE, flattenE, expandE, externalE
  , takeWhileE, delayE

  , joinDD, joinDE, joinDS

  , start, externalS, joinS, delayS, signalFromList, networkToList
  , networkToListGC

  , accumD, changesD, preservesD, delayD

  , eventToSignal, signalToEvent, applySE

  , discreteToSignal

  , TimeFunction(..), (<@>), (<@)

  , OrderingViolation (..)

  -- * internal stuff
  , newRef, readRef, writeRef, modifyRef
  , weakToLike
  ) where

import Control.Applicative
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.Trans.Reader
import Data.IORef
import Data.List
import qualified Data.Map as M
import Data.Maybe
import Data.Monoid
import Data.Ord (comparing)
import Data.Typeable
import qualified Data.Vector.Unboxed as U
import Data.Word
import System.IO.Unsafe
import System.Mem (performGC)
import System.Mem.Weak

import FRP.Ordrea.Weak

-- Phases
--
-- Execution of an ordrea program can be broken down into the
-- following phases:
--
-- * Construction (SignalGen monad)
--   The construction phase is the first step to construct a new
--   (sub)network. In this phase, a fresh location is assigned to each dynamic
--   node to be constructed. 'delay' nodes are created in this phase.
--   This is the only phase the user describes directly.
-- * Initialization (Initialize monad)
--   This phase completes construction of a new (sub)network. Non-delay nodes
--   are created and all nodes get connected together. Initialization happens
--   when a SignalGen run for a (sub)network is completed, or when 'snapshot'
--   is called in a SignalGen run.
-- * Execution step (Run monad)
--   This phase updates internal states of the network, moving the
--   computation one step forward.
-- * Clean-up step (Cleanup monad)
--   This phase comes after each execution step. The 'current state' of
--   an event node is reset to [] in this phase.
--
-- For the toplevel network the construction phase comes before the first
-- execution step. On the other hand, a subnetwork is constructed during
-- an execution step and is immediately updated as part of the current
-- execution step.
--
-- (Rationale)
-- + Why is a separate initialization phase needed?
-- - Because functions in the SignalGen monad cannot examine given signals.
--   Doing so would result in a NonTermination exception in case of a circular
--   circuit.

newtype SignalGen a = SignalGen (ReaderT GEnv IO a)
  deriving (Monad, Functor, Applicative, MonadIO, MonadFix)
type Initialize = ReaderT IEnv IO
type Run = ReaderT REnv IO
type Cleanup = IO

-- Signal, Event and Discrete are represented as a pair of a priority (see Note
-- [Priority]) and an initialization action that returns the `core' of the
-- node. The initialization action is idempotent.

data Signal a   = Sig !Priority !(Initialize (Pull a))
  --- ^ The pull contains the current value.
data Event a    = Evt !Priority !(Initialize (Pull [a], Push))
  --- ^ The pull contains the list of the current occurrences.
  -- The Push is active iff the list is non-empty.
data Discrete a = Dis !Priority !(Initialize (Pull a, Push))
  --- ^ The pull contains the current value.
  -- The Push is active iff the value might have changed.

type Consumer a = a -> IO ()

----------------------------------------------------------------------
-- locations and priorities

-- Note [Priority]
-- ~~~~~~~~~~~~~~~
--
-- Each node in a network has a "Priority". A Priority tells when in an
-- execution step the node will be updated. A smaller Priority means the node
-- gets updated earlier. To be precise:
--
-- An execution step is divided into substeps, one for each priority in the
-- network. An execution step begins with a substep for the minimum priority
-- (bottomPrio bottomLocation) and ends with a substep for the maximum priority
-- in the network. The following rules apply.
--
-- * A 'Pull' for a node with priority p will be ready after the substep for p
--   is complete or after the accompanying Push is triggered. The user
--   should not try to use the value before that.
-- * A 'Notifier' for a node with priority p, if it's active, is triggered
--   before the substep for p is complete. If it hasn't been triggered after
--   the substep, the user can be sure that it's inactive in the current step.

-- Location of a dynamic node. Each dynamic node gets a Location when it is
-- created. A Location is not necessarily unique, i.e. two dynamic nodes may
-- have the same Location.
type Location = U.Vector Word

-- Priority of updates.
data Priority = Priority
  { priLoc :: {-- UNPACK #-} !Location
  , priNum :: {-# UNPACK #-} !Int
  }
  deriving (Eq, Ord) -- The default lexicographical ordering is appropriate

-- just for debugging
instance Show Priority where
  show Priority{priLoc = loc, priNum = num} =
    show (U.toList loc) ++ "/" ++ show num

data OrderingViolation = OrderingViolation String
  deriving (Show, Typeable)

instance Exception OrderingViolation

-- | The next smallest priority after the given one.
nextPrio :: Priority -> Priority
nextPrio prio@Priority{priNum=n} = prio{ priNum = n + 1 }

-- | A special location which is never assigned to a dynamic node. Nodes that
-- don't depend on any dynamic node use this location.
bottomLocation :: Location
bottomLocation = U.empty

-- | The minimum priority under the given location.
bottomPrio :: Location -> Priority
bottomPrio loc = Priority
  { priLoc = loc
  , priNum = 0
  }

newLocationGen :: Location -> IO (IO Location)
newLocationGen parentLoc = do
  counter <- newRef 0
  return $ do
    num <- readRef counter
    writeRef counter $! num + 1
    return $! parentLoc `U.snoc` num

-- Check the ordering condition.
shouldBeGreaterThan :: Priority -> Priority -> Initialize ()
shouldBeGreaterThan x y = do
  debug $ "shouldBeGreaterThan: " ++ msg
  unless (x > y) $ liftIO $ throwIO $ OrderingViolation msg
  where
    msg = show (x, y)

----------------------------------------------------------------------
-- weak pointers

-- | A Type-erasing wrapper for IORef. Its sole purpose is to serve as a key
-- of a weak pointer.
data WeakKey = forall a. WeakKey {-# UNPACK #-} !(IORef a)

-- | Create a weak pointer using an IORef wrapped inside WeakKey.
mkWeakWithKey :: WeakKey -> v -> IO (Weak v)
mkWeakWithKey (WeakKey ref) v = mkWeakWithIORef ref v Nothing

-- | Anything that can behave like a weak pointer.
newtype WeakLike a = WeakLike (IO (Maybe a))
  deriving (Functor)

weakToLike :: Weak a -> WeakLike a
weakToLike = WeakLike . deRefWeak

deRefWeakLike :: WeakLike a -> IO (Maybe a)
deRefWeakLike (WeakLike a) = a

----------------------------------------------------------------------
-- SignalGen monad

data GEnv = GEnv
  { envRegisterInit :: Consumer (Initialize ())
  , envGRegisterPrep :: Consumer (Run ())
  , envGenLocation :: IO Location
  , envGCurrentStep :: Maybe REnv
  }

-- Note [Global preparation accumulator]
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--
-- You can add actions to the global preparation accumulator, and they will be
-- executed at the beginning of the next step. Added actions will be
-- executed only once. It is global in the sense that subetworks see the same
-- accumulator as its parent does.
--
-- Actions can be added anytime anywhere, even in different threads.

-- | Run SignalGen in IO.
runSignalGen
  :: Consumer (Run ()) -- ^ see Note [Global preparation accumulator]
  -> Location -- ^ the location of the parent node of this subnetwork
  -> Push -- ^ the global clock notifier
  -> Maybe REnv -- ^ the Run env if we are in an execution step
  -> SignalGen a -- ^ an action to create a (sub)network
  -> IO a
runSignalGen regPrep parentLoc clock curStep (SignalGen gen) = do
  (registerI, runAccumI) <- newActionAccum
  locGen <- newLocationGen parentLoc
  let
    genv = GEnv
      { envRegisterInit = registerI
      , envGRegisterPrep = regPrep
      , envGenLocation = locGen
      , envGCurrentStep = curStep
      }
  result <- runReaderT gen genv
  runInit parentLoc clock curStep regPrep runAccumI
  return result

-- | Run SignalGen in the Run monad, as part of an execution step.
runSignalGenInStep :: SignalGen (Location -> Push -> SignalGen a -> Run a)
runSignalGenInStep = do
  regPrep <- getPreparationAdder
  return $ \parentLoc clock sgen -> debugFrame "SGenInStep" $ do
    renv <- ask
    liftIO $ runSignalGen regPrep parentLoc clock (Just renv) sgen

-- | Run a SignalGen action that constructs an independent network.
-- Returns a pair of the result and an "preparation" action to be performed
-- before each step.
runSignalGenToplevel :: SignalGen (Initialize a) -> IO (a, Run ())
runSignalGenToplevel gen = do
  (clock, clockTrigger) <- newPush
  prepVar <- newMVar (prepClock clockTrigger)
  val <- debugFrame "toplevel" $ do
    ref <- newRef undefined
    runSignalGen (addToPrep prepVar) bottomLocation clock Nothing $ do
      i <- gen
      registerInit $ writeRef ref =<< i
    readRef ref
  return (val, join $ liftIO $ swapMVar prepVar (prepClock clockTrigger))
  where
    addToPrep prepVar x = modifyMVar_ prepVar (\r -> return (x >> r))
    prepClock clockTrigger = registerUpd (bottomPrio bottomLocation) clockTrigger

-- | Generate a new location.
genLocation :: SignalGen Location
genLocation = SignalGen $ do
  gen <- asks envGenLocation
  lift gen

-- | Register an initialization action to be performed after this SignalGen
-- is run.
registerInit :: Initialize () -> SignalGen ()
registerInit ini = SignalGen $ do
  reg <- asks envRegisterInit
  frm <- debugGetFrame
  lift $ reg $ debugPutFrame "init" frm ini

-- | Get access to the global preparation accumulator. See Note [Global
-- preparation accumulator].
getPreparationAdder :: SignalGen (Run () -> IO ())
getPreparationAdder = SignalGen $ asks envGRegisterPrep

----------------------------------------------------------------------
-- Initialize monad

data IEnv = IEnv
  { envClock :: Push
  , envParentLocation :: Location
  , envIRegisterPrep :: Consumer (Run ())
  , envICurrentStep :: Maybe REnv
  }

-- | Get the global clock.
getClock :: Initialize Push
getClock = asks envClock

_getParentLocation :: Initialize Location
_getParentLocation = asks envParentLocation

-- | Run Initialize
runInit :: Location -> Push -> Maybe REnv -> Consumer (Run ()) -> Initialize a -> IO a
runInit parentLoc clock curStep regPrep i = do
  let
    ienv = IEnv
      { envClock = clock
      , envIRegisterPrep = regPrep
      , envParentLocation = parentLoc
      , envICurrentStep = curStep
      }
  runReaderT i ienv

-- | Creates a function that runs an @Initialize@ action inside @Run@,
-- using @loc@ as the parent location.
makeSubinitializer :: Location -> Initialize (Initialize a -> Run a)
makeSubinitializer loc = do
  clock <- getClock
  regPrep <- asks envIRegisterPrep
  return $ \sub -> do
    renv <- ask
    liftIO $ runInit loc clock (Just renv) regPrep sub

runInCurrentStep
  :: Initialize a -- ^ action to perform if we're not inside a step
  -> Run a -- ^ action to perform if we are inside a step
  -> Initialize a
runInCurrentStep no yes = do
  curStep <- asks envICurrentStep
  case curStep of
    Nothing -> no
    Just renv -> liftIO $ runReaderT yes renv

runInStep :: Run () -> Initialize ()
runInStep action = runInCurrentStep (registerNextStep action) action

registerNextStep :: Run () -> Initialize ()
registerNextStep x = do
  addPrep <- asks envIRegisterPrep
  liftIO $ addPrep x

getPreparationAdderI :: Initialize (Run () -> IO ())
getPreparationAdderI = asks envIRegisterPrep

----------------------------------------------------------------------
-- Run monad

data REnv = REnv
  { envRegisterFini :: Consumer (Cleanup ())
  , envPendingUpdates :: IORef (M.Map Priority (Run ())) -- TODO: use heap?
  }

-- | Run Run.
runRun :: Run a -> IO a
runRun run = debugFrame "runRun" $ do
  (registerF, runAccumF) <- liftIO newActionAccum
  pqueueRef <- newRef M.empty
  let
    renv = REnv
      { envRegisterFini = registerF
      , envPendingUpdates = pqueueRef
      }
  result <- runReaderT (run <* runUpdates) renv
  debugFrame "fini" runAccumF
  return result

runUpdates :: Run ()
runUpdates = debugFrame "runUpdates" $ asks envPendingUpdates >>= loop
  where
    loop pqueueRef = do
      pending <- readRef pqueueRef
      case M.minViewWithKey pending of
        Nothing -> return ()
        Just ((prio, upd), next) -> do
          debug $ "running substep for prio " ++ show prio
          writeRef pqueueRef next
          upd :: Run ()
          loop pqueueRef

-- | Register an action to be executed during the next clean-up step.
registerFini :: IO () -> Run ()
registerFini fini = do
  reg <- asks envRegisterFini
  frm <- debugGetFrame
  lift $ reg $ debugPutFrame "fini" frm fini

-- | Register an action to be executed in the substep for the specified
-- priority. See Note [Priority].
registerUpd :: Priority -> Run () -> Run ()
registerUpd prio upd = do
  pqueueRef <- asks envPendingUpdates
  modifyRef pqueueRef $ M.insertWith' (>>) prio upd

----------------------------------------------------------------------
-- push

-- | Push is a time-dependent boolean in a network. It is either True (active)
-- or False (inactive) in a given step. If you register a listener function
-- beforehand, it will be invoked once if the Push is active in the step.
-- If your function is not invoked during the time window it would be (see
-- Note [Priority]), you can know that the Push is inactive in this step.
-- Thus Push is capable of communicating 'False' in O(0) time, which is
-- the key to the asymptotic efficiency of the library.
--
-- The IORef contains whether this push has been triggered this generation.
data Push = Push !(NotifierG Run) {-# UNPACk #-} !(IORef Bool)

newPush :: IO (Push, Run ())
newPush = do
  (notifier, triggerPush) <- newNotifier
  activeRef <- newRef False
  let
    trigger = do
      writeRef activeRef True
      triggerPush
      registerFini $ writeRef activeRef False
  return (Push notifier activeRef, trigger)

-- | Register a callback to be called when the push is triggered.
-- It will be unregistered when the given WeakKey is invalidated.
listenToPush :: (MonadIO m) => WeakKey -> Push -> Run () -> m ()
listenToPush key (Push register _) handler = do
  frm <- debugGetFrame
  weak <- liftIO $ mkWeakWithKey key (debugPutFrame "notifier" frm handler)
  liftIO $ register (weakToLike weak)

-- | Register a one-time callback to be called when the notifier
-- is triggered. It will be unregistered after one invocation.
listenToPushOnce :: (MonadIO m) => Push -> Run () -> m ()
listenToPushOnce (Push register _) handler = do
  ref <- liftIO $ newIORef (0 :: Int)
  let h' = liftIO (modifyIORef ref (+1)) >> handler
  liftIO $ register $ WeakLike $ do
    n <- liftIO $ readIORef ref
    return $ if n > 0
      then Nothing
      else Just h'

-- | A push that is always inactive.
emptyPush :: Push
emptyPush = Push emptyNotifier emptyPushRef

emptyPushRef :: IORef Bool
emptyPushRef = unsafePerformIO $ newRef False
  -- noone should write to this
{-# NOINLINE emptyPushRef #-}

pushHasBeenTriggered :: Push -> Run Bool
pushHasBeenTriggered (Push _ ref) = readRef ref

-- | @NotifierG m@ lets you know when a particular type of event happens,
-- if you registere a callback, which is an action in the Monad @m@.
type NotifierG m = WeakLike (m ()) -> IO ()

-- | Create a new notifier. It returns a pair of the notifier and
-- a function to trigger it.
newNotifier :: (Functor m, MonadIO m) => IO (NotifierG m, m ())
newNotifier = do
  listenersRef <- newRef []
  return (register listenersRef, invoke listenersRef)
  where
    register ref listenerWeak = modifyRef ref (listenerWeak:)

    invoke ref = do
      weaks <- readRef ref
      (weaks', listeners) <- unzip . catMaybes <$> mapM run1 weaks
      sequence_ $ reverse listeners
      writeRef ref weaks'
      where
        run1 weak = liftIO $ fmap ((,) weak) <$> deRefWeakLike weak

emptyNotifier :: NotifierG m
emptyNotifier _ = return ()

----------------------------------------------------------------------
-- pull

-- | A Pull reads the current value of a node. It must be idempotent
-- within a step. That is, calling it twice in a single step should
-- result in the same value, without much repeated overhead. A Pull
-- should not be called when it's not ready; See Note [Priority] for
-- details.
type Pull a = Run a

pullFromCache
  :: IORef (Maybe a)
  -> Run a
  -> Run ()
  -> Pull a
pullFromCache ref pull onWrite = do
  cache <- readRef ref
  case cache of
    Nothing -> do
      val <- pull
      writeRef ref (Just val)
      onWrite
      return val
    Just val -> return val

-- Caching and memoization
--
-- In this module, the terms 'caching' and 'memoization' refer to two different
-- things:
--
-- * Caching is a state manipulation to make sure that a node has only one
--   copy of its internal state, even if it's referenced from multiple places.
-- * Memoization is a state manipulation to avoid calculatig the same value
--   twice, even if it's repeatedly queried. For example,
--   in the expression (f <$> d) where d is a Discrete, it's important not to
--   call f multiple times when the value of d hasn't changed.
--
-- In general, omitting caching is safe if the node is referenced from only
-- one place, but ommitting memoization is only safe if it has just one consumer
-- AND that consumer only asks the current value when the value has been updated
-- since the last read.

-- | Memoize a @Pull@. The underlyng @Pull@ will be called at most once per step.
primStepMemo :: Pull a -> Initialize (Pull a)
primStepMemo pull = do
  memoRef <- newRef Nothing
  return $ pullFromCache memoRef pull $ registerFini $ writeRef memoRef Nothing

----------------------------------------------------------------------
-- common push-pull operations

unsafeProtectFromDup :: (a -> Initialize a) -> Initialize a -> Initialize a
unsafeProtectFromDup protect base = unsafeCache (base >>= protect)

-- | @unsafeCache a@ is an idempotent initialization action made from @a@.
-- When it's run for the first time, @a@ is executed. For subsequent executions
-- the return value from the first call will be returned without causing
-- any effects.
--
-- Note that this function is not referentially transparent.
unsafeCache :: Initialize a -> Initialize a
unsafeCache action = cacheWith cacheRef action
  where
    cacheRef = unsafeDupablePerformIO $ newIORef (const' Nothing action)
    {-# NOINLINE cacheRef #-}
{-# NOINLINE unsafeCache #-}

-- | Non-inlinable version of const, only useful to prevent optimization.
const' :: a -> b -> a
const' x _ = x
{-# NOINLINE const' #-}

-- | Return an idempotent version of the given initialization action using
-- the given IORef as a cache.
cacheWith :: IORef (Maybe a) -> Initialize a -> Initialize a
cacheWith cacheRef action = do
  cache <- readRef cacheRef
  case cache of
    Just val -> return val
    Nothing -> do
      val <- action
      writeRef cacheRef (Just val)
      return val

transparentMemoD
  :: Initialize (Pull a, Push)
  -> Initialize (Pull a, Push)
transparentMemoD orig = unsafeProtectFromDup primDiscreteMemo orig

transparentMemoE
  :: Initialize (Pull [a], Push)
  -> Initialize (Pull [a], Push)
transparentMemoE orig = unsafeProtectFromDup primEventMemo orig

transparentMemoS :: Initialize (Pull a) -> Initialize (Pull a)
transparentMemoS orig = unsafeProtectFromDup primStepMemo orig

primDiscreteMemo :: (Pull a, Push) -> Initialize (Pull a, Push)
primDiscreteMemo (pull, notifier) = do
  ref <- newRef Nothing
  listenToPush (WeakKey ref) notifier $
    writeRef ref . Just =<< pull
  return (pullFromCache ref pull (return ()), notifier)

primEventMemo :: (Pull [a], Push) -> Initialize (Pull [a], Push)
primEventMemo (pull, notifier) = do
  pull' <- primStepMemo pull
  return (pull', notifier)

listenToPullPush
  :: WeakKey
  -> Pull a
  -> Push
  -> Priority
  -> (a -> Run ())
  -> Initialize ()
listenToPullPush key pull notifier prio handler = do
  addPrep <- getPreparationAdderI
  runInStep $ registerUpd prio $ do
    handler =<< pull
    liftIO $ addPrep $
      listenToPush key notifier $ handler =<< pull

-- | Create a new node in the OrderGen monad, using the given initialization
-- action. The initialization action is probably time dependent, otherwise
-- you can make it a pure function rather than a SignalGen function.
-- The given initialization action is guaranteed to run exactly once,
-- before this construction step ends.
newNode
  :: Initialize a
  -> SignalGen (Initialize a)
newNode action = do
  ref <- newRef Nothing
  let act' = cacheWith ref action
  registerInit $ act' >> return ()
  return act'

----------------------------------------------------------------------
-- external events

-- | Push-based asynchronous events.
newtype ExternalEvent a = ExternalEvent (MVar (NotifierG IO, IO (), IORef a))

eeVoid :: a
eeVoid = error "bug: ExternalEvent: void"

newExternalEvent :: IO (ExternalEvent a)
newExternalEvent = do
  (add, invoke) <- newNotifier
  ref <- newRef eeVoid
  ExternalEvent <$> newMVar (add, invoke, ref)

listenToExternalEvent :: ExternalEvent a -> WeakLike (a -> IO ()) -> IO ()
listenToExternalEvent (ExternalEvent var) handlerW =
  withMVar var $ \(add, _, ref) -> add $ invoke ref <$> handlerW
  where
    invoke ref handler = do
      val <- readRef ref
      handler val

triggerExternalEvent :: ExternalEvent a -> a -> IO ()
triggerExternalEvent (ExternalEvent var) val = withMVar var $ \(_, invoke, ref) -> do
  writeRef ref val
  invoke
  writeRef ref eeVoid

----------------------------------------------------------------------
-- events

instance Functor Event where
  fmap f = transformEvent1 (map f)

instance Monoid (Event a) where
  mempty = emptyEvent
  mappend x y = mergeEvents [x, y]
  mconcat = mergeEvents

listenToEvent
  :: WeakKey
  -> Event a
  -> Priority
  -> ([a] -> Run ())
  -> Initialize ()
listenToEvent key (Evt evtprio evt) prio handler = debugFrame "listenToEvent" $ do
  prio `shouldBeGreaterThan` evtprio
  (evtPull, evtNot) <- evt
  listenToPullPush key evtPull evtNot prio $ \occs ->
    when (not $ null occs) $ handler occs

newEventSG :: Priority -> SignalGen (Event a, [a] -> Run (), WeakKey)
newEventSG prio = do
  ref <- newRef []
  (push, trigger) <- liftIO newPush
  let evt = Evt prio $ return (eventPull ref, push)
  return (evt, eventTrigger ref trigger, WeakKey ref)

newEventInit :: Initialize ((Pull [a], Push), [a] -> Run (), WeakKey)
newEventInit = do
  ref <- newRef []
  (push, trigger) <- liftIO newPush
  return ((eventPull ref, push), eventTrigger ref trigger, WeakKey ref)

eventPull :: IORef [a] -> Pull [a]
eventPull buf = readRef buf

eventTrigger :: IORef [a] -> Run () -> [a] -> Run ()
eventTrigger buf notify occs = do
  writeRef buf occs
  registerFini $ do
    debug "clearing event ref"
    writeRef buf []
  notify

transformEvent :: ([a] -> [b]) -> Event a -> Event b
transformEvent f parent@(Evt evprio _) = Evt prio $ debugFrame "transformEvent" $ unsafeCache $ do
  (pullpush, trigger, key) <- newEventInit
  listenToEvent key parent prio $ \xs -> case f xs of
    [] -> do
      debug $ "transformEvent: prio=" ++ show prio ++ " -> []"
      return ()
    ys -> do
      debug $ "transformEvent: prio=" ++ show prio ++ " -> len:" ++ show (length ys)
      trigger ys
  return pullpush
  where
    prio = nextPrio evprio

transformEvent1 :: ([a] -> [b]{-non-empty-}) -> Event a -> Event b
transformEvent1 f (Evt evprio evt) = Evt prio $ debugFrame "transformEvent1" $ transparentMemoE $ do
  (pull, notifier) <- evt
  return (f <$> pull, notifier)
  where
    prio = nextPrio evprio

generatorE :: Event (SignalGen a) -> SignalGen (Event a)
generatorE evt = do
  here <- genLocation
  let prio = bottomPrio here
  runSG <- runSignalGenInStep
  fmap (Evt prio) $ newNode $ do
    (pullpush, trigger, key) <- newEventInit
    clock <- getClock
    listenToEvent key evt prio $ \gens ->
      trigger =<< mapM (runSG here clock) gens
    return pullpush

mergeEvents :: [Event a] -> Event a
mergeEvents [] = emptyEvent
mergeEvents evts = Evt prio $ unsafeCache $ do
  (pullpush, trigger, key) <- newEventInit
  occListRef <- newRef []
  let
    upd = do
      occList <- readRef occListRef
      debug $ "mergeEvents: upd: prio=" ++ show prio ++ "; total occs=" ++ show (length $ concatMap snd occList)
      when (not $ null occList) $ do
        writeRef occListRef []
        trigger $ concatMap snd $ sortBy (comparing fst) occList
  forM_ (zip [0::Int ..] evts) $ \(num, evt) ->
    listenToEvent key evt prio $ \occs -> do
      debug $ "mergeEvents: listen: noccs=" ++ show (length occs)
      modifyRef occListRef ((num, occs):)
      registerUpd prio upd
  return pullpush
  where
    prio = nextPrio $ maximum $ map evtPrio evts
    evtPrio (Evt p _) = p

emptyEvent :: Event a
emptyEvent = Evt (bottomPrio bottomLocation) $ return (return [], emptyPush)

filterE :: (a -> Bool) -> Event a -> Event a
filterE p = transformEvent (filter p)

stepClockE :: Event ()
stepClockE = Evt (bottomPrio bottomLocation) $ do
  clock <- getClock
  return (pure [()], clock)

dropStepE :: Event a -> SignalGen (Event a)
dropStepE ~(Evt evtprio evt) = Evt prio <$> do
  addPrep <- getPreparationAdder
  newNode $ do
    (result, trigger, key) <- newEventInit
    (getoccs, evtnotifier) <- evt
    runInStep $ liftIO $ addPrep $ listenToPush key evtnotifier $ do
      occs <- getoccs
      when (not $ null occs) $ trigger occs
    return result
  where
    prio = nextPrio evtprio

eventFromList :: [[a]] -> SignalGen (Event a)
eventFromList occs = signalToEvent <$> signalFromList (occs ++ repeat [])

scanE :: a -> Event (a -> a) -> SignalGen (Event a)
scanE initial evt@(~(Evt evtprio _)) = fmap (Evt prio) $ newNode $ do
  (pullpush, trigger, key) <- newEventInit
  ref <- newRef initial
  listenToEvent key evt prio $ \occs -> do
    debug $ "accumE: occs=" ++ show (length occs)
    oldVal <- readRef ref
    let _:vals = scanl (flip ($)) oldVal occs
    writeRef ref $ last vals
    trigger vals
  return pullpush
  where
    prio = nextPrio evtprio

mapAccumE :: s -> Event (s -> (s, a)) -> SignalGen (Event a)
mapAccumE initial evt@(~(Evt evtprio _)) = fmap (Evt prio) $ newNode $ do
  (myevt, trigger, key) <- newEventInit
  ref <- newRef initial
  listenToEvent key evt prio $ \occs -> do
    debug $ "mapAccumE: occs=" ++ show (length occs)
    oldVal <- readRef ref
    let (newVal, occs') = mapAccumL (flip ($)) oldVal occs
    writeRef ref $ newVal
    trigger occs'
  return myevt
  where
    prio = nextPrio evtprio

mapAccumEM :: s -> Event (s -> SignalGen (s, a)) -> SignalGen (Event a)
mapAccumEM initial evt = mdo
  e <- generatorE $ go <$> prevState <@> expandE evt
  state <- accumD initial (const . fst <$> e)
  prevState <- delayD initial state
  return . flattenE $ snd <$> e
  where
  go :: s -> [s -> SignalGen (s, a)] -> SignalGen (s, [a])
  go initial2 fs = do
    foldM (\(s, as) f -> do (s', a) <- f s; return (s', as ++ [a])) (initial2, []) fs

{-# DEPRECATED accumE "accumE has been renamed to scanE" #-}
accumE :: a -> Event (a -> a) -> SignalGen (Event a)
accumE = scanE

{-# DEPRECATED scanAccumE "scanAccumE has been renamed to mapAccumE" #-}
scanAccumE :: s -> Event (s -> (s, a)) -> SignalGen (Event a)
scanAccumE = mapAccumE

{-# DEPRECATED scanAccumEM "scanAccumEM has been renamed to mapAccumEM" #-}
scanAccumEM :: s -> Event (s -> SignalGen (s, a)) -> SignalGen (Event a)
scanAccumEM = mapAccumEM

mapMaybeE :: (a -> Maybe b) -> Event a -> Event b
mapMaybeE f = transformEvent (mapMaybe f)

justE :: Event (Maybe a) -> Event a
justE = transformEvent catMaybes

flattenE :: Event [a] -> Event a
flattenE = transformEvent concat

expandE :: Event a -> Event [a]
expandE = transformEvent1 (:[])

-- | Create a new event that occurs every time the given external event
-- occurs.
externalE :: ExternalEvent a -> SignalGen (Event a)
externalE ee = do
  occsVar <- liftIO $ newMVar []
  (evt, trigger, key) <- newEventSG prio
  addToPrep <- getPreparationAdder
  handler <- liftIO $ fmap weakToLike $
    mkWeakWithKey key $ add trigger addToPrep occsVar
  liftIO $ listenToExternalEvent ee handler
  return evt
  where
    add trigger addToPrep occsVar occ = do
      firstTime <- modifyMVar occsVar $ \occs -> return (occ:occs, null occs)
      when firstTime $ addToPrep $ registerUpd prio $ do
        occs <- liftIO $ swapMVar occsVar []
        trigger $ reverse occs

    prio = bottomPrio bottomLocation

takeWhileE :: (a -> Bool) -> Event a -> SignalGen (Event a)
takeWhileE cond ~(Evt evtprio evt) = fmap (Evt prio) $ newNode $ do
  (push, trigger) <- liftIO $ newPush
  ref <- newRef $ error "takeWhileE"
  (evtPull, evtNot) <- evt
  subref <- newRef evtPull
  writeRef ref ([], Just subref)
  listenToPullPush (WeakKey subref) evtPull evtNot prio $ \occs -> do
    (_, eventRef) <- readRef ref
    when (isJust eventRef) $ do
      let !(occs', rest) = span cond occs
      when (not $ null occs') $ do
        modifyRef ref $ \(_, y) -> (occs', y)
        trigger
        registerFini $ modifyRef ref $ \(_, y) -> ([], y)
      when (not $ null rest) $ registerFini $ writeRef ref ([], Nothing)
  return (fst <$> readRef ref, push)
  where
    prio = nextPrio evtprio

-- | @delayE evt@ creates an event whose occurrences at step N
-- is equal to the ocurrences of @evt@ at step N-1.
delayE :: Event a -> SignalGen (Event a)
delayE evt = do
  occsS <- delayS [] $ eventToSignal evt
  return $ flattenE $ occsS <@ stepClockE

----------------------------------------------------------------------
-- discretes

instance Functor Discrete where
  fmap = mapDiscrete

instance Applicative Discrete where
  pure = pureDiscrete
  (<*>) = apDiscrete

newDiscreteInit
  :: a
  -> Initialize ((Pull a, Push), a -> Run (), WeakKey)
newDiscreteInit initial = do
  ref <- newRef initial
  (push, trigger) <- liftIO newPush
  return ((readRef ref, push), discreteTrigger ref trigger, WeakKey ref)

newDiscreteSG
  :: a
  -> Priority
  -> SignalGen (Discrete a, Run a, a -> Run (), WeakKey)
newDiscreteSG initial prio = do
  ref <- newRef initial
  (push, trigger) <- liftIO newPush
  let dis = Dis prio $ return (readRef ref, push)
  return (dis, readRef ref, discreteTrigger ref trigger, WeakKey ref)

discreteTrigger :: IORef a -> Run () -> a -> Run ()
discreteTrigger buf notify val = do
  writeRef buf val
  notify

mapDiscrete :: (a -> b) -> Discrete a -> Discrete b
mapDiscrete f (Dis dprio dis) = Dis prio $ debugFrame "mapDiscrete" $ transparentMemoD $ do
  (pull, notifier) <- dis
  return (f <$> pull, notifier)
  where
    prio = nextPrio dprio

pureDiscrete :: a -> Discrete a
pureDiscrete value = Dis (bottomPrio bottomLocation) $
  return (pure value, emptyPush)

apDiscrete :: Discrete (a -> b) -> Discrete a -> Discrete b
-- both arguments must have been memoized
apDiscrete (Dis fprio fun) (Dis aprio arg)
    = Dis prio $ debugFrame "apDiscrete" $ unsafeCache $ do
  dirtyRef <- newRef False
  (pullpush, set, key) <- newDiscreteInit (error "apDiscrete: uninitialized")
  (funPull, funNot) <- fun
  (argPull, argNot) <- arg
  let
    upd = do
      debug $ "apDiscrete.upd; prio=" ++ show prio
      dirty <- readRef dirtyRef
      when dirty $ do
        writeRef dirtyRef False
        set =<< funPull <*> argPull
  let handler _ = do
        debug $ "apDiscrete.handler: prio=" ++ show prio
        writeRef dirtyRef True
        registerUpd prio upd
  listenToPullPush key funPull funNot prio handler
  listenToPullPush key argPull argNot prio handler
  return pullpush
  where
    srcprio = max fprio aprio
    prio = nextPrio srcprio

listenToDiscrete
  :: WeakKey
  -> Discrete a
  -> Priority
  -> (a -> Run ())
  -> Initialize ()
listenToDiscrete key (Dis disprio dis) prio handler = do
  prio `shouldBeGreaterThan` disprio
  (disPull, disNot) <- dis
  listenToPullPush key disPull disNot prio handler

joinDD :: Discrete (Discrete a) -> SignalGen (Discrete a)
joinDD outer@ ~(Dis outerprio _) = do
  here <- genLocation
  let prio = bottomPrio here
  outerRef <- newRef $ error "joinDD: outerRef not initialized"
  (push, trigger) <- liftIO newPush
  fmap (Dis prio) $ newNode $ do
    prio `shouldBeGreaterThan` outerprio
    runSubinit <- makeSubinitializer here
    listenToDiscrete (WeakKey outerRef) outer prio $ \inner -> do
      debug $ "joinDD: outer"
      innerRef <- newRef $ error "joinDD: innerRef not initialized"
      writeRef outerRef innerRef
      runSubinit $ do
        listenToDiscrete (WeakKey innerRef) inner prio $ \val -> do
          currentInnerRef <- readRef outerRef
          when (currentInnerRef == innerRef) $ do
            debug $ "joinDD: inner"
            writeRef innerRef val
            trigger
    return (readRef outerRef >>= readRef, push)

joinDE :: Discrete (Event a) -> SignalGen (Event a)
joinDE outer@ ~(Dis outerprio _) = do
  here <- genLocation
  let prio = bottomPrio here
  outerRef <- newRef $ error "joinDE: outerRef not initialized"
  (push, trigger) <- liftIO newPush
  fmap (Evt prio) $ newNode $ do
    prio `shouldBeGreaterThan` outerprio
    runSubinit <- makeSubinitializer here
    listenToDiscrete (WeakKey outerRef) outer prio $ \inner -> do
      debug $ "joinDE: outer"
      innerRef <- newRef []
      writeRef outerRef innerRef
      runSubinit $ do
        listenToEvent (WeakKey innerRef) inner prio $ \occs -> do
          currentInnerRef <- readRef outerRef
          when (currentInnerRef == innerRef) $ do
            debug $ "joinDE: inner noccs=" ++ show (length occs)
            writeRef innerRef occs
            registerFini $ writeRef innerRef []
            trigger
    return (readRef outerRef >>= readRef, push)

joinDS :: Discrete (Signal a) -> SignalGen (Signal a)
joinDS outer@ ~(Dis outerprio _) = do
  here <- genLocation
  let prio = bottomPrio here
  outerRef <- newRef $ error "joinDS: outerRef not initialized"
  fmap (Sig prio) $ newNode $ do
    prio `shouldBeGreaterThan` outerprio
    runSubinit <- makeSubinitializer here
    listenToDiscrete (WeakKey outerRef) outer prio
        $ \(Sig innerprio sig) -> do
      debug $ "joinDS: outer"
      pull <- runSubinit $ do
        prio `shouldBeGreaterThan` innerprio
        sig
      writeRef outerRef pull
    return (readRef outerRef >>= id)

----------------------------------------------------------------------
-- signals

instance Functor Signal where
  fmap f (Sig prio pull) = Sig prio $ transparentMemoS $ fmap f <$> pull

instance Applicative Signal where
  pure x = Sig (bottomPrio bottomLocation) (return $ pure x)
  Sig f_prio f_init <*> Sig a_prio a_init =
      Sig (max f_prio a_prio) $ transparentMemoS $ do
    f_pull <- f_init
    a_pull <- a_init
    return $ f_pull <*> a_pull

start :: SignalGen (Signal a) -> IO (IO a)
start gensig = do
  (getval, prep) <- runSignalGenToplevel $ do
    Sig _ sig <- gensig
    return $ sig
  return $ runRun $ debugFrame "step" $ do
    debug "step"
    prep
    runUpdates
    debugFrame "getval" getval

externalS :: IO a -> SignalGen (Signal a)
externalS get = fmap (Sig (bottomPrio bottomLocation)) $
  newNode $ primStepMemo (liftIO get)

joinS :: Signal (Signal a) -> SignalGen (Signal a)
joinS ~(Sig _sigsigprio sigsig) = do
  here <- genLocation
  let prio = bottomPrio here
  fmap (Sig prio) $ newNode $ do
    debug $ "joinS: making pull; prio=" ++ show prio
    runSubinit <- makeSubinitializer here
    sigpull <- sigsig
    primStepMemo $ do
      Sig _sigprio sig <- sigpull
      pull <- runSubinit sig
      debugFrame "pull" pull

delayS :: a -> Signal a -> SignalGen (Signal a)
delayS initial ~(Sig sigprio sig) = do
  ref <- newRef initial
  registerInit $ do
    clock <- getClock
    pull <- sig
    registerNextStep $ listenToPush (WeakKey ref) clock $
      registerUpd (nextPrio sigprio) $ do
        debug "delayS: pull"
        newVal <- pull
        registerFini $ writeRef ref newVal
  return $ Sig prio $ return $ readRef ref
  where
    prio = bottomPrio bottomLocation

signalFromList :: [a] -> SignalGen (Signal a)
signalFromList xs = debugFrame "signalFromList" $ do
  clock <- dropStepE stepClockE
  suffixD <- accumD xs $ drop 1 <$ clock
  return $ discreteToSignal $ hd <$> suffixD
  where
    hd = fromMaybe (error "listToSignal: list exhausted") .
      listToMaybe

networkToList :: Int -> SignalGen (Signal a) -> IO [a]
networkToList count network = do
  smp <- start network
  replicateM count smp

networkToListGC :: Int -> SignalGen (Signal a) -> IO [a]
networkToListGC count network = do
  smp <- start network
  replicateM count (performGC >> smp)

----------------------------------------------------------------------
-- events and discretes

accumD :: a -> Event (a -> a) -> SignalGen (Discrete a)
accumD initial evt@(~(Evt evtprio _)) = fmap (Dis prio) $ newNode $ do
  (pullpush@(get, _), set, key) <- newDiscreteInit initial
  listenToEvent key evt prio $ \occs -> do
    debug $ "accumD: prio=" ++ show prio ++ "; occs=" ++ show (length occs)
    oldVal <- get
    set $! foldl' (flip ($)) oldVal occs
  return pullpush
  where
    prio = nextPrio evtprio

changesD :: Discrete a -> Event a
changesD (Dis disprio dis) = Evt prio $ unsafeCache $ do
  ref <- newRef []
  (disPull, disPush) <- dis
  let upd = eventTrigger ref (return ()) . (:[]) =<< disPull
  listenToPush (WeakKey ref) disPush upd
  runInCurrentStep (return ()) $ do
    -- If we are in a step, we need to set up the ref now.
    active <- pushHasBeenTriggered disPush
    when active upd
  return (readRef ref, disPush)
  where
    prio = nextPrio disprio

preservesD :: Discrete a -> SignalGen (Event a)
preservesD dis@ ~(Dis disprio _) = fmap (Evt prio) $ newNode $ do
  (evt, trigger, key) <- newEventInit
  listenToDiscrete key dis prio $ \val -> trigger [val]
  return evt
  where
    prio = nextPrio disprio

delayD :: a -> Discrete a -> SignalGen (Discrete a)
delayD initial dis@ ~(Dis disprio _dis) = do
  (dis2, _get, set, key) <- newDiscreteSG initial (bottomPrio bottomLocation)
  registerInit $ do
    clock <- getClock
    listenToDiscrete key dis (nextPrio disprio) $ \val ->
      listenToPushOnce clock $ set val
  return dis2

----------------------------------------------------------------------
-- events and signals

eventToSignal :: Event a -> Signal [a]
eventToSignal (Evt prio evt) = Sig prio $ do
  (pull, _push) <- evt
  return pull

signalToEvent :: Signal [a] -> Event a
signalToEvent (Sig sigprio sig) = Evt prio $ unsafeCache $ do
  debug "signalToEvent"
  sigpull <- sig
  (pullpush, trigger, key) <- newEventInit
    --- ^ Here we create a fresh event, even though its pull component
    -- will be identical to sigpull. This is because we want a new key
    -- to keep the new notifier alive as long as the new pull, rather
    -- than the original pull, is alive.
  clock <- getClock
  listenToPullPush key (return ()) clock prio $ \_ ->
    registerUpd prio $ do
      occs <- sigpull
      debug $ "signalToEvent: onclock prio=" ++ show prio
        ++ "; noccs=" ++ show (length occs)
      when (not $ null occs) $ trigger occs
  return pullpush
  where
    prio = nextPrio sigprio

applySE :: Signal (a -> b) -> Event a -> Event  b
applySE (Sig fprio fun) arg@(Evt aprio _)
    = Evt prio $ debugFrame "applySE" $ unsafeCache $ do
  (pullpush, trigger, key) <- newEventInit
  funPull <- fun
  let
    upd occs = do
      debug $ "applySE; prio=" ++ show prio
      funVal <- funPull
      trigger $ map funVal occs
  listenToEvent key arg prio $ \occs -> do
    debug $ "applySE: prio=" ++ show prio
    registerUpd prio $ upd occs
  return pullpush
  where
    srcprio = max fprio aprio
    prio = nextPrio srcprio

----------------------------------------------------------------------
-- discretes and signals

discreteToSignal :: Discrete a -> Signal a
discreteToSignal (Dis prio dis) = Sig prio $ fst <$> dis

----------------------------------------------------------------------
-- classes

class Functor s => TimeFunction s where
  toSignal :: s a -> Signal a

instance TimeFunction Signal where
  toSignal = id

instance TimeFunction Discrete where
  toSignal = discreteToSignal

infixl 4 <@> -- same as <$> and <*>

(<@>) :: (TimeFunction s) => s (a -> b) -> Event a -> Event b
f <@> a = applySE (toSignal f) a

(<@) :: (TimeFunction s) => s b -> Event a -> Event b
v <@ a = const <$> v <@> a

----------------------------------------------------------------------
-- utils

newRef :: (MonadIO m) => a -> m (IORef a)
newRef = liftIO . newIORef

readRef :: (MonadIO m) => IORef a -> m a
readRef = liftIO . readIORef

writeRef :: (MonadIO m) => IORef a -> a -> m ()
writeRef x v = liftIO $ writeIORef x v

modifyRef :: (MonadIO m) => IORef a -> (a -> a) -> m ()
modifyRef x f = do
  old <- readRef x
  writeRef x $! f old

-- TODO: specialize
newActionAccum :: (MonadIO m) => IO (Consumer (m ()), m ())
newActionAccum = do
  actions <- newRef []
  return (add actions, run actions)
  where
    add ref act = modifyIORef ref (act:)
    run ref = readRef ref >>= sequence_

----------------------------------------------------------------------
-- internal debugging

debug :: (MonadIO m) => String -> m ()
debug str = when debugTraceEnabled $ liftIO $ do
  stack <- readRef debugStackRef
  debugPrintWith (length stack) ('-':str)

debugStackRef :: IORef [String]
debugStackRef = unsafePerformIO $ newRef []
{-# NOINLINE debugStackRef #-}

debugPrintWith :: (MonadIO m) => Int -> String -> m ()
debugPrintWith level msg = liftIO $ putStrLn $ replicate level ' ' ++ msg

debugFrame :: (MonadIO m) => String -> m a -> m a
debugFrame loc body = if not debugTraceEnabled then body else do
  oldStack <- readRef debugStackRef
  debugPrintWith (length oldStack) loc
  writeRef debugStackRef (loc:oldStack)
  val <- body
  writeRef debugStackRef oldStack
  return val

debugGetFrame :: (MonadIO m) => m DebugFrame
debugGetFrame = DF `liftM` readRef debugStackRef

debugPutFrame :: (MonadIO m) => String -> DebugFrame -> m a -> m a
debugPutFrame loc (DF frame) = debugFrame $
  loc ++ "(" ++ intercalate "," frame ++ ")"

newtype DebugFrame = DF [String]

debugTraceEnabled :: Bool
debugTraceEnabled = False

----------------------------------------------------------------------
-- tests

-- vim: sw=2 ts=2 sts=2