module Data.Poolboy
  ( -- * Configuration
    PoolboySettings (..),
    WorkersCountSettings (..),
    defaultPoolboySettings,
    poolboySettingsWith,
    simpleSerializedLogger,

    -- * Running
    WorkQueue,
    withPoolboy,
    newPoolboy,
    stopWorkQueue,
    isStopWorkQueue,

    -- * Driving
    changeDesiredWorkersCount,
    waitReadyQueue,

    -- * Enqueueing
    enqueue,
    enqueueAfter,
  )
where

import Control.Concurrent
import Control.Concurrent.STM.TQueue
import Control.Exception.Safe (bracket, tryAny)
import Control.Monad
import Control.Monad.STM
import Data.IORef

-- | Initial settings
data PoolboySettings = PoolboySettings
  { PoolboySettings -> WorkersCountSettings
workersCount :: WorkersCountSettings,
    PoolboySettings -> String -> IO ()
log :: String -> IO ()
  }

-- | Initial number of threads
data WorkersCountSettings
  = -- | 'getNumCapabilities' based number
    CapabilitiesWCS
  | FixedWCS Int -- arbitrary number
  deriving stock (WorkersCountSettings -> WorkersCountSettings -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
== :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c== :: WorkersCountSettings -> WorkersCountSettings -> Bool
Eq, Int -> WorkersCountSettings -> ShowS
[WorkersCountSettings] -> ShowS
WorkersCountSettings -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkersCountSettings] -> ShowS
$cshowList :: [WorkersCountSettings] -> ShowS
show :: WorkersCountSettings -> String
$cshow :: WorkersCountSettings -> String
showsPrec :: Int -> WorkersCountSettings -> ShowS
$cshowsPrec :: Int -> WorkersCountSettings -> ShowS
Show)

-- | Usual configuration 'CapabilitiesWCS' and no log
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings =
  PoolboySettings
    { $sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = WorkersCountSettings
CapabilitiesWCS,
      $sel:log:PoolboySettings :: String -> IO ()
log = \String
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    }

-- | Arbitrary-numbered settings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith Int
c = PoolboySettings
defaultPoolboySettings {$sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = Int -> WorkersCountSettings
FixedWCS Int
c}

-- | Simple (but not particularly performant) serialized logger
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger = do
  MVar ()
logLock <- forall a. a -> IO (MVar a)
newMVar ()
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ \String
x ->
    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
logLock forall a b. (a -> b) -> a -> b
$ \() -> do
      String -> IO ()
putStrLn String
x
      forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'backet'-based usage (recommended)
withPoolboy :: PoolboySettings -> (WorkQueue -> IO a) -> IO a
withPoolboy :: forall a. PoolboySettings -> (WorkQueue -> IO a) -> IO a
withPoolboy PoolboySettings
settings = forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings) (\WorkQueue
wq -> WorkQueue -> IO ()
stopWorkQueue WorkQueue
wq forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkQueue -> IO ()
waitStopWorkQueue WorkQueue
wq)

-- | Standalone/manual usage
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings = do
  WorkQueue
wq <-
    TQueue Commands
-> TQueue (Either () (IO ()))
-> IORef Int
-> MVar ()
-> (String -> IO ())
-> WorkQueue
WorkQueue
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (TQueue a)
newTQueueIO
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (TQueue a)
newTQueueIO
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> IO (IORef a)
newIORef Int
0
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (MVar a)
newEmptyMVar
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. Monad m => a -> m a
return PoolboySettings
settings.log

  Int
count <-
    case PoolboySettings
settings.workersCount of
      WorkersCountSettings
CapabilitiesWCS -> IO Int
getNumCapabilities
      FixedWCS Int
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Int
x

  WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq Int
count
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO ()
controller WorkQueue
wq

  forall (m :: * -> *) a. Monad m => a -> m a
return WorkQueue
wq

-- | Request a worker number adjustment
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Commands
ChangeDesiredWorkersCount

-- | Request stopping wrokers
stopWorkQueue :: WorkQueue -> IO ()
stopWorkQueue :: WorkQueue -> IO ()
stopWorkQueue WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands Commands
Stop

-- | Non-blocking check of the work queue's running status
isStopWorkQueue :: WorkQueue -> IO Bool
isStopWorkQueue :: WorkQueue -> IO Bool
isStopWorkQueue WorkQueue
wq =
  Bool -> Bool
not forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. MVar a -> IO Bool
isEmptyMVar WorkQueue
wq.stopped

