{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}

module Effectful.Ki (
    -- * Effect
    StructuredConcurrency,

    -- * Handlers
    runStructuredConcurrency,
    withCurrentScope,

    -- * Core API
    Scope,
    Thread,
    scoped,
    fork,
    forkTry,
    await,
    awaitAll,
    withAwaitAll,

    -- * Extended API
    fork_,
    forkWith,
    forkWith_,
    forkTryWith,

    -- ** Thread options
    ThreadOptions (..),
    defaultThreadOptions,
    ThreadAffinity (..),

    -- ** Byte count
    ByteCount,
    kilobytes,
    megabytes,

    -- * STM re-export
    Effectful.Ki.atomically,
    Effectful.Ki.newTVarIO,
    Effectful.Ki.newTMVarIO,
    Effectful.Ki.newEmptyTMVarIO,
) where

import Control.Concurrent.STM hiding (atomically)
import Control.Concurrent.STM qualified as STM
import Control.Exception (Exception)
import Data.Void (Void)
import Effectful
import Effectful.Dispatch.Static
import Effectful.Dispatch.Static.Primitive (cloneEnv)
import Effectful.Dispatch.Static.Unsafe (reallyUnsafeUnliftIO)

import Ki hiding (fork, forkTry, forkTryWith, forkWith, forkWith_, fork_, scoped)
import Ki qualified

data StructuredConcurrency :: Effect

type instance DispatchOf StructuredConcurrency = 'Static 'WithSideEffects
data instance StaticRep StructuredConcurrency = StructuredConcurrency Scope

-- | Run the 'StructuredConcurrency' effect.
runStructuredConcurrency :: IOE :> es => Eff (StructuredConcurrency : es) a -> Eff es a
runStructuredConcurrency :: forall (es :: [Effect]) a.
(IOE :> es) =>
Eff (StructuredConcurrency : es) a -> Eff es a
runStructuredConcurrency Eff (StructuredConcurrency : es) a
k = forall (es :: [Effect]) a.
(HasCallStack, IOE :> es) =>
((forall r. Eff es r -> IO r) -> IO a) -> Eff es a
withEffToIO forall a b. (a -> b) -> a -> b
$ \forall r. Eff es r -> IO r
runInIO ->
    forall a. (Scope -> IO a) -> IO a
Ki.scoped forall a b. (a -> b) -> a -> b
$ \Scope
scope ->
        forall r. Eff es r -> IO r
runInIO forall a b. (a -> b) -> a -> b
$ forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect])
       a.
(DispatchOf e ~ 'Static sideEffects, MaybeIOE sideEffects es) =>
StaticRep e -> Eff (e : es) a -> Eff es a
evalStaticRep (Scope -> StaticRep StructuredConcurrency
StructuredConcurrency Scope
scope) Eff (StructuredConcurrency : es) a
k

scoped :: StructuredConcurrency :> es => Eff es a -> Eff es a
scoped :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
Eff es a -> Eff es a
scoped Eff es a
k = forall (es :: [Effect]) a.
((forall r. Eff es r -> IO r) -> IO a) -> Eff es a
reallyUnsafeUnliftIO forall a b. (a -> b) -> a -> b
$ \forall r. Eff es r -> IO r
runInIO ->
    forall a. (Scope -> IO a) -> IO a
Ki.scoped forall a b. (a -> b) -> a -> b
$ \Scope
scope ->
        forall r. Eff es r -> IO r
runInIO forall a b. (a -> b) -> a -> b
$ forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect])
       a.
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
(StaticRep e -> StaticRep e) -> Eff es a -> Eff es a
localStaticRep (forall a b. a -> b -> a
const (Scope -> StaticRep StructuredConcurrency
StructuredConcurrency Scope
scope)) Eff es a
k

-- | Provide a callback function to run an action within the current `Scope`.
withCurrentScope ::
    StructuredConcurrency :> es =>
    ((forall es' a. StructuredConcurrency :> es' => Eff es' a -> Eff es' a) -> Eff es b) ->
    Eff es b
withCurrentScope :: forall (es :: [Effect]) b.
(StructuredConcurrency :> es) =>
((forall (es :: [Effect]) a.
  (StructuredConcurrency :> es) =>
  Eff es a -> Eff es a)
 -> Eff es b)
-> Eff es b
withCurrentScope (forall (es :: [Effect]) a.
 (StructuredConcurrency :> es) =>
 Eff es a -> Eff es a)
-> Eff es b
f = do
    StaticRep StructuredConcurrency
rep <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep @StructuredConcurrency
    (forall (es :: [Effect]) a.
 (StructuredConcurrency :> es) =>
 Eff es a -> Eff es a)
