module Control.Concurrent.Async.Extra where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent
import Control.Concurrent.MSem (new, with)
import Data.Traversable
import Data.Foldable (Foldable, traverse_)
import Control.Applicative
import Control.Monad
sequencePool :: Traversable t => Int -> t (IO a) -> IO (t a)
sequencePool max xs = do
sem <- new max
runConcurrently $ traverse (Concurrently . with sem) xs
mapPool :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do
sem <- new max
mapConcurrently (with sem . f) xs
sequenceConcurrently :: Traversable t => t (IO a) -> IO (t a)
sequenceConcurrently = runConcurrently . traverse Concurrently
mapConcurrently_ :: Foldable t => (a -> IO b) -> t a -> IO ()
mapConcurrently_ f = runConcurrently . traverse_ (Concurrently . f)
forConcurrently_ :: Foldable t => t a -> (a -> IO b) -> IO ()
forConcurrently_ = flip mapConcurrently_
fixAsync :: (Async a -> IO a) -> IO (Async a)
fixAsync f = mdo
this <- async $ f this
return this
fixAsyncBound :: (Async a -> IO a) -> IO (Async a)
fixAsyncBound f = mdo
this <- asyncBound $ f this
return this
fixAsyncOn :: Int -> (Async a -> IO a) -> IO (Async a)
fixAsyncOn cpu f = mdo
this <- asyncOn cpu $ f this
return this
fixAsyncWithUnmask :: (Async a -> (forall b . IO b -> IO b) -> IO a) -> IO (Async a)
fixAsyncWithUnmask f = mdo
this <- asyncWithUnmask $ f this
return this
fixAsyncOnWithUnmask :: Int -> (Async a -> (forall b . IO b -> IO b) -> IO a) -> IO (Async a)
fixAsyncOnWithUnmask cpu f = mdo
this <- asyncWithUnmask $ f this
return this
withParent :: Async a -> IO b -> IO (Async b)
withParent parent act = async $ link parent >> act
newtype Promise a = Promise { unPromise :: IO a }
deriving (Functor)
instance Applicative Promise where
pure = Promise . return
Promise f <*> Promise x = Promise $ uncurry ($) <$> concurrently f x
instance Alternative Promise where
empty = Promise $ forever (threadDelay maxBound)
Promise x <|> Promise y = Promise $ either id id <$> race x y
instance Monad Promise where
return = pure
Promise m >>= f = Promise $ async m >>= wait >>= unPromise . f
instance MonadPlus Promise where
mzero = empty
mplus = (<|>)