module Unfork.Async.Core where
import Prelude (Eq ((==)), IO, pure)
import Control.Applicative ((<|>))
import Control.Concurrent.Async (concurrently)
import Control.Monad (guard, join)
import Control.Monad.STM (STM, atomically)
import Data.Functor (($>), (<&>))
import qualified Control.Concurrent.STM as STM
data Unfork a c = forall q. Unfork
{ ()
unforkedAction ::
!( Ctx q -> a -> c )
, ()
executeOneTask ::
!( q -> IO () )
}
data Ctx q = Ctx
{ forall q. Ctx q -> TQueue q
queue :: !(STM.TQueue q)
, forall q. Ctx q -> TVar Status
stopper :: !(STM.TVar Status)
}
data Status = Stop | Go deriving Status -> Status -> Bool
(Status -> Status -> Bool)
-> (Status -> Status -> Bool) -> Eq Status
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Status -> Status -> Bool
== :: Status -> Status -> Bool
$c/= :: Status -> Status -> Bool
/= :: Status -> Status -> Bool
Eq
enqueue :: Ctx q -> q -> STM ()
enqueue :: forall q. Ctx q -> q -> STM ()
enqueue Ctx{ TQueue q
queue :: forall q. Ctx q -> TQueue q
queue :: TQueue q
queue } = TQueue q -> q -> STM ()
forall a. TQueue a -> a -> STM ()
STM.writeTQueue TQueue q
queue
next :: Ctx q -> STM q
next :: forall q. Ctx q -> STM q
next Ctx{ TQueue q
queue :: forall q. Ctx q -> TQueue q
queue :: TQueue q
queue } = TQueue q -> STM q
forall a. TQueue a -> STM a
STM.readTQueue TQueue q
queue
stop :: Ctx q -> IO ()
stop :: forall q. Ctx q -> IO ()
stop Ctx{ TVar Status
stopper :: forall q. Ctx q -> TVar Status
stopper :: TVar Status
stopper } =
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Status -> Status -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar Status
stopper Status
Stop)
checkStopped :: Ctx q -> STM ()
checkStopped :: forall q. Ctx q -> STM ()
checkStopped Ctx{ TVar Status
stopper :: forall q. Ctx q -> TVar Status
stopper :: TVar Status
stopper } = do
Status
s <- TVar Status -> STM Status
forall a. TVar a -> STM a
STM.readTVar TVar Status
stopper
Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Status
s Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
Stop)
unforkAsync ::
Unfork a c
-> ((a -> c) -> IO b)
-> IO b
unforkAsync :: forall a c b. Unfork a c -> ((a -> c) -> IO b) -> IO b
unforkAsync Unfork{ Ctx q -> a -> c
unforkedAction :: ()
unforkedAction :: Ctx q -> a -> c
unforkedAction, q -> IO ()
executeOneTask :: ()
executeOneTask :: q -> IO ()
executeOneTask } (a -> c) -> IO b
continue =
do
Ctx q
ctx <- do
TQueue q
queue <- IO (TQueue q)
forall a. IO (TQueue a)
STM.newTQueueIO
TVar Status
stopper <- Status -> IO (TVar Status)
forall a. a -> IO (TVar a)
STM.newTVarIO Status
Go
Ctx q -> IO (Ctx q)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ctx{ TQueue q
queue :: TQueue q
queue :: TQueue q
queue, TVar Status
stopper :: TVar Status
stopper :: TVar Status
stopper }
let
loop :: IO ()
loop = IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ())
act STM (IO ()) -> STM (IO ()) -> STM (IO ())
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (IO ())
done))
where
act :: STM (IO ())
act = Ctx q -> STM q
forall q. Ctx q -> STM q
next Ctx q
ctx STM q -> (q -> IO ()) -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \q
x -> do{ q -> IO ()
executeOneTask q
x; IO ()
loop }
done :: STM (IO ())
done = Ctx q -> STM ()
forall q. Ctx q -> STM ()
checkStopped Ctx q
ctx STM () -> IO () -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
((), b
c) <- IO () -> IO b -> IO ((), b)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO ()
loop do
b
x <- (a -> c) -> IO b
continue (Ctx q -> a -> c
unforkedAction Ctx q
ctx)
Ctx q -> IO ()
forall q. Ctx q -> IO ()
stop Ctx q
ctx
b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
x
b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
c