module Control.Workflow
(
Workflow
, WorkflowList
, PMonadTrans (..)
, MonadCatchIO (..)
, HasFork(..)
, throw
, Indexable(..)
, keyWF
, start
, exec
, exec1d
, exec1
, exec1nc
, wfExec
, startWF
, restartWorkflows
, WFErrors(..)
, step
, unsafeIOtoWF
, WFRef
, newWFRef
, stepWFRef
, readWFRef
, writeWFRef
, moveState
, waitWFActive
, getAll
, safeFromIDyn
, getWFKeys
, getWFHistory
, waitFor
, waitForSTM
, waitUntilSTM
, getTimeoutFlag
, withTimeout
, withKillTimeout
, logWF
, clearRunningFlag
, killThreadWF
, killWF
, delWF
, killThreadWF1
, killWF1
, delWF1
, delWFHistory
, delWFHistory1
, syncWrite
, SyncMode(..)
, showHistory
, isInRecover
)
where
import Prelude hiding (catch)
import System.IO.Unsafe
import Control.Monad(when,liftM)
import qualified Control.Exception as CE (Exception,AsyncException(ThreadKilled), SomeException, ErrorCall, throwIO, handle,finally,catch,block,unblock)
import Control.Concurrent
import Control.Concurrent.STM
import GHC.Conc(unsafeIOToSTM)
import GHC.Base (maxInt)
import Data.ByteString.Lazy.Char8 as B hiding (index)
import Data.ByteString.Lazy as BL(putStrLn)
import Data.List as L
import Data.Typeable
import System.Time
import Control.Monad.Trans
import System.IO(hPutStrLn, stderr)
import Data.List(elemIndex)
import Data.Maybe
import Data.IORef
import System.IO.Unsafe(unsafePerformIO)
import Data.Map as M(Map,fromList,elems, insert, delete, lookup,toList, fromList,keys)
import qualified Control.Monad.CatchIO as CMC
import qualified Control.Exception.Extensible as E
import Data.TCache
import Data.TCache.Defs
import Data.RefSerialize
import Control.Workflow.IDynamic
import Unsafe.Coerce
import System.Mem.StableName
import Control.Workflow.Stat
type Workflow m = WF Stat m
type WorkflowList m a b= M.Map String (a -> Workflow m b)
instance Monad m => Monad (WF s m) where
return x = WF (\s -> return (s, x))
WF g >>= f = WF (\s -> do
(s1, x) <- g s
let WF fun= f x
fun s1)
instance (Monad m,Functor m) => Functor (Workflow m ) where
fmap f (WF g)= WF (\s -> do
(s1, x) <- g s
return (s1, f x))
tvRunningWfs = getDBRef $ keyRunning :: DBRef Stat
unsafeIOtoWF :: (Monad m) => IO a -> Workflow m a
unsafeIOtoWF x= let y= unsafePerformIO ( x >>= return) in y `seq` return y
class PMonadTrans t m a where
plift :: Monad m => m a -> t m a
instance (Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> PMonadTrans (WF Stat) m a
where
plift = step
instance (MonadTrans t, Monad m) => PMonadTrans t m a where
plift= Control.Monad.Trans.lift
instance MonadIO m => MonadIO (WF Stat m) where
liftIO= unsafeIOtoWF
class MonadCatchIO m a where
catch :: E.Exception e => m a -> (e -> m a) -> m a
block :: m a -> m a
unblock :: m a -> m a
throw :: (MonadIO m, E.Exception e) => e -> m a
throw = liftIO . E.throwIO
instance (Serialize a
, Typeable a,MonadIO m, CMC.MonadCatchIO m)
=> MonadCatchIO (WF Stat m) a where
catch exp exc = do
expwf <- step $ getTempName
excwf <- step $ getTempName
step $ do
ex <- CMC.catch (exec1d expwf exp >>= return . Right
) $ \e-> return $ Left e
case ex of
Right r -> return r
Left e ->exec1d excwf (exc e)
block exp=WF $ \s -> CMC.block (st exp $ s)
unblock exp= WF $ \s -> CMC.unblock (st exp $ s)
data WFInfo= WFInfo{ name :: String
, finished :: Bool
, haserror :: Maybe WFErrors }
deriving (Typeable,Read, Show)
class MonadIO io => HasFork io where
fork :: io () -> io ThreadId
instance HasFork IO where
fork= forkIO
instance (HasFork io, MonadIO io
, CMC.MonadCatchIO io)
=> HasFork (WF Stat io) where
fork f = do
(r,info@(WFInfo str finished status)) <- stepWFRef $ getTempName >>= \n -> return(WFInfo n False Nothing)
WF $ \s -> do
th <- if finished then fork $ return()
else
fork $
exec1 str f >> labelFinish r str Nothing
`CMC.catch` \(e :: E.SomeException) -> do
liftIO . atomicallySync $ writeWFRef r (WFInfo str True . Just . WFException $ show e)
killWF1 $ keyWF str ()
return (s,th)
where
labelFinish r str err= liftIO . atomicallySync $ writeWFRef r (WFInfo str True err)
wfExec
:: (Serialize a, Typeable a
, CMC.MonadCatchIO m, MonadIO m)
=> Workflow m a -> Workflow m a
wfExec f= do
str <- step $ getTempName
step $ exec1 str f
exec1d :: (MonadIO m, CMC.MonadCatchIO m)
=> String -> (Workflow m b) -> m b
exec1d str f= do
r <- exec1 str f
delit
return r
`CMC.catch` (\e@CE.ThreadKilled -> delit >> throw e)
where
delit= do
delWF str ()
exec1 :: ( Monad m, MonadIO m, CMC.MonadCatchIO m)
=> String -> Workflow m a -> m a
exec1 str f= exec str (const f) ()
exec :: ( Indexable a, Serialize a, Typeable a
, Monad m, MonadIO m, CMC.MonadCatchIO m)
=> String -> (a -> Workflow m b) -> a -> m b
exec str f x =
(do
v <- getState str f x
case v of
Right (name, f, stat) -> do
r <- runWF name (f x) stat
return r
Left err -> CMC.throw err)
`CMC.catch`
(\(e :: CE.SomeException) -> liftIO $ do
let name= keyWF str x
clearRunningFlag name
CMC.throw e )
exec1nc :: ( Monad m, MonadIO m, CMC.MonadCatchIO m)
=> String -> Workflow m a -> m a
exec1nc str f =
(do
v <- getState str f ()
case v of
Right (name, f, stat) -> do
r <- runWF1 name f stat False
return r
Left err -> CMC.throw err)
`CMC.catch`
(\(e :: CE.SomeException) -> liftIO $ do
let name= keyWF str ()
clearRunningFlag name
CMC.throw e )
mv :: MVar Int
mv= unsafePerformIO $ newMVar 0
getTempName :: MonadIO m => m String
getTempName= liftIO $ do
seq <- takeMVar mv
putMVar mv (seq + 1)
TOD t _ <- getClockTime
return $ "anon"++ show t ++ show seq
step :: ( Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> m a
-> Workflow m a
step mx= WF(\s -> do
let stat= state s
recovers= recover s
versionss= versions s
if recovers && not (L.null versionss)
then
return (s{versions=L.tail versionss }, fromIDyn $ L.head versionss )
else do
let ref= self s
when (recovers && L.null versionss) $ do
liftIO $ atomically $ do
s' <- readDBRef ref `justifyM` error ("step: not found: "++ wfName s)
writeDBRef ref s'{recover= False,references= references s}
stepExec ref mx)
stepExec sref mx= do
x' <- mx
liftIO . atomicallySync $ do
s <- readDBRef sref >>= return . fromMaybe (error $ "step: readDBRef: not found:" ++ keyObjDBRef sref)
let versionss= versions s
dynx= toIDyn x'
ver= dynx: versionss
s'= s{ recover= False, versions = ver, state= state s+1}
writeDBRef sref s'
return (s', x')
isInRecover :: Monad m => Workflow m Bool
isInRecover = WF(\s@Stat{..} ->
if recover && not (L.null versions ) then return(s,True )
else if recover== True then return(s{recover=False}, False)
else return (s,False))
stepDebug :: ( Monad m
, MonadIO m
, Serialize a
, Typeable a)
=> m a
-> Workflow m a
stepDebug f = r
where
r= do
WF(\s ->
let stat= state s
in case recover s && not(L.null $ versions s) of
True -> f >>= \x -> return (s{versions= L.tail $ versions s},x)
False -> stepExec (self s) f)
--while
start
:: ( CMC.MonadCatchIO m
, MonadIO m
, Indexable a
, Serialize a
, Typeable a)
=> String
-> (a -> Workflow m b)
-> a
-> m (Either WFErrors b)
start namewf f1 v = do
ei <- getState namewf f1 v
case ei of
Left error -> return $ Left error
Right (name, f, stat) ->
runWF name (f v) stat >>= return . Right
`CMC.catch`
(\(e :: WFErrors) -> do
let name= keyWF namewf v
clearRunningFlag name
return $ Left e)
`CMC.catch`
(\(e :: CE.SomeException) -> liftIO $ do
let name= keyWF namewf v
clearRunningFlag name
return . Left $ WFException $ show e )
data WFErrors = NotFound | AlreadyRunning | Timeout | WFException String deriving (Typeable, Read, Show)
instance CE.Exception WFErrors
getState :: (Monad m, MonadIO m, Indexable a, Serialize a, Typeable a)
=> String -> x -> a
-> m (Either WFErrors (String, x, Stat))
getState namewf f v= liftIO . atomically $ getStateSTM
where
getStateSTM = do
mrunning <- readDBRef tvRunningWfs
case mrunning of
Nothing -> do
writeDBRef tvRunningWfs (Running $ fromList [])
getStateSTM
Just(Running map) -> do
let key= keyWF namewf v
dynv= toIDyn v
stat1= stat0{wfName= key,versions=[dynv],state=1, self= sref`seq`sref}
sref= getDBRef $ keyResource stat1
case M.lookup key map of
Nothing -> do
mythread <- unsafeIOToSTM $ myThreadId
safeIOToSTM $ delResource stat1 >> writeResource stat1
writeDBRef tvRunningWfs . Running $ M.insert key (namewf,Just mythread) map
writeDBRef sref stat1
return $ Right (key, f, stat1)
Just (wf, started) ->
if isJust started
then return $ Left AlreadyRunning
else do
mst <- readDBRef sref
stat' <- case mst of
Nothing -> error $ "getState: Workflow not found: "++ key
Just s -> do
when(not $ recover s) $ error ("flow "++key++ "found in a wrong state: report it")
if isJust (timeout s)
then do
tnow <- unsafeIOToSTM getTimeSeconds
if lastActive s+ fromJust(timeout s) > tnow
then
return s{recover= True,timeout=Nothing}
else
return stat1
else return s{recover= True}
writeDBRef sref stat'
mythread <- unsafeIOToSTM myThreadId
writeDBRef tvRunningWfs . Running $ M.insert key (namewf,Just mythread) map
return $ Right (key, f, stat')
runWF :: ( Monad m, MonadIO m)
=> String -> Workflow m b -> Stat -> m b
runWF n f s = runWF1 n f s True
runWF1 n f s clear= do
(s', v') <- st f s{versions= L.tail $ versions s}
liftIO $ if clear then clearFromRunningList n
else clearRunningFlag n >> return ()
return v'
where
clearFromRunningList n = atomicallySync $ do
Just(Running map) <- readDBRef tvRunningWfs
writeDBRef tvRunningWfs . Running $ M.delete n map
flushDBRef (getDBRef n :: DBRef Stat)
startWF
:: ( CMC.MonadCatchIO m, MonadIO m
, Serialize a, Serialize b
, Typeable a
, Indexable a
, Typeable b)
=> String
-> a
-> WorkflowList m a b
-> m (Either WFErrors b)
startWF namewf v wfs=
case M.lookup namewf wfs of
Nothing -> return $ Left NotFound
Just f -> start namewf f v
restartWorkflows
:: (Serialize a, Serialize b, Typeable a
, Indexable b, Typeable b)
=> WorkflowList IO a b
-> IO ()
restartWorkflows map = do
mw <- liftIO $ getResource ((Running undefined ) )
case mw of
Nothing -> return ()
Just (Running all) -> mapM_ start . mapMaybe filter . toList $ all
where
filter (a, (b,Nothing)) = Just (b, a)
filter _ = Nothing
start (key, kv)= do
let mf= M.lookup key map
case mf of
Nothing -> return ()
Just f -> do
let st0= stat0{wfName = kv}
mst <- liftIO $ getResource st0
case mst of
Nothing -> error $ "restartWorkflows: workflow not found "++ keyResource st0
Just st-> do
liftIO . forkIO $ runWF key (f (fromIDyn . L.head $ versions st )) st{recover=True} >> return ()
return ()
getAll :: Monad m => Workflow m [IDynamic]
getAll= WF(\s -> return (s, versions s))
--getStep
getWFKeys :: String -> IO [String]
getWFKeys wfname= do
mwfs <- atomically $ readDBRef tvRunningWfs
case mwfs of
Nothing -> return []
Just (Running wfs) -> return $ Prelude.filter (L.isPrefixOf wfname) $ M.keys wfs
getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat)
getWFHistory wfname x= getResource stat0{wfName= keyWF wfname x}
delWFHistory name1 x = do
let name= keyWF name1 x
delWFHistory1 name
delWFHistory1 name = do
let proto= stat0{wfName= name}
atomically . withSTMResources [] $ const resources{ toDelete= [proto] }
waitWFActive wf= do
r <- threadWF wf
case r of
Just (_, Nothing) -> retry
_ -> return ()
where
threadWF wf= do
Just(Running map) <- readDBRef tvRunningWfs
return $ M.lookup wf map
killThreadWF :: ( Indexable a
, Serialize a
, Typeable a
, MonadIO m)
=> String -> a -> m()
killThreadWF wfname x= do
let name= keyWF wfname x
killThreadWF1 name
killThreadWF1 :: MonadIO m => String -> m()
killThreadWF1 name= killThreadWFm name >> return ()
killThreadWFm name= do
(map,f) <- clearRunningFlag name
case f of
Just th -> liftIO $ killThread th
Nothing -> return()
return map
killWF :: (Indexable a,MonadIO m) => String -> a -> m ()
killWF name1 x= do
let name= keyWF name1 x
killWF1 name
killWF1 :: MonadIO m => String -> m ()
killWF1 name = do
map <- killThreadWFm name
liftIO . atomically . writeDBRef tvRunningWfs . Running $ M.delete name map
return ()
delWF :: ( Indexable a
, MonadIO m
, Typeable a)
=> String -> a -> m()
delWF name1 x= do
let name= keyWF name1 x
delWF1 name
delWF1 :: MonadIO m=> String -> m()
delWF1 name= liftIO $ do
mrun <- atomically $ readDBRef tvRunningWfs
case mrun of
Nothing -> return()
Just (Running map) -> do
atomicallySync . writeDBRef tvRunningWfs . Running $! M.delete name map
delWFHistory1 name
clearRunningFlag name= liftIO $ atomically $ do
mrun <- readDBRef tvRunningWfs
case mrun of
Nothing -> error $ "clearRunningFLag: non existing workflows" ++ name
Just(Running map) -> do
case M.lookup name map of
Just(_, Nothing) -> return (map,Nothing)
Just(v, Just th) -> do
writeDBRef tvRunningWfs . Running $ M.insert name (v, Nothing) map
flushDBRef (getDBRef $ keyResource stat0{wfName=name} :: DBRef Stat)
return (map,Just th)
Nothing ->
return (map, Nothing)
newWFRef :: ( Serialize a
, Typeable a
, MonadIO m
, CMC.MonadCatchIO m)
=> a -> Workflow m (WFRef a)
newWFRef x= stepWFRef (return x) >>= return . fst
stepWFRef :: ( Serialize a
, Typeable a
, MonadIO m)
=> m a -> Workflow m (WFRef a,a)
stepWFRef exp= do
r <- step exp
WF(\s@Stat{..} -> do
let (n,flag)= if recover then (state (L.length versions) 1 ,False)
else (state 1 ,True)
let ref = WFRef n self
let s'= s{references= (n,(toIDyn r,flag)):references }
liftIO $ atomically $ writeDBRef self s'
r `seq` return (s',(ref,r)) )
readWFRef :: ( Serialize a
, Typeable a)
=> WFRef a
-> STM (Maybe a)
readWFRef (WFRef n ref)= do
mr <- readDBRef ref
case mr of
Nothing -> return Nothing
Just st ->
case L.lookup n $! references st of
Just (r,_) -> return . Just $ fromIDyn r
Nothing -> do
let n1= state st n
return . Just . fromIDyn $ versions st !! n1
justifyM io y= io >>= return . fromMaybe y
writeWFRef :: ( Serialize a
, Typeable a)
=> WFRef a
-> a
-> STM ()
writeWFRef r@(WFRef n ref) x= do
mr <- readDBRef ref
case mr of
Nothing -> error $ "writeWFRef: workflow does not exist: " ++ show ref
Just st@Stat{..} ->
writeDBRef ref st{references= add x references}
where
add x xs= (n,(toIDyn x,False)) : L.filter (\(n',_) -> n/=n') xs
moveState :: (MonadIO m
, Indexable a
, Serialize a
, Typeable a)
=>String -> a -> a -> m ()
moveState wf t t'= liftIO $ do
atomicallySync $ do
withSTMResources[stat0{wfName= n}] $ change n
mrun <- readDBRef tvRunningWfs
case mrun of
Nothing -> return()
Just (Running map) -> do
let mr= M.lookup n map
let th= case mr of Nothing -> Nothing; Just(_,mt)-> mt
let map'= M.insert n' (wf,th) $ M.delete n map
writeDBRef tvRunningWfs $ Running map'
where
n = keyWF wf t
n'= keyWF wf t'
change m [Nothing] = error $ "changeState: Workflow not found: "++ show n
change n [Just s] = resources{toAdd= [ s{wfName=n',versions = toIDyn t': L.tail( versions s) }]
,toDelete=[s]}
doit n [Nothing]= error $ "moveState: state not found for: " ++ n
logWF :: MonadIO m => String -> Workflow m ()
logWF str=do
str <- step . liftIO $ do
time <- getClockTime >>= toCalendarTime >>= return . calendarTimeToString
Prelude.putStrLn str
return $ time ++ ": "++ str
WF $ \s -> str `seq` return (s, ())
waitForData :: (IResource a, Typeable a)
=> (a -> Bool)
-> a
-> IO a
waitForData f x = atomically $ waitForDataSTM f x
waitForDataSTM :: (IResource a, Typeable a)
=> (a -> Bool)
-> a
-> STM a
waitForDataSTM filter x= do
tv <- newDBRef x
do
mx <- readDBRef tv >>= \v -> return $ cast v
case mx of
Nothing -> retry
Just x ->
case filter x of
False -> retry
True -> return x
waitFor
:: ( Indexable a, Serialize a, Serialize b, Typeable a
, Indexable b, Typeable b)
=> (b -> Bool)
-> String
-> a
-> IO b
waitFor filter wfname x= atomically $ waitForSTM filter wfname x
waitForSTM
:: ( Indexable a, Serialize a, Serialize b, Typeable a
, Indexable b, Typeable b)
=> (b -> Bool)
-> String
-> a
-> STM b
waitForSTM filter wfname x= do
let name= keyWF wfname x
let tv= getDBRef . keyResource $ stat0{wfName= name}
mmx <- readDBRef tv
case mmx of
Nothing -> error ("waitForSTM: Workflow does not exist: "++ name)
Just mx -> do
let Stat{ versions= d:_}= mx
case safeFromIDyn d of
Nothing -> retry
Just x ->
case filter x of
False -> retry
True -> return x
getTimeoutFlag
:: MonadIO m
=> Integer
-> Workflow m (TVar Bool)
getTimeoutFlag 0 = WF $ \s -> liftIO $ newTVarIO False >>= \tv -> return (s, tv)
getTimeoutFlag t = do
tnow <- step $ liftIO getTimeSeconds
flag tnow t
where
flag tnow delta = WF $ \s -> do
tv <- liftIO $ newTVarIO False
liftIO $ do
let t = tnow + delta
atomically $ writeTVar tv False
forkIO $ do waitUntil t ; atomically $ writeTVar tv True
return (s, tv)
getTimeSeconds :: IO Integer
getTimeSeconds= do
TOD n _ <- getClockTime
return n
waitUntilSTM :: TVar Bool -> STM()
waitUntilSTM tv = do
b <- readTVar tv
if b == False then retry else return ()
waitUntil:: Integer -> IO()
waitUntil t= getTimeSeconds >>= \tnow -> wait (ttnow)
wait :: Integer -> IO()
wait delta= do
let delay | delta < 0= 0
| delta > (fromIntegral maxInt) = maxInt
| otherwise = fromIntegral $ delta
threadDelay $ delay * 1000000
if delta <= 0 then return () else wait $ delta (fromIntegral delay )
withTimeout :: ( MonadIO m, Typeable a, Serialize a)=> Integer -> STM a -> Workflow m (Maybe a)
withTimeout time f = do
flag <- getTimeoutFlag time
step . liftIO . atomically $ (f >>= return . Just )
`orElse`
(waitUntilSTM flag >> return Nothing)
withKillTimeout :: CMC.MonadCatchIO m => String -> Int -> Integer -> m a -> m a
withKillTimeout id time time2 f = do
tid <- liftIO myThreadId
twatchdog <- liftIO $ forkIO $ threadDelay (time * 1000000) >> throwTo tid Timeout
r <- f
liftIO $ killThread twatchdog
return r
`CMC.catch` \(e :: WFErrors) ->
case e of
Timeout -> liftIO $ do
tnow <- getTimeSeconds
let ref = getDBRef $ keyResource $ stat0{wfName=id}
when (time2 /=0) $ atomically $ do
s <- readDBRef ref `onNothing` error ( "withKillTimeout: Workflow not found: "++ id)
writeDBRef ref s{lastActive= tnow,timeout= Just (time2fromIntegral time)}
syncCache
clearRunningFlag id
throw Timeout
_ -> throw e
transientTimeout 0= atomically $ newTVar False
transientTimeout t= do
flag <- atomically $ newTVar False
forkIO $ threadDelay (t * 1000000) >> atomically (writeTVar flag True)
return flag