------------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Chan.Split.Implementation
-- Copyright   :  (c) 2012 Leon P Smith
-- License     :  MIT
--
-- Maintainer  :  leon@melding-monads.com
--
------------------------------------------------------------------------------

module Control.Concurrent.Chan.Split.Implementation where

import Control.Concurrent.MVar
import Control.Exception(mask_)
import System.IO.Unsafe(unsafeInterleaveIO)

type List a = MVar (Item a)

data Item a = Item a !(List a)

-- | @SendPorts@ represent one end of the channel.   There is only one
--   @SendPort@ per channel,  though it can be used from multiple threads.
--   Messages can be sent to the channel using 'send'.

newtype SendPort a = SendPort (MVar (List a))


-- | @ReceivePorts@ represent the other end of a channel.   A channel
--   can have many @ReceivePorts@,  which all receive the same messages in
--   a publish/subscribe like manner.  A single @ReceivePort@ can be used
--   from multiple threads,  where every message will be delivered to a
--   single thread in a push/pull like manner.  Use 'receive' to fetch
--   messages from the channel.

newtype ReceivePort a = ReceivePort (MVar (List a))


-- | Creates a new channel and a  @(SendPort, ReceivePort)@ pair representing
--   the two sides of the channel.

new :: IO (SendPort a, ReceivePort a)
new = do
   hole <- newEmptyMVar
   send <- SendPort `fmap` newMVar hole
   recv <- ReceivePort `fmap` newMVar hole
   return (send, recv)


-- | Produces a new channel that initially has zero @ReceivePorts@.
--   Any elements written to this channel before a reader is @'listen'ing@
--   will be eligible for garbage collection.

newSendPort :: IO (SendPort a)
newSendPort = SendPort `fmap` (newMVar =<< newEmptyMVar)


-- | Create a new @ReceivePort@ attached the same channel as a given
--   @SendPort@.  This @ReceivePort@ starts out empty, and remains so
--   until more elements are written to the @SendPort@.

listen :: SendPort a  -> IO (ReceivePort a)
listen   (SendPort a) =  ReceivePort `fmap` withMVar a newMVar


-- | Create a new @ReceivePort@ attached to the same channel as another
--   @ReceivePort@.  These two ports will receive the same messages.
--   Any messages in the channel that have not been consumed by the
--   existing port will also appear in the new port.

duplicate :: ReceivePort a  -> IO (ReceivePort a)
duplicate   (ReceivePort a) =  ReceivePort `fmap` withMVar a newMVar


-- | Fetch an element from a channel.  If no element is available,  it blocks
--   until one is.  Can be used in conjunction with @System.Timeout@.

receive :: ReceivePort a -> IO a
receive (ReceivePort r) = do
    modifyMVar r $ \read_end -> do
      (Item val new_read_end) <- readMVar read_end
      return (new_read_end, val)


-- | Send an element to a channel.   This is asynchronous and does not block.

send :: SendPort a -> a -> IO ()
send (SendPort s) a = do
    new_hole <- newEmptyMVar
    mask_ $ do
      old_hole <- takeMVar s
      putMVar old_hole (Item a new_hole)
      putMVar s new_hole

-- | A right fold over a receiver,  a generalization of @getChanContents@
--   where @getChanContents = fold (:)@. Note that the type of 'fold'
--   implies that the folding function needs to be sufficienctly non-strict,
--   otherwise the result cannot be productive.

fold :: (a -> b -> b) -> ReceivePort a -> IO b
fold f recv = unsafeFold f =<< duplicate recv


-- | 'unsafeFold' should usually be called only on readers that are not
--   subsequently used in other channel operations.  Otherwise it may be
--   possible that the (non-)evaluation of pure values will cause race
--   conditions inside IO computations.  The safer 'fold' uses 'duplicate'
--   to satisfy this condition.

unsafeFold :: (a -> b -> b) -> ReceivePort a -> IO b
unsafeFold f = loop
   where
     loop source = unsafeInterleaveIO $ do
       a <- receive source
       b <- loop source
       return (f a b)