-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-- | A module containing broadcast channels (one-to-many).  Whereas a one-to-any
-- channel features one writer sending a /single/ value to /one/ (of many) readers, a
-- one-to-many channel features one writer sending the /same/ value to /many/
-- readers.  So a one-to-any channel involves claiming the channel-end to ensure
-- exclusivity, but a one-to-many channel involves enrolling on the channel-end
-- (subscribing) before it can engage in communication.
--
-- A communication on a one-to-many channel only takes place when the writer
-- and all readers currently enrolled agree to communicate.  What happens when
-- the writer wants to communicate and no readers are enrolled is undefined
-- (the writer may block, or may communicate happily to no-one).
--
-- This module also contains reduce channels (added in version 1.1.1).  Because
-- in CHP channels must have the same type at both ends, we use the Monoid
-- type-class.  It is important to be aware that the order of mappends will be
-- non-deterministic, and thus you should either use an mappend that is commutative
-- or code around this restruction.
--
-- For example, a common thing to do would be to use lists as the type for
-- reduce channels, make each writer write a single item list (but more is
-- possible), then use the list afterwards, but be aware that it is unordered.
--  If it is important to have an ordered list, make each writer write a pair
-- containing a (unique) index value and the real data, then sort by the index
-- value and discard it.
--
-- Since reduce channels were added after the initial library design, there
-- is a slight complication: it is not possible to use newChannel (and all
-- similar functions) with reduce channels because it is impossible to express
-- the Monoid constraint for the Channel instance.  Instead, you must use manyToOneChannel
-- and manyToAnyChannel.
module Control.Concurrent.CHP.BroadcastChannels (BroadcastChanin, BroadcastChanout,
  OneToManyChannel, AnyToManyChannel, oneToManyChannel, anyToManyChannel, ReduceChanin,
    ReduceChanout, ManyToOneChannel, ManyToAnyChannel, manyToOneChannel, manyToAnyChannel)
      where

import Control.Concurrent.STM
import Control.Monad.Trans
import Data.Monoid

import Control.Concurrent.CHP.Barriers
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.Channels
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Enroll
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Traces.Base


-- The general pattern of a broadcast channel is as follows:
-- SYNC -> Agreement; the readers indicate they are all willing to read, and the
-- writer indicates it is ready to write.  Either side may ALT.
--
-- After this synchronisation, the writer can write his data to the TVar, possibly
-- following an extended action
-- 
-- SYNC -> Reading; everyone syncs (no-one ALTs) to move to the reading phase
--
-- After this synchronisation, the readers can all read the data from the TVar,
-- and possibly complete an extended action
--
-- SYNC -> Neutral; everyone syncs (no-one ALTs) to indicate one communication
-- cycle has finished.  After this the writer may proceed on their way (the
-- main reason for needing a third sync).

-- There used to be a warning that the first two constructors are never used, but they
-- do need to be there for the Enum and Bounded instances...
data Phase = Agreement | Reading | Neutral deriving (Enum, Bounded, Eq)
-- So I constructed this horrendous hack to suppress the warning:
dontWarnMe :: a -> a
dontWarnMe = flip const [Agreement, Reading, Neutral]

newtype BroadcastChannel a = BC (PhasedBarrier Phase, TVar a)

-- | The reading end of a broadcast channel.  You must enroll on it before
-- you can read from it or poison it.
newtype BroadcastChanin a = BI (BroadcastChannel a)

-- | The writing end of a broadcast channel.
newtype BroadcastChanout a = BO (BroadcastChannel a)

