-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


-- | The module containing all the different types of channels in CHP.  Unlike
-- JCSP and C++CSP2, CHP does not offer buffered channels directly (see the
-- "Control.Concurrent.CHP.Buffers" module).  There are four different channel types, effectively
-- all possible combinations of:
--
-- * Shared reader vs non-shared reader
--
-- * Shared writer vs non-shared writer
--
-- For most applications you probably want just 'OneToOneChannel'.
--
-- It is possible for the type system to infer which channel you want when
-- you use 'newChannel'.  If the types of the ends are known by the type system,
-- the channel-type can be inferred.  So you can usually just write 'newChannel',
-- and depending on how you use the channel, the type system will figure out
-- which one you needed.
module Control.Concurrent.CHP.Channels (
  -- * Channel Creation
  Chan, Channel(..), writeChannelStrict, newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..),
  newChannelList, newChannelListWithLabels, newChannelListWithStem,
  getChannelIdentifier,
  -- * Channel-Ends
  Chanin, Chanout,
  reader, writer, readers, writers,

  -- * Reading and Writing with Channels
  ReadableChannel(..), WriteableChannel(..), 

  -- * Shared Channels
  claim, Shared,

  -- * Specific Channel Types
  -- | All the functions here are equivalent to newChannel (or newChannelWithLabel), but typed.  So for
  -- example, @oneToOneChannel = newChannel :: MonadCHP m => m OneToOneChannel@.
  OneToOneChannel, oneToOneChannel, oneToOneChannelWithLabel,
  OneToAnyChannel, oneToAnyChannel, oneToAnyChannelWithLabel,
  AnyToOneChannel, anyToOneChannel, anyToOneChannelWithLabel,
  AnyToAnyChannel, anyToAnyChannel, anyToAnyChannelWithLabel
  )
  where


import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.STM
import Control.Monad.Trans
import Control.Parallel.Strategies
import Data.Maybe
import Data.Unique

import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Monad
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Poison
import Control.Concurrent.CHP.Traces.Base

-- ======
-- Types:
-- ======

-- | A reading channel-end type that allows poison to be thrown
--
-- Eq instance added in version 1.1.1
newtype Chanin a = Chanin (STMChannel a) deriving Eq

-- | A writing channel-end type that allows poison to be thrown
--
-- Eq instance added in version 1.1.1
newtype Chanout a = Chanout (STMChannel a) deriving Eq

newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a))) deriving
  Eq

type OneToOneChannel = Chan Chanin Chanout
type AnyToOneChannel = Chan (Chanin) (Shared Chanout)
type OneToAnyChannel = Chan (Shared Chanin) (Chanout)
type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout)

-- ========
-- Classes:
-- ========

class ChaninC c a where
  -- Start gets the event and the transaction that will wait for data.  You
  -- sync on the event (possible extended write occurs) then wait for data
  startReadChannelC :: c a -> (Event, STM (WithPoison a))
  -- (extended read action goes here)
  -- Read releases the writer
  endReadChannelC :: c a -> STM (WithPoison ())
  poisonReadC :: c a -> IO ()
  checkPoisonReadC :: c a -> IO (WithPoison ())

class ChanoutC c a where
  -- Start checks for poison and gets the event:
  startWriteChannelC :: c a -> (Event, STM (WithPoison ()))
  -- (extended write action goes here)
  -- Send actually transmits the value:
  sendWriteChannelC :: c a -> a -> STM (WithPoison ())
  -- (extended read action goes here)
  -- End waits for the reader to tell us we're done, must be done in a different
  -- transaction to the send
  endWriteChannelC :: c a -> STM (WithPoison ())
  
  poisonWriteC :: c a -> IO ()
  checkPoisonWriteC :: c a -> IO (WithPoison ())

-- | A class used for allocating new channels, and getting the reading and
-- writing ends.  There is a bijective assocation between the channel, and
-- its pair of end types.  You can see the types in the list of instances below.
-- Thus, 'newChannel' may be used, and the compiler will infer which type of
-- channel is required based on what end-types you get from 'reader' and 'writer'.
-- Alternatively, if you explicitly type the return of 'newChannel', it will
-- be definite which ends you will use.  If you do want to fix the type of
-- the channel you are using when you allocate it, consider using one of the
-- many 'oneToOneChannel'-like shorthand functions that fix the type.
class Channel r w where
  -- | Allocates a new channel.  Nothing need be done to
  -- destroy\/de-allocate the channel when it is no longer in use.
  newChannel :: MonadCHP m => m (Chan r w a)

-- | A class indicating that a channel can be read from.
class ReadableChannel chanEnd where -- minimal implementation: extReadChannel
  -- | Reads from the given reading channel-end
  readChannel :: chanEnd a -> CHP a
  readChannel c = extReadChannel c return
  -- | Performs an extended read from the channel, performing the given action
  -- before freeing the writer
  extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b

