{-
The parallel function (specialised to lists) is equivalent to:

import Control.Parallel.Strategies
parallel :: [IO [a]] -> IO [[a]]
parallel = pure . withStrategy (parList $ seqList r0) . map unsafePerformIO

However, this version performs about 10% slower with 2 processors in GHC 6.12.1
-}

module Parallel(parallel) where

import System.IO.Unsafe
import Control.Concurrent
import Control.Exception
import Control.Monad


parallel :: Int -> [IO a] -> IO [a]
parallel :: forall a. Int -> [IO a] -> IO [a]
parallel Int
j = if Int
j forall a. Ord a => a -> a -> Bool
<= Int
1 then forall a. [IO a] -> IO [a]
parallel1 else forall a. Int -> [IO a] -> IO [a]
parallelN Int
j


parallel1 :: [IO a] -> IO [a]
parallel1 :: forall a. [IO a] -> IO [a]
parallel1 [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure []
parallel1 (IO a
x:[IO a]
xs) = do
    a
x2 <- IO a
x
    [a]
xs2 <- forall a. IO a -> IO a
unsafeInterleaveIO forall a b. (a -> b) -> a -> b
$ forall a. [IO a] -> IO [a]
parallel1 [IO a]
xs
    forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ a
x2forall a. a -> [a] -> [a]
:[a]
xs2


parallelN :: Int -> [IO a] -> IO [a]
parallelN :: forall a. Int -> [IO a] -> IO [a]
parallelN Int
j [IO a]
xs = do
    [MVar (Either SomeException a)]
ms <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (forall a b. a -> b -> a
const forall a. IO (MVar a)
newEmptyMVar) [IO a]
xs
    Chan (Maybe (MVar (Either SomeException a), IO a))
chan <- forall a. IO (Chan a)
newChan
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe (MVar (Either SomeException a), IO a))
chan forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just) forall a b. (a -> b) -> a -> b
$ forall a b. [a] -> [b] -> [(a, b)]
zip [MVar (Either SomeException a)]
ms [IO a]
xs
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
j (forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe (MVar (Either SomeException a), IO a))
chan forall a. Maybe a
Nothing forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ThreadId
forkIO (forall {e} {a}.
Exception e =>
Chan (Maybe (MVar (Either e a), IO a)) -> IO ()
f Chan (Maybe (MVar (Either SomeException a), IO a))
chan))
    let throwE :: SomeException -> a
throwE SomeException
x = forall a e. Exception e => e -> a
throw (SomeException
x :: SomeException)
    forall a. [IO a] -> IO [a]
parallel1 forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall {a}. SomeException -> a
throwE forall a. a -> a
id) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. MVar a -> IO a
takeMVar) [MVar (Either SomeException a)]
ms
    where
        f :: Chan (Maybe (MVar (Either e a), IO a)) -> IO ()
f Chan (Maybe (MVar (Either e a), IO a))
chan = do
            Maybe (MVar (Either e a), IO a)
v <- forall a. Chan a -> IO a
readChan Chan (Maybe (MVar (Either e a), IO a))
chan
            case Maybe (MVar (Either e a), IO a)
v of
                Maybe (MVar (Either e a), IO a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Just (MVar (Either e a)
m,IO a
x) -> do
                    forall a. MVar a -> a -> IO ()
putMVar MVar (Either e a)
m forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall e a. Exception e => IO a -> IO (Either e a)
try IO a
x
                    Chan (Maybe (MVar (Either e a), IO a)) -> IO ()
f Chan (Maybe (MVar (Either e a), IO a))
chan