--------------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.CML
-- Copyright   :  Avik Chaudhuri 2009 (avik@cs.ucsc.edu)
-- License     :  BSD3
-- 
-- Maintainer  :  ben.franksen@online.de
-- Stability   :  provisional
-- Portability :  portable
--
-- Events and Channels as in Concurrent ML (extended with communication guards)
--
-- See /A Concurrent ML Library in Concurrent Haskell/ by Avik Chaudhuri
-- (avik\@cs.ucsc.edu). The original code as well as the papers can be
-- found at <http://www.cs.umd.edu/~Eavik/projects/cmllch/>.
--
-- User-visible changes to the original code:
--
-- * 'Event' and 'Channel' are now abstract types
--
-- * renamed 'new' to 'channel'
--
-- * export list, hierarchical module name & similar goodies
--------------------------------------------------------------------------------
module Control.Concurrent.CML (
  -- * Channels
  -- $channels
  Channel,
  channel,
  receive,
  transmit,
  -- * Events
  -- $events
  Event,
  sync,
  choose,
  wrap,
  guard,
  wrapabort,
  spawn
) where

import Control.Concurrent(ThreadId, forkIO)
import Control.Concurrent.MVar(MVar, newEmptyMVar, putMVar, takeMVar)
import Data.Maybe(isJust)
import Control.Monad(foldM)
import Control.Monad.Fix(fix)

--------------------------------------------------------------------------------

type Commit = MVar Bool
type Decision = MVar (Maybe Commit)
type Candidate = MVar (Maybe Decision)
type In a = MVar (Candidate, a -> Bool)
type Out a = MVar (Candidate, a)

-- | Values of type @a@ can be transported over channels of type @Channel a@.
data Channel a = Channel (In a) (Out a) (MVar a)

type Point = MVar ()
type Name = MVar [Point]
type Abort = MVar ([Point], IO ())
type Synchronizer = MVar (Point, Decision)

-- | Events return a value on synchronization.
--
-- Note that by construction, an event can synchronize at exactly one
-- /commit point/, where a message is either sent or accepted on a
-- channel. This commit point may be selected among several other,
-- potential commit points. Some code may be run before
-- synchronization, as specified by 'guard' functions throughout the
-- event. Some more code may be run after synchronization, as specified
-- by 'wrap' functions that surround the commit point, and by 'wrapabort'
-- functions that do not surround the commit point.
newtype Event a = Event (Synchronizer -> Abort -> Name -> IO a)

--------------------------------------------------------------------------------

atchan :: In a -> Out a -> IO ()
atchan i o = do {
  (ei,patt) <- takeMVar i;
  (eo,y) <- takeMVar o;
  if (patt y) then do {
    si <- newEmptyMVar;
    putMVar ei (Just si);
    ki <- takeMVar si;
    so <- newEmptyMVar;
    putMVar eo (Just so);
    ko <- takeMVar so;
    maybe (return ()) (\ci -> putMVar ci (isJust ko)) ki;
    maybe (return ()) (\co -> putMVar co (isJust ki)) ko
  }
  else do {
    putMVar ei Nothing;
    putMVar ei Nothing;
    atchan i o
  }
}

atsync :: Synchronizer -> Abort -> IO () -> IO ()
atsync r a x = do {
  (t,s) <- takeMVar r;
  forkIO $ fix $ \z -> do {
    (t',s') <- takeMVar r;
    forkIO z;
    putMVar s' Nothing
  };
  c <- newEmptyMVar;
  putMVar s (Just c);
  b <- takeMVar c;
  if b then do {
    putMVar t ();
    fix $ \z -> do {
      (tL,f) <- takeMVar a;
      forkIO z;
      if elem t tL then return ()
      else f
    }
  }
  else x
}

atpointI :: Synchronizer -> Point -> In a -> (a -> Bool) -> IO a -> IO a
atpointI r t i patt x = do {
  e <- newEmptyMVar;
  putMVar i (e,patt);
  ms <- takeMVar e;
  maybe (atpointI r t i patt x) (\s -> do {
      putMVar r (t,s);
      takeMVar t;
      x
    }
  ) ms
}

atpointO :: Synchronizer -> Point -> Out a -> a -> IO () -> IO ()
atpointO r t o y x = do {
  e <- newEmptyMVar;
  putMVar o (e,y);
  ms <- takeMVar e;
  maybe (atpointO r t o y x) (\s -> do {
      putMVar r (t,s);
      takeMVar t;
      x
    }
  ) ms
}

