module Transient.Indeterminism (
choose, choose', collect, collect', group, groupByTime
) where
import Transient.Base
import Control.Monad.IO.Class
import Data.IORef
import Control.Applicative
import Data.Monoid
import Control.Concurrent
import Data.Typeable
import Control.Monad.State
import Control.Concurrent.STM as STM
import GHC.Conc
import Data.Time.Clock
choose :: [a] -> TransientIO a
choose []= empty
choose xs = do
evs <- liftIO $ newIORef xs
r <- parallel $ do
es <- atomicModifyIORef' evs $ \es -> let !tes= tail es in (tes,es)
case es of
[x] -> return $ SLast $ head es
x:_ -> return $ SMore x
return $ toData r
toData r= case r of
SMore x -> x
SLast x -> x
group :: Int -> TransientIO a -> TransientIO [a]
group num proc = do
v <- liftIO $ newIORef (0,[])
x <- proc
n <- liftIO $ atomicModifyIORef' v $ \(n,xs) -> let !n'=n +1 in ((n', x:xs),n')
if n < num
then stop
else liftIO $ atomicModifyIORef v $ \(n,xs) -> ((0,[]),xs)
groupByTime :: Integer -> TransientIO a -> TransientIO [a]
groupByTime time proc = do
v <- liftIO $ newIORef (0,[])
t <- liftIO getCurrentTime
x <- proc
n <- liftIO $ atomicModifyIORef' v $ \(n,xs) -> let !n'=n +1 in ((n', x:xs),n')
t' <- liftIO getCurrentTime
if diffUTCTime t' t < fromIntegral time
then stop
else liftIO $ atomicModifyIORef v $ \(n,xs) -> ((0,[]),xs)
choose' :: [a] -> TransientIO a
choose' xs = foldl (<|>) empty $ map (\x -> parallel (return (SLast x)) >>= return . toData) xs
collect :: Int -> TransientIO a -> TransientIO [a]
collect n = collect' n 1000 0
collect' :: Int -> NominalDiffTime -> NominalDiffTime -> TransientIO a -> TransientIO [a]
collect' n t1 t2 search= do
rv <- liftIO $ atomically $ newTVar (0,[]) !> "NEWMVAR"
endflag <- liftIO $ newTVarIO False
st <- get
t <- liftIO getCurrentTime
let any1 = do
r <- search !> "ANY"
liftIO $ atomically $ do
(n1,rs) <- readTVar rv
writeTVar rv (n1+1,r:rs) !> "MODIFY"
stop
monitor= freeThreads $ do
xs <- async $ atomically $
do (n', xs) <- readTVar rv
ns <- readTVar $ children st
t' <- unsafeIOToSTM getCurrentTime
if
(n > 0 && n' >= n) ||
(null ns && (diffUTCTime t' t > t1)) ||
(t2 > 0 && diffUTCTime t' t > t2)
then return xs else retry
th <- liftIO $ myThreadId !> "KILL"
stnow <- get
liftIO $ killChildren st
liftIO $ addThread st stnow
return xs
monitor <|> any1