{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# OPTIONS_HADDOCK prune #-}

{- |
Utility functions for running 'Program' actions concurrently.

Haskell uses green threads: small lines of work that are scheduled down onto
actual execution contexts (set by default by this library to be one per core).
Haskell threads are incredibly lightweight, and you are encouraged to use them
freely. Haskell provides a rich ecosystem of tools to do work concurrently and
to communicate safely between threads.

This module provides wrappers around some of these primatives so you can use
them easily from the 'Program' monad.

Note that when you fire off a new thread the top-level application state is
/shared/; it's the same @τ@ inherited from the parent 'Program'.
-}
module Core.Program.Threads (
    -- * Concurrency
    forkThread,
    waitThread,
    waitThread_,
    waitThread',
    waitThreads',
    linkThread,
    cancelThread,

    -- * Helper functions
    concurrentThreads,
    concurrentThreads_,
    raceThreads,
    raceThreads_,

    -- * Internals
    Thread,
    unThread,
) where

import Control.Concurrent.Async (Async, AsyncCancelled, cancel)
import qualified Control.Concurrent.Async as Async (
    async,
    cancel,
    concurrently,
    concurrently_,
    link,
    race,
    race_,
    wait,
    waitCatch,
 )
import Control.Concurrent.MVar (
    newMVar,
    readMVar,
 )
import qualified Control.Exception.Safe as Safe (catch, catchAsync)
import Control.Monad (
    void,
 )
import Control.Monad.Reader.Class (MonadReader (ask))
import Core.Program.Context
import Core.Program.Logging
import Core.System.Base
import Core.Text.Rope

{- |
A thread for concurrent computation.

(this wraps __async__'s 'Async')
-}
newtype Thread α = Thread (Async α)

unThread :: Thread α -> Async α
unThread :: Thread α -> Async α
unThread (Thread Async α
a) = Async α
a

{- |
Fork a thread. The child thread will run in the same @Context@ as the calling
@Program@, including sharing the user-defined application state value.

Threads that are launched off as children are on their own! If the code in the
child thread throws an exception that is /not/ caught within that thread, the
exception will kill the thread. Threads dying without telling anyone is a bit
of an anti-pattern, so this library logs a warning-level log message if this
happens.

If you additionally want the exception to propagate back to the parent thread
(say, for example, you want your whole program to die if any of its worker
threads fail), then call 'linkThread' after forking. If you want the other
direction, that is, if you want the forked thread to be cancelled when its
parent is cancelled, then you need to be waiting on it using 'waitThread'.

(this wraps __async__\'s 'Control.Concurrent.Async.async' which in turn wraps
__base__'s 'Control.Concurrent.forkIO')

@since 0.2.7
-}
forkThread :: Program τ α -> Program τ (Thread α)
forkThread :: Program τ α -> Program τ (Thread α)
forkThread Program τ α
program = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    let i :: MVar TimeStamp
i = Context τ -> MVar TimeStamp
forall τ. Context τ -> MVar TimeStamp
startTimeFrom Context τ
context
    let v :: MVar Datum
v = Context τ -> MVar Datum
forall τ. Context τ -> MVar Datum
currentDatumFrom Context τ
context

    IO (Thread α) -> Program τ (Thread α)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread α) -> Program τ (Thread α))
-> IO (Thread α) -> Program τ (Thread α)
forall a b. (a -> b) -> a -> b
$ do
        -- if someone calls resetTimer in the thread it should just be that
        -- thread's local duration that is affected, not the parent. We simply
        -- make a new MVar and copy the current start time into it.

        TimeStamp
start <- MVar TimeStamp -> IO TimeStamp
forall a. MVar a -> IO a
readMVar MVar TimeStamp
i
        MVar TimeStamp
i' <- TimeStamp -> IO (MVar TimeStamp)
forall a. a -> IO (MVar a)
newMVar TimeStamp
start

        -- we also need to fork the current Datum, in the same way that we do
        -- when we create a nested span. We do this simply by creating a new
        -- MVar so that when the new thread updates the attached metadata
        -- it'll be evolving a different object.

        Datum
datum <- MVar Datum -> IO Datum
forall a. MVar a -> IO a
readMVar MVar Datum
v
        MVar Datum
v' <- Datum -> IO (MVar Datum)
forall a. a -> IO (MVar a)
newMVar Datum
datum

        let context' :: Context τ
context' =
                Context τ
context
                    { $sel:startTimeFrom:Context :: MVar TimeStamp
startTimeFrom = MVar TimeStamp
i'
                    , $sel:currentDatumFrom:Context :: MVar Datum
currentDatumFrom = MVar Datum
v'
                    }

        -- fork, and run nested program

        Async α
a <- IO α -> IO (Async α)
forall a. IO a -> IO (Async a)
Async.async (IO α -> IO (Async α)) -> IO α -> IO (Async α)
forall a b. (a -> b) -> a -> b
$ do
            IO α -> (SomeException -> IO α) -> IO α
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
Safe.catch
                (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context' Program τ α
program)
                ( \(SomeException
e :: SomeException) ->
                    let text :: Rope
text = String -> Rope
forall α. Textual α => α -> Rope
intoRope (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
e)
                     in do
                            Context τ -> Program τ () -> IO ()
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context' (Program τ () -> IO ()) -> Program τ () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                Rope -> Program τ ()
forall τ. Rope -> Program τ ()
warn Rope
"Uncaught exception in thread"
                                Rope -> Rope -> Program τ ()
forall τ. Rope -> Rope -> Program τ ()
debug Rope
"e" Rope
text
                            SomeException -> IO α
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
                )

        Thread α -> IO (Thread α)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async α -> Thread α
forall α. Async α -> Thread α
Thread Async α
a)

