Workflow-0.5.5: library for transparent execution of interruptible computations

Control.Workflow

Description

Transparent support for interruptable computations. A workflow can be seen as a persistent thread. The main features are:

  • Transparent state logging trough a monad transformer: step :: m a -> Workflow m a.
  • Resume the computation state after an accidental o planned program shutdown (restartWorkflows).
  • Event handling (waithFor, waitForData).
  • Monitoring of workflows with state change display and other auxiliary features.
  • Communications with other processes including other workflows trough persistent data objects, inspecttion of intermediate workflow results , Queues so that no data is lost due to shutdowns

In this way, a very long computation that may take more time than the average time between hardware or software failures, shutdowns etc. The workflow can be defined transparently in a single monadic procedure. Besides state logging and recovery, there are a number of communication primitives that are aware of persistence across reinitiations such are persistent queues, persistent timeouts, or wait for events in the STM monad. These primitives permits inter-woikflow communications and communications with external threads.

This package uses TCache for persistence and event handling.

It also uses the package Refserialize. This package permits to reduce the workflow state load, since the RefSerialize package permits to serialize and deserialize complex and autoreferenced data structures without loosing such references, this is critical when big and structured data, such are documents, suffer little modifications across a set of workflow steps. Therefore, it is also recommended to use Refserialize for big user-defined objects that have small pieces that suffer little modifications during the workflow. As an added bonus, the history will show such changes with more detail.

The step primitive is the lift operation that converts a result of type m a to a type Workflow m a with automatic state loggin and recovery. To allow such features, Every a must be instance of Typeable and IResource (defined in the TCache package).

In fact, Workflow can be considered as an instance of a partial monad transformed. defined as such:

class PMonadTrans  t m a  where

plift :: Monad m => m a -> t m a

instance  (Monad m,MonadIO m, IResource  a, Typeable a)

=> PMonadTrans (WF Stat)  m a where

plift = step

It is partial because the lift operation is not defined for every monad m and data type a , but for monads and data types that meet certain conditions. In this case, to be instances of MonadIO, IResource and Typeable respectively.

to avoid to define the last two interfaces however, Read and Show' can be used to derive instances of IResource for most of the useful cases. This is the set of automatic derivations:

(Read a, Show a) =>  Serialize a

Typeable a => Indexable a    (a single key for all values. enough for workflows)

(Indexable a, Serialize a) => IResource a

Therefore deriving to be instance of Read, Show is enough for every intermediate data result along the computation

Because Data.TCache.Dynamic from the package TCache is used for persistence, every data type must be registered by using registerType

Here is a compkete example: This is a counter that shows a sequence of numbers, one a second:

module Main where
import Control.Concurrent(threadDelay)
import System.IO (hFlush,stdout)

count n= do
                    putStr (show n ++  ) >> hFlush stdout >> threadDelay 1000000
                    count (n+1)

main= count 0

This is the same program, with the added feature of remembering the last count after interrupted:

module Main where
import Control.Workflow
import Control.Concurrent(threadDelay)
import System.IO (hFlush,stdout)

mcount n= do
            step $  putStr (show n ++  ) >> hFlush stdout >> threadDelay 1000000
            mcount (n+1)

main= do
   registerType :: IO ()
   registerType :: IO Int
   let start= 0 :: Int
   startWF  count  start   [(count, mcount)] :: IO ()

This is the execution log:

Worflow-0.5.5demos>runghc sequence.hs
0 1 2 3 4 5 6 7 sequence.hs: win32ConsoleHandler
sequence.hs: sequence.hs: interrupted
Worflow-0.5.5demos>
Worflow-0.5.5demos>runghc sequence.hs
7 8 9 10 11 ....

Synopsis

Documentation

type Workflow m l = WF Stat m lSource

type WorkflowList m a b = [(String, a -> Workflow m b)]Source

class IResource a where

Interface that must be defined for every object being cached. readResource and writeResource are implemented by default as read-write to files with its key as filename serialize and deserialize are specified just to allow these defaults. If you define your own persistence, then serialize and deserialize are not needed. The package Workflow need them anyway.

minimal definition: keyResource, serialize, deserialize

While serialize and deserialize are agnostic about the way of converison to strings, either binary or textual, treadp and tshowp use the monad defined in the RefSerialize package. Both ways of serialization are alternative. one is defined by default in terms of the other. the RefSerialize monad has been introduced to permit IResource objects to be serialized as part of larger structures that embody them. This is necessary for the Workdlow package.

The keyResource string must be a unique since this is used to index it in the hash table. when accessing a resource, the user must provide a partial object for wich the key can be obtained. for example:

data Person= Person{name, surname:: String, account :: Int ....)

keyResource Person n s ...= n++s

the data being accesed must have the fields used by keyResource filled. For example

  readResource Person {name=John, surname= Adams}

leaving the rest of the fields undefined

IResource has defaults definitions for all the methods except keyResource Either one or other serializer must be defiened for default witeResource, readResource and delResource

Methods

keyResource

Arguments

:: a 
-> String

must be defined

serialize

Arguments

:: a 
-> String

must be defined by the user

deserialize

Arguments

:: String 
-> a

must be defined by the user

tshowp

Arguments

:: a 
-> ST String

