module Control.Workflow.Patterns(
split, merge, select,
vote, sumUp, Select(..)
) where
import Control.Concurrent.STM
import Data.Monoid
import qualified Control.Monad.Catch as CMC
import Control.Workflow.Stat
import Control.Workflow
import Data.Typeable
import Prelude hiding (catch)
import Control.Monad
import Control.Monad.Trans
import Control.Concurrent
import Control.Exception.Extensible (Exception,SomeException)
import Data.RefSerialize
import Control.Workflow.Stat
import qualified Data.Vector as V
import Data.TCache
import Debug.Trace
import Data.Maybe
data ActionWF a= ActionWF (WFRef(Maybe a)) ThreadId
split :: ( Typeable b
, Serialize b
, HasFork io
, CMC.MonadMask io)
=> [a -> Workflow io b] -> a -> Workflow io [ActionWF b]
split actions a = mapM (\ac ->
do
mv <- newWFRef Nothing
th<- fork (ac a >>= \v -> (step . liftIO . atomicallySync . writeWFRef mv . Just) v )
return $ ActionWF mv th )
actions
merge :: ( MonadIO io
, Typeable a
, Typeable b
, Serialize a, Serialize b)
=> ([a] -> io b) -> [ActionWF a] -> Workflow io b
merge cond results= step $ mapM (\(ActionWF mv _ ) -> liftIO (atomically $ readWFRef1 mv) ) results >>= cond
readWFRef1 :: ( Serialize a
, Typeable a)
=> WFRef (Maybe a) -> STM a
readWFRef1 r = do
mv <- readWFRef r
case mv of
Just(Just v) -> return v
Just Nothing -> retry
Nothing -> error $ "readWFRef1: workflow not found "++ show r
data Select
= Select
| Discard
| Continue
| FinishDiscard
| FinishSelect
deriving(Typeable, Read, Show)
instance Exception Select
select ::
( Serialize a
, Typeable a
, HasFork io
, CMC.MonadMask io)
=> Integer
-> (a -> STM Select)
-> [ActionWF a]
-> Workflow io [a]
select timeout check actions= do
res <- liftIO $ newTVarIO $ V.generate(length actions) (const Nothing)
flag <- getTimeoutFlag timeout
parent <- liftIO myThreadId
checThreads <- liftIO $ newEmptyMVar
count <- liftIO $ newMVar 1
let process = do
let check' (ActionWF ac _) i = do
liftIO . atomically $ do
r <- readWFRef1 ac
b <- check r
case b of
Discard -> return ()
Select -> addRes i r
Continue -> addRes i r >> retry
FinishDiscard -> do
unsafeIOToSTM $ throwTo parent FinishDiscard
FinishSelect -> do
addRes i r
unsafeIOToSTM $ throwTo parent FinishDiscard
n <- liftIO $ do
n <- takeMVar count
putMVar count (n+1)
return n
if ( n == length actions)
then liftIO $ throwTo parent FinishDiscard
else return ()
`CMC.catch` (\(e :: Select) -> liftIO $ throwTo parent e)
ws <- mapM (\(ac,i) -> fork $ check' ac i) $ zip actions [0..]
liftIO $ putMVar checThreads ws
liftIO $ atomically $ do
v <- readTVar flag
case v of
False -> retry
True -> return ()
throw FinishDiscard
where
addRes i r= do
l <- readTVar res
writeTVar res $ l V.// [(i, Just r)]
let killall = liftIO $ do
ws <- readMVar checThreads
liftIO $ mapM_ killThread ws
liftIO $ mapM_ (\(ActionWF _ th) -> killThread th)actions
step $ CMC.catch process
(\(e :: Select)-> do
liftIO $ return . catMaybes . V.toList =<< atomically ( readTVar res)
)
`CMC.finally` killall
justify str Nothing = error str
justify _ (Just x) = return x
vote
:: ( Serialize b
, Typeable b
, HasFork io
, CMC.MonadMask io)
=> Integer
-> [a -> Workflow io b]
-> ([b] -> Workflow io c)
-> a
-> Workflow io c
vote timeout actions comp x=
split actions x >>= select timeout (const $ return Continue) >>= comp
sumUp
:: ( Serialize b
, Typeable b
, Monoid b
, HasFork io
, CMC.MonadMask io)
=> Integer
-> [a -> Workflow io b]
-> a
-> Workflow io b
sumUp timeout actions = vote timeout actions (return . mconcat)
main= do
syncWrite SyncManual
r <- exec1 "sumup" $ sumUp 0 [f 1, f 2] "0"
print r
`CMC.catch` \(e:: SomeException) -> syncCache
f :: Int -> String -> Workflow IO String
f n s= step ( threadDelay ( 5000000 * n)) >> return ( s ++"1")
main2=do
syncWrite SyncManual
exec1 "split" $ split (take 10 $ repeat (step . print)) "hi" >>= merge (const $ return True)
main3=do
refs <- exec1 "WFRef" $ do
refs <- replicateM 20 $ newWFRef Nothing
mapM (\r -> fork $ unsafeIOtoWF $ atomically $ writeWFRef r $ Just "hi final value") refs
return refs
mapM (\r -> liftIO (atomically $ readWFRef1 r) >>= print) refs