-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Workflow patterns and a monad for thread state logging & recovery -- -- Transparent support for interruptible computations. A workflow can be -- seen as a persistent thread that executes a monadic computation. -- Therefore, it can be used in very time consuming computations such are -- CPU intensive calculations or procedures that are most of the time -- waiting for the action of a process or an user, that are prone to -- comunication failures, timeouts or shutdowns. It also can be used if -- you like to restart your program at the point where the user left it -- last time . . The computation can be restarted at the interrupted -- point thanks to its logged state in permanent storage. The thread -- state is located in files by default. It can be moved and continued in -- another computer. Besides that, the package also provides other higher -- level services associated to workflows: Workflow patters and and a -- general configuarion utility. -- -- See Control.Workflow for details @package Workflow @version 0.8.0.2 -- | A workflow can be seen as a persistent thread. The workflow monad -- writes a log that permit to restore the thread at the interrupted -- point. step is the (partial) monad transformer for the Workflow -- monad. A workflow is defined by its name and, optionally by the key of -- the single parameter passed. There primitives for starting workflows -- also restart the interrupted workflow when it has been in execution -- previously. -- -- A small example that print the sequence of integers in te console if -- you interrupt the progam, when restarted again, it will start from the -- last printed number -- --
-- module Main where -- import Control.Workflow -- import Control.Concurrent(threadDelay) -- import System.IO (hFlush,stdout) -- -- mcount n= do step $ do -- putStr (show n ++ " ") -- hFlush stdout -- threadDelay 1000000 -- mcount (n+1) -- return () -- to disambiguate the return type -- -- main= exec1 "count" $ mcount (0 :: Int) ---- --
-- >>> runghc demos\sequence.hs -- >0 1 2 3 -- >CTRL-C Pressed -- -- >>> runghc demos\sequence.hs -- >3 4 5 6 7 -- >CTRL-C Pressed -- -- >>> runghc demos\sequence.hs -- >7 8 9 10 11 -- ... ---- -- The program restart at the last saved step. -- -- As you can see, some side effect can be re-executed after recovery if -- the log is not complete. This may happen after an unexpected shutdown -- (in this case) or due to an asynchronous log writing policy. (see -- syncWrite) -- -- When the step results are big and complex, use the -- Data.RefSerialize package to define the (de)serialization -- instances The log size will be reduced. printWFHistory` method will -- print the structure changes in each step. -- -- If instead of RefSerialize, you use read and show instances, -- there will be no reduction. but still it will work, and the log will -- be readable for debugging purposes. The RefSerialize istance is -- automatically derived from Read, Show instances. -- -- Data.Binary instances are also fine for serialization. To use Binary, -- just define a binary instance of your data by using showpBinary -- and readpBinary. -- -- Within the RefSerialize instance of a structure, you can freely mix -- Show,Read RefSerialize and Data Binary instances. -- -- Control.Workflow.Patterns contains higuer level workflow -- patters of multiuser workflows -- -- Control.Workflow.Configuration permits the use of workflows for -- configuration purposes module Control.Workflow data Stat type Workflow m = WF Stat m type WorkflowList m a b = Map String (a -> Workflow m b) -- | PMonadTrans permits |to define a partial monad transformer. -- They are not defined for all kinds of data but the ones that have -- instances of certain classes.That is because in the lift instance code -- there are some hidden use of these classes. This also may permit an -- accurate control of effects. An instance of MonadTrans is an instance -- of PMonadTrans class PMonadTrans t m a plift :: (PMonadTrans t m a, Monad m) => m a -> t m a -- | adapted from the MonadCatchIO-mtl package. However, in tis -- case is needed to express serializable constraints about the returned -- values, so the usual class definitions for lifting IO functions are -- not suitable. class MonadCatchIO m a catch :: (MonadCatchIO m a, Exception e) => m a -> (e -> m a) -> m a block :: MonadCatchIO m a => m a -> m a unblock :: MonadCatchIO m a => m a -> m a class MonadIO io => HasFork io fork :: HasFork io => io () -> io ThreadId -- | Generalized version of throwIO throw :: (MonadIO m, Exception e) => e -> m a -- | Indexable is an utility class used to derive instances of IResource -- -- Example: -- --
-- data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
-- data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)
--
--
-- Since Person and Car are instances of Read ans Show, by
-- defining the Indexable instance will implicitly define the
-- IResource instance for file persistence:
--
--
-- instance Indexable Person where key Person{pname=n} = "Person " ++ n
-- instance Indexable Car where key Car{cname= n} = "Car " ++ n
--
class Indexable a
key :: Indexable a => a -> String
defPath :: Indexable a => a -> String
-- | Return the unique name of a workflow with a parameter (executed with
-- exec or start)
keyWF :: Indexable a => String -> a -> String
-- | Start or continue a workflow . WFErrors and exceptions are
-- returned as Left err (even if they were triggered as
-- exceptions). Other exceptions are returned as Left (Exception
-- e) use killWF or delWF in case of erro to clear the
-- log.
start :: (MonadCatchIO m, MonadIO m, Indexable a, Serialize a, Typeable a) => String -> (a -> Workflow m b) -> a -> m (Either WFErrors b)
-- | Start or continue a workflow with exception handling the workflow
-- flags are updated even in case of exception WFerrors are
-- raised as exceptions
exec :: (Indexable a, Serialize a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m b
-- | A version of exec1 that deletes its state after complete execution or
-- thread killed
exec1d :: (MonadIO m, MonadCatchIO m) => String -> (Workflow m b) -> m b
-- | A version of exec with no seed parameter.
exec1 :: (Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a
-- | executes a workflow, but does not mark it as finished even if the
-- process ended. The workflow will return the las result.
exec1nc :: (Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a
-- | Start or restart an anonymous workflow inside another workflow. Its
-- state is deleted when finished and the result is stored in the
-- parent's WF state.
wfExec :: (Serialize a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m a
-- | Start or continue a workflow from a list of workflows with exception
-- handling. see start for details about exception and error
-- handling
startWF :: (MonadCatchIO m, MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a) => String -> a -> WorkflowList m a b -> m (Either WFErrors b)
-- | Re-start the non finished workflows in the list, for all the initial
-- values that they may have been invoked
restartWorkflows :: (Serialize a, Typeable a) => WorkflowList IO a b -> IO ()
-- | Return conditions from the invocation of start/restart primitives
data WFErrors
NotFound :: WFErrors
AlreadyRunning :: WFErrors
Timeout :: WFErrors
WFException :: String -> WFErrors
-- | Lifts a monadic computation to the WF monad, and provides transparent
-- state loging and resuming the computation Note: Side effect can be
-- repeated at recovery time if the log was not complete before shut down
-- see the integer sequence example, above.
step :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a
getWFStat :: Monad m => Workflow m (DBRef Stat)
stepExec :: (Typeable t, Serialize t, MonadIO m) => DBRef Stat -> m t -> m (DBRef Stat, t)
-- | Executes a computation inside of the workflow monad whatever the monad
-- encapsulated in the workflow. Warning: this computation is executed
-- whenever the workflow restarts, no matter if it has been already
-- executed previously. This is useful for intializations or debugging.
-- To avoid re-execution when restarting use: step $
-- unsafeIOtoWF...
--
-- To perform IO actions in a workflow that encapsulates an IO monad, use
-- step over the IO action directly:
--
-- -- step $ action ---- -- instead of -- --
-- step $ unsafeIOtoWF $ action --unsafeIOtoWF :: Monad m => IO a -> Workflow m a data WFRef a -- | Log a value in the workflow log and return a reference to it. -- --
-- newWFRef x= stepWFRef (return x) >>= return . fst --newWFRef :: (Serialize a, Typeable a, MonadIO m, MonadCatchIO m) => a -> Workflow m (WFRef a) -- | Execute an step and return a reference to the result besides the -- result itself stepWFRef :: (Serialize a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a, a) -- | Read the content of a Workflow reference. Note that its result is not -- in the Workflow monad readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a) getWFRef :: Indexable a => Int -> String -> a -> WFRef b -- | Writes a new value en in the workflow reference, that is, in the -- workflow log. Why would you use this?. Don't do that!. modifiying the -- content of the workflow log would change the excution flow when the -- workflow restarts. This metod is used internally in the package. The -- best way to communicate with a workflow is trough a persistent queue, -- using Data.Persistent.Collection: -- --
-- worflow= exec1 wf do -- r <- stepWFRef expr -- push "queue" r -- back <- pop "queueback" -- ... --writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM () -- | Moves the state of workflow with a seed value to become the state of -- other seed value This may be of interest when the entry value changes -- its key value but should not initiate a new workflow but continues -- with the current one moveState :: (MonadIO m, Indexable a, Serialize a, Typeable a) => String -> a -> a -> m () -- | wait until the workflow is restarted waitWFActive :: String -> STM () -- | Return all the steps of the workflow log. The values are dynamic -- -- to get all the steps with result of type Int: all <- -- getAll let lfacts = mapMaybe safeFromIDyn all :: -- [Int] getAll :: Monad m => Workflow m [IDynamic] safeFromIDyn :: (Typeable a, Serialize a) => IDynamic -> Either String a -- | Return the keys of the workflows that are running with a given prefix getWFKeys :: String -> IO [String] -- | Return the current state of the computation, in the IO monad getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat) -- | Observe the workflow log untiil a condition is met. waitFor :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> IO b waitForSTM :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> STM b -- | Wait until a certain clock time has passed by monitoring its flag, in -- the STM monad. This permits to compose timeouts with locks waiting for -- data using orElse -- --
-- flag <- getTimeoutFlag $ 5 * 60 -- ap <- step . atomically $ readSomewhere >>= return . Just orElse waitUntilSTM flag >> return Nothing -- case ap of -- Nothing -> do logWF timeout ... -- Just x -> do logWF $ received ++ show x ... --waitUntilSTM :: TVar Bool -> STM () -- | Start the timeout and return the flag to be monitored by -- waitUntilSTM This timeout is persistent. This means that the -- time start to count from the first call to getTimeoutFlag on no matter -- if the workflow is restarted. The time that the worlkflow has been -- stopped count also. the wait time can exceed the time between -- failures. when timeout is 0 means no timeout. getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool) -- | Return either the result of the STM conputation or Nothing in case of -- timeout This timeout is persistent. This means that the time start to -- count from the first call to getTimeoutFlag on no matter if the -- workflow is restarted. The time that the worlkflow has been stopped -- count also. Thus, the wait time can exceed the time between failures. -- when timeout is 0 means no timeout. withTimeout :: (MonadIO m, Typeable a, Serialize a) => Integer -> STM a -> Workflow m (Maybe a) -- | Executes a computation understanding that it is inside the workflow -- identified by id. If f finish after time it -- genetates a Timeout exception which must result in the end of -- the workflow. If the workflow is restarted after time2 has -- elapsed, the workflow will restart from the beginning. If not, it will -- restart at the last checkpoint. -- -- Usually time2> time -- -- time2=0 means time2 is infinite withKillTimeout :: MonadCatchIO m => String -> Int -> Integer -> m a -> m a -- | Log a message in the workflow history. I can be printed out with -- printWFhistory The message is printed in the standard output -- too logWF :: MonadIO m => String -> Workflow m () clearRunningFlag :: MonadIO m => [Char] -> m (Map String (String, Maybe ThreadId), Maybe ThreadId) -- | Kill the executing thread if not killed, but not its state. -- exec start or restartWorkflows will continue the -- workflow killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m () -- | Kill the process (if running) and drop it from the list of -- restart-able workflows. Its state history remains , so it can be -- inspected with getWfHistory printWFHistory and so on killWF :: (Indexable a, MonadIO m) => String -> a -> m () -- | Delete the WF from the running list and delete the workflow state from -- persistent storage. Use it to perform cleanup if the process has been -- killed. delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m () -- | A version of KillThreadWF for workflows started wit no -- parameter by exec1 killThreadWF1 :: MonadIO m => String -> m () -- | A version of KillWF for workflows started wit no parameter by -- exec1 killWF1 :: MonadIO m => String -> m () -- | A version of delWF for workflows started wit no parameter by -- exec1 delWF1 :: MonadIO m => String -> m () -- | Delete the history of a workflow. Be sure that this WF has finished. delWFHistory :: Indexable a => [Char] -> a -> IO () delWFHistory1 :: [Char] -> IO () -- | Specify the cache synchronization policy with permanent storage. See -- SyncMode for details syncWrite :: SyncMode -> IO () data SyncMode :: * -- | sync state to permanent storage when atomicallySync is invoked Synchronous :: SyncMode Asyncronous :: Int -> (Integer -> Integer -> Integer -> Bool) -> Int -> SyncMode -- | number of seconds between saves when asyncronous frecuency :: SyncMode -> Int -- | The user-defined check-for-cleanup-from-cache for each object. -- defaultCheck is an example check :: SyncMode -> Integer -> Integer -> Integer -> Bool -- | size of the cache when async cacheSize :: SyncMode -> Int -- | use syncCache to write the state SyncManual :: SyncMode -- | show the state changes along the workflow, that is, all the -- intermediate results showHistory :: Stat -> ByteString isInRecover :: Monad m => Workflow m Bool instance [overlap ok] Typeable WFErrors instance [overlap ok] Typeable WFInfo instance [overlap ok] Read WFErrors instance [overlap ok] Show WFErrors instance [overlap ok] Read WFInfo instance [overlap ok] Show WFInfo instance [overlap ok] Exception WFErrors instance [overlap ok] (HasFork io, MonadIO io, MonadCatchIO io) => HasFork (WF Stat io) instance [overlap ok] HasFork IO instance [overlap ok] (Serialize a, Typeable a, MonadIO m, MonadCatchIO m) => MonadCatchIO (WF Stat m) a instance [overlap ok] MonadIO m => MonadIO (WF Stat m) instance [overlap ok] (MonadTrans t, Monad m) => PMonadTrans t m a instance [overlap ok] (Monad m, MonadIO m, Serialize a, Typeable a) => PMonadTrans (WF Stat) m a instance [overlap ok] (Monad m, Functor m) => Functor (Workflow m) instance [overlap ok] Monad m => Monad (WF s m) -- | Helpers for application initialization module Control.Workflow.Configuration -- | to execute one computation once . It executes at the first run only once :: (Typeable a, Serialize a, MonadIO m) => m a -> Workflow m a -- | to execute a computation every time it is invoked. A synonimous of -- unsafeIOtoWF ever :: (Typeable a, Serialize a, MonadIO m) => IO a -> Workflow m a -- | executes a computation with once and ever statements a -- synonym of exec1nc runConfiguration :: (Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a -- | This module contains monadic combinators that express some workflow -- patterns. see the docAprobal.hs example included in the package -- -- EXAMPLE: -- -- This fragment below describes the approbal procedure of a document. -- First the document reference is sent to a list of bosses trough a -- queue. ithey return a boolean in a return queue ( askUser) the -- booleans are summed up according with a monoid instance (sumUp) -- -- if the resullt is false, the correctWF workflow is executed If the -- result is True, the pipeline continues to the next stage -- (checkValidated) -- -- the next stage is the same process with a new list of users -- (superbosses). There is a timeout of seven days. The result of the -- users that voted is summed up according with the same monoid instance -- -- if the result is true the document is added to the persistent list of -- approbed documents if the result is false, the document is added to -- the persistent list of rejectec documents (checlkValidated1) -- -- The program can be interrupted at any moment. The Workflow monad will -- restartWorkflows it at the point where it was interrupted. -- -- This example uses queues from Data.Persistent.Queue -- --
-- docApprobal :: Document -> Workflow IO ()
-- docApprobal doc = getWFRef >>= docApprobal1
--
-- docApprobal1 rdoc=
-- return True >>=
-- log "requesting approbal from bosses" >>=
-- sumUp 0 (map (askUser doc rdoc) bosses) >>=
-- checkValidated >>=
-- log "requesting approbal from superbosses or timeout" >>=
-- sumUp (7*60*60*24) (map(askUser doc rdoc) superbosses) >>=
-- checkValidated1
--
-- askUser _ _ user False = return False
-- askUser doc rdoc user True = do
-- step $ push (quser user) rdoc
-- logWF ("wait for any response from the user: " ++ user)
-- step . pop $ qdocApprobal (title doc)
--
-- log txt x = logWF txt >> return x
--
-- checkValidated :: Bool -> Workflow IO Bool
-- checkValidated val =
-- case val of
-- False -> correctWF (title doc) rdoc >> return False
-- _ -> return True
--
-- checkValidated1 :: Bool -> Workflow IO ()
-- checkValidated1 val = step $ do
-- case val of
-- False -> push qrejected doc
-- _ -> push qapproved doc
-- mapM (u ->deleteFromQueue (quser u) rdoc) superbosses
--
module Control.Workflow.Patterns
-- | spawn a list of independent workflow actions with a seed
-- value a The results are reduced by merge or
-- select
split :: (Typeable b, Serialize b, HasFork io, MonadCatchIO io) => [a -> Workflow io b] -> a -> Workflow io [ActionWF b]
-- | wait for the results and apply the cond to produce a single output in
-- the Workflow monad
merge :: (MonadIO io, Typeable a, Typeable b, Serialize a, Serialize b) => ([a] -> io b) -> [ActionWF a] -> Workflow io b
-- | select the outputs of the workflows produced by split
-- constrained within a timeout. The check filter, can select , discard
-- or finish the entire computation before the timeout is reached. When
-- the computation finalizes, it kill all the pending workflows and
-- return the list of selected outputs the timeout is in seconds and it
-- is is in the workflow monad, so it is possible to restart the process
-- if interrupted, so it can proceed for years.
--
-- This is necessary for the modelization of real-life institutional
-- cycles such are political elections A timeout of 0 means no timeout.
select :: (Serialize a, Typeable a, HasFork io, MonadCatchIO io) => Integer -> (a -> STM Select) -> [ActionWF a] -> Workflow io [a]
-- | spawn a list of workflows and reduces the results according with the
-- comp parameter within a given timeout
--
-- -- vote timeout actions comp x= -- split actions x >>= select timeout (const $ return Select) >>= comp --vote :: (Serialize b, Typeable b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> ([b] -> Workflow io c) -> a -> Workflow io c -- | sum the outputs of a list of workflows according with its monoid -- definition -- --
-- sumUp timeout actions = vote timeout actions (return . mconcat) --sumUp :: (Serialize b, Typeable b, Monoid b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> a -> Workflow io b data Select -- | select the source output Select :: Select -- | Discard the source output Discard :: Select -- | Continue the source process Continue :: Select -- | Discard this output, kill all and return the selected outputs FinishDiscard :: Select -- | Select this output, kill all and return the selected outputs FinishSelect :: Select instance [overlap ok] Typeable Select instance [overlap ok] Read Select instance [overlap ok] Show Select instance [overlap ok] Exception Select