-> Eff es b
f (forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect])
       a.
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
(StaticRep e -> StaticRep e) -> Eff es a -> Eff es a
localStaticRep (forall a b. a -> b -> a
const StaticRep StructuredConcurrency
rep))

fork ::
    StructuredConcurrency :> es =>
    Eff es a ->
    Eff es (Thread a)
fork :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
Eff es a -> Eff es (Thread a)
fork Eff es a
action = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
        Env es
es' <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
        forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
action Env es
es')

forkTry ::
    Exception e =>
    StructuredConcurrency :> es =>
    Eff es a ->
    Eff es (Thread (Either e a))
forkTry :: forall e (es :: [Effect]) a.
(Exception e, StructuredConcurrency :> es) =>
Eff es a -> Eff es (Thread (Either e a))
forkTry Eff es a
action = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
        Env es
es' <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
        forall e a.
Exception e =>
Scope -> IO a -> IO (Thread (Either e a))
Ki.forkTry Scope
scope (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
action Env es
es')

fork_ ::
    StructuredConcurrency :> es =>
    Eff es Void ->
    Eff es ()
fork_ :: forall (es :: [Effect]).
(StructuredConcurrency :> es) =>
Eff es Void -> Eff es ()
fork_ Eff es Void
action = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
        Env es
es' <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
        Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es Void
action Env es
es')

forkWith ::
    StructuredConcurrency :> es =>
    ThreadOptions ->
    Eff es a ->
    Eff es (Thread a)
forkWith :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
ThreadOptions -> Eff es a -> Eff es (Thread a)
forkWith ThreadOptions
threadOptions Eff es a
action = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
        Env es
es' <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
        forall a. Scope -> ThreadOptions -> IO a -> IO (Thread a)
Ki.forkWith Scope
scope ThreadOptions
threadOptions (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
action Env es
es')

forkWith_ ::
    StructuredConcurrency :> es =>
    ThreadOptions ->
    Eff es Void ->
    Eff es ()
forkWith_ :: forall (es :: [Effect]).
(StructuredConcurrency :> es) =>
ThreadOptions -> Eff es Void -> Eff es ()
forkWith_ ThreadOptions
threadOptions Eff es Void
action = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
        Env es
es' <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
        Scope -> ThreadOptions -> IO Void -> IO ()
Ki.forkWith_ Scope
scope ThreadOptions
threadOptions (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es Void
action Env es
es')

forkTryWith ::
    Exception e =>
    StructuredConcurrency :> es =>
    ThreadOptions ->
    Eff es a ->
    Eff es (Thread (Either e a))
forkTryWith :: forall e (es :: [Effect]) a.
(Exception e, StructuredConcurrency :> es) =>
ThreadOptions -> Eff es a -> Eff es (Thread (Either e a))
forkTryWith ThreadOptions
threadOptions Eff es a
action = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
        Env es
es' <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
        forall e a.
Exception e =>
Scope -> ThreadOptions -> IO a -> IO (Thread (Either e a))
Ki.forkTryWith Scope
scope ThreadOptions
threadOptions (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
action Env es
es')

withAwaitAll :: StructuredConcurrency :> es => (STM () -> STM a) -> Eff es a
withAwaitAll :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
(STM () -> STM a) -> Eff es a
withAwaitAll STM () -> STM a
f = do
    StructuredConcurrency Scope
scope <- forall (e :: Effect) (sideEffects :: SideEffects) (es :: [Effect]).
(DispatchOf e ~ 'Static sideEffects, e :> es) =>
Eff es (StaticRep e)
getStaticRep
    forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
STM.atomically forall a b. (a -> b) -> a -> b
$ STM () -> STM a
f forall a b. (a -> b) -> a -> b
$ Scope -> STM ()
Ki.awaitAll Scope
scope

atomically :: StructuredConcurrency :> es => STM a -> Eff es a
atomically :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
STM a -> Eff es a
atomically = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
STM.atomically

newTVarIO :: StructuredConcurrency :> es => a -> Eff es (TVar a)
newTVarIO :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
a -> Eff es (TVar a)
newTVarIO = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> IO (TVar a)
STM.newTVarIO

newTMVarIO :: StructuredConcurrency :> es => a -> Eff es (TMVar a)
newTMVarIO :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
a -> Eff es (TMVar a)
newTMVarIO = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> IO (TMVar a)
STM.newTMVarIO

newEmptyTMVarIO :: StructuredConcurrency :> es => Eff es (TMVar a)
newEmptyTMVarIO :: forall (es :: [Effect]) a.
(StructuredConcurrency :> es) =>
Eff es (TMVar a)
newEmptyTMVarIO = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall a. IO (TMVar a)
STM.newEmptyTMVarIO