Safe Haskell | None |
---|
A workflow can be seen as a persistent thread.
The workflow monad writes a log that permit to restore the thread
at the interrupted point. step
is the (partial) monad transformer for
the Workflow monad. A workflow is defined by its name and, optionally
by the key of the single parameter passed. The primitives for starting workflows
also restart the workflow when it has been in execution previously.
This is the main module that uses the RefSerialize
paclkage for serialization. Here the constraint DynSerializer w r a
is equivalent to
Data.RefSerialize a
For workflows that uses big structures, for example, documents use this module in combination with the RefSerialize package to define the (de)serialization instances The log size will be reduced. printWFHistory` method will print the structure changes in each step.
If instead of RefSerialize, you define read and show instances, there will be no reduction. but still the log will be readable for debugging purposes.
for workflows that does not care about this, use the binary alternative: Control.Workflow.Binary
A small example that print the sequence of integers in te console if you interrupt the progam, when restarted again, it will start from the last printed number
module Main where import Control.Workflow.Text import Control.Concurrent(threadDelay) import System.IO (hFlush,stdout) mcount n= dostep
$ do putStr (show n ++ " ") hFlush stdout threadDelay 1000000 mcount (n+1) return () -- to disambiguate the return type main=exec1
"count" $ mcount (0 :: Int)
- type Workflow m = WF Stat m
- type WorkflowList m a b = [(String, a -> Workflow m b)]
- class PMonadTrans t m a where
- class MonadCatchIO m a where
- throw :: (MonadIO m, Exception e) => e -> m a
- class Indexable a where
- start :: (Monad m, MonadIO m, Indexable a, Serialize a, Serialize b, Typeable a, Typeable b) => String -> (a -> Workflow m b) -> a -> m (Either WFErrors b)
- exec :: (Indexable a, Serialize a, Serialize b, Typeable a, Typeable b, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m b
- exec1d :: (Serialize b, Typeable b, MonadCatchIO m) => String -> Workflow m b -> m b
- exec1 :: (Serialize a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a
- wfExec :: (Indexable a, Serialize a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m a
- startWF :: (MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a, Typeable b) => String -> a -> WorkflowList m a b -> m (Either WFErrors b)
- restartWorkflows :: (Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => WorkflowList IO a b -> IO ()
- data WFErrors
- = NotFound
- | AlreadyRunning
- | Timeout
- | forall e . Exception e => Exception e
- step :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a
- stepControl :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a
- unsafeIOtoWF :: Monad m => IO a -> Workflow m a
- data WFRef a
- getWFRef :: (Monad m, MonadIO m, Serialize a, Typeable a) => Workflow m (WFRef a)
- newWFRef :: (Serialize a, Typeable a, MonadIO m) => a -> Workflow m (WFRef a)
- stepWFRef :: (Serialize a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a)
- readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a)
- writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM ()
- waitWFActive :: String -> STM ()
- getAll :: Monad m => Workflow m [IDynamic]
- safeFromIDyn :: (Typeable a, Serialize a) => IDynamic -> Maybe a
- getWFKeys :: String -> IO [String]
- getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat)
- waitFor :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> IO b
- waitForSTM :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> STM b
- waitUntilSTM :: TVar Bool -> STM ()
- getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)
- logWF :: (Monad m, MonadIO m) => String -> Workflow m ()
- clearRunningFlag :: MonadIO m => [Char] -> m (Map String (String, Maybe ThreadId), Maybe ThreadId)
- killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m ()
- killWF :: (Indexable a, MonadIO m) => String -> a -> m ()
- delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m ()
- killThreadWF1 :: MonadIO m => String -> m ()
- killWF1 :: MonadIO m => String -> m ()
- delWF1 :: MonadIO m => String -> m ()
- delWFHistory :: Indexable a => String -> a -> IO ()
- delWFHistory1 :: String -> IO ()
- syncWrite :: (Monad m, MonadIO m) => SyncMode -> m ()
- data SyncMode
- = Synchronous
- | Asyncronous { }
- | SyncManual
- printHistory :: Stat -> IO ()
Documentation
type WorkflowList m a b = [(String, a -> Workflow m b)]Source
class PMonadTrans t m a whereSource
PMonadTrans permits |to define a partial monad transformer. They are not defined for all kinds of data but the ones that have instances of certain classes.That is because in the lift instance code there are some hidden use of these classes. This also may permit an accurate control of effects. An instance of MonadTrans is an instance of PMonadTrans
(MonadTrans t, Monad m) => PMonadTrans t m a | An instance of MonadTrans is an instance of PMonadTrans |
(Monad m, MonadIO m, Serialize a, Typeable a) => PMonadTrans (WF Stat) m a | plift= step |
class MonadCatchIO m a whereSource
adapted from MonadCatchIO-mtl. Workflow need to express serializable constraints about the returned values, so the usual class definitions for lifting IO functions are not suitable.
catch :: Exception e => m a -> (e -> m a) -> m aSource
Generalized version of catch
Generalized version of block
Generalized version of unblock
(Serialize a, Typeable a, MonadIO m, MonadCatchIO m) => MonadCatchIO (WF Stat m) a |
class Indexable a where
Indexable is an utility class used to derive instances of IResource
Example:
data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable) data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)
Since Person and Car are instances of Read
ans Show
, by defining the Indexable
instance
will implicitly define the IResource instance for file persistence:
instance Indexable Person where key Person{pname=n} = "Person " ++ n instance Indexable Car where key Car{cname= n} = "Car " ++ n
Start/restart workflows
exec :: (Indexable a, Serialize a, Serialize b, Typeable a, Typeable b, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m bSource
start or continue a workflow with exception handling
| the workflow flags are updated even in case of exception
| WFerrors
are raised as exceptions
exec1d :: (Serialize b, Typeable b, MonadCatchIO m) => String -> Workflow m b -> m bSource
a version of exec1 that deletes its state after complete execution or thread killed
exec1 :: (Serialize a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m aSource
a version of exec with no seed parameter.
wfExec :: (Indexable a, Serialize a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m aSource
start or restart an anonymous workflow inside another workflow its state is deleted when finished and the result is stored in the parent's WF state.
:: (MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a, Typeable b) | |
=> String | name of workflow in the workflow list |
-> a | initial value (ever use the initial value even to restart the workflow) |
-> WorkflowList m a b | function to execute |
-> m (Either WFErrors b) | result of the computation |
start or continue a workflow from a list of workflows in the IO monad with exception handling. The excepton is returned as a Left value
restartWorkflows :: (Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => WorkflowList IO a b -> IO ()Source
re-start the non finished workflows in the list, for all the initial values that they may have been called
return conditions from the invocation of start/restart primitives
NotFound | |
AlreadyRunning | |
Timeout | |
forall e . Exception e => Exception e |
Lifting to the Workflow monad
step :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m aSource
lifts a monadic computation to the WF monad, and provides transparent state loging and resuming of computation
stepControl :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m aSource
permits modification of the workflow state by the procedure being lifted if the boolean value is True. This is used internally for control purposes
unsafeIOtoWF :: Monad m => IO a -> Workflow m aSource
executes a computation inside of the workflow monad whatever the monad encapsulated in the workflow.
Warning: this computation is executed whenever
the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging.
To avoid re-execution when restarting use: step
$ unsafeIOtoWF...
To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly:
step
$ action
instead of
step
$ unsafeIOtoWF $ action
References to intermediate values in the workflow log
getWFRef :: (Monad m, MonadIO m, Serialize a, Typeable a) => Workflow m (WFRef a)Source
Return the reference to the last logged result , usually, the last result stored by step
.
wiorkflow references can be accessed outside of the workflow
. They also can be (de)serialized.
WARNING getWFRef can produce casting errors when the type demanded
do not match the serialized data. Instead, newDBRef
and stepWFRef
are type safe at runtuime.
readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a)Source
Read the content of a Workflow reference. Note that its result is not in the Workflow monad
writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM ()Source
Writes a new value en in the workflow reference, that is, in the workflow log. Why would you use this?. Don do that!. modifiying the content of the workflow log would change the excution flow when the workflow restarts. This metod is used internally in the package the best way to communicate with a workflow is trough a persistent queue:
worflow= exec1 wf do r <-stepWFRef
exprpush
"queue" r back <-pop
"queueback" ...
Workflow inspect
waitWFActive :: String -> STM ()Source
getAll :: Monad m => Workflow m [IDynamic]Source
return all the steps of the workflow log. The values are dynamic
to get all the steps with result of type Int:
all <-
getAll
let lfacts = mapMaybe safeFromIDyn
all :: [Int]
safeFromIDyn :: (Typeable a, Serialize a) => IDynamic -> Maybe aSource
getWFKeys :: String -> IO [String]Source
return the list of object keys that are running for a workflow
getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat)Source
return the current state of the computation, in the IO monad
:: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) | |
=> (b -> Bool) | The condition that the retrieved object must meet |
-> String | The workflow name |
-> a | the INITIAL value used in the workflow to start it |
-> IO b | The first event that meet the condition |
observe the workflow log untiil a condition is met.
Persistent timeouts
waitUntilSTM :: TVar Bool -> STM ()Source
Wait until a certain clock time has passed by monitoring its flag, in the STM monad.
This permits to compose timeouts with locks waiting for data using orElse
- example: wait for any respoinse from a Queue if no response is given in 5 minutes, it is returned True.
flag <-getTimeoutFlag
$ 5 * 60 ap - 'step' . atomically $ readSomewhere >= return . JustorElse
waitUntilSTM
flag >> return Nothing case ap of Nothing -> dologWF
timeout ... Just x -> dologWF
$ received ++ show x ...
getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)Source
Start the timeout and return the flag to be monitored by waitUntilSTM
This timeout is persistent. This means that the time start to count from the first call to getTimeoutFlag on
no matter if the workflow is restarted. The time that the worlkflow has been stopped count also.
the wait time can exceed the time between failures.
when timeout is 0 means no timeout.
Trace logging
logWF :: (Monad m, MonadIO m) => String -> Workflow m ()Source
Log a message in the workflow history. I can be printed out with printWFhistory
The message is printed in the standard output too
Termination of workflows
clearRunningFlag :: MonadIO m => [Char] -> m (Map String (String, Maybe ThreadId), Maybe ThreadId)Source
killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m ()Source
kill the executing thread if not killed, but not its state.
exec
start
or restartWorkflows
will continue the workflow
killWF :: (Indexable a, MonadIO m) => String -> a -> m ()Source
kill the process (if running) and drop it from the list of
restart-able workflows. Its state history remains , so it can be inspected with
getWfHistory
printWFHistory
and so on
delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m ()Source
delete the WF from the running list and delete the workflow state from persistent storage. Use it to perform cleanup if the process has been killed.
killThreadWF1 :: MonadIO m => String -> m ()Source
a version of KillThreadWF
for workflows started wit no parameter by exec1
killWF1 :: MonadIO m => String -> m ()Source
a version of KillWF
for workflows started wit no parameter by exec1
delWFHistory :: Indexable a => String -> a -> IO ()Source
delWFHistory1 :: String -> IO ()Source
Log writing policy
syncWrite :: (Monad m, MonadIO m) => SyncMode -> m ()Source
The execution log is cached in memory using the package TCache
. This procedure defines the polcy for writing the cache into permanent storage.
For fast workflows, or when TCache` is used also for other purposes , Asynchronous
is the best option
Asynchronous
mode invokes clearSyncCache
. For more complex use of the syncronization
please use this clearSyncCache
.
When interruptions are controlled, use SyncManual
mode and include a call to syncCache
in the finalizaton code
Synchronous | write state after every step |
Asyncronous | |
SyncManual | use Data.TCache.syncCache to write the state |
Print log history
printHistory :: Stat -> IO ()Source
print the state changes along the workflow, that is, all the intermediate results