{-# LANGUAGE DeriveDataTypeable #-}

-- |
-- Module      :  Control.Concurrent.Chan.Split.Implementation
-- Copyright   :  (c) 2012 Leon P Smith
-- License     :  MIT
-- Maintainer  :  leon@melding-monads.com

module Control.Concurrent.Chan.Split.Implementation where

import Control.Applicative
import Control.Concurrent.MVar
import Control.Exception(mask_, onException)
import Data.IORef
import Data.Typeable(Typeable)
import System.IO.Unsafe(unsafeInterleaveIO)

type List a = MVar (Item a)

data Item a = Item a {-# UNPACK #-} !(List a)

-- | @SendPorts@ represent one end of the channel.   There is only one
--   @SendPort@ per channel,  though it can be used from multiple threads.
--   Messages can be sent to the channel using 'send'.

newtype SendPort a = SendPort (MVar (List a)) deriving (Eq, Typeable)

-- | @ReceivePorts@ represent the other end of a channel.   A channel
--   can have many @ReceivePorts@,  which all receive the same messages in
--   a publish/subscribe like manner.  A single @ReceivePort@ can be used
--   from multiple threads,  where every message will be delivered to a
--   single thread in a push/pull like manner.  Use 'receive' to fetch
--   messages from the channel.

data ReceivePort a = ReceivePort
    { rp_ref  :: {-# UNPACK #-} !(IORef (List a))
    , rp_lock :: {-# UNPACK #-} !(MVar ())
    } deriving (Eq, Typeable)

-- | Creates a new channel and a  @(SendPort, ReceivePort)@ pair representing
--   the two sides of the channel.

new :: IO (SendPort a, ReceivePort a)
new = do
   hole <- newEmptyMVar
   send <- SendPort <$> newMVar hole
   recv <- ReceivePort <$> newIORef hole <*> newMVar ()
   return (send, recv)

-- | Produces a new channel that initially has zero @ReceivePorts@.
--   Any messages written to this channel before a reader is @'listen'ing@
--   will be eligible for garbage collection.   Note that one can
--   one can implement 'newSendPort' in terms of 'new' by throwing away the
--   'ReceivePort' and letting it be garbage collected,   and that one can
--   implement 'new' in terms of 'newSendPort' and 'listen'.

newSendPort :: IO (SendPort a)
newSendPort = SendPort `fmap` (newMVar =<< newEmptyMVar)

-- | Create a new @ReceivePort@ attached the same channel as a given
--   @SendPort@.  This @ReceivePort@ starts out empty, and remains so
--   until more messages are written to the @SendPort@.

listen :: SendPort a  -> IO (ReceivePort a)
listen (SendPort a) = ReceivePort <$> (newIORef =<< readMVar a) <*> newMVar ()

-- | Create a new @ReceivePort@ attached to the same channel as another
--   @ReceivePort@.  These two ports will receive the same messages.
--   Any messages in the channel that have not been consumed by the
--   existing port will also appear in the new port.

duplicate :: ReceivePort a  -> IO (ReceivePort a)
duplicate   (ReceivePort ref _) = do
    rp_ref  <- newIORef =<< readIORef ref
    rp_lock <- newMVar ()
    return $! ReceivePort rp_ref rp_lock

-- | Fetch a message from a channel.  If no message is available,  it blocks
--   until one is.  Can be used in conjunction with @System.Timeout@.

receive :: ReceivePort a -> IO a
receive (ReceivePort ref lock) =
    withLock lock $ do
      read_end <- readIORef ref
      (Item val new_read_end) <- readMVar read_end
      writeIORef ref new_read_end
      return val

-- | Send a message to a channel.   This is asynchronous and does not block.

send :: SendPort a -> a -> IO ()
send (SendPort s) a = do
    new_hole <- newEmptyMVar
    mask_ $ do
      old_hole <- takeMVar s
      putMVar old_hole (Item a new_hole)
      putMVar s new_hole

-- | A right fold over a receiver.   @fold (:)@ is quite similar to
--   @getChanContents@.  The one difference is that that @fold@ does not
--   produce any observable side-effects on the @ReceivePort@;  unlike
--   @getChanContents@ any messages observed by @fold@ are not removed
--   from the port.
--   Note that the type of 'fold' implies that the folding function needs
--   to be sufficiently non-strict, otherwise the result cannot be productive.

fold :: (a -> b -> b) -> ReceivePort a -> IO b
fold f (ReceivePort ref _) = readIORef ref >>= foldList f

-- | Traverse a 'List' directly.  Avoids the outer 'MVar' overhead of calling
-- 'receive' over and over.
foldList :: (a -> b -> b) -> List a -> IO b
foldList f list =
    unsafeInterleaveIO $ do
        Item a list' <- readMVar list
        b <- foldList f list'
        return (f a b)

-- | Atomically send many messages at once.   Note that this function
--   forces the spine of the list beforehand to minimize the critical section,
--   which also helps prevent exceptions at inopportune times.  Trying to send
--   an infinite list will never send anything,  though it will allocate and
--   retain a lot of memory trying to do so.

sendMany :: SendPort a -> [a] -> IO ()
sendMany _ [] = return ()
sendMany s (a:as) = do
    new_hole <- newEmptyMVar
    loop s (Item a new_hole) new_hole as
    loop s msgs hole (a:as) = do
       new_hole <- newEmptyMVar
       putMVar hole (Item a new_hole)
       loop s msgs new_hole as
    loop (SendPort s) msgs new_hole [] = mask_ $ do
       hole <- takeMVar s
       putMVar hole msgs
       putMVar s new_hole

-- | This function splits an existing channel in two;  associating
--   a new receive port with the old send port,  and a new send
--   port with the existing receive ports.   The new receive port
--   starts out empty,  while the existing receive ports retain
--   any unprocessed messages.
--   <<split.png>>

split :: SendPort a -> IO (ReceivePort a, SendPort a)
split (SendPort s) = do
    new_hole <- newEmptyMVar
    old_hole <- swapMVar s new_hole
    rp <- ReceivePort <$> newIORef new_hole <*> newMVar ()
    sp <- SendPort `fmap` newMVar old_hole
    return (rp, sp)

{-# INLINE withLock #-}
withLock :: MVar () -> IO b -> IO b
withLock m io =
  mask_ $ do
    _ <- takeMVar m
    b <- io `onException` putMVar m ()
    putMVar m ()
    return b