{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RecordWildCards #-}
module Control.Monad.Schedule.OSThreadPool where

-- base
import Control.Concurrent
import Control.Monad ( void, forM, replicateM )
import Control.Monad.IO.Class
import Data.List.NonEmpty hiding (zip, cycle)
import Data.Proxy
import GHC.TypeLits
import Prelude hiding (take)

-- stm
import Control.Concurrent.STM.TChan

-- rhine
import Control.Monad.Schedule.Class
import Control.Concurrent.STM
import Data.Either (partitionEithers)

newtype OSThreadPool (n :: Nat) a = OSThreadPool { OSThreadPool n a -> IO a
unOSThreadPool :: IO a }
  deriving (a -> OSThreadPool n b -> OSThreadPool n a
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
(forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b)
-> (forall a b. a -> OSThreadPool n b -> OSThreadPool n a)
-> Functor (OSThreadPool n)
forall a b. a -> OSThreadPool n b -> OSThreadPool n a
forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (n :: Nat) a b. a -> OSThreadPool n b -> OSThreadPool n a
forall (n :: Nat) a b.
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> OSThreadPool n b -> OSThreadPool n a
$c<$ :: forall (n :: Nat) a b. a -> OSThreadPool n b -> OSThreadPool n a
fmap :: (a -> b) -> OSThreadPool n a -> OSThreadPool n b
$cfmap :: forall (n :: Nat) a b.
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
Functor, Functor (OSThreadPool n)
a -> OSThreadPool n a
Functor (OSThreadPool n)
-> (forall a. a -> OSThreadPool n a)
-> (forall a b.
    OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b)
-> (forall a b c.
    (a -> b -> c)
    -> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c)
-> (forall a b.
    OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b)
-> (forall a b.
    OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a)
-> Applicative (OSThreadPool n)
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall a. a -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall (n :: Nat). Functor (OSThreadPool n)
forall (n :: Nat) a. a -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall (n :: Nat) a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (n :: Nat) a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
$c<* :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
*> :: OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
$c*> :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
liftA2 :: (a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
$cliftA2 :: forall (n :: Nat) a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
<*> :: OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
$c<*> :: forall (n :: Nat) a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
pure :: a -> OSThreadPool n a
$cpure :: forall (n :: Nat) a. a -> OSThreadPool n a
$cp1Applicative :: forall (n :: Nat). Functor (OSThreadPool n)
Applicative, Applicative (OSThreadPool n)
a -> OSThreadPool n a
Applicative (OSThreadPool n)
-> (forall a b.
    OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b)
-> (forall a b.
    OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b)
-> (forall a. a -> OSThreadPool n a)
-> Monad (OSThreadPool n)
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a. a -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
forall (n :: Nat). Applicative (OSThreadPool n)
forall (n :: Nat) a. a -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall (n :: Nat) a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> OSThreadPool n a
$creturn :: forall (n :: Nat) a. a -> OSThreadPool n a
>> :: OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
$c>> :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
>>= :: OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
$c>>= :: forall (n :: Nat) a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
$cp1Monad :: forall (n :: Nat). Applicative (OSThreadPool n)
Monad, Monad (OSThreadPool n)
Monad (OSThreadPool n)
-> (forall a. IO a -> OSThreadPool n a) -> MonadIO (OSThreadPool n)
IO a -> OSThreadPool n a
forall a. IO a -> OSThreadPool n a
forall (n :: Nat). Monad (OSThreadPool n)
forall (n :: Nat) a. IO a -> OSThreadPool n a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> OSThreadPool n a
$cliftIO :: forall (n :: Nat) a. IO a -> OSThreadPool n a
$cp1MonadIO :: forall (n :: Nat). Monad (OSThreadPool n)
MonadIO)

data WorkerLink a = WorkerLink
  { WorkerLink a -> TChan (Maybe (IO a))
jobTChan :: TChan (Maybe (IO a))
  , WorkerLink a -> TChan a
resultTChan :: TChan a
  }

putJob :: WorkerLink a -> OSThreadPool n a -> IO ()
putJob :: WorkerLink a -> OSThreadPool n a -> IO ()
putJob WorkerLink { TChan a
TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
resultTChan :: forall a. WorkerLink a -> TChan a
jobTChan :: forall a. WorkerLink a -> TChan (Maybe (IO a))
.. } OSThreadPool { IO a
unOSThreadPool :: IO a
unOSThreadPool :: forall (n :: Nat) a. OSThreadPool n a -> IO a
.. }
  = STM () -> IO ()
forall a. STM a -> IO a
atomically
  (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (Maybe (IO a)) -> Maybe (IO a) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe (IO a))
jobTChan
  (Maybe (IO a) -> STM ()) -> Maybe (IO a) -> STM ()
forall a b. (a -> b) -> a -> b
$ IO a -> Maybe (IO a)
forall a. a -> Maybe a
Just IO a
unOSThreadPool

makeWorkerLink :: IO (WorkerLink a)
makeWorkerLink :: IO (WorkerLink a)
makeWorkerLink = do
  TChan (Maybe (IO a))
jobTChan <- STM (TChan (Maybe (IO a))) -> IO (TChan (Maybe (IO a)))
forall a. STM a -> IO a
atomically STM (TChan (Maybe (IO a)))
forall a. STM (TChan a)
newTChan
  TChan a
resultTChan <- STM (TChan a) -> IO (TChan a)
forall a. STM a -> IO a
atomically STM (TChan a)
forall a. STM (TChan a)
newTChan
  let worker :: IO ()
worker = do
        Maybe (IO a)
job <- STM (Maybe (IO a)) -> IO (Maybe (IO a))
forall a. STM a -> IO a
atomically (STM (Maybe (IO a)) -> IO (Maybe (IO a)))
-> STM (Maybe (IO a)) -> IO (Maybe (IO a))
forall a b. (a -> b) -> a -> b
$ TChan (Maybe (IO a)) -> STM (Maybe (IO a))
forall a. TChan a -> STM a
readTChan TChan (Maybe (IO a))
jobTChan
        case Maybe (IO a)
job of
          Maybe (IO a)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just IO a
action -> do
            a
result <- IO a
action
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan a -> a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan a
resultTChan a
result
            IO ()
worker
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkOS IO ()
worker
  WorkerLink a -> IO (WorkerLink a)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerLink :: forall a. TChan (Maybe (IO a)) -> TChan a -> WorkerLink a
WorkerLink { TChan a
TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
.. }

proxyForActions :: NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions :: NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions NonEmpty (OSThreadPool n a)
_ = Proxy n
forall k (t :: k). Proxy t
Proxy

instance (KnownNat n, 1 <= n) => MonadSchedule (OSThreadPool n) where
  schedule :: NonEmpty (OSThreadPool n a)
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
schedule NonEmpty (OSThreadPool n a)
actions = IO (NonEmpty a, [OSThreadPool n a])
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
forall (n :: Nat) a. IO a -> OSThreadPool n a
OSThreadPool (IO (NonEmpty a, [OSThreadPool n a])
 -> OSThreadPool n (NonEmpty a, [OSThreadPool n a]))
-> IO (NonEmpty a, [OSThreadPool n a])
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
forall a b. (a -> b) -> a -> b
$ do
    let n :: Integer
n = Proxy n -> Integer
forall (n :: Nat) (proxy :: Nat -> *).
KnownNat n =>
proxy n -> Integer
natVal (Proxy n -> Integer) -> Proxy n -> Integer
forall a b. (a -> b) -> a -> b
$ NonEmpty (OSThreadPool n a) -> Proxy n
forall (n :: Nat) a. NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions NonEmpty (OSThreadPool n a)
actions
    [WorkerLink a]
workerLinks <- Int -> IO (WorkerLink a) -> IO [WorkerLink a]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM (Integer -> Int
forall a. Num a => Integer -> a
fromInteger Integer
n) IO (WorkerLink a)
forall a. IO (WorkerLink a)
makeWorkerLink
    [TChan a]
backgroundActions <- [(WorkerLink a, OSThreadPool n a)]
-> ((WorkerLink a, OSThreadPool n a) -> IO (TChan a))
-> IO [TChan a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([WorkerLink a]
-> [OSThreadPool n a] -> [(WorkerLink a, OSThreadPool n a)]
forall a b. [a] -> [b] -> [(a, b)]
zip ([WorkerLink a] -> [WorkerLink a]
forall a. [a] -> [a]
cycle [WorkerLink a]
workerLinks) (NonEmpty (OSThreadPool n a) -> [OSThreadPool n a]
forall a. NonEmpty a -> [a]
toList NonEmpty (OSThreadPool n a)
actions))
      (((WorkerLink a, OSThreadPool n a) -> IO (TChan a))
 -> IO [TChan a])
-> ((WorkerLink a, OSThreadPool n a) -> IO (TChan a))
-> IO [TChan a]
forall a b. (a -> b) -> a -> b
$ \(WorkerLink a
link, OSThreadPool n a
action) -> do
        WorkerLink a -> OSThreadPool n a -> IO ()
forall a (n :: Nat). WorkerLink a -> OSThreadPool n a -> IO ()
putJob WorkerLink a
link OSThreadPool n a
action
        TChan a -> IO (TChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TChan a -> IO (TChan a)) -> TChan a -> IO (TChan a)
forall a b. (a -> b) -> a -> b
$ WorkerLink a -> TChan a
forall a. WorkerLink a -> TChan a
resultTChan WorkerLink a
link
    [TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
backgroundActions
    where
      pollPools :: [TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
      pollPools :: [TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
chans = do
        [Either (TChan a) a]
results <- (TChan a -> IO (Either (TChan a) a))
-> [TChan a] -> IO [Either (TChan a) a]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse TChan a -> IO (Either (TChan a) a)
forall a. TChan a -> IO (Either (TChan a) a)
pollPool [TChan a]
chans
        case [Either (TChan a) a] -> ([TChan a], [a])
forall a b. [Either a b] -> ([a], [b])
partitionEithers [Either (TChan a) a]
results of
          ([TChan a]
_, []) -> do
            Int -> IO ()
threadDelay Int
1000
            [TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
chans
          ([TChan a]
remainingChans, a
a : [a]
as) -> (NonEmpty a, [OSThreadPool n a])
-> IO (NonEmpty a, [OSThreadPool n a])
forall (m :: * -> *) a. Monad m => a -> m a
return
            ( a
a a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
as
            , IO a -> OSThreadPool n a
forall (n :: Nat) a. IO a -> OSThreadPool n a
OSThreadPool (IO a -> OSThreadPool n a)
-> (TChan a -> IO a) -> TChan a -> OSThreadPool n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (TChan a -> STM a) -> TChan a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan a -> STM a
forall a. TChan a -> STM a
readTChan (TChan a -> OSThreadPool n a) -> [TChan a] -> [OSThreadPool n a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TChan a]
remainingChans
            )

      pollPool :: TChan a -> IO (Either (TChan a) a)
      pollPool :: TChan a -> IO (Either (TChan a) a)
pollPool TChan a
chan = Either (TChan a) a
-> (a -> Either (TChan a) a) -> Maybe a -> Either (TChan a) a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (TChan a -> Either (TChan a) a
forall a b. a -> Either a b
Left TChan a
chan) a -> Either (TChan a) a
forall a b. b -> Either a b
Right (Maybe a -> Either (TChan a) a)
-> IO (Maybe a) -> IO (Either (TChan a) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TChan a -> STM (Maybe a)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
chan)