{-# OPTIONS -fglasgow-exts -XOverlappingInstances -XUndecidableInstances -O2 #-} ----------------------------------------------------------------------------- -- -- Module : Control.Workflow -- Copyright : Alberto Gómez Corona -- License : see LICENSE -- -- Maintainer : agocorona@gmail.com -- Stability : experimental {- | Transparent support for interruptable computations. A workflow can be seen as a persistent thread. The main features are: * Transparent state logging trough a monad transformer: step :: m a -> Workflow m a. * Resume the computation state after an accidental o planned program shutdown (restartWorkflows). *Event handling (waithFor, waitForData). * Monitoring of workflows with state change display and other auxiliary features. * Communications with other processes including other workflows trough persistent data objects, inspecttion of intermediate workflow results , Queues so that no data is lost due to shutdowns In this way, a very long computation that may take more time than the average time between hardware or software failures, shutdowns etc. The workflow can be defined transparently in a single monadic procedure. Besides state logging and recovery, there are a number of communication primitives that are aware of persistence across reinitiations such are persistent queues, persistent timeouts, or wait for events in the STM monad. These primitives permits inter-woikflow communications and communications with external threads. This package uses TCache for persistence and event handling. It also uses the package Refserialize. This package permits to reduce the workflow state load, since the RefSerialize package permits to serialize and deserialize complex and autoreferenced data structures without loosing such references, this is critical when big and structured data, such are documents, suffer little modifications across a set of workflow steps. Therefore, it is also recommended to use Refserialize for big user-defined objects that have small pieces that suffer little modifications during the workflow. As an added bonus, the history will show such changes with more detail. The 'step' primitive is the lift operation that converts a result of type @m a@ to a type @'Workflow' m a@ with automatic state loggin and recovery. To allow such features, Every @a@ must be instance of 'Typeable' and 'IResource' (defined in the @TCache@ package). In fact, Workflow can be considered as an instance of a partial monad transformed. defined as such: @class 'PMonadTrans' t m a where 'plift' :: Monad m => m a -> t m a instance (Monad m,MonadIO m, IResource a, Typeable a) => PMonadTrans (WF Stat) m a where 'plift' = 'step' @ It is partial because the lift operation is not defined for every monad @m@ and data type @a@ , but for monads and data types that meet certain conditions. In this case, to be instances of @MonadIO@, @IResource@ and @Typeable@ respectively. to avoid to define the last two interfaces however, 'Read' and 'Show'' can be used to derive instances of 'IResource' for most of the useful cases. This is the set of automatic derivations: @(Read a, Show a) => 'Serialize' a Typeable a => 'Indexable' a (a single key for all values. enough for workflows) ('Indexable' a, 'Serialize' a) => IResource a@ Therefore deriving to be instance of @Read, Show@ is enough for every intermediate data result along the computation Because 'Data.TCache.Dynamic' from the package 'TCache' is used for persistence, every data type must be registered by using 'registerType' Here is a compkete example: This is a counter that shows a sequence of numbers, one a second: @module Main where import Control.Concurrent(threadDelay) import System.IO (hFlush,stdout) count n= do putStr (show n ++ " ") >> hFlush stdout >> threadDelay 1000000 count (n+1) main= count 0@ This is the same program, with the added feature of remembering the last count after interrupted: @module Main where import Control.Workflow import Control.Concurrent(threadDelay) import System.IO (hFlush,stdout) mcount n= do 'step' $ putStr (show n ++ " ") >> hFlush stdout >> threadDelay 1000000 mcount (n+1) main= do registerType :: IO () registerType :: IO Int let start= 0 :: Int startWF "count" start [("count", mcount)] :: IO ()@ This is the execution log: @Worflow-0.5.5\demos>runghc sequence.hs 0 1 2 3 4 5 6 7 sequence.hs: win32ConsoleHandler sequence.hs: sequence.hs: interrupted Worflow-0.5.5\demos> Worflow-0.5.5\demos>runghc sequence.hs 7 8 9 10 11 ....@ -} ----------------------------------------------------------------------------- module Control.Workflow ( Workflow -- a useful type name , WorkflowList , IResource(..) , registerType , PMonadTrans (..) , Indexable (key) , step , startWF , restartWorkflows , getStep , getAll , logWF , getWFKeys , getWFHistory -- return the list of steps results , delWFHistory -- delete the workflow history , printHistory -- print the history , unsafeIOtoWF , waitFor , waitUntil , waitUntilSTM , syncWrite , writeQueue , writeQueueSTM , readQueue , readQueueSTM , unreadQueue , unreadQueueSTM , getTimeSeconds , getTimeoutFlag , isEmptyQueue , isEmptyQueueSTM ) where import System.IO.Unsafe import Control.Monad(when,liftM) import Control.Exception(Exception, throw) import Control.Concurrent (forkIO,threadDelay, ThreadId) import Control.Concurrent.STM import GHC.Conc(unsafeIOToSTM) import GHC.Base (maxInt) import Data.TCache.Dynamic import Data.RefSerialize import Data.List((\\),find,elemIndices, isPrefixOf) import Data.Typeable import System.Time import Control.Monad.Trans import Control.Monad (replicateM) import System.IO(hPutStrLn, stderr) import Data.List(elemIndex) import Data.Maybe(fromJust, isNothing) import qualified Data.Map as M(Map,fromList,elems, insert, lookup) import System.Mem.StableName {- import Debug.Trace debug a b = trace b a report :: Exception e => IO a -> String -> e -> IO a report f text e= catch f (\e -> throw $ userError (text++": "++ show e)) freport text f = report f text -} data WF s m l = WF { st :: s -> m (s,l) } type Workflow m l= WF Stat m l -- not so scary type WorkflowList m a b= [(String, a -> Workflow m b) ] data Stat = RunningWorkflows [String] | Stat{ wfName :: String, state:: Int, index :: Int, recover:: Bool, sync :: Bool , versions ::[IDynamic], timeout :: Maybe (TVar Bool)} deriving (Typeable) stat0 = Stat{ wfName="", state=0, index=0, recover=False, versions = [] , sync= True, timeout= Nothing} hasht x= (hashStableName . unsafePerformIO . makeStableName) x -- serialization of data is done trough RefSerialize because it permits to store -- different versions of the same object with minumum memory. instance Serialize IDynamic where showp= tshowp readp = treadp instance Serialize Stat where showp (RunningWorkflows list)= do str <- showp list return $ "StatWorkflows "++ str showp (Stat wfName state index recover sync versions _)= do parsea <- rshowp versions return $ "Stat "++ show wfName ++" "++ show state++" "++show index++" "++show recover++" " ++ show sync ++ parsea readp = choice [rStat, rWorkflows] where rStat= do symbol "Stat" wfName <- readp -- stringLiteral state <- readp -- integer index <- readp --integer recover <- readp --bool sync <- readp --bool versions <- rreadp return $ Stat wfName state index recover sync versions Nothing rWorkflows= do symbol "StatWorkflows" list <- readp return $ RunningWorkflows list --persistence trough TCache , default persistence in files workflowsPath= "Workflows/" instance IResource Stat where keyResource s@Stat{wfName=name}= "Stat#" ++ name keyResource (RunningWorkflows _)= "RunningWorkflows" defPath _= workflowsPath -- directory for Workflow data serialize x= runW $ showp x deserialize str = runR readp str {- | Indexablle can be used to derive instances of IResource This is the set of automatic derivations: *(Read a, Show a) => Serialize a *Typeable a => Indexable a (a single key for all values. enough for workflows) *(Indexable a, Serialize a) => IResource a -} class Indexable a where key:: a -> String instance Typeable a => Indexable a where key x= show $ typeOf x instance (Serialize a, Indexable a) => IResource a where keyResource x=key x tshowp= showp treadp= readp -- | executes a IO computation inside of the workflow monad whatever the monad encapsulated in the workflow. -- Warning: this computation is executed whenever -- the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging. -- To avoid re-execution when restarting use: @'step' $ unsafeIOtoWF...@ -- -- To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly: -- -- @ 'step' $ action @ -- -- instead of -- -- @ 'step' $ unsafeIOtoWF $ action @ unsafeIOtoWF :: (Monad m) => IO a -> Workflow m a unsafeIOtoWF x= let y= unsafePerformIO ( x >>= return) in y `seq` return y {- | PMonadTrans permits |to define a partial monad transformer. They are not defined for all kinds of data but the ones that have instances of certain classes.That is because in the lift instance code there are some hidden use of these classes. This also may permit an accurate control of effects. An instance of MonadTrans is an instance of PMonadTrans -} class PMonadTrans t m a where plift :: Monad m => m a -> t m a -- | plift= step instance (Monad m,MonadIO m,IResource a, Typeable a) => PMonadTrans (WF Stat) m a where plift = step -- | An instance of MonadTrans is an instance of PMonadTrans instance (MonadTrans t, Monad m) => PMonadTrans t m a where plift= Control.Monad.Trans.lift instance Monad m => MonadIO (WF Stat m) where liftIO=unsafeIOtoWF instance Monad m => Monad (WF s m) where return x = WF (\s -> return (s, x)) WF g >>= f = WF (\s -> do (s1, x) <- g s let WF fun= f x (s3, x') <- fun s1 return (s3, x')) --class (IResource a, Serialize a,Typeable a) => Workflow_ a where -- | step lifts a monadic computation to the WF monad, and provides transparent state loging and resume of computation step :: (Monad m,MonadIO m,IResource a, Typeable a) => m a -> Workflow m a step mx= WF(\s -> do let stat= state s let ind= index s if recover s && ind < stat then return (s{index=ind +1 }, fromIDyn $ versions s !! (stat - ind-1) ) else do x' <- mx let versionss= versions s let ver= toIDyn x': versionss let s'= s{recover= False, versions = ver, state= state s+1} liftIO $ do withResources ([]::[Stat]) (\_-> [s' ]) --`debug` "update cache" when (sync s ) syncCache return (s', x') ) -- | start or continue a workflow. startWF :: (Monad m,MonadIO m,IResource a, Typeable a, IResource b, Typeable b) => String -- ^ name of workflow in the workflow list -> a -- ^ initial value (even use the initial value even to restart the workflow) -> WorkflowList m a b -- ^ workflow list. t is an assoc-list of (workflow name string,Workflow methods) -> m b -- ^ result of the computation startWF namewf v wfs= do liftIO (registerType :: IO Stat) liftIO (registerType :: IO Control.Workflow.Queue) liftIO (registerType :: IO String) liftIO (registerType :: IO Integer) case lookup namewf wfs of Nothing -> error $ "startWF: workflow name not found in workflow list: "++namewf; Just f -> do let name= namewf ++ "#" ++ keyResource v let stat1= stat0{wfName= name , versions= [toIDyn v]} mst <- liftIO $ getResource stat1 let (vn, stat,create)= case mst of Nothing -> (v, stat1, True) Just s-> (v,s{index=0,recover=True},False) -- the last value let addwf [wf ] = resources{ toAdd=[ toIDyn $ RunningWorkflows (name:xs) ] ++ [ toIDyn stat]} where xs= case wf of Nothing -> []; Just dyn -> xs where RunningWorkflows xs = fromIDyn dyn when create $ liftIO . atomically $ withDSTMResources [toIDyn $ RunningWorkflows undefined ] addwf runWF name f vn stat -- `debug` (serialize stat) runWF :: (Monad m,MonadIO m,IResource a,Typeable a, IResource b, Typeable b) => String -> ( a -> Workflow m b) -> a -> Stat -> m b runWF name f v s=do when (sync s) $ liftIO $ syncCache (s', v') <- st (f v) $ s --`debug` "runWF********" let delWF Nothing = error $ "runWF: Workflow list not found: " delWF (Just (RunningWorkflows xs ))= let name= name++ "#" ++ keyResource v in case elem name xs of False -> error $"runWF: not found state for workflow: "++ name True -> RunningWorkflows (xs \\ [name]) liftIO $ withResource (RunningWorkflows undefined) delWF when (sync s) $ liftIO $ syncCache return v' -- | re-start the non finished workflows started for all initial values that are listed in the workflow list restartWorkflows ::(IResource a, Typeable a, IResource b, Typeable b) => WorkflowList IO a b -- the list of workflows that implement the module -> IO () -- Only workflows in the IO monad can be restarted with restartWorkflows restartWorkflows map = do liftIO (registerType :: IO Stat) liftIO (registerType :: IO Control.Workflow.Queue) liftIO (registerType :: IO String) liftIO (registerType :: IO Integer) mw <- liftIO $ getResource ((RunningWorkflows undefined ) ) -- :: IO (Maybe(Stat a)) case mw of Nothing -> return () Just (RunningWorkflows all) -> mapM_ start all where start key1= do let key= case elemIndex '#' key1 of Just n -> take n key1 Nothing -> key1 let mf= lookup key map if isNothing mf then return () else do let f= fromJust mf let st0= stat0{wfName = key1} mst <- liftIO $ getResource st0 case mst of Nothing -> error $ "restartWorkflows: not found "++ keyResource st0 Just st-> do liftIO . forkIO $ runWF key f (fromIDyn . last $ versions st ) st{index=0,recover=True} >> return () return () -- | change the logging policy (default is syncronous) -- Workflow uses the package TCache for logging -- for very fast workflow steps or when TCache is used also for other purposes , asyncronous is a better option syncWrite:: (Monad m, MonadIO m) => Bool -- ^True means syncronoys: changes are inmediately saved after each step -> Int -- ^ number of seconds between saves when asyncronous -> Int -- ^ size of the cache when async -> WF Stat m () -- ^ in the workflow monad syncWrite bool time maxsize= WF(\s -> do when (bool== False) $ do liftIO $ clearSyncCacheProc time defaultCheck maxsize return () return (s{ sync= bool},())) getStep :: (IResource a, Typeable a, Monad m) => Int -- ^ the step number. If negative, count from the current state backwards -> Workflow m a -- ^ return the n-tn intermediate step result getStep i= WF(\s -> do let stat= state s return (s, if i > 0 && i <= stat then fromIDyn $ versions s !! (stat - i) else if i < 0 && i >= -stat then fromIDyn $ versions s !! (stat +i) else error "getStep: wrong index") ) -- | return all the intermediate results. it is supposed that all the intermediate result have -- the same type. getAll :: (IResource a, Typeable a, Monad m) => WF Stat m [a] getAll = WF(\s -> return (s, map fromIDyn . take (state s+1) $ versions s)) -- | log a message in the workflow history. I can be printed out with 'printWFhistory' logWF :: (Monad m, MonadIO m) => String -> Workflow m () logWF str= WF (\s -> do time <- liftIO $ getClockTime >>= toCalendarTime >>= return . calendarTimeToString let str2 = time ++ ": "++ str let (state1, index1, versions1)= let stat= state s ; ind= index s in if recover s && ind < stat then (stat,ind +1, versions s) else (stat +1, ind, toIDyn str2 : versions s) return (s{versions= versions1, state= state1, index= index1}, ()) ) -- | return the list of object keys that are running getWFKeys :: String -> IO [String] getWFKeys wfname= do mwfs <- getResource $ RunningWorkflows undefined case mwfs of Nothing -> return [] Just (RunningWorkflows wfs) -> return $ map (tail . dropWhile (/= '#')) $ filter (isPrefixOf wfname) wfs -- | return the current state of the computation, in the IO monad getWFHistory :: (IResource a) => String -> a -> IO (Maybe Stat) getWFHistory wfname x= getResource stat0{wfName= wfname ++ "#" ++ keyResource x} -- | delete the workflow. Make sure that the workdlow is not running delWFHistory :: IResource a => String -> a -> IO () delWFHistory wfname1 x=do let wfname= wfname1 ++ "#" ++ keyResource x let doit [Just (RunningWorkflows wfs)] = resources{ toAdd = [RunningWorkflows (wfs \\ [wfname])] , toDelete= [stat0{wfName= wfname}] } doit _ = error "delWFHistory: list of running workflows not found" atomically $ withSTMResources[RunningWorkflows undefined] doit syncCache -- | print the state changes along the workflow, that is, all the intermediate results printHistory :: Stat -> IO () printHistory stat= do putStrLn . runW $ showp $ Pretty stat putStrLn "-----------------------------------" {- mapM_ f . zip [1..] . reverse $ versions stat where f :: (Int,IDynamic) -> IO() f (n, ( IDynamic x))= do putStr "Step " putStr $ show n putStr " " putStrLn $ serialize x -} data Pretty = Pretty Stat instance Serialize Pretty where showp (Pretty (Stat wfName state index recover sync versions _))= do name <- showp wfName vers <- showElem (zip ( reverse $ take (length versions)[1..] ) versions ) "" return $ "Workflow name= " ++ name ++ "\n" ++ vers where showElem [] str= return $ str ++ "\n" showElem ((n, IDynamic e):es) str= do etext <- tshowp e showElem es $ "Step " ++ show n ++ ": " ++ etext ++ "\n" ++ str readp = undefined --------- event handling-------------- reference :: IDynamic -> STM (TVar IDynamic) reference x=do mv <- getTVars [ x ] case mv of [Nothing] -> do insertResources [x] reference x [Just cl] -> return cl where insertResources xs= withDSTMResources [] $ const resources{toAdd= [x]} -- |wait until a TCache object (with a certaing key) meet a certain condition (useful to check external actions ) -- NOTE if anoter process delete the object from te cache, then waitForData will no longuer work -- inside the wokflow, it can be used by lifting it : -- do -- x <- step $ .. -- y <- step $ waitForData ... -- .. waitForData :: (IResource a, Typeable a,IResource b, Typeable b) => (b -> Bool) -- ^ The condition that the retrieved object must meet -> a -- ^ a partially defined object for which keyResource can be extracted -> IO b -- ^ return the retrieved object that meet the condition and has the given key waitForData filter x= atomically $ waitForDataSTM filter x waitForDataSTM :: (IResource a, Typeable a,IResource b, Typeable b) => (b -> Bool) -- ^ The condition that the retrieved object must meet -> a -- ^ a partially defined object for which keyResource can be extracted -> STM b -- ^ return the retrieved object that meet the condition and has the given key waitForDataSTM filter x= do tv <- reference $ toIDyn x do dyn <- readTVar tv case safeFromIDyn dyn of Nothing -> retry Just x -> case filter x of False -> retry True -> return x waitFor :: (IResource a, Typeable a, IResource b, Typeable b) => (b -> Bool) -- ^ The condition that the retrieved object must meet -> String -- ^ The workflow name -> a -- ^ the INITIAL value used in the workflow to start it -> IO b -- ^ The first event that meet the condition waitFor filter wfname x= atomically $ waitForSTM filter wfname x waitForSTM :: (IResource a, Typeable a, IResource b, Typeable b) => (b -> Bool) -- ^ The condition that the retrieved object must meet -> String -- ^ The workflow name -> a -- ^ the INITIAL value used in the workflow to start it -> STM b -- ^ The first event that meet the condition waitForSTM filter wfname x= do mtv <- getTVars [toIDyn stat0{wfName=wfname ++ "#" ++ keyResource x}] -- `debug` "**waitFor***" case mtv of [Nothing] -> error $ "workflow "++ wfname ++" not initialized for data " ++ keyResource x [Just tv] -> do dyn <- readTVar tv let Stat{ versions= d: _}= fromIDyn dyn case safeFromIDyn d of Nothing -> retry -- `debug` "waithFor retry Nothing" Just x -> case filter x of False -> retry -- `debug` "waitFor false filter retry" True -> return x -- `debug` "waitfor return" -- | start the timeout and return the flag to be monitored by 'waitUntilSTM' getTimeoutFlag :: (MonadIO m) => Integer -- ^ wait time in secods -> Workflow m (TVar Bool) -- ^ the returned flag in the workflow monad getTimeoutFlag t=do tnow<- step $ liftIO getTimeSeconds flag tnow t where flag tnow delta= WF(\s -> do (s', tv) <- case timeout s of Nothing -> do tv <- liftIO $ newTVarIO False return (s{timeout= Just tv}, tv) Just tv -> return (s, tv) liftIO $ do let t = tnow + delta forkIO $ do waitUntil t ; atomically $ writeTVar tv True return (s', tv)) getTimeSeconds :: IO Integer getTimeSeconds= do TOD n _ <- getClockTime return n {- | wait until a certain clock time, in the STM monad. This permits to compose timeouts with locks waiting for data. *example: wait for any respoinse from a Queue if no response is given in 5 minutes, it is returned True. @ flag <- getTimeoutFlag $ 5 * 60 ap <- step . atomically $ readQueueSTM docQueue `orElse` waitUntilSTM flag >> return True case ap of False -> 'logWF' "False or timeout" >> correctWF doc True -> do @ -} waitUntilSTM :: TVar Bool -> STM() waitUntilSTM tv = do b <- readTVar tv if b == False then retry else return () waitUntil:: Integer -> IO() waitUntil t= do tnow <- getTimeSeconds let delay | t-tnow < 0= 0 | t-tnow > (fromIntegral maxInt) = maxInt | otherwise = fromIntegral $ t - tnow threadDelay $ delay * 1000000 if t - tnow <= 0 then return () else waitUntil t data Queue= Queue {name :: String, imp :: [IDynamic], out :: [IDynamic]} deriving (Typeable) instance Serialize Queue where showp (Queue name imp out)= do sin<- showp imp sout <- showp out return $ "Queue " ++ show name ++ " " ++ sin ++ " " ++ sout readp = do symbol "Queue" name <- readp sin <- readp sout <- readp return $ Queue name sin sout instance IResource Queue where keyResource (Queue name _ _)= "Queue#" ++ name serialize x= runW $ showp x deserialize str = runR readp str defPath _= workflowsPath -- | delete elements from the Queue stack and return them in the IO monad readQueue :: (IResource a , Typeable a) => String -- ^ Queue name -> IO a -- ^ the returned elems readQueue = atomically . readQueueSTM -- | delete elements from the Queue stack an return them. in the STM monad readQueueSTM :: (IResource a , Typeable a) => String -> STM a readQueueSTM queue = do let qempty= Queue queue [] [] let empty= toIDyn qempty reference empty -- make sure that the queue has been created d <- withSTMResources [qempty] doit -- otherwise, it will not retry releaseTVars [empty] return $ fromIDyn d where doit [ Nothing] = Retry doit [Just(Queue _ [] [])] = Retry doit [Just(Queue _ imp [])] = doit [Just (Queue queue [] $ reverse imp)] doit [Just (Queue _ imp list)] = resources { toAdd= [ Queue queue imp (tail list)] , toReturn= head list } unreadQueue :: (IResource a , Typeable a) => String -> a -> IO () unreadQueue queue x= atomically $ unreadQueueSTM queue x unreadQueueSTM :: (IResource a , Typeable a) => String -> a -> STM () unreadQueueSTM queue x= withSTMResources [Queue queue undefined undefined] $ \[r]-> resources{ toAdd= doit r} where doit Nothing = [Queue queue [] [ toIDyn x] ] doit (Just(Queue _ imp out)) = [Queue queue imp ( toIDyn x : out) ] -- | insert an element on top of the Queue Stack writeQueue :: (IResource a, Typeable a) => String -> a -> IO () writeQueue queue v = atomically $ writeQueueSTM queue v -- | Like writeQueue, but in the STM monad writeQueueSTM :: (IResource a, Typeable a) => String -> a -> STM () writeQueueSTM queue v= withSTMResources [Queue queue undefined undefined] $ \[r]-> resources{ toAdd= doit r} where doit Nothing = [Queue queue [toIDyn v] []] doit (Just(Queue _ imp out)) = [Queue queue ( toIDyn v : imp) out] isEmptyQueue = atomically . isEmptyQueueSTM isEmptyQueueSTM :: String -> STM Bool isEmptyQueueSTM queue= do withDSTMResources [toIDyn $ Queue queue undefined undefined] doit where doit [ r]= resources{toReturn= ret} where ret=case r of Nothing -> True Just x -> case fromIDyn x of Queue _ [] [] -> True _ -> False