{- |
Wait for the completion of a thread, returning the result. This is a blocking
operation.

If the thread you are waiting on throws an exception it will be rethrown by
'waitThread'.

If the current thread making this call is cancelled (as a result of being on
the losing side of 'concurrentThreads' or 'raceThreads' for example, or due to
an explicit call to 'cancelThread'), then the thread you are waiting on will
be cancelled. This is necessary to ensure that child threads are not leaked if
you nest `forkThread`s.

(this wraps __async__\'s 'Control.Concurrent.Async.wait', taking care to
ensure the behaviour described above)

@since 0.2.7
-}
waitThread :: Thread α -> Program τ α
waitThread :: Thread α -> Program τ α
waitThread (Thread Async α
a) = IO α -> Program τ α
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO α -> Program τ α) -> IO α -> Program τ α
forall a b. (a -> b) -> a -> b
$ do
    IO α -> (AsyncCancelled -> IO α) -> IO α
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
Safe.catchAsync
        (Async α -> IO α
forall a. Async a -> IO a
Async.wait Async α
a)
        ( \(AsyncCancelled
e :: AsyncCancelled) -> do
            Async α -> IO ()
forall a. Async a -> IO ()
cancel Async α
a
            AsyncCancelled -> IO α
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw AsyncCancelled
e
        )

{- |
Wait for the completion of a thread, discarding its result. This is
particularly useful at the end of a do-block if you're waiting on a worker
thread to finish but don't need its return value, if any; otherwise you have
to explicily deal with the unused return value:

@
    _ <- 'waitThread' t1
    'return' ()
@

which is a bit tedious. Instead, you can just use this convenience function:

@
    'waitThread_' t1
@

The trailing underscore in the name of this function follows the same
convetion as found in "Control.Monad", which has 'Control.Monad.mapM_' which
does the same as 'Control.Monad.mapM' but which likewise discards the return
value.

@since 0.2.7
-}
waitThread_ :: Thread α -> Program τ ()
waitThread_ :: Thread α -> Program τ ()
waitThread_ = Program τ α -> Program τ ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Program τ α -> Program τ ())
-> (Thread α -> Program τ α) -> Thread α -> Program τ ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Thread α -> Program τ α
forall α τ. Thread α -> Program τ α
waitThread

