-- Sync.
-- Copyright (c) 2010, Neil Brown.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * The name of Neil Brown may not be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


-- | This library supports synchronous message-passing with choice in Haskell.
-- It is similar to the CML package for Haskell
-- (<http://hackage.haskell.org/package/cml>), and shares a similar API.  It
-- avoids some of the problems the CML package has with choose on GHC 6.12.1
-- (<http://www.haskell.org/pipermail/haskell-cafe/2010-March/074134.html>),
-- and also deliberately leaves out some of the features in the CML package.
--
-- The implementation is explained in this blog post:
--  <http://chplib.wordpress.com/2010/03/04/choice-over-events-using-stm/>.
--  The algorithm uses STM rather than spawning threads to implement choice.
--
-- At the moment the library is fairly unfeatured; if you want more features I
-- would suggest using my more powerful CHP library
-- (<http://hackage.haskell.org/package/chp>) -- but then I am biased!
module Control.Concurrent.Sync (Event, Channel, choose, send, recv, newChannel, sync) where

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.Function

----------------------------------------------------------------------
-- Internal implementation:
----------------------------------------------------------------------

data Offer = Offer { offerThreadId :: ThreadId, offerEvents :: [IntEvent] }

instance Eq Offer where
  (==) = (==) `on` offerThreadId

data IntEvent = IntEvent { enrollCount :: Int, offersTV :: TVar [(STM (), Offer)] }

newIntEvent :: Int -> IO IntEvent
newIntEvent n = IntEvent n <$> atomically (newTVar [])

-- A ham-fisted strictifying function, but it seems to do the trick
modifyTVar' :: ([a] -> [a]) -> TVar [a] -> STM ()
modifyTVar' f tv = do xs <- f <$> readTVar tv
                      case length xs of
                        -1 -> error "modifyTVar' impossible"
                        _ -> writeTVar tv xs

-- | Executes the actions until it finds one that returns True (at which point
-- it will execute no further actions).  Returns True if an action did, False
-- if none of them did.
anyM :: Monad m => [m Bool] -> m Bool
anyM = foldM orM False
  where
    orM True _ = return True
    orM False m = m

recordOffer :: Offer -> (STM (), IntEvent) -> STM ()
recordOffer o (act, e) = modifyTVar' ((act, o):) (offersTV e)

offerAll :: [(STM (), IntEvent, a)] -> IO a
offerAll off
  = do tid <- myThreadId
       rtv <- atomically $ checkAll tid
       atomically $ readTVar rtv >>= maybe retry return    
  where
    checkAll tid
      = do rtv <- newTVar Nothing
           let offer = [(act >> writeTVar rtv (Just x), e) | (act, e, x) <- off]
           complete <- anyM (map checkComplete offer)
           unless complete $
              mapM_ (recordOffer (Offer tid [e | (_, e, _) <- off])) offer
           return rtv

checkComplete :: (STM (), IntEvent) -> STM Bool
checkComplete (act, e)
  = do offers <- readTVar (offersTV e)
       if enrollCount e /= length offers + 1
         then return False
         else do sequence_ (act : map fst offers)
                 mapM_ (revoke . snd) offers
                 return True

revoke :: Offer -> STM ()
revoke off = mapM_ (modifyTVar' (filter ((/= off) . snd)) . offersTV) (offerEvents off)

----------------------------------------------------------------------
-- Public API:
----------------------------------------------------------------------

-- | A synchronous communication channel (i.e. the writer must wait until the read
-- is willing to read the value).  Should only ever be used by one writer and one
-- reader -- the algorithm is not currently designed for anything else.
data Channel a = Channel { _chanEvent :: IntEvent, chanTVar :: TVar a }

instance Eq (Channel a) where
  (==) = (==) `on` chanTVar

-- | A synchronisation that is yet to be executed (and that returns a value).
-- The functor instance allows you to modify the value after the
-- synchronisation has occurred.
data Event a = Event {events :: [((STM (), STM a), IntEvent)]}

instance Functor Event where
  fmap f (Event es) = Event [((dur, fmap f aft), e) | ((dur, aft), e) <- es]

-- | Synchronises on an event.  This blocks the thread until the Event can occur.
-- This may be a choice of several different events, via the 'choose' function.
sync :: Event a -> IO a
sync ev = offerAll [(dur, e, aft) | ((dur, aft), e) <- events ev] >>= atomically

-- | Creates an event that represents sending the given value on the given channel.
send :: Channel a -> a -> Event ()
send (Channel e ctv) x = Event [((writeTVar ctv x, return ()), e)]

-- | Creates an event that represents receiving a value from the given channel.
recv :: Channel a -> Event a
recv (Channel e ctv) = Event [((return (), readTVar ctv), e)]

-- | Creates an event that is the choice of the given list of events.
--
-- If the list is a singleton this is equivalent to calling 'head'.  If the
-- list is empty, and you call 'sync' on the resulting event, it will block
-- forever (or GHC will throw you an exception because of it).
--
-- You should not pass more than one event from each channel in the list (nor
-- combine two events that are themselves choices, such that you end up
-- combining more than one event from a channel) or undefined behaviour will result.
choose :: [Event a] -> Event a
choose = Event . concatMap events

-- | Creates a new communication channel.
newChannel :: IO (Channel a)
newChannel = liftM2 Channel (newIntEvent 2) (atomically $ newTVar (error "sync:newChannel"))