module Data.Iteratee.Parallel (
psequence_
,parE
,parI
,liftParI
,mapReduce
)
where
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Data.Iteratee as I hiding (mapM_, zip, filter)
import qualified Data.ListLike as LL
import Data.Monoid
import Control.Concurrent
import Control.Parallel
import Control.Monad
parI :: forall s a. (Nullable s, Monoid s) => Iteratee s IO a -> Iteratee s IO a
parI = icont . firstStep
where
firstStep iter chunk = do
var <- liftIO newEmptyMVar
_ <- sideStep var chunk iter
return (icont $ go var, mempty)
go var chunk@(Chunk _) = do
iter <- liftIO $ takeMVar var
_ <- sideStep var chunk iter
return (icont $ go var, mempty)
go var e = do
iter <- liftIO $ takeMVar var
return (join . lift $ enumChunk e iter, e)
sideStep var chunk iter = liftIO . forkIO
$ runIter iter onDone onCont onErr onReq
where
onDone a = putMVar var $ idone a
onCont k = k chunk >>= \(i',_) ->
runIter i' onDone onFina onErr onReq
onErr i e = putMVar var $ ierr i e
onReq :: IO x -> (x -> Iteratee s IO a) -> IO ()
onReq mb doB = mb >>= \b ->
runIter (doB b) onDone onCont onErr onReq
onFina k = putMVar var $ icont k
parE ::
(Nullable s1, Nullable s2, Monoid s1)
=> Enumeratee s1 s2 IO r
-> Enumeratee s1 s2 IO r
parE outer inner = parI (outer inner)
psequence_ ::
(LL.ListLike s el, Nullable s)
=> [Iteratee s IO a]
-> Iteratee s IO ()
psequence_ = I.sequence_ . map parI
liftParI ::
(Nullable s, Monoid s, MonadIO m)
=> Iteratee s IO a
-> Iteratee s m a
liftParI = ilift liftIO . parI
mapReduce ::
(Monad m, Nullable s, Monoid b)
=> Int
-> (s -> b)
-> Iteratee s m b
mapReduce bufsize f = icontP (step (0, []))
where
step a@(!buf,acc) (Chunk xs)
| nullC xs = (icontP (step a), Chunk xs)
| buf >= bufsize =
let acc' = mconcat acc
b' = f xs
in b' `par` acc' `pseq` (icontP (step (0,[b' `mappend` acc'])), Chunk empty)
| otherwise =
let b' = f xs
in b' `par` (icontP (step (succ buf,b':acc)), Chunk empty)
step (_,acc) s@(EOF Nothing) =
(idone (mconcat acc), s)
step acc s@(EOF (Just err)) =
(throwRecoverableErr err (icontP $ step acc), s)