-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


-- | The module containing all the different types of channels in CHP.  Unlike
-- JCSP and C++CSP2, CHP does not offer buffered channels directly (see the
-- "Control.Concurrent.CHP.Buffers" module).  There are four different channel types, effectively
-- all possible combinations of:
--
-- * Shared reader vs non-shared reader
--
-- * Shared writer vs non-shared writer
--
-- For most applications you probably want just 'OneToOneChannel'.
--
-- It is possible for the type system to infer which channel you want when
-- you use 'newChannel'.  If the types of the ends are known by the type system,
-- the channel-type can be inferred.  So you can usually just write 'newChannel',
-- and depending on how you use the channel, the type system will figure out
-- which one you needed.
module Control.Concurrent.CHP.Channels (
  -- * Channel Creation
  Chan, Channel(..), writeChannelStrict, newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..),
  newChannelList, newChannelListWithLabels, newChannelListWithStem,
  getChannelIdentifier,
  -- * Channel-Ends
  Chanin, Chanout,
  reader, writer, readers, writers,

  -- * Reading and Writing with Channels
  ReadableChannel(..), WriteableChannel(..), 

  -- * Shared Channels
  claim, Shared,

  -- * Specific Channel Types
  -- | All the functions here are equivalent to newChannel (or newChannelWithLabel), but typed.  So for
  -- example, @oneToOneChannel = newChannel :: MonadCHP m => m OneToOneChannel@.
  OneToOneChannel, oneToOneChannel, oneToOneChannelWithLabel,
  OneToAnyChannel, oneToAnyChannel, oneToAnyChannelWithLabel,
  AnyToOneChannel, anyToOneChannel, anyToOneChannelWithLabel,
  AnyToAnyChannel, anyToAnyChannel, anyToAnyChannelWithLabel
  )
  where


import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.STM
import Control.Monad.Trans
import Control.Parallel.Strategies
import Data.Maybe
import Data.Monoid
import Data.Unique

import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Guard
import Control.Concurrent.CHP.Monad
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Poison
import Control.Concurrent.CHP.Traces.Base

-- ======
-- Types:
-- ======

-- | A reading channel-end type that allows poison to be thrown
--
-- Eq instance added in version 1.1.1
newtype Chanin a = Chanin (STMChannel a) deriving Eq

-- | A writing channel-end type that allows poison to be thrown
--
-- Eq instance added in version 1.1.1
newtype Chanout a = Chanout (STMChannel a) deriving Eq

newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a, Maybe ()))) deriving
  Eq

type OneToOneChannel = Chan Chanin Chanout
type AnyToOneChannel = Chan (Chanin) (Shared Chanout)
type OneToAnyChannel = Chan (Shared Chanin) (Chanout)
type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout)

-- ========
-- Classes:
-- ========

class ChaninC c a where
  -- Start gets the event and the transaction that will wait for data.  You
  -- sync on the event (possible extended write occurs) then wait for data
  startReadChannelC :: c a -> (Event, STM (WithPoison a))
  -- (extended read action goes here)
  -- Read releases the writer
  endReadChannelC :: c a -> STM (WithPoison ())

  -- First action is to be done as part of the completion:
  readChannelC :: c a -> (Event, STM (), STM (WithPoison a))

  poisonReadC :: c a -> IO ()
  checkPoisonReadC :: c a -> IO (WithPoison ())

class ChanoutC c a where
  -- Start checks for poison and gets the event:
  startWriteChannelC :: c a -> (Event, STM (WithPoison ()))
  -- (extended write action goes here)
  -- Send actually transmits the value:
  sendWriteChannelC :: c a -> a -> STM (WithPoison ())
  -- (extended read action goes here)
  -- End waits for the reader to tell us we're done, must be done in a different
  -- transaction to the send
  endWriteChannelC :: c a -> STM (WithPoison ())

  -- First action is to be done as part of the completion:
  writeChannelC :: c a -> a -> (Event, STM (), STM (WithPoison ()))
  
  poisonWriteC :: c a -> IO ()
  checkPoisonWriteC :: c a -> IO (WithPoison ())

