module Control.Concurrent.ParallelTreeSearch (
SearchQueue(..), SearchView(..), LIFO(..), FIFO(..),
parallelTreeSearch
) where
import Control.Monad.SearchTree
import Control.Concurrent
import qualified Data.Sequence as Seq
class SearchQueue q
where
emptyQ :: q a
addQ :: SearchTree a -> q a -> q a
viewQ :: q a -> SearchView q a
isEmptyQ :: SearchQueue q => q a -> Bool
isEmptyQ q = case viewQ q of EmptyQ -> True; _ -> False
data SearchView q a = EmptyQ | SearchTree a :~ q a
newtype LIFO a = LIFO [SearchTree a]
instance SearchQueue LIFO
where
emptyQ = LIFO []
addQ t (LIFO q) = LIFO (t:q)
viewQ (LIFO []) = EmptyQ
viewQ (LIFO (x:xs)) = x :~ LIFO xs
newtype FIFO a = FIFO (Seq.Seq (SearchTree a))
instance SearchQueue FIFO
where
emptyQ = FIFO Seq.empty
addQ t (FIFO q) = FIFO (q Seq.|> t)
viewQ (FIFO q) = case Seq.viewl q of
Seq.EmptyL -> EmptyQ
x Seq.:< xs -> x :~ FIFO xs
parallelTreeSearch :: SearchQueue q
=> Int
-> Int
-> q a
-> IO [a]
parallelTreeSearch tl wl q =
do counter <- newMVar 1
channel <- newChan
let env = SearchEnv tl wl counter channel
forkIO (parSearch env [] q)
xs <- getChanContents channel
return (concNonEmpty xs)
concNonEmpty :: [[a]] -> [a]
concNonEmpty [] = []
concNonEmpty ([]:_) = []
concNonEmpty (xs:xss) = xs ++ concNonEmpty xss
data SearchEnv a = SearchEnv { threadLimit :: Int
, workLimit :: Int
, threadCounter :: MVar Int
, results :: Chan [a] }
parSearch :: SearchQueue q => SearchEnv a -> [a] -> q a -> IO ()
parSearch env xs q
| isEmptyQ q = do writeResults env xs
finaliseResults env
| otherwise = do noMoreThreads <- threadLimitReached env
if noMoreThreads
then let (ys,q') = search (workLimit env) xs (viewQ q)
in do writeResults env ys
parSearch env [] q'
else do (ys,q') <- process env [] (viewQ q)
parSearch env ys q'
process :: SearchQueue q
=> SearchEnv a -> [a] -> SearchView q a -> IO ([a], q a)
process _ xs EmptyQ = return (xs,emptyQ)
process env xs (None :~ q) = process env xs (viewQ q)
process env xs (One x :~ q) = process env (x:xs) (viewQ q)
process env xs (Choice s t :~ q) =
do incThreadCounter env
forkIO (parSearch env xs (addQ s (emptyQ `withTypeOf` q)))
return ([], addQ t q)
withTypeOf :: a -> a -> a
withTypeOf = const
writeResults :: SearchEnv a -> [a] -> IO ()
writeResults _ [] = return ()
writeResults env xs = writeChan (results env) xs
incThreadCounter :: SearchEnv a -> IO ()
incThreadCounter env = modifyMVar_ (threadCounter env) (return.(+1))
threadLimitReached :: SearchEnv a -> IO Bool
threadLimitReached env = do count <- readMVar (threadCounter env)
return (count >= threadLimit env)
finaliseResults :: SearchEnv a -> IO ()
finaliseResults env = do count <- takeMVar (threadCounter env)
if count <= 1
then writeChan (results env) []
else putMVar (threadCounter env) (count1)
search :: SearchQueue q => Int -> [a] -> SearchView q a -> ([a],q a)
search _ xs EmptyQ = (xs,emptyQ)
search 0 xs (t :~ q) = (xs,addQ t q)
search n xs (None :~ q) = search (n1) xs (viewQ q)
search n xs (One x :~ q) = search (n1) (x:xs) (viewQ q)
search n xs (Choice s t :~ q) = search (n1) xs (viewQ (addQ s (addQ t q)))