{-# LANGUAGE TupleSections, ConstraintKinds #-}

-- | Extra functions for "Control.Concurrent".
--
--   This module includes three new types of 'MVar', namely 'Lock' (no associated value),
--   'Var' (never empty) and 'Barrier' (filled at most once). See
--   <https://neilmitchell.blogspot.co.uk/2012/06/flavours-of-mvar_04.html this blog post>
--   for examples and justification.
--
--   If you need greater control of exceptions and threads
--   see the <https://hackage.haskell.org/package/slave-thread slave-thread> package.
--   If you need elaborate relationships between threads
--   see the <https://hackage.haskell.org/package/async async> package.
module Control.Concurrent.Extra(
    module Control.Concurrent,
    withNumCapabilities,
    once, onceFork,
    -- * Lock
    Lock, newLock, withLock, withLockTry,
    -- * Var
    Var, newVar, readVar,
    writeVar, writeVar',
    modifyVar, modifyVar',
    modifyVar_, modifyVar_',
    withVar,
    -- * Barrier
    Barrier, newBarrier, signalBarrier, waitBarrier, waitBarrierMaybe,
    ) where

import Control.Concurrent
import Control.Exception.Extra
import Control.Monad.Extra
import Data.Maybe
import Data.Either.Extra
import Data.Functor
import Prelude
import Data.Tuple.Extra (dupe)


-- | On GHC 7.6 and above with the @-threaded@ flag, brackets a call to 'setNumCapabilities'.
--   On lower versions (which lack 'setNumCapabilities') this function just runs the argument action.
withNumCapabilities :: Int -> IO a -> IO a
withNumCapabilities :: Int -> IO a -> IO a
withNumCapabilities Int
new IO a
act | Bool
rtsSupportsBoundThreads = do
    Int
old <- IO Int
getNumCapabilities
    if Int
old Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
new then IO a
act else
        IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Int -> IO ()
setNumCapabilities Int
new) (Int -> IO ()
setNumCapabilities Int
old) IO a
act
withNumCapabilities Int
_ IO a
act = IO a
act


-- | Given an action, produce a wrapped action that runs at most once.
--   If the function raises an exception, the same exception will be reraised each time.
--
-- > let x ||| y = do t1 <- onceFork x; t2 <- onceFork y; t1; t2
-- > \(x :: IO Int) -> void (once x) == pure ()
-- > \(x :: IO Int) -> join (once x) == x
-- > \(x :: IO Int) -> (do y <- once x; y; y) == x
-- > \(x :: IO Int) -> (do y <- once x; y ||| y) == x
once :: IO a -> IO (IO a)
once :: IO a -> IO (IO a)
once IO a
act = do
    Var (Once (Either SomeException a))
var <- Once (Either SomeException a)
-> IO (Var (Once (Either SomeException a)))
forall a. a -> IO (Var a)
newVar Once (Either SomeException a)
forall a. Once a
OncePending
    let run :: Either SomeException a -> IO a
