module Control.Workflow.Patterns(
split, merge, select,
vote, sumUp, Select(..)
) where
import Control.Concurrent.STM
import Data.Monoid
import qualified Control.Monad.CatchIO as CMC
import Control.Workflow.Stat
import Control.Workflow
import Data.Typeable
import Prelude hiding (catch)
import Control.Monad
import Control.Monad.Trans
import Control.Concurrent
import Control.Exception.Extensible (Exception,SomeException)
import Data.RefSerialize
import Control.Workflow.Stat
import Data.TCache
import Debug.Trace
newtype ActionWF a= ActionWF (WFRef(Maybe a))
split :: ( Typeable b
, Serialize b
, HasFork io
, CMC.MonadCatchIO io)
=> [a -> Workflow io b] -> a -> Workflow io [ActionWF b]
split actions a = mapM (\ac ->
do
mv <- newWFRef Nothing
fork (ac a >>= \v -> (step . liftIO . atomicallySync . writeWFRef mv . Just) v )
return $ ActionWF mv )
actions
merge :: ( MonadIO io
, Typeable a
, Typeable b
, Serialize a, Serialize b)
=> ([a] -> io b) -> [ActionWF a] -> Workflow io b
merge cond results= step $ mapM (\(ActionWF mv ) -> readWFRef1 mv ) results >>= cond
readWFRef1 :: ( MonadIO io
, Serialize a
, Typeable a)
=> WFRef (Maybe a) -> io a
readWFRef1 r = liftIO . atomically $ do
mv <- readWFRef r
case mv of
Just(Just v) -> return v
Just Nothing -> retry
Nothing -> error $ "readWFRef1: workflow not found "++ show r
data Select
= Select
| Discard
| FinishDiscard
| FinishSelect
deriving(Typeable, Read, Show)
instance Exception Select
select ::
( Serialize a
, Serialize [a]
, Typeable a
, HasFork io
, CMC.MonadCatchIO io)
=> Integer
-> (a -> io Select)
-> [ActionWF a]
-> Workflow io [a]
select timeout check actions= do
res <- liftIO $ newMVar []
flag <- getTimeoutFlag timeout
parent <- liftIO myThreadId
checThreads <- liftIO $ newEmptyMVar
count <- liftIO $ newMVar 1
let process = do
let check' (ActionWF ac ) = do
r <- readWFRef1 ac
b <- check r
case b of
Discard -> return ()
Select -> addRes r
FinishDiscard -> do
liftIO $ throwTo parent FinishDiscard
FinishSelect -> do
addRes r
liftIO $ throwTo parent FinishDiscard
n <- liftIO $ CMC.block $ do
n <- takeMVar count
putMVar count (n+1)
return n
if ( n == length actions)
then liftIO $ throwTo parent FinishDiscard
else return ()
`CMC.catch` (\(e :: Select) -> liftIO $ throwTo parent e)
ws <- mapM ( fork . check') actions
liftIO $ putMVar checThreads ws
liftIO $ atomically $ do
v <- readTVar flag
case v of
False -> retry
True -> return ()
throw FinishDiscard
where
addRes r= liftIO $ CMC.block $ do
l <- takeMVar res
putMVar res $ r : l
let killall = liftIO $ do
ws <- readMVar checThreads
liftIO $ mapM_ killThread ws
step $ CMC.catch process
(\(e :: Select)-> do
liftIO $ readMVar res
)
`CMC.finally` killall
justify str Nothing = error str
justify _ (Just x) = return x
vote
:: ( Serialize b
, Serialize [b]
, Typeable b
, HasFork io
, CMC.MonadCatchIO io)
=> Integer
-> [a -> Workflow io b]
-> ([b] -> Workflow io c)
-> a
-> Workflow io c
vote timeout actions comp x=
split actions x >>= select timeout (const $ return Select) >>= comp
sumUp
:: ( Serialize b
, Serialize [b]
, Typeable b
, Monoid b
, HasFork io
, CMC.MonadCatchIO io)
=> Integer
-> [a -> Workflow io b]
-> a
-> Workflow io b
sumUp timeout actions = vote timeout actions (return . mconcat)
main= do
syncWrite SyncManual
r <- exec1 "sumup" $ sumUp 0 [f 1, f 2] "0"
print r
`CMC.catch` \(e::SomeException) -> syncCache
f :: Int -> String -> Workflow IO String
f n s= step ( threadDelay ( 5000000 * n)) >> return ( s ++"1")
main2=do
syncWrite SyncManual
exec1 "split" $ split (take 10 $ repeat (step . print)) "hi" >>= merge (const $ return True)
main3=do
refs <- exec1 "WFRef" $ do
refs <- replicateM 20 $ newWFRef Nothing
mapM (\r -> fork $ unsafeIOtoWF $ atomically $ writeWFRef r $ Just "hi final value") refs
return refs
mapM (\r -> readWFRef1 r >>= print) refs