serializer in the RefSerialize monad. Either one or other serializer must be defined to use default persistence

treadp

Arguments

:: ST a

deserialize in the RefSerilzlize monad.

defPath

Arguments

:: a 
-> String

additional extension for default file paths or key prefixes

readResource :: a -> IO (Maybe a)

writeResource :: a -> IO ()

delResource :: a -> IO ()

registerType

Arguments

:: DynamicInterface x 
=> IO x

registers the deserialize, readp and readResource methods for this data type

class PMonadTrans t m a whereSource

PMonadTrans permits |to define a partial monad transformer. They are not defined for all kinds of data but the ones that have instances of certain classes.That is because in the lift instance code there are some hidden use of these classes. This also may permit an accurate control of effects. An instance of MonadTrans is an instance of PMonadTrans

Methods

plift :: Monad m => m a -> t m aSource

Instances

(MonadTrans t, Monad m) => PMonadTrans t m a

An instance of MonadTrans is an instance of PMonadTrans

(Monad m, MonadIO m, IResource a, Typeable a) => PMonadTrans (WF Stat) m a

plift= step

class Indexable a whereSource

Indexablle can be used to derive instances of IResource This is the set of automatic derivations:

  • (Read a, Show a) => Serialize a
  • Typeable a => Indexable a (a single key for all values. enough for workflows)
  • (Indexable a, Serialize a) => IResource a

Methods

key :: a -> StringSource

Instances

step :: (Monad m, MonadIO m, IResource a, Typeable a) => m a -> Workflow m aSource

step lifts a monadic computation to the WF monad, and provides transparent state loging and resume of computation

startWFSource

Arguments

:: (Monad m, MonadIO m, IResource a, Typeable a, IResource b, Typeable b) 
=> String

name of workflow in the workflow list

-> a

initial value (even use the initial value even to restart the workflow)

-> WorkflowList m a b

workflow list. t is an assoc-list of (workflow name string,Workflow methods)

-> m b

result of the computation

start or continue a workflow.

restartWorkflows :: (IResource a, Typeable a, IResource b, Typeable b) => WorkflowList IO a b -> IO ()Source

re-start the non finished workflows started for all initial values that are listed in the workflow list

getStepSource

Arguments

:: (IResource a, Typeable a, Monad m) 
=> Int

the step number. If negative, count from the current state backwards

-> Workflow m a

return the n-tn intermediate step result

getAll :: (IResource a, Typeable a, Monad m) => WF Stat m [a]Source

return all the intermediate results. it is supposed that all the intermediate result have the same type.

logWF :: (Monad m, MonadIO m) => String -> Workflow m ()Source

log a message in the workflow history. I can be printed out with printWFhistory

getWFKeys :: String -> IO [String]Source

return the list of object keys that are running

getWFHistory :: IResource a => String -> a -> IO (Maybe Stat)Source

return the current state of the computation, in the IO monad

delWFHistory :: IResource a => String -> a -> IO ()Source

delete the workflow. Make sure that the workdlow is not running

printHistory :: Stat -> IO ()Source

print the state changes along the workflow, that is, all the intermediate results

unsafeIOtoWF :: Monad m => IO a -> Workflow m aSource

executes a IO computation inside of the workflow monad whatever the monad encapsulated in the workflow. Warning: this computation is executed whenever the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging. To avoid re-execution when restarting use: step $ unsafeIOtoWF...

To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly:

 step $ action

instead of

  step $ unsafeIOtoWF $ action

waitForSource

Arguments

:: (IResource a, Typeable a, IResource b, Typeable b) 
=> (b -> Bool)

The condition that the retrieved object must meet

-> String

The workflow name

-> a

the INITIAL value used in the workflow to start it

-> IO b

The first event that meet the condition

waitUntilSTM :: TVar Bool -> STM ()Source

wait until a certain clock time, in the STM monad. This permits to compose timeouts with locks waiting for data.

  • example: wait for any respoinse from a Queue if no response is given in 5 minutes, it is returned True.
            flag <- getTimeoutFlag $  5 * 60
            ap - step  .  atomically $  readQueueSTM docQueue  `orElse`  waitUntilSTM flag  > return True
            case ap of
                    False -> logWF False or timeout >> correctWF doc
                    True -> do

syncWriteSource

Arguments

:: (Monad m, MonadIO m) 
=> Bool

True means syncronoys: changes are inmediately saved after each step

-> Int

number of seconds between saves when asyncronous

-> Int

size of the cache when async

-> WF Stat m ()

in the workflow monad

change the logging policy (default is syncronous) Workflow uses the package TCache for logging for very fast workflow steps or when TCache is used also for other purposes , asyncronous is a better option

writeQueue :: (IResource a, Typeable a) => String -> a -> IO ()Source

insert an element on top of the Queue Stack

writeQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()Source

Like writeQueue, but in the STM monad

readQueueSource

Arguments

:: (IResource a, Typeable a) 
=> String

Queue name

-> IO a

the returned elems

delete elements from the Queue stack and return them in the IO monad

readQueueSTM :: (IResource a, Typeable a) => String -> STM aSource

delete elements from the Queue stack an return them. in the STM monad

getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)Source

start the timeout and return the flag to be monitored by waitUntilSTM