Workflow-0.5.5: library for transparent execution of interruptible computationsContentsIndex
Control.Workflow
Description

Transparent support for interruptable computations. A workflow can be seen as a persistent thread. The main features are:

  • Transparent state logging trough a monad transformer: step :: m a -> Workflow m a.
  • Resume the computation state after an accidental o planned program shutdown (restartWorkflows).
  • Event handling (waithFor, waitForData).
  • Monitoring of workflows with state change display and other auxiliary features.
  • Communications with other processes including other workflows trough persistent data objects, inspecttion of intermediate workflow results , Queues so that no data is lost due to shutdowns

In this way, a very long computation that may take more time than the average time between hardware or software failures, shutdowns etc. The workflow can be defined transparently in a single monadic procedure. Besides state logging and recovery, there are a number of communication primitives that are aware of persistence across reinitiations such are persistent queues, persistent timeouts, or wait for events in the STM monad. These primitives permits inter-woikflow communications and communications with external threads.

This package uses TCache for persistence and event handling.

It also uses the package Refserialize. This package permits to reduce the workflow state load, since the RefSerialize package permits to serialize and deserialize complex and autoreferenced data structures without loosing such references, this is critical when big and structured data, such are documents, suffer little modifications across a set of workflow steps. Therefore, it is also recommended to use Refserialize for big user-defined objects that have small pieces that suffer little modifications during the workflow. As an added bonus, the history will show such changes with more detail.

The step primitive is the lift operation that converts a result of type m a to a type Workflow m a with automatic state loggin and recovery. To allow such features, Every a must be instance of Typeable and IResource (defined in the TCache package).

In fact, Workflow can be considered as an instance of a partial monad transformed. defined as such:

class PMonadTrans  t m a  where

plift :: Monad m => m a -> t m a

instance  (Monad m,MonadIO m, IResource  a, Typeable a)

=> PMonadTrans (WF Stat)  m a where

plift = step

It is partial because the lift operation is not defined for every monad m and data type a , but for monads and data types that meet certain conditions. In this case, to be instances of MonadIO, IResource and Typeable respectively.

to avoid to define the last two interfaces however, Read and Show' can be used to derive instances of IResource for most of the useful cases. This is the set of automatic derivations:

(Read a, Show a) =>  Serialize a

Typeable a => Indexable a    (a single key for all values. enough for workflows)

(Indexable a, Serialize a) => IResource a

Therefore deriving to be instance of Read, Show is enough for every intermediate data result along the computation

Because Data.TCache.Dynamic from the package TCache is used for persistence, every data type must be registered by using registerType

Here is a compkete example: This is a counter that shows a sequence of numbers, one a second:

module Main where
import Control.Concurrent(threadDelay)
import System.IO (hFlush,stdout)

count n= do
                    putStr (show n ++  ) >> hFlush stdout >> threadDelay 1000000
                    count (n+1)

main= count 0

This is the same program, with the added feature of remembering the last count after interrupted:

module Main where
import Control.Workflow
import Control.Concurrent(threadDelay)
import System.IO (hFlush,stdout)

mcount n= do
            step $  putStr (show n ++  ) >> hFlush stdout >> threadDelay 1000000
            mcount (n+1)

main= do
   registerType :: IO ()
   registerType :: IO Int
   let start= 0 :: Int
   startWF  count  start   [(count, mcount)] :: IO ()

This is the execution log:

Worflow-0.5.5demos>runghc sequence.hs
0 1 2 3 4 5 6 7 sequence.hs: win32ConsoleHandler
sequence.hs: sequence.hs: interrupted
Worflow-0.5.5demos>
Worflow-0.5.5demos>runghc sequence.hs
7 8 9 10 11 ....
Synopsis
type Workflow m l = WF Stat m l
type WorkflowList m a b = [(String, a -> Workflow m b)]
IResource (keyResource, serialize, deserialize, tshowp, treadp, defPath, readResource, writeResource, delResource)
registerType
class PMonadTrans t m a where
plift :: Monad m => m a -> t m a
class Indexable a where
key :: a -> String
step :: (Monad m, MonadIO m, IResource a, Typeable a) => m a -> Workflow m a
startWF :: (Monad m, MonadIO m, IResource a, Typeable a, IResource b, Typeable b) => String -> a -> WorkflowList m a b -> m b
restartWorkflows :: (IResource a, Typeable a, IResource b, Typeable b) => WorkflowList IO a b -> IO ()
getStep :: (IResource a, Typeable a, Monad m) => Int -> Workflow m a
getAll :: (IResource a, Typeable a, Monad m) => WF Stat m [a]
logWF :: (Monad m, MonadIO m) => String -> Workflow m ()
getWFKeys :: String -> IO [String]
getWFHistory :: IResource a => String -> a -> IO (Maybe Stat)
delWFHistory :: IResource a => String -> a -> IO ()
printHistory :: Stat -> IO ()
unsafeIOtoWF :: Monad m => IO a -> Workflow m a
waitFor :: (IResource a, Typeable a, IResource b, Typeable b) => (b -> Bool) -> String -> a -> IO b
waitUntil :: Integer -> IO ()
waitUntilSTM :: TVar Bool -> STM ()
syncWrite :: (Monad m, MonadIO m) => Bool -> Int -> Int -> WF Stat m ()
writeQueue :: (IResource a, Typeable a) => String -> a -> IO ()
writeQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()
readQueue :: (IResource a, Typeable a) => String -> IO a
readQueueSTM :: (IResource a, Typeable a) => String -> STM a
unreadQueue :: (IResource a, Typeable a) => String -> a -> IO ()
unreadQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()
getTimeSeconds :: IO Integer
getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)
isEmptyQueue
isEmptyQueueSTM :: String -> STM Bool
Documentation
type Workflow m l = WF Stat m l
type WorkflowList m a b = [(String, a -> Workflow m b)]
IResource (keyResource, serialize, deserialize, tshowp, treadp, defPath, readResource, writeResource, delResource)
registerType
class PMonadTrans t m a where
PMonadTrans permits |to define a partial monad transformer. They are not defined for all kinds of data but the ones that have instances of certain classes.That is because in the lift instance code there are some hidden use of these classes. This also may permit an accurate control of effects. An instance of MonadTrans is an instance of PMonadTrans
Methods
plift :: Monad m => m a -> t m a
show/hide Instances
(Monad m, MonadIO m, IResource a, Typeable a) => PMonadTrans (WF Stat) m a
class Indexable a where