{- |
Wait for a thread to complete, returning the result if the computation was
successful or the exception if one was thrown by the child thread.

This basically is convenience for calling `waitThread` and putting `catch`
around it, but as with all the other @wait*@ functions this ensures that if
the thread waiting is cancelled the cancellation is propagated to the thread
being watched as well.

(this wraps __async__\'s 'Control.Concurrent.Async.waitCatch')

@since 0.4.5
-}
waitThread' :: Thread α -> Program τ (Either SomeException α)
waitThread' :: Thread α -> Program τ (Either SomeException α)
waitThread' (Thread Async α
a) = IO (Either SomeException α) -> Program τ (Either SomeException α)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException α) -> Program τ (Either SomeException α))
-> IO (Either SomeException α)
-> Program τ (Either SomeException α)
forall a b. (a -> b) -> a -> b
$ do
    IO (Either SomeException α)
-> (AsyncCancelled -> IO (Either SomeException α))
-> IO (Either SomeException α)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
Safe.catchAsync
        ( do
            Either SomeException α
result <- Async α -> IO (Either SomeException α)
forall a. Async a -> IO (Either SomeException a)
Async.waitCatch Async α
a
            Either SomeException α -> IO (Either SomeException α)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either SomeException α
result
        )
        ( \(AsyncCancelled
e :: AsyncCancelled) -> do
            Async α -> IO ()
forall a. Async a -> IO ()
cancel Async α
a
            AsyncCancelled -> IO (Either SomeException α)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw AsyncCancelled
e
        )

{- |
Wait for many threads to complete. This function is intended for the scenario
where you fire off a number of worker threads with `forkThread` but rather
than leaving them to run independantly, you need to wait for them all to
complete.

The results of the threads that complete successfully will be returned as
'Right' values. Should any of the threads being waited upon throw an
exception, those exceptions will be returned as 'Left' values.

If you don't need to analyse the failures individually, then you can just
collect the successes using "Data.Either"'s 'Data.Either.rights':

@
    responses <- 'waitThreads''

    'info' "Aggregating results..."
    combineResults ('Data.Either.rights' responses)
@

Likewise, if you /do/ want to do something with all the failures, you might
find 'Data.Either.lefts' useful:

@
    'mapM_' ('warn' . 'intoRope' . 'displayException') ('Data.Either.lefts' responses)
@

If the thread calling 'waitThreads'' is cancelled, then all the threads being
waited upon will also be cancelled. This often occurs within a timeout or
similar control measure implemented using 'raceThreads_'. Should the thread
that spawned all the workers and is waiting for their results be told to
cancel because it lost the "race", the child threads need to be told in turn
to cancel so as to avoid those threads being leaked and continuing to run as
zombies. This function takes care of that.

(this extends __async__\'s 'Control.Concurrent.Async.waitCatch' to work
across a list of Threads, taking care to ensure the cancellation behaviour
described throughout this module)

@since 0.4.5
-}
waitThreads' :: [Thread α] -> Program τ [Either SomeException α]
waitThreads' :: [Thread α] -> Program τ [Either SomeException α]
waitThreads' [Thread α]
ts = IO [Either SomeException α] -> Program τ [Either SomeException α]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Either SomeException α] -> Program τ [Either SomeException α])
-> IO [Either SomeException α]
-> Program τ [Either SomeException α]
forall a b. (a -> b) -> a -> b
$ do
    let as :: [Async α]
as = (Thread α -> Async α) -> [Thread α] -> [Async α]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Thread α -> Async α
forall α. Thread α -> Async α
unThread [Thread α]
ts
    IO [Either SomeException α]
-> (AsyncCancelled -> IO [Either SomeException α])
-> IO [Either SomeException α]
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
Safe.catchAsync
        ( do
            [Either SomeException α]
results <- (Async α -> IO (Either SomeException α))
-> [Async α] -> IO [Either SomeException α]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Async α -> IO (Either SomeException α)
forall a. Async a -> IO (Either SomeException a)
Async.waitCatch [Async α]
as
            [Either SomeException α] -> IO [Either SomeException α]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Either SomeException α]
results
        )
        ( \(AsyncCancelled
e :: AsyncCancelled) -> do
            (Async α -> IO ()) -> [Async α] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async α -> IO ()
forall a. Async a -> IO ()
cancel [Async α]
as
            AsyncCancelled -> IO [Either SomeException α]
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw AsyncCancelled
e
        )

