-- Communicating Haskell Processes.
-- Copyright (c) 2008, University of Kent.
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions are
-- met:
--
--  * Redistributions of source code must retain the above copyright
--    notice, this list of conditions and the following disclaimer.
--  * Redistributions in binary form must reproduce the above copyright
--    notice, this list of conditions and the following disclaimer in the
--    documentation and/or other materials provided with the distribution.
--  * Neither the name of the University of Kent nor the names of its
--    contributors may be used to endorse or promote products derived from
--    this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


-- | Various processes that act like buffers.  Poisoning either end of a buffer
-- process is immediately passed on to the other side, in contrast to C++CSP2
-- and JCSP.
module Control.Concurrent.CHP.Buffers (fifoBuffer, infiniteBuffer, overflowingBuffer, overwritingBuffer)
  where

import Data.Sequence (Seq, viewl, ViewL(..))
import qualified Data.Sequence as Seq

import Control.Concurrent.CHP
import qualified Control.Concurrent.CHP.Common as Common

-- | Acts like a limited capacity FIFO buffer of the given size.  When it is
-- full it accepts no input, and when it is empty it offers no output.
fifoBuffer :: forall a. Int -> Chanin a -> Chanout a -> CHP ()
fifoBuffer n in_ out
  | n < 0     = return ()
  | n == 0    = Common.id in_ out
  | otherwise = fifo Seq.empty `onPoisonRethrow` (poison in_ >> poison out)
  where
    fifo :: Seq a -> CHP ()
    fifo s | Seq.null s = takeIn
           | Seq.length s == n = sendOut
           | otherwise = takeIn <-> sendOut
      where
        takeIn = readChannel in_ >>= fifo . addLast s
        sendOut = do writeChannel out (seqHead s)
                     fifo (removeHead s)

-- | Acts like a FIFO buffer with unlimited capacity.  Use with caution; make
-- sure you do not let the buffer grow so large that it eats up all your memory.
--  When it is empty, it offers no output.  It always accepts input.
infiniteBuffer :: forall a. Chanin a -> Chanout a -> CHP ()
infiniteBuffer in_ out
  = buff Seq.empty `onPoisonRethrow` (poison in_ >> poison out)
  where
    buff :: Seq a -> CHP ()
    buff s | Seq.null s = takeIn
           | otherwise = sendOut </> takeIn
      where
        takeIn = readChannel in_ >>= buff . addLast s
        sendOut = do writeChannel out (seqHead s)
                     buff (removeHead s)

-- | Acts like a FIFO buffer of limited capacity, except that when it is full,
-- it always accepts input and discards it.  When it is empty, it does not offer output.
overflowingBuffer :: forall a. Int -> Chanin a -> Chanout a -> CHP ()
overflowingBuffer n in_ out
  | n < 0     = return ()
  | n == 0    = Common.id in_ out
  | otherwise = flow Seq.empty `onPoisonRethrow` (poison in_ >> poison out)
  where
    flow :: Seq a -> CHP ()
    flow s | Seq.null s = takeIn
           | Seq.length s == n = sendOut <-> dropItem
           | otherwise = takeIn <-> sendOut
      where
        takeIn = readChannel in_ >>= flow . addLast s
        dropItem = readChannel in_ >> flow s
        sendOut = do writeChannel out (seqHead s)
                     flow (removeHead s)

-- | Acts like a FIFO buffer of limited capacity, except that when it is full,
-- it always accepts input and pushes out the oldest item in the buffer.  When
-- it is empty, it does not offer output.
overwritingBuffer :: forall a. Int -> Chanin a -> Chanout a -> CHP ()
overwritingBuffer n in_ out
  | n < 0     = return ()
  | n == 0    = Common.id in_ out
  | otherwise = over Seq.empty `onPoisonRethrow` (poison in_ >> poison out)
  where
    over :: Seq a -> CHP ()
    over s | Seq.null s = takeIn
           | Seq.length s == n = sendOut <-> takeInOver
           | otherwise = takeIn <-> sendOut
      where
        takeIn = readChannel in_ >>= over . addLast s
        takeInOver = readChannel in_ >>= over . removeHead . addLast s
        sendOut = do writeChannel out (seqHead s)
                     over (removeHead s)

seqHead :: Seq a -> a
seqHead s = case viewl s of
  EmptyL -> error "Internal code logic error in buffer"
  x :< _ -> x

removeHead :: Seq a -> Seq a
removeHead = Seq.drop 1

addLast :: Seq a -> a -> Seq a
addLast = (Seq.|>)