run = (SomeException -> IO a)
-> (a -> IO a) -> Either SomeException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    IO a -> IO (IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO a -> IO (IO a)) -> IO a -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO a) -> IO a)
-> ((forall a. IO a -> IO a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a) -> IO (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ Var (Once (Either SomeException a))
-> (Once (Either SomeException a)
    -> IO (Once (Either SomeException a), IO a))
-> IO (IO a)
forall a b. Var a -> (a -> IO (a, b)) -> IO b
modifyVar Var (Once (Either SomeException a))
var ((Once (Either SomeException a)
  -> IO (Once (Either SomeException a), IO a))
 -> IO (IO a))
-> (Once (Either SomeException a)
    -> IO (Once (Either SomeException a), IO a))
-> IO (IO a)
forall a b. (a -> b) -> a -> b
$ \Once (Either SomeException a)
v -> case Once (Either SomeException a)
v of
        OnceDone Either SomeException a
x -> (Once (Either SomeException a), IO a)
-> IO (Once (Either SomeException a), IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Once (Either SomeException a)
v, IO a -> IO a
forall a. IO a -> IO a
unmask (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Either SomeException a -> IO a
forall a. Either SomeException a -> IO a
run Either SomeException a
x)
        OnceRunning Barrier (Either SomeException a)
x -> (Once (Either SomeException a), IO a)
-> IO (Once (Either SomeException a), IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Once (Either SomeException a)
v, IO a -> IO a
forall a. IO a -> IO a
unmask (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Either SomeException a -> IO a
forall a. Either SomeException a -> IO a
run (Either SomeException a -> IO a)
-> IO (Either SomeException a) -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Barrier (Either SomeException a) -> IO (Either SomeException a)
forall a. Barrier a -> IO a
waitBarrier Barrier (Either SomeException a)
x)
        Once (Either SomeException a)
OncePending -> do
            Barrier (Either SomeException a)
b <- IO (Barrier (Either SomeException a))
forall a. IO (Barrier a)
newBarrier
            (Once (Either SomeException a), IO a)
-> IO (Once (Either SomeException a), IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Once (Either SomeException a), IO a)
 -> IO (Once (Either SomeException a), IO a))
-> (Once (Either SomeException a), IO a)
-> IO (Once (Either SomeException a), IO a)
forall a b. (a -> b) -> a -> b
$ (Barrier (Either SomeException a) -> Once (Either SomeException a)
forall a. Barrier a -> Once a
OnceRunning Barrier (Either SomeException a)
b,) (IO a -> (Once (Either SomeException a), IO a))
-> IO a -> (Once (Either SomeException a), IO a)
forall a b. (a -> b) -> a -> b
$ do
                Either SomeException a
res <- IO a -> IO (Either SomeException a)
forall a. IO a -> IO (Either SomeException a)
try_ (IO a -> IO (Either SomeException a))
-> IO a -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall a. IO a -> IO a
unmask IO a
act
                Barrier (Either SomeException a) -> Either SomeException a -> IO ()
forall a. Partial => Barrier a -> a -> IO ()
signalBarrier Barrier (Either SomeException a)
b Either SomeException a
res
                Var (Once (Either SomeException a))
-> (Once (Either SomeException a)
    -> IO (Once (Either SomeException a)))
-> IO ()
forall a. Var a -> (a -> IO a) -> IO ()
modifyVar_ Var (Once (Either SomeException a))
var ((Once (Either SomeException a)
  -> IO (Once (Either SomeException a)))
 -> IO ())
-> (Once (Either SomeException a)
    -> IO (Once (Either SomeException a)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \Once (Either SomeException a)
_ -> Once (Either SomeException a) -> IO (Once (Either SomeException a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Once (Either SomeException a)
 -> IO (Once (Either SomeException a)))
-> Once (Either SomeException a)
-> IO (Once (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ Either SomeException a -> Once (Either SomeException a)
forall a. a -> Once a
OnceDone Either SomeException a
res
                Either SomeException a -> IO a
forall a. Either SomeException a -> IO a
run Either SomeException a
res

data Once a = OncePending | OnceRunning (Barrier a) | OnceDone a


-- | Like 'once', but immediately starts running the computation on a background thread.
--
-- > \(x :: IO Int) -> join (onceFork x) == x
-- > \(x :: IO Int) -> (do a <- onceFork x; a; a) == x
onceFork :: IO a -> IO (IO a)
onceFork :: IO a -> IO (IO a)
onceFork IO a
act = do
    Barrier (Either SomeException a)
bar <- IO (Barrier (Either SomeException a))
forall a. IO (Barrier a)
newBarrier
    IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO a
act ((Either SomeException a -> IO ()) -> IO ThreadId)
-> (Either SomeException a -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Barrier (Either SomeException a) -> Either SomeException a -> IO ()
forall a. Partial => Barrier a -> a -> IO ()
signalBarrier Barrier (Either SomeException a)
bar
    IO a -> IO (IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO a -> IO (IO a)) -> IO a -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ (SomeException -> IO a)
-> (a -> IO a) -> IO (Either SomeException a) -> IO a
forall (m :: * -> *) a c b.
Monad m =>
(a -> m c) -> (b -> m c) -> m (Either a b) -> m c
eitherM SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO (Either SomeException a) -> IO a)
-> IO (Either SomeException a) -> IO a
forall a b. (a -> b) -> a -> b
$ Barrier (Either SomeException a) -> IO (Either SomeException a)
forall a. Barrier a -> IO a
waitBarrier Barrier (Either SomeException a)
bar


---------------------------------------------------------------------
-- LOCK

-- | Like an 'MVar', but has no value.
--   Used to guarantee single-threaded access, typically to some system resource.
--   As an example:
--
-- @
-- lock <- 'newLock'
-- let output = 'withLock' lock . putStrLn
-- forkIO $ do ...; output \"hello\"
-- forkIO $ do ...; output \"world\"
-- @
--
--   Here we are creating a lock to ensure that when writing output our messages
--   do not get interleaved. This use of 'MVar' never blocks on a put. It is permissible,
--   but rare, that a withLock contains a withLock inside it - but if so,
--   watch out for deadlocks.

newtype Lock = Lock (MVar ())

-- | Create a new 'Lock'.
newLock :: IO Lock
newLock :: IO Lock
newLock = MVar () -> Lock
Lock (MVar () -> Lock) -> IO (MVar ()) -> IO Lock
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()

-- | Perform some operation while holding 'Lock'. Will prevent all other
--   operations from using the 'Lock' while the action is ongoing.
withLock :: Lock -> IO a -> IO a
withLock :: Lock -> IO a -> IO a
withLock (Lock MVar ()
x) = MVar () -> (() -> IO a) -> IO a
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
x ((() -> IO a) -> IO a) -> (IO a -> () -> IO a) -> IO a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> () -> IO a
forall a b. a -> b -> a
const

-- | Like 'withLock' but will never block. If the operation cannot be executed
--   immediately it will return 'Nothing'.
withLockTry :: Lock -> IO a -> IO (Maybe a)
withLockTry :: Lock -> IO a -> IO (Maybe a)
withLockTry (Lock MVar ()
m) IO a
act = IO (Maybe ())
-> (Maybe () -> IO ())
-> (Maybe () -> IO (Maybe a))
-> IO (Maybe a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
    (MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar ()
m)
    (\Maybe ()
v -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe () -> Bool
forall a. Maybe a -> Bool
isJust Maybe ()
v) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m ())
    (\Maybe ()
v -> if Maybe () -> Bool
forall a. Maybe a -> Bool
isJust Maybe ()
v then (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just IO a
act else Maybe a -> IO (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing)


---------------------------------------------------------------------
-- VAR

-- | Like an 'MVar', but must always be full.
--   Used to operate on a mutable variable in a thread-safe way.
--   As an example:
--
-- @
-- hits <- 'newVar' 0
-- forkIO $ do ...; 'modifyVar_' hits (+1); ...
-- i <- 'readVar' hits
-- print (\"HITS\",i)
-- @
--
--   Here we have a variable which we modify atomically, so modifications are
--   not interleaved. This use of 'MVar' never blocks on a put. No modifyVar
--   operation should ever block, and they should always complete in a reasonable
--   timeframe. A 'Var' should not be used to protect some external resource, only
--   the variable contained within. Information from a 'readVar' should not be subsequently
--   inserted back into the 'Var'.
newtype Var a = Var (MVar a)

-- | Create a new 'Var' with a value.
newVar :: a -> IO (Var a)
newVar :: a -> IO (Var a)
newVar = (MVar a -> Var a) -> IO (MVar a) -> IO (Var a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MVar a -> Var a
forall a. MVar a -> Var a
Var (IO (MVar a) -> IO (Var a))
-> (a -> IO (MVar a)) -> a -> IO (Var a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO (MVar a)
forall a. a -> IO (MVar a)
newMVar

-- | Read the current value of the 'Var'.
readVar :: Var a -> IO a
readVar :: Var a -> IO a
readVar (Var MVar a
x) = MVar a -> IO a
forall a. MVar a -> IO a
readMVar MVar a
x

-- | Write a value to become the new value of 'Var'.
writeVar :: Var a -> a -> IO ()
writeVar :: Var a -> a -> IO ()
writeVar Var a
v a
x = Var a -> (a -> IO a) -> IO ()
forall a. Var a -> (a -> IO a) -> IO ()
modifyVar_ Var a
v ((a -> IO a) -> IO ()) -> (a -> IO a) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a -> a -> IO a
forall a b. a -> b -> a
const (IO a -> a -> IO a) -> IO a -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x

-- | Strict variant of 'writeVar'
writeVar' :: Var a -> a -> IO ()
writeVar' :: Var a -> a -> IO ()
writeVar' Var a
v a
x = Var a -> (a -> IO a) -> IO ()
forall a. Var a -> (a -> IO a) -> IO ()
modifyVar_' Var a
v ((a -> IO a) -> IO ()) -> (a -> IO a) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a -> a -> IO a
forall a b. a -> b -> a
const (IO a -> a -> IO a) -> IO a -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x

-- | Modify a 'Var' producing a new value and a return result.
modifyVar :: Var a -> (a -> IO (a, b)) -> IO b
modifyVar :: Var a -> (a -> IO (a, b)) -> IO b
modifyVar (Var MVar a
x) a -> IO (a, b)
f = MVar a -> (a -> IO (a, b)) -> IO b
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar a
x a -> IO (a, b)
f

-- | Strict variant of 'modifyVar''
modifyVar' :: Var a -> (a -> IO (a, b)) -> IO b
modifyVar' :: Var a -> (a -> IO (a, b)) -> IO b
modifyVar' Var a
x a -> IO (a, b)
f = do
    (a
newContents, b
res) <- Var a -> (a -> IO (a, (a, b))) -> IO (a, b)
forall a b. Var a -> (a -> IO (a, b)) -> IO b
modifyVar Var a
x ((a -> IO (a, (a, b))) -> IO (a, b))
-> (a -> IO (a, (a, b))) -> IO (a, b)
forall a b. (a -> b) -> a -> b
$ \a
v -> do
        (a
newContents, b
res) <- a -> IO (a, b)
f a
v
        (a, (a, b)) -> IO (a, (a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
newContents, (a
newContents, b
res))
    a -> IO a
forall a. a -> IO a
evaluate a
newContents
    b -> IO b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
res

-- | Modify a 'Var', a restricted version of 'modifyVar'.
modifyVar_ :: Var a -> (a -> IO a) -> IO ()
modifyVar_ :: Var a -> (a -> IO a) -> IO ()
modifyVar_ (Var MVar a
x) a -> IO a
f = MVar a -> (a -> IO a) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar a
x a -> IO a
f

-- | Strict variant of 'modifyVar_'
modifyVar_' :: Var a -> (a -> IO a) -> IO ()
modifyVar_' :: Var a -> (a -> IO a) -> IO ()
modifyVar_' Var a
x a -> IO a
f = do
    a
newContents <- Var a -> (a -> IO (a, a)) -> IO a
forall a b. Var a -> (a -> IO (a, b)) -> IO b
modifyVar Var a
x ((a -> (a, a)) -> IO a -> IO (a, a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> (a, a)
forall a. a -> (a, a)
dupe (IO a -> IO (a, a)) -> (a -> IO a) -> a -> IO (a, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO a
f)
    a
_ <- a -> IO a
forall a. a -> IO a
evaluate a
newContents
    () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Perform some operation using the value in the 'Var',
--   a restricted version of 'modifyVar'.
withVar :: Var a -> (a -> IO b) -> IO b
withVar :: Var a -> (a -> IO b) -> IO b
withVar (Var MVar a
x) a -> IO b
f = MVar a -> (a -> IO b) -> IO b
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar a
x a -> IO b
f


---------------------------------------------------------------------
-- BARRIER

-- | Starts out empty, then is filled exactly once. As an example:
--
-- @
-- bar <- 'newBarrier'
-- forkIO $ do ...; val <- ...; 'signalBarrier' bar val
-- print =<< 'waitBarrier' bar
-- @
--
--   Here we create a barrier which will contain some computed value.
--   A thread is forked to fill the barrier, while the main thread waits
--   for it to complete. A barrier has similarities to a future or promise
--   from other languages, has been known as an IVar in other Haskell work,
--   and in some ways is like a manually managed thunk.
newtype Barrier a = Barrier (MVar a)

-- | Create a new 'Barrier'.
newBarrier :: IO (Barrier a)
newBarrier :: IO (Barrier a)
newBarrier = MVar a -> Barrier a
forall a. MVar a -> Barrier a
Barrier (MVar a -> Barrier a) -> IO (MVar a) -> IO (Barrier a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar

-- | Write a value into the Barrier, releasing anyone at 'waitBarrier'.
--   Any subsequent attempts to signal the 'Barrier' will throw an exception.
signalBarrier :: Partial => Barrier a -> a -> IO ()
signalBarrier :: Barrier a -> a -> IO ()
signalBarrier (Barrier MVar a
var) a
v = do
    Bool
b <- MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
var a
v
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. Partial => String -> IO a
errorIO String
"Control.Concurrent.Extra.signalBarrier, attempt to signal a barrier that has already been signaled"


-- | Wait until a barrier has been signaled with 'signalBarrier'.
waitBarrier :: Barrier a -> IO a
waitBarrier :: Barrier a -> IO a
waitBarrier (Barrier MVar a
var) = MVar a -> IO a
forall a. MVar a -> IO a
readMVar MVar a
var


-- | A version of 'waitBarrier' that never blocks, returning 'Nothing'
--   if the barrier has not yet been signaled.
waitBarrierMaybe :: Barrier a -> IO (Maybe a)
waitBarrierMaybe :: Barrier a -> IO (Maybe a)
waitBarrierMaybe (Barrier MVar a
bar) = MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
bar