{- |
Module      :  <File name or $Header$ to be replaced automatically>
Description :  This module implements barrier using futures.
Maintainer  :  willig@ki.informatik.uni-frankfurt.de
Stability   :  experimental
Portability :  non-portable (requires Futures)

This module implements barrier using futures.
A rendezvous ensures that two threads meet at a specific point before
continuing their computation. The rendezvous idiom blocks at this point
until both threads have arrived. A barrier is a rendezvous for a
group of processes. Assume that a application is divided into phases where
a couple of threads compute several interims to be the input for the next
phase. Then all threads must complete the current phase before entering
the next. To achieve this behaviour a barrier is placed at the end of a phase.
Note that a barriers is purely for synchronisation and not for exchange of
data.

Warning: All operations on barrier should only be used within the global wrapper function
'Futures.withFuturesDo'!
-}
module Control.Concurrent.Futures.Barrier (
            Bar,
            newBar,
            syncBar
) where
import Control.Concurrent.Futures.Futures as Futures
import Control.Concurrent.Futures.Buffer as Buffer

-- | A new barrier type contains of a buffer containing the count of active threads,
-- a buffer containing the a count of finished threads and a capacity. 
type Bar a = (Buffer a, Buffer [Bool -> IO ()], Int)

-- | Creates a new barrier.
newBar :: Int -> IO (Buffer Int, Buffer [Bool -> IO ()], Int)
newBar n = do
  cnt_act <- Buffer.newBuf
  cnt_fin <- Buffer.newBuf
  putBuf cnt_fin []
  putBuf cnt_act n
  return (cnt_act,cnt_fin,n)

-- | syncs on the barrier
syncBar :: (Buffer Int, Buffer [Bool -> IO ()], Int) -> IO Bool
syncBar (cnt_act,cnt_fin,k) = do
 act <- Buffer.getBuf cnt_act
 fin <- Buffer.getBuf cnt_fin
 case (act==1) of
  True -> do 
          openBar fin
          Buffer.putBuf cnt_act k
          Buffer.putBuf cnt_fin []
          return True
  False -> do
           Buffer.putBuf cnt_act (act-1)
           (h,f) <- Futures.newhandled
           Buffer.putBuf cnt_fin (h:fin)
           wait f

-- | opens all synced threads 
openBar :: [Bool -> IO ()] -> IO ()
openBar ls = case ls of
 x:xs -> do 
  x True
  openBar xs
 [] -> return ()
 
-- | Waits its argument to become true.
wait :: Bool -> IO Bool
wait x = do
 case x of
  True -> return x
  otherwise -> return x