--------------------------------------------------------------------------------

-- $channels
-- Channels transport a single value at a time. The operations on channels are:
-- creation, transmit, and receive. None of them block the calling thread, in
-- fact transmit and receive are pure functions, not IO actions. Blocking may
-- occur only when a thread explicitly synchronizes on the resulting event.

-- | Create a new channel.
channel :: IO (Channel a)
channel = do
  i <- newEmptyMVar
  o <- newEmptyMVar
  forkIO $ fix $ \z -> do
    atchan i o
    z
  m <- newEmptyMVar
  return (Channel i o m)

-- | Receive a message from a channel.
--
-- More precisely, @receive c cond@ returns an event that, on synchronization,
-- accepts a message @m@ on channel @c@ and returns @m@. The resulting
-- event is eligible for synchronization with a @transmit c m@ only if @cond m@
-- is true.
receive :: Channel a -> (a -> Bool) -> Event a
receive (Channel i o m) patt = Event efun where
  efun r a n = do
    t <- newEmptyMVar
    forkIO (putMVar n [t])
    atpointI r t i patt (takeMVar m)

-- | Transmit a message over a channel.
--
-- More precisely, @transmit c m@ returns an event that, on synchronization,
-- sends the message @m@ on channel @c@ and returns @()@. Such an event must
-- synchronize with @receive c@.
transmit :: Channel a -> a -> Event ()
transmit (Channel i o m) y = Event efun where
  efun r a n = do
    t <- newEmptyMVar
    forkIO (putMVar n [t])
    atpointO r t o y (putMVar m y)

-- $events
-- Events encapsulate a potentially blocking point of synchronization between
-- threads, together with possible pre- and post-synchronization code as well
-- as code that is executed (in a separate thread) when an event is /not/
-- selected (aborted).

-- | Non-deterministically select an event from a list of events, so that
-- the selected event can be synchronized. The other events in the list are
-- /aborted/.
choose :: [Event a] -> Event a
choose vL = Event efun where
  efun r a n = do
    j <- newEmptyMVar
    tL <- foldM (\tL -> \(Event v) -> do
        n' <- newEmptyMVar
        forkIO $ do
          x <- v r a n'
          putMVar j x
        tL' <- takeMVar n'
        putMVar n' tL'
        return (tL' ++ tL)
      ) [] vL
    forkIO (putMVar n tL)
    takeMVar j

-- | Specify a post-synchronization action.
--
-- More precisely, @wrap v f@ returns an event that, on synchronization,
-- synchronizes the event @v@ and then runs the action returned by @f@
-- applied to the result.
wrap :: Event a -> (a -> IO b) -> Event b
wrap (Event v) f = Event efun where
  efun r a n = do
    x <- v r a n
    f x

-- | Specify a pre-synchronization action.
--
-- More precisely, @guard a@ returns an event that, on synchronization,
-- synchronizes the event returned by the action @a@. Here, @a@ is run
-- every time a thread /tries/ to synchronize @guard a@.
guard :: IO (Event a) -> Event a
guard vs = Event efun where
  efun r a n = do
    Event v <- vs
    v r a n

-- | Specify a post-synchronization action that is spawned if an event is
-- /not/ selected by a 'choose'.
--
-- More precisely, @wrapabort a v@ returns an event that, on
-- synchronization, synchronizes the event @v@, and on abortion, spawns a
-- thread that runs the action @a@. Here, if @v@ itself is of the form
-- @choose vs@ and one of the events in @vs@ is selected, then @v@ is
-- considered selected, so @a@ is not spawned.
wrapabort :: IO () -> Event a -> Event a
wrapabort f (Event v) = Event efun where
  efun r a n = do
    forkIO $ do
      tL <- takeMVar n
      putMVar n tL
      putMVar a (tL, f)
    v r a n

-- | Synchronize an event.
--
-- This blocks the calling thread until a matching event is available.
sync :: Event a -> IO a
sync (Event v) = do
  j <- newEmptyMVar
  forkIO $ fix $ \z -> do
    r <- newEmptyMVar
    a <- newEmptyMVar
    n <- newEmptyMVar
    forkIO $ atsync r a z
    x <- v r a n
    putMVar j x
  takeMVar j

-- | A synonym for 'forkIO'.
spawn :: IO () -> IO ThreadId
spawn = forkIO