Indexablle can be used to derive instances of IResource This is the set of automatic derivations:

  • (Read a, Show a) => Serialize a
  • Typeable a => Indexable a (a single key for all values. enough for workflows)
  • (Indexable a, Serialize a) => IResource a
Methods
key :: a -> String
step :: (Monad m, MonadIO m, IResource a, Typeable a) => m a -> Workflow m a
step lifts a monadic computation to the WF monad, and provides transparent state loging and resume of computation
startWF
:: (Monad m, MonadIO m, IResource a, Typeable a, IResource b, Typeable b)
=> Stringname of workflow in the workflow list
-> ainitial value (even use the initial value even to restart the workflow)
-> WorkflowList m a bworkflow list. t is an assoc-list of (workflow name string,Workflow methods)
-> m bresult of the computation
start or continue a workflow.
restartWorkflows :: (IResource a, Typeable a, IResource b, Typeable b) => WorkflowList IO a b -> IO ()
re-start the non finished workflows started for all initial values that are listed in the workflow list
getStep
:: (IResource a, Typeable a, Monad m)
=> Intthe step number. If negative, count from the current state backwards
-> Workflow m areturn the n-tn intermediate step result
getAll :: (IResource a, Typeable a, Monad m) => WF Stat m [a]
return all the intermediate results. it is supposed that all the intermediate result have the same type.
logWF :: (Monad m, MonadIO m) => String -> Workflow m ()
log a message in the workflow history. I can be printed out with printWFhistory
getWFKeys :: String -> IO [String]
return the list of object keys that are running
getWFHistory :: IResource a => String -> a -> IO (Maybe Stat)
return the current state of the computation, in the IO monad
delWFHistory :: IResource a => String -> a -> IO ()
delete the workflow. Make sure that the workdlow is not running
printHistory :: Stat -> IO ()
print the state changes along the workflow, that is, all the intermediate results
unsafeIOtoWF :: Monad m => IO a -> Workflow m a

executes a IO computation inside of the workflow monad whatever the monad encapsulated in the workflow. Warning: this computation is executed whenever the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging. To avoid re-execution when restarting use: step $ unsafeIOtoWF...

To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly:

 step $ action

instead of

  step $ unsafeIOtoWF $ action
waitFor
:: (IResource a, Typeable a, IResource b, Typeable b)
=> b -> BoolThe condition that the retrieved object must meet
-> StringThe workflow name
-> athe INITIAL value used in the workflow to start it
-> IO bThe first event that meet the condition
waitUntil :: Integer -> IO ()
waitUntilSTM :: TVar Bool -> STM ()

wait until a certain clock time, in the STM monad. This permits to compose timeouts with locks waiting for data.

  • example: wait for any respoinse from a Queue if no response is given in 5 minutes, it is returned True.
            flag <- getTimeoutFlag $  5 * 60
            ap - step  .  atomically $  readQueueSTM docQueue  `orElse`  waitUntilSTM flag  > return True
            case ap of
                    False -> logWF False or timeout >> correctWF doc
                    True -> do
syncWrite
:: (Monad m, MonadIO m)
=> BoolTrue means syncronoys: changes are inmediately saved after each step
-> Intnumber of seconds between saves when asyncronous
-> Intsize of the cache when async
-> WF Stat m ()in the workflow monad
change the logging policy (default is syncronous) Workflow uses the package TCache for logging for very fast workflow steps or when TCache is used also for other purposes , asyncronous is a better option
writeQueue :: (IResource a, Typeable a) => String -> a -> IO ()
insert an element on top of the Queue Stack
writeQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()
Like writeQueue, but in the STM monad
readQueue
:: (IResource a, Typeable a)
=> StringQueue name
-> IO athe returned elems
delete elements from the Queue stack and return them in the IO monad
readQueueSTM :: (IResource a, Typeable a) => String -> STM a
delete elements from the Queue stack an return them. in the STM monad
unreadQueue :: (IResource a, Typeable a) => String -> a -> IO ()
unreadQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()
getTimeSeconds :: IO Integer
getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)
start the timeout and return the flag to be monitored by waitUntilSTM
isEmptyQueue
isEmptyQueueSTM :: String -> STM Bool
Produced by Haddock version 2.4.2