Workflow-0.6.0.0: library for transparent execution of interruptible computations

Safe HaskellNone

Control.Workflow

Contents

Description

A workflow can be seen as a persistent thread. The workflow monad writes a log that permit to restore the thread at the interrupted point. step is the (partial) monad transformer for the Workflow monad. A workflow is defined by its name and, optionally by the key of the single parameter passed. The primitives for starting workflows also restart the workflow when it has been in execution previously.

This is the main module that uses the RefSerialize paclkage for serialization. Here the constraint DynSerializer w r a is equivalent to Data.RefSerialize a

For workflows that uses big structures, for example, documents use this module in combination with the RefSerialize package to define the (de)serialization instances The log size will be reduced. printWFHistory` method will print the structure changes in each step.

If instead of RefSerialize, you define read and show instances, there will be no reduction. but still the log will be readable for debugging purposes.

for workflows that does not care about this, use the binary alternative: Control.Workflow.Binary

A small example that print the sequence of integers in te console if you interrupt the progam, when restarted again, it will start from the last printed number

module Main where
import Control.Workflow.Text
import Control.Concurrent(threadDelay)
import System.IO (hFlush,stdout)

mcount n= do step $  do
                       putStr (show n ++ " ")
                       hFlush stdout
                       threadDelay 1000000
             mcount (n+1)
             return () -- to disambiguate the return type

main= exec1  "count"  $ mcount (0 :: Int)

Synopsis

Documentation

type Workflow m = WF Stat mSource

type WorkflowList m a b = [(String, a -> Workflow m b)]Source

class PMonadTrans t m a whereSource

PMonadTrans permits |to define a partial monad transformer. They are not defined for all kinds of data but the ones that have instances of certain classes.That is because in the lift instance code there are some hidden use of these classes. This also may permit an accurate control of effects. An instance of MonadTrans is an instance of PMonadTrans

Methods

plift :: Monad m => m a -> t m aSource

Instances

(MonadTrans t, Monad m) => PMonadTrans t m a

An instance of MonadTrans is an instance of PMonadTrans

(Monad m, MonadIO m, Serialize a, Typeable a) => PMonadTrans (WF Stat) m a

plift= step

class MonadCatchIO m a whereSource

adapted from MonadCatchIO-mtl. Workflow need to express serializable constraints about the returned values, so the usual class definitions for lifting IO functions are not suitable.

Methods

catch :: Exception e => m a -> (e -> m a) -> m aSource

Generalized version of catch

block :: m a -> m aSource

Generalized version of block

unblock :: m a -> m aSource

Generalized version of unblock

Instances

(Serialize a, Typeable a, MonadIO m, MonadCatchIO m) => MonadCatchIO (WF Stat m) a 

throw :: (MonadIO m, Exception e) => e -> m aSource

Generalized version of throwIO

class Indexable a where

Indexable is an utility class used to derive instances of IResource

Example:

data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)

Since Person and Car are instances of Read ans Show, by defining the Indexable instance will implicitly define the IResource instance for file persistence:

instance Indexable Person where  key Person{pname=n} = "Person " ++ n
instance Indexable Car where key Car{cname= n} = "Car " ++ n

Methods

key :: a -> String

defPath

Arguments

:: a 
-> String

additional extension for default file paths. The default value is data/.

Start/restart workflows

startSource

Arguments

:: (Monad m, MonadIO m, Indexable a, Serialize a, Serialize b, Typeable a, Typeable b) 
=> String

name thar identifies the workflow.

-> (a -> Workflow m b)

workflow to execute

-> a

initial value (ever use the initial value for restarting the workflow)

-> m (Either WFErrors b)

result of the computation

start or continue a workflow with no exception handling. | the programmer has to handle inconsistencies in the workflow state | using killWF or delWF in case of exception.

exec :: (Indexable a, Serialize a, Serialize b, Typeable a, Typeable b, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m bSource

start or continue a workflow with exception handling | the workflow flags are updated even in case of exception | WFerrors are raised as exceptions

exec1d :: (Serialize b, Typeable b, MonadCatchIO m) => String -> Workflow m b -> m bSource

a version of exec1 that deletes its state after complete execution or thread killed

exec1 :: (Serialize a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m aSource

a version of exec with no seed parameter.

wfExec :: (Indexable a, Serialize a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m aSource

start or restart an anonymous workflow inside another workflow its state is deleted when finished and the result is stored in the parent's WF state.

startWFSource

Arguments

:: (MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a, Typeable b) 
=> String

name of workflow in the workflow list

-> a

initial value (ever use the initial value even to restart the workflow)

-> WorkflowList m a b

function to execute

-> m (Either WFErrors b)

result of the computation

start or continue a workflow from a list of workflows in the IO monad with exception handling. The excepton is returned as a Left value

restartWorkflows :: (Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => WorkflowList IO a b -> IO ()Source

re-start the non finished workflows in the list, for all the initial values that they may have been called

data WFErrors Source

return conditions from the invocation of start/restart primitives

Constructors

NotFound 
AlreadyRunning 
Timeout 
forall e . Exception e => Exception e 

Lifting to the Workflow monad

step :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m aSource

lifts a monadic computation to the WF monad, and provides transparent state loging and resuming of computation

stepControl :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m aSource

permits modification of the workflow state by the procedure being lifted if the boolean value is True. This is used internally for control purposes

unsafeIOtoWF :: Monad m => IO a -> Workflow m aSource

executes a computation inside of the workflow monad whatever the monad encapsulated in the workflow. Warning: this computation is executed whenever the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging. To avoid re-execution when restarting use: step $ unsafeIOtoWF...

To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly:

 step $ action

instead of

  step $ unsafeIOtoWF $ action

References to intermediate values in the workflow log

getWFRef :: (Monad m, MonadIO m, Serialize a, Typeable a) => Workflow m (WFRef a)Source

Return the reference to the last logged result , usually, the last result stored by step. wiorkflow references can be accessed outside of the workflow . They also can be (de)serialized.

WARNING getWFRef can produce casting errors when the type demanded do not match the serialized data. Instead, newDBRef and stepWFRef are type safe at runtuime.

newWFRef :: (Serialize a, Typeable a, MonadIO m) => a -> Workflow m (WFRef a)Source

Log a value and return a reference to it.

newWFRef x= step $ return x >>= getWFRef

stepWFRef :: (Serialize a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a)Source

Execute an step but return a reference to the result instead of the result itself

stepWFRef exp= step exp >>= getWFRef

readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a)Source

Read the content of a Workflow reference. Note that its result is not in the Workflow monad

writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM ()Source

Writes a new value en in the workflow reference, that is, in the workflow log. Why would you use this?. Don do that!. modifiying the content of the workflow log would change the excution flow when the workflow restarts. This metod is used internally in the package the best way to communicate with a workflow is trough a persistent queue:

worflow= exec1 wf do
         r <- stepWFRef  expr
         push "queue" r
         back <- pop "queueback"
         ...

Workflow inspect

getAll :: Monad m => Workflow m [IDynamic]Source

return all the steps of the workflow log. The values are dynamic

to get all the steps with result of type Int: all <- getAll let lfacts = mapMaybe safeFromIDyn all :: [Int]

safeFromIDyn :: (Typeable a, Serialize a) => IDynamic -> Maybe aSource

getWFKeys :: String -> IO [String]Source

return the list of object keys that are running for a workflow

getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat)Source

return the current state of the computation, in the IO monad

waitForSource

Arguments

:: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) 
=> (b -> Bool)

The condition that the retrieved object must meet

-> String

The workflow name

-> a

the INITIAL value used in the workflow to start it

-> IO b

The first event that meet the condition

observe the workflow log untiil a condition is met.

waitForSTMSource

Arguments

:: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) 
=> (b -> Bool)

The condition that the retrieved object must meet

-> String

The workflow name

-> a

The INITIAL value used in the workflow to start it

-> STM b

The first event that meet the condition

Persistent timeouts

waitUntilSTM :: TVar Bool -> STM ()Source

Wait until a certain clock time has passed by monitoring its flag, in the STM monad. This permits to compose timeouts with locks waiting for data using orElse

  • example: wait for any respoinse from a Queue if no response is given in 5 minutes, it is returned True.
   flag <- getTimeoutFlag $  5 * 60
   ap - 'step'  .  atomically $  readSomewhere >= return . Just  orElse  waitUntilSTM flag  >> return Nothing
   case ap of
        Nothing -> do logWF timeout ...
        Just x -> do logWF $ received ++ show x ...

getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)Source

Start the timeout and return the flag to be monitored by waitUntilSTM This timeout is persistent. This means that the time start to count from the first call to getTimeoutFlag on no matter if the workflow is restarted. The time that the worlkflow has been stopped count also. the wait time can exceed the time between failures. when timeout is 0 means no timeout.

Trace logging

logWF :: (Monad m, MonadIO m) => String -> Workflow m ()Source

Log a message in the workflow history. I can be printed out with printWFhistory The message is printed in the standard output too

Termination of workflows

killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m ()Source

kill the executing thread if not killed, but not its state. exec start or restartWorkflows will continue the workflow

killWF :: (Indexable a, MonadIO m) => String -> a -> m ()Source

kill the process (if running) and drop it from the list of restart-able workflows. Its state history remains , so it can be inspected with getWfHistory printWFHistory and so on

delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m ()Source

delete the WF from the running list and delete the workflow state from persistent storage. Use it to perform cleanup if the process has been killed.

killThreadWF1 :: MonadIO m => String -> m ()Source

a version of KillThreadWF for workflows started wit no parameter by exec1

killWF1 :: MonadIO m => String -> m ()Source

a version of KillWF for workflows started wit no parameter by exec1

delWF1 :: MonadIO m => String -> m ()Source

a version of delWF for workflows started wit no parameter by exec1

Log writing policy

syncWrite :: (Monad m, MonadIO m) => SyncMode -> m ()Source

The execution log is cached in memory using the package TCache. This procedure defines the polcy for writing the cache into permanent storage.

For fast workflows, or when TCache` is used also for other purposes , Asynchronous is the best option

Asynchronous mode invokes clearSyncCache. For more complex use of the syncronization please use this clearSyncCache.

When interruptions are controlled, use SyncManual mode and include a call to syncCache in the finalizaton code

data SyncMode Source

Constructors

Synchronous

write state after every step

Asyncronous 

Fields

frecuency :: Int

number of seconds between saves when asyncronous

cacheSize :: Int

size of the cache when async

SyncManual

use Data.TCache.syncCache to write the state

Instances

Print log history

printHistory :: Stat -> IO ()Source

print the state changes along the workflow, that is, all the intermediate results