-- | Block until the queue is totally stopped (no more running worker)
waitStopWorkQueue :: WorkQueue -> IO ()
waitStopWorkQueue :: WorkQueue -> IO ()
waitStopWorkQueue WorkQueue
wq =
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar WorkQueue
wq.stopped

-- | Enqueue one action in the work queue (non-blocking)
enqueue :: WorkQueue -> IO () -> IO ()
enqueue :: WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right

-- | Block until one worker is available
waitReadyQueue :: WorkQueue -> IO ()
waitReadyQueue :: WorkQueue -> IO ()
waitReadyQueue WorkQueue
wq = do
  MVar ()
ready <- forall a. IO (MVar a)
newEmptyMVar
  WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar ()
ready ()
  forall a. MVar a -> IO a
readMVar MVar ()
ready

-- | Enqueue action and some actions to be run after it
enqueueAfter :: Foldable f => WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter :: forall (f :: * -> *).
Foldable f =>
WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter WorkQueue
wq IO ()
x f (IO ())
xs =
  WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ do
    IO ()
x
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ f (IO ())
xs forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq

-- Support (internal)
data WorkQueue = WorkQueue
  { WorkQueue -> TQueue Commands
commands :: TQueue Commands,
    WorkQueue -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
    WorkQueue -> IORef Int
currentWorkersCount :: IORef Int,
    WorkQueue -> MVar ()
stopped :: MVar (),
    WorkQueue -> String -> IO ()
log :: String -> IO ()
  }

data Commands
  = ChangeDesiredWorkersCount Int
  | Stop
  deriving stock (Int -> Commands -> ShowS
[Commands] -> ShowS
Commands -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Commands] -> ShowS
$cshowList :: [Commands] -> ShowS
show :: Commands -> String
$cshow :: Commands -> String
showsPrec :: Int -> Commands -> ShowS
$cshowsPrec :: Int -> Commands -> ShowS
Show)

controller :: WorkQueue -> IO ()
controller :: WorkQueue -> IO ()
controller WorkQueue
wq = do
  Commands
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueue
wq.commands
  let stopOneWorker :: IO ()
stopOneWorker = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left ()
  WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Command: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Commands
command
  case Commands
command of
    ChangeDesiredWorkersCount Int
n -> do
      Int
currentCount <- forall a. IORef a -> IO a
readIORef WorkQueue
wq.currentWorkersCount
      let diff :: Int
diff = Int
currentCount forall a. Num a => a -> a -> a
- Int
n
      if Int
diff forall a. Ord a => a -> a -> Bool
> Int
0
        then forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
diff IO ()
stopOneWorker
        else forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (forall a. Num a => a -> a
abs Int
diff) forall a b. (a -> b) -> a -> b
$ do
          WorkQueue
wq.log String
"Pre-fork"
          IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO ()
worker WorkQueue
wq
      WorkQueue -> IO ()
controller WorkQueue
wq
    Commands
Stop -> do
      Int
currentCount <- forall a. IORef a -> IO a
readIORef WorkQueue
wq.currentWorkersCount
      WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Stopping " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
currentCount forall a. Semigroup a => a -> a -> a
<> String
" workers"
      forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
currentCount IO ()
stopOneWorker

worker :: WorkQueue -> IO ()
worker :: WorkQueue -> IO ()
worker WorkQueue
wq = do
  WorkQueue
wq.log String
"New worker"
  Int
newCount <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' WorkQueue
wq.currentWorkersCount forall a b. (a -> b) -> a -> b
$ \Int
n -> (Int
n forall a. Num a => a -> a -> a
+ Int
1, Int
n forall a. Num a => a -> a -> a
+ Int
1)
  WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"New worker count " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
newCount
  let loop :: IO ()
loop = do
        Either () (IO ())
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueue
wq.queue
        case Either () (IO ())
command of
          Left () -> do
            WorkQueue
wq.log String
"Stopping"
            Int
remaining <-
              forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' WorkQueue
wq.currentWorkersCount forall a b. (a -> b) -> a -> b
$ \Int
n ->
                let count :: Int
count = forall a. Ord a => a -> a -> a
max Int
0 (Int
n forall a. Num a => a -> a -> a
- Int
1) in (Int
count, Int
count)
            WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Remaining: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
remaining
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
remaining forall a. Eq a => a -> a -> Bool
== Int
0) forall a b. (a -> b) -> a -> b
$
              forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
                forall a. MVar a -> a -> IO Bool
tryPutMVar WorkQueue
wq.stopped ()
          Right IO ()
act -> WorkQueue
wq.log String
"pop" forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny IO ()
act) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkQueue
wq.log String
"poped" forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
  IO ()
loop