{- |
Ordinarily if an exception is thrown in a forked thread that exception is
silently swollowed. If you instead need the exception to propegate back to the
parent thread, you can \"link\" the two together using this function.

(this wraps __async__\'s 'Control.Concurrent.Async.link')

@since 0.4.2
-}
linkThread :: Thread α -> Program τ ()
linkThread :: Thread α -> Program τ ()
linkThread (Thread Async α
a) = do
    IO () -> Program τ ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Program τ ()) -> IO () -> Program τ ()
forall a b. (a -> b) -> a -> b
$ do
        Async α -> IO ()
forall a. Async a -> IO ()
Async.link Async α
a

{- |
Cancel a thread.

(this wraps __async__\'s 'Control.Concurrent.Async.cancel'. The underlying
mechanism used is to throw the 'AsyncCancelled' to the other thread. That
exception is asynchronous, so will not be trapped by a 'catch' block and will
indeed cause the thread receiving the exception to come to an end)

@since 0.4.5
-}
cancelThread :: Thread α -> Program τ ()
cancelThread :: Thread α -> Program τ ()
cancelThread (Thread Async α
a) = do
    IO () -> Program τ ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Program τ ()) -> IO () -> Program τ ()
forall a b. (a -> b) -> a -> b
$ do
        Async α -> IO ()
forall a. Async a -> IO ()
Async.cancel Async α
a

{- |
Fork two threads and wait for both to finish. The return value is the pair of
each action's return types.

This is the same as calling 'forkThread' and 'waitThread' twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.

@
    (a,b) <- 'concurrentThreads' one two

    -- continue, doing something with both results.
@

For a variant that ingores the return values and just waits for both see
'concurrentThreads_' below.

(this wraps __async__\'s 'Control.Concurrent.Async.concurrently')

@since 0.4.0
-}
concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β)
concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β)
concurrentThreads Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO (α, β) -> Program τ (α, β)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (α, β) -> Program τ (α, β)) -> IO (α, β) -> Program τ (α, β)
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO (α, β)
forall a b. IO a -> IO b -> IO (a, b)
Async.concurrently
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)

{- |
Fork two threads and wait for both to finish.

This is the same as calling 'forkThread' and 'waitThread_' twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.

(this wraps __async__\'s 'Control.Concurrent.Async.concurrently_')

@since 0.4.0
-}
concurrentThreads_ :: Program τ α -> Program τ β -> Program τ ()
concurrentThreads_ :: Program τ α -> Program τ β -> Program τ ()
concurrentThreads_ Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> Program τ ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Program τ ()) -> IO () -> Program τ ()
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.concurrently_
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)

{- |
Fork two threads and race them against each other. This blocks until one or
the other of the threads finishes. The return value will be 'Left' @α@ if the
first program (@one@) completes first, and 'Right' @β@ if it is the second
program (@two@) which finishes first. The sub program which is still running
will be cancelled with an exception.

@
    result <- 'raceThreads' one two
    case result of
        Left a -> do
            -- one finished first
        Right b -> do
            -- two finished first
@

For a variant that ingores the return value and just races the threads see
'raceThreads_' below.

(this wraps __async__\'s 'Control.Concurrent.Async.race')

@since 0.4.0
-}
raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β)
raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β)
raceThreads Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO (Either α β) -> Program τ (Either α β)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either α β) -> Program τ (Either α β))
-> IO (Either α β) -> Program τ (Either α β)
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO (Either α β)
forall a b. IO a -> IO b -> IO (Either a b)
Async.race
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)

{- |
Fork two threads and race them against each other. When one action completes
the other will be cancelled with an exception. This is useful for enforcing
timeouts:

@
    'raceThreads_'
        ('sleepThread' 300)
        (do
            -- We expect this to complete within 5 minutes.
            performAction
        )
@

(this wraps __async__\'s 'Control.Concurrent.Async.race_')

@since 0.4.0
-}
raceThreads_ :: Program τ α -> Program τ β -> Program τ ()
raceThreads_ :: Program τ α -> Program τ β -> Program τ ()
raceThreads_ Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> Program τ ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Program τ ()) -> IO () -> Program τ ()
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.race_
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)