{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RecordWildCards #-}
module Control.Monad.Schedule.OSThreadPool where

-- base
import Control.Concurrent
import Control.Monad ( void, forM, replicateM )
import Control.Monad.IO.Class
import Data.List.NonEmpty hiding (zip, cycle)
import Data.Proxy
import GHC.TypeLits
import Prelude hiding (take)

-- stm
import Control.Concurrent.STM.TChan

-- rhine
import Control.Monad.Schedule.Class
import Control.Concurrent.STM
import Data.Either (partitionEithers)

newtype OSThreadPool (n :: Nat) a = OSThreadPool { forall (n :: Nat) a. OSThreadPool n a -> IO a
unOSThreadPool :: IO a }
  deriving (forall (n :: Nat) a b. a -> OSThreadPool n b -> OSThreadPool n a
forall (n :: Nat) a b.
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall a b. a -> OSThreadPool n b -> OSThreadPool n a
forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> OSThreadPool n b -> OSThreadPool n a
$c<$ :: forall (n :: Nat) a b. a -> OSThreadPool n b -> OSThreadPool n a
fmap :: forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b
$cfmap :: forall (n :: Nat) a b.
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
Functor, forall (n :: Nat). Functor (OSThreadPool n)
forall (n :: Nat) a. a -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall (n :: Nat) a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (n :: Nat) a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall a. a -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
$c<* :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
*> :: forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
$c*> :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
liftA2 :: forall a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
$cliftA2 :: forall (n :: Nat) a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
<*> :: forall a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
$c<*> :: forall (n :: Nat) a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
pure :: forall a. a -> OSThreadPool n a
$cpure :: forall (n :: Nat) a. a -> OSThreadPool n a
Applicative, forall (n :: Nat). Applicative (OSThreadPool n)
forall (n :: Nat) a. a -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall (n :: Nat) a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
forall a. a -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: forall a. a -> OSThreadPool n a
$creturn :: forall (n :: Nat) a. a -> OSThreadPool n a
>> :: forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
$c>> :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
>>= :: forall a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
$c>>= :: forall (n :: Nat) a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
Monad, forall (n :: Nat). Monad (OSThreadPool n)
forall (n :: Nat) a. IO a -> OSThreadPool n a
forall a. IO a -> OSThreadPool n a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: forall a. IO a -> OSThreadPool n a
$cliftIO :: forall (n :: Nat) a. IO a -> OSThreadPool n a
MonadIO)

data WorkerLink a = WorkerLink
  { forall a. WorkerLink a -> TChan (Maybe (IO a))
jobTChan :: TChan (Maybe (IO a))
  , forall a. WorkerLink a -> TChan a
resultTChan :: TChan a
  }

putJob :: WorkerLink a -> OSThreadPool n a -> IO ()
putJob :: forall a (n :: Nat). WorkerLink a -> OSThreadPool n a -> IO ()
putJob WorkerLink { TChan a
TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
resultTChan :: forall a. WorkerLink a -> TChan a
jobTChan :: forall a. WorkerLink a -> TChan (Maybe (IO a))
.. } OSThreadPool { IO a
unOSThreadPool :: IO a
unOSThreadPool :: forall (n :: Nat) a. OSThreadPool n a -> IO a
.. }
  = forall a. STM a -> IO a
atomically
  forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe (IO a))
jobTChan
  forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just IO a
unOSThreadPool

makeWorkerLink :: IO (WorkerLink a)
makeWorkerLink :: forall a. IO (WorkerLink a)
makeWorkerLink = do
  TChan (Maybe (IO a))
jobTChan <- forall a. STM a -> IO a
atomically forall a. STM (TChan a)
newTChan
  TChan a
resultTChan <- forall a. STM a -> IO a
atomically forall a. STM (TChan a)
newTChan
  let worker :: IO ()
worker = do
        Maybe (IO a)
job <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (Maybe (IO a))
jobTChan
        case Maybe (IO a)
job of
          Maybe (IO a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just IO a
action -> do
            a
result <- IO a
action
            forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan a
resultTChan a
result
            IO ()
worker
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkOS IO ()
worker
  forall (m :: * -> *) a. Monad m => a -> m a
return WorkerLink { TChan a
TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
.. }

proxyForActions :: NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions :: forall (n :: Nat) a. NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions NonEmpty (OSThreadPool n a)
_ = forall {k} (t :: k). Proxy t
Proxy

instance (KnownNat n, (1 <=? n) ~ True) => MonadSchedule (OSThreadPool n) where
  schedule :: forall a.
NonEmpty (OSThreadPool n a)
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
schedule NonEmpty (OSThreadPool n a)
actions = forall (n :: Nat) a. IO a -> OSThreadPool n a
OSThreadPool forall a b. (a -> b) -> a -> b
$ do
    let n :: Integer
n = forall (n :: Nat) (proxy :: Nat -> *).
KnownNat n =>
proxy n -> Integer
natVal forall a b. (a -> b) -> a -> b
$ forall (n :: Nat) a. NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions NonEmpty (OSThreadPool n a)
actions
    [WorkerLink a]
workerLinks <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM (forall a. Num a => Integer -> a
fromInteger Integer
n) forall a. IO (WorkerLink a)
makeWorkerLink
    [TChan a]
backgroundActions <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (forall a b. [a] -> [b] -> [(a, b)]
zip (forall a. [a] -> [a]
cycle [WorkerLink a]
workerLinks) (forall a. NonEmpty a -> [a]
toList NonEmpty (OSThreadPool n a)
actions))
      forall a b. (a -> b) -> a -> b
$ \(WorkerLink a
link, OSThreadPool n a
action) -> do
        forall a (n :: Nat). WorkerLink a -> OSThreadPool n a -> IO ()
putJob WorkerLink a
link OSThreadPool n a
action
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. WorkerLink a -> TChan a
resultTChan WorkerLink a
link
    forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
backgroundActions
    where
      pollPools :: [TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
      pollPools :: forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
chans = do
        [Either (TChan a) a]
results <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. TChan a -> IO (Either (TChan a) a)
pollPool [TChan a]
chans
        case forall a b. [Either a b] -> ([a], [b])
partitionEithers [Either (TChan a) a]
results of
          ([TChan a]
_, []) -> do
            Int -> IO ()
threadDelay Int
1000
            forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
chans
          ([TChan a]
remainingChans, a
a : [a]
as) -> forall (m :: * -> *) a. Monad m => a -> m a
return
            ( a
a forall a. a -> [a] -> NonEmpty a
:| [a]
as
            , forall (n :: Nat) a. IO a -> OSThreadPool n a
OSThreadPool forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TChan a -> STM a
readTChan forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TChan a]
remainingChans
            )

      pollPool :: TChan a -> IO (Either (TChan a) a)
      pollPool :: forall a. TChan a -> IO (Either (TChan a) a)
pollPool TChan a
chan = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a b. a -> Either a b
Left TChan a
chan) forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. STM a -> IO a
atomically (forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
chan)