-- | A class used for allocating new channels, and getting the reading and
-- writing ends.  There is a bijective assocation between the channel, and
-- its pair of end types.  You can see the types in the list of instances below.
-- Thus, 'newChannel' may be used, and the compiler will infer which type of
-- channel is required based on what end-types you get from 'reader' and 'writer'.
-- Alternatively, if you explicitly type the return of 'newChannel', it will
-- be definite which ends you will use.  If you do want to fix the type of
-- the channel you are using when you allocate it, consider using one of the
-- many 'oneToOneChannel'-like shorthand functions that fix the type.
class Channel r w where
  -- | Allocates a new channel.  Nothing need be done to
  -- destroy\/de-allocate the channel when it is no longer in use.
  newChannel :: MonadCHP m => m (Chan r w a)

  -- | Determines if two channel-ends refer to the same channel.
  --
  -- This function was added in version 1.4.0.
  sameChannel :: r a -> w a -> Bool

-- | A class indicating that a channel can be read from.
class ReadableChannel chanEnd where -- minimal implementation: extReadChannel
  -- | Reads from the given reading channel-end
  readChannel :: chanEnd a -> CHP a
  readChannel c = extReadChannel c return
  -- | Performs an extended read from the channel, performing the given action
  -- before freeing the writer
  extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b

-- | A class indicating that a channel can be written to.
class WriteableChannel chanEnd where -- minimal implementation: extWriteChannel
  -- | Writes from the given writing channel-end
  writeChannel :: chanEnd a -> a -> CHP ()
  writeChannel c x = extWriteChannel c (return x) >> return ()

  -- | Starts the communication, then performs the given extended action, then
  -- sends the result of that down the channel.
  extWriteChannel :: chanEnd a -> CHP a -> CHP ()
  extWriteChannel c m = extWriteChannel' c (liftM (flip (,) ()) m)

  -- | Like extWriteChannel, but allows a value to be returned from the inner action.
  --
  -- This function was added in version 1.4.0.
  extWriteChannel' :: chanEnd a -> CHP (a, b) -> CHP b
  

-- | A helper class for easily creating several channels of the same type.
--  The same type refers not only to what type the channel carries, but
--  also to the type of channel (one-to-one no poison, one-to-any with
--  poison, etc).  You can write code like this:
--
-- > (a, b, c, d, e) <- newChannels
--
-- To create five channels of the same type.
class ChannelTuple t where
  newChannels :: MonadCHP m => m t

-- ==========
-- Functions: 
-- ==========

-- | A helper function that uses the parallel strategies library (see the
-- paper: \"Algorithm + Strategy = Parallelism\", P.W. Trinder et al, JFP
-- 8(1) 1998,
-- <http://www.macs.hw.ac.uk/~dsg/gph/papers/html/Strategies/strategies.html>)
-- to make sure that the value sent down a channel is strictly evaluated
-- by the sender before transmission.
--
-- This is useful when you want to write worker processes that evaluate data
--  and send it back to some \"harvester\" process.  By default the values sent
-- back may be unevaluated, and thus the harvester might end up doing the evaluation.
--  If you use this function, the value is guaranteed to be completely evaluated
-- before sending.
--
-- Added in version 1.0.2.
writeChannelStrict :: (NFData a, WriteableChannel chanEnd) => chanEnd a -> a -> CHP ()
writeChannelStrict c x = (writeChannel c $| rnf) x

chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a)
chan m r w = do (u, x) <- m
                return $ Chan u (r x) (w x)

-- | Like 'newChannel' but also associates a label with that channel in a
-- trace.  You can use this function whether tracing is turned on or not,
-- so if you ever use tracing, you should use this rather than 'newChannel'.
newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a)
newChannelWithLabel l
  = do c <- newChannel
       liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
       return c


-- | A helper that is like 'newChannel' but returns the reading and writing
-- end of the channels directly.
newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a)
newChannelRW = do c <- newChannel
                  return (reader c, writer c)

-- | A helper that is like 'newChannel' but returns the writing and reading
-- end of the channels directly.
newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a)
newChannelWR = do c <- newChannel
                  return (writer c, reader c)

-- | Creates a list of channels of the same type with the given length.  If
-- you need to access some channels by index, use this function.  Otherwise
-- you may find using 'newChannels' to be easier.
newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a]
newChannelList n = replicateM n newChannel