-- | A class indicating that a channel can be written to.
class WriteableChannel chanEnd where -- minimal implementation: extWriteChannel
  -- | Writes from the given writing channel-end
  writeChannel :: chanEnd a -> a -> CHP ()
  writeChannel c x = extWriteChannel c (return x)

  -- | Starts the communication, then performs the given extended action, then
  -- sends the result of that down the channel
  extWriteChannel :: chanEnd a -> CHP a -> CHP ()

-- | A helper class for easily creating several channels of the same type.
--  The same type refers not only to what type the channel carries, but
--  also to the type of channel (one-to-one no poison, one-to-any with
--  poison, etc).  You can write code like this:
--
-- > (a, b, c, d, e) <- newChannels
--
-- To create five channels of the same type.
class ChannelTuple t where
  newChannels :: MonadCHP m => m t

-- ==========
-- Functions: 
-- ==========

-- | A helper function that uses the parallel strategies library (see the
-- paper: \"Algorithm + Strategy = Parallelism\", P.W. Trinder et al, JFP
-- 8(1) 1998,
-- <http://www.macs.hw.ac.uk/~dsg/gph/papers/html/Strategies/strategies.html>)
-- to make sure that the value sent down a channel is strictly evaluated
-- by the sender before transmission.
--
-- This is useful when you want to write worker processes that evaluate data
--  and send it back to some \"harvester\" process.  By default the values sent
-- back may be unevaluated, and thus the harvester might end up doing the evaluation.
--  If you use this function, the value is guaranteed to be completely evaluated
-- before sending.
--
-- Added in version 1.0.2.
writeChannelStrict :: (NFData a, WriteableChannel chanEnd) => chanEnd a -> a -> CHP ()
writeChannelStrict c x = (writeChannel c $| rnf) x

chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a)
chan m r w = do (u, x) <- m
                return $ Chan u (r x) (w x)

-- | Like 'newChannel' but also associates a label with that channel in a
-- trace.  You can use this function whether tracing is turned on or not,
-- so if you ever use tracing, you should use this rather than 'newChannel'.
newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a)
newChannelWithLabel l
  = do c <- newChannel
       liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
       return c


-- | A helper that is like 'newChannel' but returns the reading and writing
-- end of the channels directly.
newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a)
newChannelRW = do c <- newChannel
                  return (reader c, writer c)

-- | A helper that is like 'newChannel' but returns the writing and reading
-- end of the channels directly.
newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a)
newChannelWR = do c <- newChannel
                  return (writer c, reader c)

-- | Creates a list of channels of the same type with the given length.  If
-- you need to access some channels by index, use this function.  Otherwise
-- you may find using 'newChannels' to be easier.
newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a]
newChannelList n = replicateM n newChannel

-- | A helper that is like 'newChannelList', but labels the channels according
-- to a pattern.  Given a stem such as foo, it names the channels in the list
-- foo0, foo1, foo2, etc.
newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a]
newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n - 1)]]

-- | A helper that is like 'newChannelList', but labels the channels with the
-- given list.  The number of channels returned is the same as the length of
-- the list of labels
newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a]
newChannelListWithLabels = mapM newChannelWithLabel

-- | Gets all the reading ends of a list of channels.  A shorthand for @map
-- reader@.
readers :: [Chan r w a] -> [r a]
readers = map reader

-- | Gets all the writing ends of a list of channels.  A shorthand for @map
-- writer@.
writers :: [Chan r w a] -> [w a]
writers = map writer

stmChannel :: MonadIO m => m (Unique, STMChannel a)
stmChannel = liftIO $
  do e <- newEvent ChannelComm 2
     c <- atomically $ newTVar $ NoPoison Nothing
     return (getEventUnique e, STMChan (e,c))

-- | A type-constrained version of newChannel.
oneToOneChannel :: MonadCHP m => m (OneToOneChannel a)
oneToOneChannel = newChannel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
oneToOneChannelWithLabel :: MonadCHP m => String -> m (OneToOneChannel a)
oneToOneChannelWithLabel = newChannelWithLabel


-- | Claims the given channel-end, executes the given block, then releases
-- the channel-end and returns the output value.  If poison or an IO
-- exception is thrown inside the block, the channel is released and the
-- poison\/exception re-thrown.
claim :: Shared c a -> (c a -> CHP b) -> CHP b
claim (Shared (lock, c)) body
  = scopeBlock
       (claimMutex lock >> return c)
       (\y -> do x <- body y
                 liftIO $ releaseMutex lock
                 return x)
       (releaseMutex lock)

-- | A type-constrained version of newChannel.
anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a)
anyToOneChannel = newChannel

-- | A type-constrained version of newChannel.
oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a)
oneToAnyChannel = newChannel

-- | A type-constrained version of newChannel.
anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a)
anyToAnyChannel = newChannel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
anyToOneChannelWithLabel :: MonadCHP m => String -> m (AnyToOneChannel a)
anyToOneChannelWithLabel = newChannelWithLabel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
oneToAnyChannelWithLabel :: MonadCHP m => String -> m (OneToAnyChannel a)
oneToAnyChannelWithLabel = newChannelWithLabel

