module Control.Workflow
(
Workflow
, WorkflowList
, PMonadTrans (..)
, MonadCatchIO (..)
, throw
, Indexable(..)
, start
, exec
, exec1d
, exec1
, wfExec
, startWF
, restartWorkflows
, WFErrors(..)
, step
, stepControl
, unsafeIOtoWF
, WFRef
, getWFRef
, newWFRef
, stepWFRef
, readWFRef
, writeWFRef
, waitWFActive
, getAll
, safeFromIDyn
, getWFKeys
, getWFHistory
, waitFor
, waitForSTM
, waitUntilSTM
, getTimeoutFlag
, logWF
, clearRunningFlag
, killThreadWF
, killWF
, delWF
, killThreadWF1
, killWF1
, delWF1
, delWFHistory
, delWFHistory1
, syncWrite
, SyncMode(..)
, printHistory
)
where
import Prelude hiding (catch)
import System.IO.Unsafe
import Control.Monad(when,liftM)
import qualified Control.Exception as CE (Exception,AsyncException(ThreadKilled), SomeException, throwIO, handle,finally,catch,block,unblock)
import Control.Concurrent (forkIO,threadDelay, ThreadId, myThreadId, killThread)
import Control.Concurrent.STM
import GHC.Conc(unsafeIOToSTM)
import GHC.Base (maxInt)
import Data.ByteString.Lazy.Char8 as B hiding (index)
import Data.ByteString.Lazy as BL(putStrLn)
import Data.List as L
import Data.Typeable
import System.Time
import Control.Monad.Trans
import Control.Concurrent.MonadIO(HasFork(..),MVar,newMVar,takeMVar,putMVar)
import System.IO(hPutStrLn, stderr)
import Data.List(elemIndex)
import Data.Maybe(fromJust, isNothing, isJust, mapMaybe)
import Data.IORef
import System.IO.Unsafe(unsafePerformIO)
import Data.Map as M(Map,fromList,elems, insert, delete, lookup,toList, fromList,keys)
import qualified Control.Monad.CatchIO as CMC
import qualified Control.Exception.Extensible as E
import Data.TCache
import Data.TCache.DefaultPersistence
import Data.RefSerialize
import Control.Workflow.IDynamic
import Unsafe.Coerce
import Control.Workflow.Stat
type Workflow m = WF Stat m
type WorkflowList m a b= [(String, a -> Workflow m b) ]
instance Monad m => Monad (WF s m) where
return x = WF (\s -> return (s, x))
WF g >>= f = WF (\s -> do
(s1, x) <- g s
let WF fun= f x
(s3, x') <- fun s1
return (s3, x'))
instance (Monad m,Functor m) => Functor (Workflow m ) where
fmap f (WF g)= WF (\s -> do
(s1, x) <- g s
return (s1, f x))
tvRunningWfs = getDBRef $ keyRunning :: DBRef Stat
unsafeIOtoWF :: (Monad m) => IO a -> Workflow m a
unsafeIOtoWF x= let y= unsafePerformIO ( x >>= return) in y `seq` return y
class PMonadTrans t m a where
plift :: Monad m => m a -> t m a
instance (Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> PMonadTrans (WF Stat) m a
where
plift = step
instance (MonadTrans t, Monad m) => PMonadTrans t m a where
plift= Control.Monad.Trans.lift
instance Monad m => MonadIO (WF Stat m) where
liftIO=unsafeIOtoWF
class MonadCatchIO m a where
catch :: E.Exception e => m a -> (e -> m a) -> m a
block :: m a -> m a
unblock :: m a -> m a
throw :: (MonadIO m, E.Exception e) => e -> m a
throw = liftIO . E.throwIO
instance (Serialize a
, Typeable a,MonadIO m, CMC.MonadCatchIO m)
=> MonadCatchIO (WF Stat m) a where
catch exp exc = do
expwf <- step $ getTempName
excwf <- step $ getTempName
step $ do
ex <- CMC.catch (exec1d expwf exp >>= return . Right
) $ \e-> return $ Left e
case ex of
Right r -> return r
Left e ->exec1d excwf (exc e)
block exp=WF $ \s -> CMC.block (st exp $ s)
unblock exp= WF $ \s -> CMC.unblock (st exp $ s)
instance (HasFork io
, CMC.MonadCatchIO io)
=> HasFork (WF Stat io) where
fork f = do
(str, finished) <- step $ getTempName >>= \n -> return(n, False)
r <- getWFRef
WF (\s ->
do th <- if finished
then fork $ return ()
else fork $ do
exec1 str f
liftIO $ do atomically $ writeWFRef r (str, True)
syncIt
return(s,th))
wfExec
:: (Indexable a, Serialize a, Typeable a
, CMC.MonadCatchIO m, MonadIO m)
=> Workflow m a -> Workflow m a
wfExec f= do
str <- step $ getTempName
step $ exec1 str f
exec1d :: (Serialize b, Typeable b
,CMC.MonadCatchIO m)
=> String -> (Workflow m b) -> m b
exec1d str f= do
r <- exec1 str f
delit
return r
`CMC.catch` (\e@CE.ThreadKilled -> delit >> throw e)
where
delit= do
delWF str ()
liftIO syncIt
exec1 :: ( Serialize a, Typeable a
, Monad m, MonadIO m, CMC.MonadCatchIO m)
=> String -> Workflow m a -> m a
exec1 str f= exec str (const f) ()
exec :: ( Indexable a, Serialize a, Serialize b, Typeable a
, Typeable b
, Monad m, MonadIO m, CMC.MonadCatchIO m)
=> String -> (a -> Workflow m b) -> a -> m b
exec str f x =
(do
v <- getState str f x
case v of
Right (name, f, stat) -> do
r <- runWF name (f x) stat
return r
Left err -> CMC.throw err)
`CMC.catch`
(\(e :: CE.SomeException) -> liftIO $ do
let name= keyWF str x
clearRunningFlag name
syncIt
CMC.throw e )
mv :: MVar Int
mv= unsafePerformIO $ newMVar 0
getTempName :: MonadIO m => m String
getTempName= liftIO $ do
seq <- takeMVar mv
putMVar mv (seq + 1)
TOD t _ <- getClockTime
return $ "anon"++ show t ++ show seq
instance Indexable () where
key= show
step :: ( Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> m a
-> Workflow m a
step= stepControl1 False
stepControl :: ( Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> m a
-> Workflow m a
stepControl= stepControl1 True
stepControl1 :: ( Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> Bool -> m a
-> Workflow m a
stepControl1 isControl mx= WF(\s'' -> do
let stat= state s''
let ind= index s''
if recover s'' && ind < stat
then return (s''{index=ind +1 }, fromIDyn $ versions s'' !! (stat ind1) )
else do
x' <- mx
let sref = self s''
s'<- liftIO . atomically $ do
s <- if isControl
then readDBRef sref >>= unjustify ("step: readDBRef: not found:" ++ keyObjDBRef sref)
else return s''
let versionss= versions s
let dynx= toIDyn x'
let ver= dynx: versionss
let s'= s{ recover= False, versions = ver, state= state s+1}
writeDBRef sref s'
return s'
liftIO syncIt
return (s', x') )
unjustify str Nothing = error str
unjustify _ (Just x) = return x
start
:: ( Monad m
, MonadIO m
, Indexable a
, Serialize a, Serialize b
, Typeable a
, Typeable b)
=> String
-> (a -> Workflow m b)
-> a
-> m (Either WFErrors b)
start namewf f1 v = do
ei <- getState namewf f1 v
case ei of
Right (name, f, stat) ->
runWF name (f v) stat >>= return . Right
Left error -> return $ Left error
data WFErrors = NotFound | AlreadyRunning | Timeout | forall e.CE.Exception e => Exception e deriving Typeable
instance Show WFErrors where
show NotFound= "Not Found"
show AlreadyRunning= "Already Running"
show Timeout= "Timeout"
show (Exception e)= "Exception: "++ show e
instance CE.Exception WFErrors
getState :: (Monad m, MonadIO m, Indexable a, Serialize a, Typeable a)
=> String -> x -> a
-> m (Either WFErrors (String, x, Stat))
getState namewf f v= liftIO . atomically $ getStateSTM
where
getStateSTM = do
mrunning <- readDBRef tvRunningWfs
case mrunning of
Nothing -> do
writeDBRef tvRunningWfs (Running $ fromList [])
getStateSTM
Just(Running map) -> do
let key= keyWF namewf v
stat1= stat0{wfName= key,versions=[toIDyn v],self= sref}
sref= getDBRef $ keyResource stat1
case M.lookup key map of
Nothing -> do
mythread <- unsafeIOToSTM $ myThreadId
writeDBRef tvRunningWfs . Running $ M.insert key (namewf,Just mythread) map
writeDBRef sref stat1
return $ Right (key, f, stat1)
Just (wf, started) ->
if isJust started
then return $ Left AlreadyRunning
else do
mythread <- unsafeIOToSTM $ myThreadId
writeDBRef tvRunningWfs . Running $ M.insert key (namewf,Just mythread) map
mst <- readDBRef sref
let stat' = case mst of
Nothing -> error $ "Workflow not found: "++ key
Just s -> s{index=0,recover= True}
writeDBRef sref stat'
return $ Right (key, f, stat')
syncIt= do
(sync,_) <- atomically $ readTVar tvSyncWrite
when (sync ==Synchronous) syncCache
runWF :: (Monad m,MonadIO m
, Serialize b, Typeable b)
=> String -> Workflow m b -> Stat -> m b
runWF n f s= do
sync <- liftIO $! do
(sync,_) <- atomically $ readTVar tvSyncWrite
when (sync ==Synchronous) syncCache
return sync
(s', v') <- st f $ s
liftIO $! do
clearFromRunningList n
when (sync ==Synchronous) syncCache
return v'
where
clearFromRunningList n = atomically $ do
Just(Running map) <- readDBRef tvRunningWfs
writeDBRef tvRunningWfs . Running $ M.delete n map
startWF
:: ( MonadIO m
, Serialize a, Serialize b
, Typeable a
, Indexable a
, Typeable b)
=> String
-> a
-> WorkflowList m a b
-> m (Either WFErrors b)
startWF namewf v wfs=
case Prelude.lookup namewf wfs of
Nothing -> return $ Left NotFound
Just f -> start namewf f v
restartWorkflows
:: (Serialize a, Serialize b, Typeable a
, Indexable b, Typeable b)
=> WorkflowList IO a b
-> IO ()
restartWorkflows map = do
mw <- liftIO $ getResource ((Running undefined ) )
case mw of
Nothing -> return ()
Just (Running all) -> mapM_ start . mapMaybe filter . toList $ all
where
filter (a, (b,Nothing)) = Just (b, a)
filter _ = Nothing
start (key, kv)= do
let mf= Prelude.lookup key map
case mf of
Nothing -> return ()
Just f -> do
let st0= stat0{wfName = kv}
mst <- liftIO $ getResource st0
case mst of
Nothing -> error $ "restartWorkflows: workflow not found "++ keyResource st0
Just st-> do
liftIO . forkIO $ runWF key (f (fromIDyn . Prelude.last $ versions st )) st{index=0,recover=True} >> return ()
return ()
syncWrite:: (Monad m, MonadIO m) => SyncMode -> m ()
syncWrite mode= do
(_,thread) <- liftIO . atomically $ readTVar tvSyncWrite
when (isJust thread ) $ liftIO . killThread . fromJust $ thread
case mode of
Synchronous -> modeWrite
SyncManual -> modeWrite
Asyncronous time maxsize -> do
th <- liftIO $ clearSyncCacheProc time defaultCheck maxsize >> return()
liftIO . atomically $ writeTVar tvSyncWrite (mode,Just th)
where
modeWrite=
liftIO . atomically $ writeTVar tvSyncWrite (mode, Nothing)
getAll :: Monad m => Workflow m [IDynamic]
getAll= WF(\s -> return (s, versions s))
getWFKeys :: String -> IO [String]
getWFKeys wfname= do
mwfs <- atomically $ readDBRef tvRunningWfs
case mwfs of
Nothing -> return []
Just (Running wfs) -> return $ Prelude.filter (L.isPrefixOf wfname) $ M.keys wfs
getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat)
getWFHistory wfname x= getResource stat0{wfName= keyWF wfname x}
delWFHistory name1 x= do
let name= keyWF name1 x
delWFHistory1 name
delWFHistory1 name =
atomically . withSTMResources [] $ const resources{ toDelete= [stat0{wfName= name}] }
waitWFActive wf= do
r <- threadWF wf
case r of
Just (_, Nothing) -> retry
_ -> return ()
where
threadWF wf= do
Just(Running map) <- readDBRef tvRunningWfs
return $ M.lookup wf map
killThreadWF :: ( Indexable a
, Serialize a
, Typeable a
, MonadIO m)
=> String -> a -> m()
killThreadWF wfname x= do
let name= keyWF wfname x
killThreadWF1 name
killThreadWF1 :: MonadIO m => String -> m()
killThreadWF1 name= killThreadWFm name >> return ()
killThreadWFm name= do
(map,f) <- clearRunningFlag name
case f of
Just th -> liftIO $ killThread th
Nothing -> return()
return map
killWF :: (Indexable a,MonadIO m) => String -> a -> m ()
killWF name1 x= do
let name= keyWF name1 x
killWF1 name
killWF1 :: MonadIO m => String -> m ()
killWF1 name = do
map <- killThreadWFm name
liftIO . atomically . writeDBRef tvRunningWfs . Running $ M.delete name map
return ()
delWF :: ( Indexable a
, MonadIO m
, Typeable a)
=> String -> a -> m()
delWF name1 x= do
let name= keyWF name1 x
delWF1 name
delWF1 :: MonadIO m=> String -> m()
delWF1 name= liftIO $ do
mrun <- atomically $ readDBRef tvRunningWfs
case mrun of
Nothing -> return()
Just (Running map) -> do
atomically . writeDBRef tvRunningWfs . Running $! M.delete name map
delWFHistory1 name
syncIt
clearRunningFlag name= liftIO $ atomically $ do
mrun <- readDBRef tvRunningWfs
case mrun of
Nothing -> error $ "clearRunningFLag non existing workflows" ++ name
Just(Running map) -> do
case M.lookup name map of
Just(_, Nothing) -> return (map,Nothing)
Just(v, Just th) -> do
writeDBRef tvRunningWfs . Running $ M.insert name (v, Nothing) map
return (map,Just th)
Nothing ->
return (map, Nothing)
getWFRef :: ( Monad m,
MonadIO m,
Serialize a
, Typeable a)
=> Workflow m (WFRef a)
getWFRef =ret
where
ret= WF (\s -> do
let n= if recover s then index s else state s
let ref = WFRef n (self s)
let r= fromIDyn (versions s !! (state s n)) `asTypeOf` typeofRef ret
r `seq` return (s,ref))
where
typeofRef :: Workflow m (WFRef a) -> a
typeofRef= undefined
stepWFRef :: ( Serialize a
, Typeable a
, MonadIO m)
=> m a -> Workflow m (WFRef a)
stepWFRef exp= step exp >> getWFRef
newWFRef :: ( Serialize a
, Typeable a
, MonadIO m)
=> a -> Workflow m (WFRef a)
newWFRef x= step (return x) >> getWFRef
readWFRef :: ( Serialize a
, Typeable a)
=> WFRef a
-> STM (Maybe a)
readWFRef (WFRef n ref)= do
mr <- readDBRef ref
case mr of
Nothing -> return Nothing
Just s -> do
let elems= versions s
l = state s
x = elems !! (l n)
return . Just $! fromIDyn x
writeWFRef :: ( Serialize a
, Typeable a)
=> WFRef a
-> a
-> STM ()
writeWFRef r@(WFRef n ref) x= do
mr <- readDBRef ref
case mr of
Nothing -> error $ "writeWFRef: workflow does not exist: " ++ keyObjDBRef ref
Just s -> do
let elems= versions s
l = state s
p = l n
(h,t)= L.splitAt p elems
elems'= h ++ (toIDyn x:tail' t)
tail' []= []
tail' t= L.tail t
writeDBRef ref s{ versions= elems'}
logWF :: (Monad m, MonadIO m) => String -> Workflow m ()
logWF str=do
str <- step . liftIO $ do
time <- getClockTime >>= toCalendarTime >>= return . calendarTimeToString
Prelude.putStrLn str
return $ time ++ ": "++ str
WF $ \s -> str `seq` return (s, ())
waitForData :: (IResource a, Typeable a)
=> (a -> Bool)
-> a
-> IO a
waitForData f x = atomically $ waitForDataSTM f x
waitForDataSTM :: (IResource a, Typeable a)
=> (a -> Bool)
-> a
-> STM a
waitForDataSTM filter x= do
tv <- newDBRef x
do
mx <- readDBRef tv >>= \v -> return $ cast v
case mx of
Nothing -> retry
Just x ->
case filter x of
False -> retry
True -> return x
waitFor
:: ( Indexable a, Serialize a, Serialize b, Typeable a
, Indexable b, Typeable b)
=> (b -> Bool)
-> String
-> a
-> IO b
waitFor filter wfname x= atomically $ waitForSTM filter wfname x
waitForSTM
:: ( Indexable a, Serialize a, Serialize b, Typeable a
, Indexable b, Typeable b)
=> (b -> Bool)
-> String
-> a
-> STM b
waitForSTM filter wfname x= do
let name= keyWF wfname x
let tv= getDBRef . key $ stat0{wfName= name}
mmx <- readDBRef tv
case mmx of
Nothing -> error ("waitForSTM: Workflow does not exist: "++ name)
Just mx -> do
let Stat{ versions= d:_}= mx
case safeFromIDyn d of
Nothing -> retry
Just x ->
case filter x of
False -> retry
True -> return x
getTimeoutFlag
:: MonadIO m
=> Integer
-> Workflow m (TVar Bool)
getTimeoutFlag 0 = WF $ \s -> liftIO $ newTVarIO False >>= \tv -> return (s, tv)
getTimeoutFlag t = do
tnow<- step $ liftIO getTimeSeconds
flag tnow t
where
flag tnow delta = WF(\s -> do
(s', tv) <- case timeout s of
Nothing -> do
tv <- liftIO $ newTVarIO False
return (s{timeout= Just tv}, tv)
Just tv -> return (s, tv)
liftIO $ do
let t = tnow + delta
atomically $ writeTVar tv False
forkIO $ do waitUntil t ; atomically $ writeTVar tv True
return (s', tv))
getTimeSeconds :: IO Integer
getTimeSeconds= do
TOD n _ <- getClockTime
return n
waitUntilSTM :: TVar Bool -> STM()
waitUntilSTM tv = do
b <- readTVar tv
if b == False then retry else return ()
waitUntil:: Integer -> IO()
waitUntil t= getTimeSeconds >>= \tnow -> wait (ttnow)
wait :: Integer -> IO()
wait delta= do
let delay | delta < 0= 0
| delta > (fromIntegral maxInt) = maxInt
| otherwise = fromIntegral $ delta
threadDelay $ delay * 1000000
if delta <= 0 then return () else wait $ delta (fromIntegral delay )