-- | A helper that is like 'newChannelList', but labels the channels according
-- to a pattern.  Given a stem such as foo, it names the channels in the list
-- foo0, foo1, foo2, etc.
newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a]
newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n - 1)]]

-- | A helper that is like 'newChannelList', but labels the channels with the
-- given list.  The number of channels returned is the same as the length of
-- the list of labels
newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a]
newChannelListWithLabels = mapM newChannelWithLabel

-- | Gets all the reading ends of a list of channels.  A shorthand for @map
-- reader@.
readers :: [Chan r w a] -> [r a]
readers = map reader

-- | Gets all the writing ends of a list of channels.  A shorthand for @map
-- writer@.
writers :: [Chan r w a] -> [w a]
writers = map writer

stmChannel :: MonadIO m => m (Unique, STMChannel a)
stmChannel = liftIO $
  do e <- newEvent ChannelComm 2
     c <- atomically $ newTVar $ NoPoison (Nothing, Nothing)
     return (getEventUnique e, STMChan (e,c))

-- | A type-constrained version of newChannel.
oneToOneChannel :: MonadCHP m => m (OneToOneChannel a)
oneToOneChannel = newChannel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
oneToOneChannelWithLabel :: MonadCHP m => String -> m (OneToOneChannel a)
oneToOneChannelWithLabel = newChannelWithLabel


-- | Claims the given channel-end, executes the given block, then releases
-- the channel-end and returns the output value.  If poison or an IO
-- exception is thrown inside the block, the channel is released and the
-- poison\/exception re-thrown.
claim :: Shared c a -> (c a -> CHP b) -> CHP b
claim (Shared (lock, c)) body
  = scopeBlock
       (claimMutex lock >> return c)
       (\y -> do x <- body y
                 liftIO $ releaseMutex lock
                 return x)
       (releaseMutex lock)

-- | A type-constrained version of newChannel.
anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a)
anyToOneChannel = newChannel

-- | A type-constrained version of newChannel.
oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a)
oneToAnyChannel = newChannel

-- | A type-constrained version of newChannel.
anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a)
anyToAnyChannel = newChannel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
anyToOneChannelWithLabel :: MonadCHP m => String -> m (AnyToOneChannel a)
anyToOneChannelWithLabel = newChannelWithLabel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
oneToAnyChannelWithLabel :: MonadCHP m => String -> m (OneToAnyChannel a)
oneToAnyChannelWithLabel = newChannelWithLabel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
anyToAnyChannelWithLabel :: MonadCHP m => String -> m (AnyToAnyChannel a)
anyToAnyChannelWithLabel = newChannelWithLabel

-- ==========
-- Instances: 
-- ==========

instance ReadableChannel Chanin where
  readChannel (Chanin c)
    = let (e, mdur, mafter) = readChannelC c in
      buildOnEventPoison (indivRecJust ChannelRead) e
        (EventActions (return ()) mdur)
        (liftSTM mafter) >>= checkPoison

  extReadChannel (Chanin c) body
    = let (e, m) = startReadChannelC c in
      scopeBlock
        (buildOnEventPoison (indivRecJust ChannelRead) e mempty (liftSTM m) >>= checkPoison)
        (\val -> do x <- body val
                    liftSTM $ endReadChannelC c
                    return x)
        (poisonReadC c)

instance WriteableChannel Chanout where
  writeChannel (Chanout c) x
    = let (e, mdur, mafter) = writeChannelC c x in
        buildOnEventPoison (indivRecJust ChannelWrite) e
          (EventActions (return ()) mdur) (liftSTM mafter)
        >>= checkPoison
  extWriteChannel' (Chanout c) body
    = let (e, m) = startWriteChannelC c in
      scopeBlock
        (buildOnEventPoison (indivRecJust ChannelWrite)
          e mempty (liftSTM m) >>= checkPoison)
        (const $ do (x, r) <- body
                    sequence [liftSTM $ sendWriteChannelC c x
                             ,liftSTM (endWriteChannelC c)]
                      >>= checkPoison . mergeWithPoison
                    return r)
        (poisonWriteC c)
        

instance Poisonable (Chanin a) where
  poison (Chanin c) = liftIO $ poisonReadC c
  checkForPoison (Chanin c) = liftCHP $ liftIO (checkPoisonReadC c) >>= checkPoison

