{-

    This module constitutes the main contribution of the library

-}

module Unfork.Async.Core where

import Prelude (Eq ((==)), IO, pure)

import Control.Applicative ((<|>))
import Control.Concurrent.Async (concurrently)
import Control.Monad (guard, join)
import Control.Monad.STM (STM, atomically)
import Data.Functor (($>), (<&>))

import qualified Control.Concurrent.STM as STM

data Unfork a c = forall q. Unfork
  { ()
unforkedAction ::         --   The unforked action that we give
      !( Ctx q -> a -> c )    --  to the user-provided continuation
  , ()
executeOneTask ::
      !( q -> IO () )   -- How the queue worker processes each item
  }

data Ctx q = Ctx          -- Mutable context for an async unforking
  { forall q. Ctx q -> TQueue q
queue :: !(STM.TQueue q)
  , forall q. Ctx q -> TVar Status
stopper :: !(STM.TVar Status)
  }

data Status = Stop | Go          deriving Status -> Status -> Bool
(Status -> Status -> Bool)
-> (Status -> Status -> Bool) -> Eq Status
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Status -> Status -> Bool
== :: Status -> Status -> Bool
$c/= :: Status -> Status -> Bool
/= :: Status -> Status -> Bool
Eq

enqueue :: Ctx q -> q -> STM ()              --  Write to the queue
enqueue :: forall q. Ctx q -> q -> STM ()
enqueue Ctx{ TQueue q
queue :: forall q. Ctx q -> TQueue q
queue :: TQueue q
queue } = TQueue q -> q -> STM ()
forall a. TQueue a -> a -> STM ()
STM.writeTQueue TQueue q
queue

next :: Ctx q -> STM q                      --  Read from the queue
next :: forall q. Ctx q -> STM q
next Ctx{ TQueue q
queue :: forall q. Ctx q -> TQueue q
queue :: TQueue q
queue } = TQueue q -> STM q
forall a. TQueue a -> STM a
STM.readTQueue TQueue q
queue

stop :: Ctx q -> IO ()   --  Indicate to the queue loop thread that
stop :: forall q. Ctx q -> IO ()
stop Ctx{ TVar Status
stopper :: forall q. Ctx q -> TVar Status
stopper :: TVar Status
stopper } =    --  it should stop once all tasks are done
    STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Status -> Status -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar Status
stopper Status
Stop)

checkStopped :: Ctx q -> STM ()     --     STM action that succeeds
checkStopped :: forall q. Ctx q -> STM ()
checkStopped Ctx{ TVar Status
stopper :: forall q. Ctx q -> TVar Status
stopper :: TVar Status
stopper } = do    --  only if 'stop' has been run
    Status
s <- TVar Status -> STM Status
forall a. TVar a -> STM a
STM.readTVar TVar Status
stopper
    Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Status
s Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
Stop)

unforkAsync ::                   --        This is the basis of all
    Unfork a c                   --  four async unforking functions
    -> ((a -> c) -> IO b)
    -> IO b
unforkAsync :: forall a c b. Unfork a c -> ((a -> c) -> IO b) -> IO b
unforkAsync Unfork{ Ctx q -> a -> c
unforkedAction :: ()
unforkedAction :: Ctx q -> a -> c
unforkedAction, q -> IO ()
executeOneTask :: ()
executeOneTask :: q -> IO ()
executeOneTask } (a -> c) -> IO b
continue =
  do
    Ctx q
ctx <- do
        TQueue q
queue <- IO (TQueue q)
forall a. IO (TQueue a)
STM.newTQueueIO
        TVar Status
stopper <- Status -> IO (TVar Status)
forall a. a -> IO (TVar a)
STM.newTVarIO Status
Go
        Ctx q -> IO (Ctx q)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ctx{ TQueue q
queue :: TQueue q
queue :: TQueue q
queue, TVar Status
stopper :: TVar Status
stopper :: TVar Status
stopper }

    let
      loop :: IO ()
loop = IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ())
act STM (IO ()) -> STM (IO ()) -> STM (IO ())
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (IO ())
done))
        where
          act :: STM (IO ())
act = Ctx q -> STM q
forall q. Ctx q -> STM q
next Ctx q
ctx STM q -> (q -> IO ()) -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \q
x -> do{ q -> IO ()
executeOneTask q
x; IO ()
loop }
          done :: STM (IO ())
done = Ctx q -> STM ()
forall q. Ctx q -> STM ()
checkStopped Ctx q
ctx STM () -> IO () -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    ((), b
c) <- IO () -> IO b -> IO ((), b)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO ()
loop do
        b
x <- (a -> c) -> IO b
continue (Ctx q -> a -> c
unforkedAction Ctx q
ctx)
        Ctx q -> IO ()
forall q. Ctx q -> IO ()
stop Ctx q
ctx
        b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
x

    b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
c