-- |Description: Async Combinators
module Polysemy.Conc.Async where

import qualified Control.Concurrent.Async as Base
import Polysemy.Time (MilliSeconds (MilliSeconds), TimeUnit)

import Polysemy.Conc.Effect.Race (Race)
import qualified Polysemy.Conc.Effect.Sync as Sync
import Polysemy.Conc.Effect.Sync (ScopedSync, Sync)
import Polysemy.Conc.Interpreter.Sync (interpretSync)
import qualified Polysemy.Conc.Race as Race
import Polysemy.Conc.Sync (withSync)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Passes the handle into the action to allow it to await its result.
--
-- When cancelling, this variant will wait indefinitely for the thread to be gone.
withAsyncBlock ::
  Members [Resource, Async] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsyncBlock :: forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncBlock Sem r b
mb =
  Sem r (Async (Maybe b))
-> (Async (Maybe b) -> Sem r ())
-> (Async (Maybe b) -> Sem r a)
-> Sem r a
forall (r :: [(* -> *) -> * -> *]) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket (Sem r b -> Sem r (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async Sem r b
mb) Async (Maybe b) -> Sem r ()
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Async a -> Sem r ()
cancel

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Passes the handle into the action to allow it to await its result.
--
-- When cancelling, this variant will wait for the specified interval for the thread to be gone.
withAsyncWait ::
  TimeUnit u =>
  Members [Resource, Race, Async] r =>
  u ->
  Sem r b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsyncWait :: forall u (r :: [(* -> *) -> * -> *]) b a.
(TimeUnit u, Members '[Resource, Race, Async] r) =>
u -> Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncWait u
interval Sem r b
mb =
  Sem r (Async (Maybe b))
-> (Async (Maybe b) -> Sem r ())
-> (Async (Maybe b) -> Sem r a)
-> Sem r a
forall (r :: [(* -> *) -> * -> *]) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket (Sem r b -> Sem r (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async Sem r b
mb) (u -> Sem r () -> Sem r ()
forall u (r :: [(* -> *) -> * -> *]).
(TimeUnit u, Member Race r) =>
u -> Sem r () -> Sem r ()
Race.timeoutU u
interval (Sem r () -> Sem r ())
-> (Async (Maybe b) -> Sem r ()) -> Async (Maybe b) -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async (Maybe b) -> Sem r ()
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Async a -> Sem r ()
cancel)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Passes the handle into the action to allow it to await its result.
--
-- When cancelling, this variant will wait for 500ms for the thread to be gone.
withAsync ::
  Members [Resource, Race, Async] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem r a) ->
  Sem r a
withAsync :: forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsync =
  MilliSeconds -> Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
forall u (r :: [(* -> *) -> * -> *]) b a.
(TimeUnit u, Members '[Resource, Race, Async] r) =>
u -> Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsyncWait (Int64 -> MilliSeconds
MilliSeconds Int64
500)

-- |Run the first action asynchronously while the second action executes, then cancel the first action.
-- Discards the handle, expecting the async action to either terminate or be cancelled.
--
-- When cancelling, this variant will wait for 500ms for the thread to be gone.
withAsync_ ::
  Members [Resource, Race, Async] r =>
  Sem r b ->
  Sem r a ->
  Sem r a
withAsync_ :: forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ Sem r b
mb =
  Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
withAsync Sem r b
mb ((Async (Maybe b) -> Sem r a) -> Sem r a)
-> (Sem r a -> Async (Maybe b) -> Sem r a) -> Sem r a -> Sem r a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r a -> Async (Maybe b) -> Sem r a
forall a b. a -> b -> a
const

-- |Run an action with 'async', but don't start it right away, so the thread handle can be processed before the action
-- executes.
--
-- Takes a callback function that is invoked after spawning the thread.
-- The callback receives the 'Base.Async' handle and a unit action that starts the computation.
--
-- This is helpful if the 'Base.Async' has to be stored in state and the same state is written when the action finishes.
-- In that case, the race condition causes the handle to be written over the finished state.
--
-- @
-- makeRequest = put Nothing
--
-- main = scheduleAsync makeRequest \ handle start -> do
--   put (Just handle)
--   start -- now makeRequest is executed
-- @
scheduleAsync ::
   res b r a .
  Members [ScopedSync res (), Async, Race] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a) ->
  Sem r a
scheduleAsync :: forall res b (r :: [(* -> *) -> * -> *]) a.
Members '[ScopedSync res (), Async, Race] r =>
Sem r b
-> (Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a)
-> Sem r a
scheduleAsync Sem r b
mb Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f =
  forall d res (r :: [(* -> *) -> * -> *]).
Member (ScopedSync res d) r =>
InterpreterFor (Sync d) r
withSync @() @res do
    Async (Maybe b)
h <- Sem (Sync () : r) b -> Sem (Sync () : r) (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async do
      forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
Sem r d
Sync.readBlock @()
      Sem r b -> Sem (Sync () : r) b
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise Sem r b
mb
    Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f Async (Maybe b)
h (() -> Sem (Sync () : r) ()
forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
d -> Sem r ()
Sync.putBlock ())

-- |Variant of 'scheduleAsync' that directly interprets the 'Control.Concurrent.MVar' used for signalling.
scheduleAsyncIO ::
   b r a .
  Members [Resource, Async, Race, Embed IO] r =>
  Sem r b ->
  (Base.Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a) ->
  Sem r a
scheduleAsyncIO :: forall b (r :: [(* -> *) -> * -> *]) a.
Members '[Resource, Async, Race, Embed IO] r =>
Sem r b
-> (Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a)
-> Sem r a
scheduleAsyncIO Sem r b
mb Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f =
  forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
InterpreterFor (Sync d) r
interpretSync @() do
    Async (Maybe b)
h <- Sem (Sync () : r) b -> Sem (Sync () : r) (Async (Maybe b))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async do
      forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
Sem r d
Sync.readBlock @()
      Sem r b -> Sem (Sync () : r) b
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise Sem r b
mb
    Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a
f Async (Maybe b)
h (() -> Sem (Sync () : r) ()
forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Sync d) r =>
d -> Sem r ()
Sync.putBlock ())