instance Poisonable (Chanout a) where
  poison (Chanout c) = liftIO $ poisonWriteC c
  checkForPoison (Chanout c) = liftCHP $ liftIO (checkPoisonWriteC c) >>= checkPoison

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   return (c0, c1)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   return (c0, c1, c2)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
  Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   c3 <- newChannel
                   return (c0, c1, c2, c3)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
  Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   c3 <- newChannel
                   c4 <- newChannel
                   return (c0, c1, c2, c3, c4)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
  Chan r w a, Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   c3 <- newChannel
                   c4 <- newChannel
                   c5 <- newChannel
                   return (c0, c1, c2, c3, c4, c5)

-- Some of this is defensive programming -- the writer should never be able
-- to discover poison in the channel variable, for example

consumeData :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison a)
consumeData tv = do d <- readTVar tv
                    case d of
                      PoisonItem -> return PoisonItem
                      NoPoison (Nothing, _) -> retry
                      NoPoison (Just x, a) -> do writeTVar tv $ NoPoison (Nothing, a)
                                                 return $ NoPoison x

sendData :: TVar (WithPoison (Maybe a, Maybe ())) -> a -> STM (WithPoison ())
sendData tv x  = do y <- readTVar tv
                    case y of
                      PoisonItem -> return PoisonItem
                      NoPoison (Just _, _) -> error "CHP: Found data while sending data"
                      NoPoison (Nothing, a) -> do writeTVar tv $ NoPoison (Just x, a)
                                                  return $ NoPoison ()

consumeAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ())
consumeAck tv = do d <- readTVar tv
                   case d of
                      PoisonItem -> return PoisonItem
                      NoPoison (_, Nothing) -> retry
                      NoPoison (x, Just _) -> do writeTVar tv $ NoPoison (x, Nothing)
                                                 return $ NoPoison ()

sendAck ::  TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ())
sendAck tv =    do d <- readTVar tv
                   case d of
                      PoisonItem -> return PoisonItem
                      NoPoison (_, Just _) -> error "CHP: Found ack while placing ack!"
                      NoPoison (x, Nothing) -> do writeTVar tv $ NoPoison (x, Just ())
                                                  return $ NoPoison ()

instance ChaninC STMChannel a where
  startReadChannelC (STMChan (e,tv)) = (e, consumeData tv)
  endReadChannelC (STMChan (_,tv)) = sendAck tv
  readChannelC (STMChan (e, tv))
    = (e, sendAck tv >> return (), consumeData tv)

  poisonReadC (STMChan (e,tv))
    = liftSTM $ do poisonEvent e
                   writeTVar tv PoisonItem
  checkPoisonReadC (STMChan (e,_)) = liftSTM $ checkEventForPoison e

instance ChanoutC STMChannel a where
  startWriteChannelC (STMChan (e,tv))
    = (e, do x <- readTVar tv
             case x of
               PoisonItem -> return PoisonItem
               NoPoison _ -> return $ NoPoison ())
  sendWriteChannelC (STMChan (_, tv)) val
    = sendData tv val
  endWriteChannelC (STMChan (_, tv))
    = consumeAck tv

  writeChannelC (STMChan (e, tv)) val
    = (e, sendData tv val >> return (), consumeAck tv)

  poisonWriteC (STMChan (e,tv))
    = liftSTM $ do poisonEvent e
                   writeTVar tv PoisonItem
  checkPoisonWriteC (STMChan (e,_)) = liftSTM $ checkEventForPoison e

instance Channel Chanin Chanout where
  newChannel = chan stmChannel Chanin Chanout
  sameChannel (Chanin x) (Chanout y) = x == y

instance Channel (Shared Chanin) Chanout where
  newChannel = do m <- newMutex
                  c <- newChannel
                  return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)
  sameChannel (Shared (_, Chanin x)) (Chanout y) = x == y

instance Channel Chanin (Shared Chanout) where
  newChannel = do m <- newMutex
                  c <- newChannel
                  return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
  sameChannel (Chanin x) (Shared (_, Chanout y)) = x == y

instance Channel (Shared Chanin) (Shared Chanout) where
  newChannel = do m <- newMutex
                  m' <- newMutex
                  c <- newChannel
                  return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c))
  sameChannel (Shared (_, Chanin x)) (Shared (_, Chanout y)) = x == y