-- |Description: Async Combinators
module Polysemy.Conc.Async where

import qualified Control.Concurrent.Async as Base
import Polysemy.Time (MilliSeconds (MilliSeconds), TimeUnit)

import Polysemy.Conc.Effect.Gate (Gate, gate, withGate)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Effect.Scoped (Scoped_)
import qualified Polysemy.Conc.Effect.Sync as Sync
import Polysemy.Conc.Effect.Sync (ScopedSync, Sync)
import Polysemy.Conc.Interpreter.Sync (interpretSync)
import qualified Polysemy.Conc.Race as Race
import Polysemy.Conc.Sync (withSync)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Passes the handle into the action to allow it to await its result.
--
-- When cancelling, this variant will wait indefinitely for the thread to be gone.
withAsyncBlock ::
  Members [Resource, Async] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsyncBlock :: forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncBlock Sem r b
mb Async (Maybe b) -> Sem r a
use = do
  Async (Maybe b)
handle <- Sem r b -> Sem r (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async Sem r b
mb
  Sem r a -> Sem r () -> Sem r a
forall (r :: [(* -> *) -> * -> *]) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally (Async (Maybe b) -> Sem r a
use Async (Maybe b)
handle) (Async (Maybe b) -> Sem r ()
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Async a -> Sem r ()
cancel Async (Maybe b)
handle)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Passes the handle into the sync action to allow it to await the async action's result.
--
-- When cancelling, this variant will wait for the specified interval for the thread to be gone.
withAsyncWait ::
  TimeUnit u =>
  Members [Resource, Race, Async] r =>
  u ->
  Sem r b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsyncWait :: forall u (r :: [(* -> *) -> * -> *]) b a.
(TimeUnit u, Members '[Resource, Race, Async] r) =>
u -> Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncWait u
interval Sem r b
mb Async (Maybe b) -> Sem r a
use = do
  Async (Maybe b)
handle <- Sem r b -> Sem r (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async Sem r b
mb
  Sem r a -> Sem r () -> Sem r a
forall (r :: [(* -> *) -> * -> *]) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally (Async (Maybe b) -> Sem r a
use Async (Maybe b)
handle) (u -> Sem r () -> Sem r ()
forall u (r :: [(* -> *) -> * -> *]).
(TimeUnit u, Member Race r) =>
u -> Sem r () -> Sem r ()
Race.timeoutU u
interval (Async (Maybe b) -> Sem r ()
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Async a -> Sem r ()
cancel Async (Maybe b)
handle))

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Passes the handle into the sync action to allow it to await the async action's result.
--
-- When cancelling, this variant will wait for 500ms for the thread to be gone.
withAsync ::
  Members [Resource, Race, Async] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsync :: forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsync =
  MilliSeconds -> Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
forall u (r :: [(* -> *) -> * -> *]) b a.
(TimeUnit u, Members '[Resource, Race, Async] r) =>
u -> Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncWait (Int64 -> MilliSeconds
MilliSeconds Int64
500)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Discards the handle, expecting the async action to either terminate or be cancelled.
--
-- When cancelling, this variant will wait for 500ms for the thread to be gone.
withAsync_ ::
  Members [Resource, Race, Async] r =>
  Sem r b ->
  Sem r a ->
  Sem r a
withAsync_ :: forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ Sem r b
mb =
  Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsync Sem r b
mb ((Async (Maybe b) -> Sem r a) -> Sem r a)
-> (Sem r a -> Async (Maybe b) -> Sem r a) -> Sem r a -> Sem r a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r a -> Async (Maybe b) -> Sem r a
forall a b. a -> b -> a
const

-- |Run an action with 'async', but don't start it right away, so the thread handle can be processed before the action
-- executes.
--
-- Takes a callback function that is invoked after spawning the thread.
-- The callback receives the 'Base.Async' handle and a unit action that starts the computation.
--
-- This is helpful if the 'Base.Async' has to be stored in state and the same state is written when the action finishes.
-- In that case, the race condition causes the handle to be written over the finished state.
--
-- @
-- makeRequest = put Nothing
--
-- main = scheduleAsync makeRequest \ handle start -> do
--   put (Just handle)
--   start -- now makeRequest is executed
-- @
scheduleAsync ::
   b r a .
  Members [ScopedSync (), Async, Race] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a) ->
  Sem r a
scheduleAsync :: forall b (r :: [(* -> *) -> * -> *]) a.
Members '[ScopedSync (), Async, Race] r =>
Sem r b
-> (Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a)
-> Sem r a
scheduleAsync Sem r b
mb Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f =
  forall d (r :: [(* -> *) -> * -> *]).
Member (ScopedSync d) r =>
InterpreterFor (Sync d) r
withSync @() do
    Async (Maybe b)
h <- Sem (Sync () : r) b -> Sem (Sync () : r) (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async do
      forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
Sem r d
Sync.block @()
      Sem r b -> Sem (Sync () : r) b
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise Sem r b
mb
    Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f Async (Maybe b)
h (() -> Sem (Sync () : r) ()
forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
d -> Sem r ()
Sync.putBlock ())

-- |Variant of 'scheduleAsync' that directly interprets the 'Control.Concurrent.MVar' used for signalling.
scheduleAsyncIO ::
   b r a .
  Members [Resource, Async, Race, Embed IO] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a) ->
  Sem r a
scheduleAsyncIO :: forall b (r :: [(* -> *) -> * -> *]) a.
Members '[Resource, Async, Race, Embed IO] r =>
Sem r b
-> (Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a)
-> Sem r a
scheduleAsyncIO Sem r b
mb Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f =
  forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
InterpreterFor (Sync d) r
interpretSync @() do
    Async (Maybe b)
h <- Sem (Sync () : r) b -> Sem (Sync () : r) (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async do
      forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
Sem r d
Sync.block @()
      Sem r b -> Sem (Sync () : r) b
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise Sem r b
mb
    Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f Async (Maybe b)
h (() -> Sem (Sync () : r) ()
forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
d -> Sem r ()
Sync.putBlock ())

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
--
-- The second action will only start when the first action calls 'Polysemy.Conc.Gate.signal'.
--
-- Passes the handle into the sync action to allow it to await the async action's result.
--
-- This can be used to ensure that the async action has acquired its resources before the main action starts.
withAsyncGated ::
   b r a .
  Members [Scoped_ Gate, Resource, Race, Async] r =>
  Sem (Gate : r) b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsyncGated :: forall b (r :: [(* -> *) -> * -> *]) a.
Members '[Scoped_ Gate, Resource, Race, Async] r =>
Sem (Gate : r) b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncGated Sem (Gate : r) b
mb Async (Maybe b) -> Sem r a
use =
  Sem (Gate : r) a -> Sem r a
forall (r :: [(* -> *) -> * -> *]).
Member (Scoped_ Gate) r =>
InterpreterFor Gate r
withGate (Sem (Gate : r) a -> Sem r a) -> Sem (Gate : r) a -> Sem r a
forall a b. (a -> b) -> a -> b
$ Sem (Gate : r) b
-> (Async (Maybe b) -> Sem (Gate : r) a) -> Sem (Gate : r) a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsync Sem (Gate : r) b
mb \ Async (Maybe b)
h -> do
    Sem (Gate : r) ()
forall (r :: [(* -> *) -> * -> *]).
MemberWithError Gate r =>
Sem r ()
gate
    Sem r a -> Sem (Gate : r) a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise (Async (Maybe b) -> Sem r a
use Async (Maybe b)
h)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
--
-- The second action will only start when the first action calls 'Polysemy.Conc.Gate.signal'.
--
-- This can be used to ensure that the async action has acquired its resources before the main action starts.
withAsyncGated_ ::
   b r a .
  Members [Scoped_ Gate, Resource, Race, Async] r =>
  Sem (Gate : r) b ->
  Sem r a ->
  Sem r a
withAsyncGated_ :: forall b (r :: [(* -> *) -> * -> *]) a.
Members '[Scoped_ Gate, Resource, Race, Async] r =>
Sem (Gate : r) b -> Sem r a -> Sem r a
withAsyncGated_ Sem (Gate : r) b
mb Sem r a
use =
  Sem (Gate : r) a -> Sem r a
forall (r :: [(* -> *) -> * -> *]).
Member (Scoped_ Gate) r =>
InterpreterFor Gate r
withGate (Sem (Gate : r) a -> Sem r a) -> Sem (Gate : r) a -> Sem r a
forall a b. (a -> b) -> a -> b
$ Sem (Gate : r) b -> Sem (Gate : r) a -> Sem (Gate : r) a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ Sem (Gate : r) b
mb do
    Sem (Gate : r) ()
forall (r :: [(* -> *) -> * -> *]).
MemberWithError Gate r =>
Sem r ()
gate
    Sem r a -> Sem (Gate : r) a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise Sem r a
use