-- | Evaluating promises in parallel module Control.Concurrent.STM.Promise.Workers (workers,worker,evaluatePromise) where import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.Promise import Control.Monad maybeIO :: Maybe a -> (a -> IO b) -> IO (Maybe b) maybeIO m f = maybe (return Nothing) (fmap Just . f) m -- | Evaluates a single promise (in the calling thread), -- maybe using a timeout in microseconds. evaluatePromise :: Maybe Int -> Promise a -> IO () evaluatePromise m_t promise = do m_thr <- maybeIO m_t $ \ timeout -> forkIO $ do threadDelay timeout cancel promise spawn promise atomically $ do status <- result promise when (isUnfinished status) retry void $ maybeIO m_thr killThread -- | Evaluates a channel of promises, maybe using a timeout in microseconds. -- Stops when the channel is empty. worker :: Maybe Int -> TChan (Promise a) -> IO () worker m_t ch = go where go = do m_promise <- atomically $ tryReadTChan ch case m_promise of Just promise -> evaluatePromise m_t promise >> go Nothing -> return () -- | Evaluate these promises on n processors, maybe using a timeout in microseconds. workers :: Maybe Int -> Int -> [Promise a] -> IO () workers m_t n xs = do ch <- newTChanIO atomically $ mapM_ (writeTChan ch) xs replicateM_ n $ forkIO $ worker m_t ch