instance Enrollable BroadcastChanin a where
  enroll c@(BI (BC (b,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c))
  resign (Enrolled (BI (BC (b,_)))) m
    = do x <- resign (Enrolled b) m
         waitForPhase Neutral (Enrolled b)
         return x

instance WriteableChannel BroadcastChanout where
  extWriteChannel (BO (BC (b, tv))) m
    = do syncBarrierWith (Just . ChannelWrite)
           $ Enrolled b
         m >>= liftIO . atomically . writeTVar tv
         syncBarrierWith (const Nothing)
           $ Enrolled b
         syncBarrierWith (const Nothing)
           $ Enrolled b
         return ()

instance ReadableChannel (Enrolled BroadcastChanin) where
  extReadChannel (Enrolled (BI (BC (b, tv)))) f
    = do syncBarrierWith (Just . ChannelRead)
           $ Enrolled b
         syncBarrierWith (const Nothing)
           $ Enrolled b
         x <- liftIO (atomically $ readTVar tv)
         y <- f x
         syncBarrierWith (const Nothing)
           $ Enrolled b
         return y

instance Poisonable (BroadcastChanout a) where
  poison (BO (BC (b,_))) = poison $ Enrolled b
  checkForPoison (BO (BC (b,_))) = checkForPoison $ Enrolled b

instance Poisonable (Enrolled BroadcastChanin a) where
  poison (Enrolled (BI (BC (b,_)))) = poison $ Enrolled b
  checkForPoison (Enrolled (BI (BC (b,_)))) = checkForPoison $ Enrolled b

newBroadcastChannel :: CHP (BroadcastChannel a)
newBroadcastChannel = dontWarnMe {- see above -} $ do
    do b@(Barrier (e,_)) <- newPhasedBarrier Neutral
       -- Writer is always enrolled:
       liftIO $ atomically $ enrollEvent e
       tv <- liftIO $ atomically $ newTVar undefined
       return $ BC (b, tv)

instance Channel BroadcastChanin BroadcastChanout where
  newChannel = liftCHP $ do
    c@(BC (b,_)) <- newBroadcastChannel
    return $ Chan (getBarrierIdentifier b) (BI c) (BO c)

instance Channel BroadcastChanin (Shared BroadcastChanout) where
  newChannel = liftCHP $ do
    m <- newMutex
    c <- newChannel
    return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))

type OneToManyChannel = Chan BroadcastChanin BroadcastChanout
type AnyToManyChannel = Chan BroadcastChanin (Shared BroadcastChanout)

oneToManyChannel :: CHP (OneToManyChannel a)
oneToManyChannel = newChannel

anyToManyChannel :: CHP (AnyToManyChannel a)
anyToManyChannel = newChannel



newtype ReduceChannel a = GC (PhasedBarrier Phase, TVar a, (a -> a -> a, a))

-- | The reading end of a reduce channel.
newtype ReduceChanin a = GI (ReduceChannel a)

-- | The writing end of a reduce channel.  You must enroll on it before
-- you can read from it or poison it.
newtype ReduceChanout a = GO (ReduceChannel a)

instance Enrollable ReduceChanout a where
  enroll c@(GO (GC (b,_,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c))
  resign (Enrolled (GO (GC (b,_,_)))) m
    = do x <- resign (Enrolled b) m
         waitForPhase Neutral (Enrolled b)
         return x

instance WriteableChannel (Enrolled ReduceChanout) where
  extWriteChannel (Enrolled (GO (GC (b, tv, (f,_))))) m
    = do syncBarrierWith (Just . ChannelWrite)
           $ Enrolled b
         m >>= liftIO . atomically . \x -> readTVar tv >>= writeTVar tv . f x
         syncBarrierWith (const Nothing)
           $ Enrolled b
         syncBarrierWith (const Nothing)
           $ Enrolled b
         return ()

instance ReadableChannel ReduceChanin where
  extReadChannel (GI (GC (b, tv, (_, empty)))) f
    = do syncBarrierWith (Just . ChannelRead)
           $ Enrolled b
         syncBarrierWith (const Nothing)
           $ Enrolled b
         x <- liftIO (atomically $ readTVar tv)
         y <- f x
         liftIO (atomically $ writeTVar tv empty)
         syncBarrierWith (const Nothing)
           $ Enrolled b
         return y

instance Poisonable (Enrolled ReduceChanout a) where
  poison (Enrolled (GO (GC (b,_,_)))) = poison $ Enrolled b
  checkForPoison (Enrolled (GO (GC (b,_,_)))) = checkForPoison $ Enrolled b

instance Poisonable (ReduceChanin a) where
  poison (GI (GC (b,_,_))) = poison $ Enrolled b
  checkForPoison (GI (GC (b,_,_))) = checkForPoison $ Enrolled b

newReduceChannel :: Monoid a => CHP (ReduceChannel a)
newReduceChannel = dontWarnMe {- see above -} $ do
    do b@(Barrier (e,_)) <- newPhasedBarrier Neutral
       -- Writer is always enrolled:
       liftIO $ atomically $ enrollEvent e
       tv <- liftIO $ atomically $ newTVar mempty
       return $ GC (b, tv, (mappend, mempty))

type ManyToOneChannel = Chan ReduceChanin ReduceChanout
type ManyToAnyChannel = Chan (Shared ReduceChanin) ReduceChanout

manyToOneChannel :: Monoid a => CHP (ManyToOneChannel a)
manyToOneChannel = do
    c@(GC (b,_,_)) <- newReduceChannel
    return $ Chan (getBarrierIdentifier b) (GI c) (GO c)


manyToAnyChannel :: Monoid a => CHP (ManyToAnyChannel a)
manyToAnyChannel = do
    m <- newMutex
    c <- manyToOneChannel
    return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)