module Control.Concurrent.Async.Extra
(
mapConcurrentlyBounded
, mapConcurrentlyBatched
, mapConcurrentlyChunks
, mergeConcatAll
)
where
import Control.Concurrent.Async
import Control.DeepSeq
import Control.Exception
import Data.List
import Data.Sequence (Seq)
import qualified Control.Concurrent.QSem as S
import qualified Data.Foldable as F
import qualified Data.Sequence as Seq
mapConcurrentlyBounded :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapConcurrentlyBounded bound action items =
do qs <- S.newQSem bound
let wrappedAction x =
bracket_ (S.waitQSem qs) (S.signalQSem qs) (action x)
mapConcurrently wrappedAction items
mapConcurrentlyBatched ::
(NFData b, Foldable t)
=> Int -> (Seq (Seq b) -> IO r) -> (a -> IO b) -> t a -> IO r
mapConcurrentlyBatched batchSize merge action items =
do let chunks = chunkList batchSize $ F.toList items
r <- mapConcurrently (\x -> force <$> mapM action x) chunks
merge r
mapConcurrentlyChunks ::
(NFData b, Foldable t)
=> Int -> (Seq (Seq b) -> IO r) -> (t a -> Int) -> (a -> IO b) -> t a -> IO r
mapConcurrentlyChunks chunkCount merge getLength action items =
do let listSize = getLength items
batchSize :: Double
batchSize = fromIntegral listSize / fromIntegral chunkCount
mapConcurrentlyBatched (ceiling batchSize) merge action items
chunkList :: forall a. Int -> [a] -> Seq (Seq a)
chunkList chunkSize =
go 0 Seq.empty
where
go :: Int -> Seq a -> [a] -> Seq (Seq a)
go !size !chunk q
| size == chunkSize =
Seq.singleton chunk Seq.>< go 0 Seq.empty q
| otherwise =
case q of
[] -> Seq.singleton chunk
(x : xs) -> go (size + 1) (chunk Seq.|> x) xs
mergeConcatAll :: Seq (Seq a) -> [a]
mergeConcatAll = F.toList . foldl' (Seq.><) Seq.empty . F.toList