----------------------------------------------------------------------------- -- -- Module : Transient.Indeterminism -- Copyright : -- License : GPL (Just (Version {versionBranch = [3], versionTags = []})) -- -- Maintainer : agocorona@gmail.com -- Stability : -- Portability : -- -- | see -- ----------------------------------------------------------------------------- {-# LANGUAGE BangPatterns #-} module Transient.Indeterminism ( choose, choose', collect, collect', group, groupByTime ) where import Transient.Base import Control.Monad.IO.Class import Data.IORef import Control.Applicative import Data.Monoid import Control.Concurrent import Data.Typeable import Control.Monad.State import Control.Concurrent.STM as STM import GHC.Conc import Data.Time.Clock -- | slurp a list of values and process them in parallel . To limit the number of processing -- threads, use `threads` choose :: [a] -> TransientIO a choose []= empty choose xs = do evs <- liftIO $ newIORef xs r <- parallel $ do es <- atomicModifyIORef' evs $ \es -> let !tes= tail es in (tes,es) case es of [x] -> return $ SLast $ head es x:_ -> return $ SMore x return $ toData r toData r= case r of SMore x -> x SLast x -> x -- | group the output of a possible multithreaded process in groups of n elements. group :: Int -> TransientIO a -> TransientIO [a] group num proc = do v <- liftIO $ newIORef (0,[]) x <- proc n <- liftIO $ atomicModifyIORef' v $ \(n,xs) -> let !n'=n +1 in ((n', x:xs),n') if n < num then stop else liftIO $ atomicModifyIORef v $ \(n,xs) -> ((0,[]),xs) -- | group result for a time interval, measured with `diffUTCTime groupByTime :: Integer -> TransientIO a -> TransientIO [a] groupByTime time proc = do v <- liftIO $ newIORef (0,[]) t <- liftIO getCurrentTime x <- proc n <- liftIO $ atomicModifyIORef' v $ \(n,xs) -> let !n'=n +1 in ((n', x:xs),n') t' <- liftIO getCurrentTime if diffUTCTime t' t < fromIntegral time then stop else liftIO $ atomicModifyIORef v $ \(n,xs) -> ((0,[]),xs) -- | alternative definition with more parallelism choose' :: [a] -> TransientIO a choose' xs = foldl (<|>) empty $ map (\x -> parallel (return (SLast x)) >>= return . toData) xs --newtype Collect a= Collect (MVar (Int, [a])) deriving Typeable -- collect the results of a search done in parallel, usually initiated by -- `choose` . The results are added to the collection with `found` -- -- -- | execute a process and get the first n solutions. -- if the process end without finding the number of solutions requested, it return the found ones -- if he find the number of solutions requested, it kill the non-free threads of the process and return -- It works monitoring the solutions found and the number of active threads. -- If the first parameter is 0, collect will return all the results collect :: Int -> TransientIO a -> TransientIO [a] collect n = collect' n 1000 0 -- | search also between two time intervals. If the first interval has passed and there is no result, --it stops. -- After the second interval, it stop unconditionally and return the current results. -- It also stops as soon as there are enough results. collect' :: Int -> NominalDiffTime -> NominalDiffTime -> TransientIO a -> TransientIO [a] collect' n t1 t2 search= do rv <- liftIO $ atomically $ newTVar (0,[]) !> "NEWMVAR" endflag <- liftIO $ newTVarIO False st <- get t <- liftIO getCurrentTime let any1 = do r <- search !> "ANY" liftIO $ atomically $ do (n1,rs) <- readTVar rv writeTVar rv (n1+1,r:rs) !> "MODIFY" stop monitor= freeThreads $ do xs <- async $ atomically $ do (n', xs) <- readTVar rv ns <- readTVar $ children st t' <- unsafeIOToSTM getCurrentTime if (n > 0 && n' >= n) || (null ns && (diffUTCTime t' t > t1)) || -- !> show (n, n', length ns) (t2 > 0 && diffUTCTime t' t > t2) then return xs else retry th <- liftIO $ myThreadId !> "KILL" stnow <- get liftIO $ killChildren st liftIO $ addThread st stnow return xs monitor <|> any1