-- | A type-constrained version of newChannelWithLabel.
--
-- Added in version 1.2.0.
anyToAnyChannelWithLabel :: MonadCHP m => String -> m (AnyToAnyChannel a)
anyToAnyChannelWithLabel = newChannelWithLabel

-- ==========
-- Instances: 
-- ==========

instance ReadableChannel Chanin where
  readChannel (Chanin c)
    = let (e, m) = startReadChannelC c in
      buildOnEventPoison (indivRecJust ChannelRead) e (return ()) (liftSTM $
        do x <- m
           endReadChannelC c
           return x) >>= checkPoison

  extReadChannel (Chanin c) body
    = let (e, m) = startReadChannelC c in
      scopeBlock
        (buildOnEventPoison (indivRecJust ChannelRead) e (return ()) (liftSTM m) >>= checkPoison)
        (\val -> do x <- body val
                    liftSTM $ endReadChannelC c
                    return x)
        (poisonReadC c)

instance WriteableChannel Chanout where
  writeChannel (Chanout c) x
    = let (e, m) = startWriteChannelC c in
        buildOnEventPoison (indivRecJust ChannelWrite) e (return ())
          (liftM2 (++)
            (liftSTM $ sequence [m, sendWriteChannelC c x])
            (liftSTM $ sequence [endWriteChannelC c]))
        >>= checkPoison . mergeWithPoison
  extWriteChannel (Chanout c) body
    = let (e, m) = startWriteChannelC c in
      scopeBlock
        (buildOnEventPoison (indivRecJust ChannelWrite)
          e (return ()) (liftSTM m) >>= checkPoison)
        (const $ sequence [body >>= liftSTM . sendWriteChannelC c
                          ,liftSTM (endWriteChannelC c)]
                   >>= checkPoison . mergeWithPoison)
        (poisonWriteC c)
        

instance Poisonable (Chanin a) where
  poison (Chanin c) = liftIO $ poisonReadC c
  checkForPoison (Chanin c) = liftCHP $ liftIO (checkPoisonReadC c) >>= checkPoison

instance Poisonable (Chanout a) where
  poison (Chanout c) = liftIO $ poisonWriteC c
  checkForPoison (Chanout c) = liftCHP $ liftIO (checkPoisonWriteC c) >>= checkPoison

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   return (c0, c1)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   return (c0, c1, c2)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
  Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   c3 <- newChannel
                   return (c0, c1, c2, c3)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
  Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   c3 <- newChannel
                   c4 <- newChannel
                   return (c0, c1, c2, c3, c4)

instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
  Chan r w a, Chan r w a, Chan r w a) where
  newChannels = do c0 <- newChannel
                   c1 <- newChannel
                   c2 <- newChannel
                   c3 <- newChannel
                   c4 <- newChannel
                   c5 <- newChannel
                   return (c0, c1, c2, c3, c4, c5)

-- Some of this is defensive programming -- the writer should never be able
-- to discover poison in the channel variable, for example

instance ChaninC STMChannel a where
  startReadChannelC (STMChan (e,tv)) = (e, waitForJustOrPoison tv)
  endReadChannelC (STMChan (_,tv))
    = do x <- readTVar tv
         case x of
           PoisonItem -> return PoisonItem
           NoPoison _ -> do writeTVar tv $ NoPoison Nothing
                            return $ NoPoison ()
  poisonReadC (STMChan (e,tv))
    = liftSTM $ do poisonEvent e
                   writeTVar tv PoisonItem
  checkPoisonReadC (STMChan (e,_)) = liftSTM $ checkEventForPoison e

instance ChanoutC STMChannel a where
  startWriteChannelC (STMChan (e,tv))
    = (e, do x <- readTVar tv
             case x of
               PoisonItem -> return PoisonItem
               NoPoison _ -> return $ NoPoison ())
  sendWriteChannelC (STMChan (_, tv)) val
    =     do x <- readTVar tv
             case x of
               PoisonItem -> return PoisonItem
               NoPoison _ -> do writeTVar tv $ NoPoison $ Just val
                                return $ NoPoison ()
  endWriteChannelC (STMChan (_, tv))
    = waitForNothingOrPoison tv


  poisonWriteC (STMChan (e,tv))
    = liftSTM $ do poisonEvent e
                   writeTVar tv PoisonItem
  checkPoisonWriteC (STMChan (e,_)) = liftSTM $ checkEventForPoison e

instance Channel Chanin Chanout where
  newChannel = chan stmChannel Chanin Chanout

instance Channel (Shared Chanin) Chanout where
  newChannel = do m <- newMutex
                  c <- newChannel
                  return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)

instance Channel Chanin (Shared Chanout) where
  newChannel = do m <- newMutex
                  c <- newChannel
                  return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))

instance Channel (Shared Chanin) (Shared Chanout) where
  newChannel = do m <- newMutex
                  m' <- newMutex
                  c <- newChannel
                  return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c))