Transparent support for interruptable computations. A workflow can be seen as a persistent thread. The main features are:
In this way, a very long computation that may take more time than the average time between hardware or software failures, shutdowns etc. The workflow can be defined transparently in a single monadic procedure. Besides state logging and recovery, there are a number of communication primitives that are aware of persistence across reinitiations such are persistent queues, persistent timeouts, or wait for events in the STM monad. These primitives permits inter-woikflow communications and communications with external threads.
This package uses TCache for persistence and event handling.
It also uses the package Refserialize. This package permits to reduce the workflow state load, since the RefSerialize package permits to serialize and deserialize complex and autoreferenced data structures without loosing such references, this is critical when big and structured data, such are documents, suffer little modifications across a set of workflow steps. Therefore, it is also recommended to use Refserialize for big user-defined objects that have small pieces that suffer little modifications during the workflow. As an added bonus, the history will show such changes with more detail.
The step primitive is the lift operation that converts a result of type m a to a type Workflow m a with automatic state loggin and recovery. To allow such features, Every a must be instance of Typeable and IResource (defined in the TCache package).
In fact, Workflow can be considered as an instance of a partial monad transformed. defined as such:
class PMonadTrans t m a where plift :: Monad m => m a -> t m a instance (Monad m,MonadIO m, IResource a, Typeable a) => PMonadTrans (WF Stat) m a where plift = step
It is partial because the lift operation is not defined for every monad m and data type a , but for monads and data types that meet certain conditions. In this case, to be instances of MonadIO, IResource and Typeable respectively.
(Read a, Show a) => Serialize a Typeable a => Indexable a (a single key for all values. enough for workflows) (Indexable a, Serialize a) => IResource a
Therefore deriving to be instance of Read, Show is enough for every intermediate data result along the computation
Because Data.TCache.Dynamic from the package TCache is used for persistence, every data type must be registered by using registerType
Here is a compkete example: This is a counter that shows a sequence of numbers, one a second:
module Main where import Control.Concurrent(threadDelay) import System.IO (hFlush,stdout) count n= do putStr (show n ++ ) >> hFlush stdout >> threadDelay 1000000 count (n+1) main= count 0
This is the same program, with the added feature of remembering the last count after interrupted:
module Main where import Control.Workflow import Control.Concurrent(threadDelay) import System.IO (hFlush,stdout) mcount n= do step $ putStr (show n ++ ) >> hFlush stdout >> threadDelay 1000000 mcount (n+1) main= do registerType :: IO () registerType :: IO Int let start= 0 :: Int startWF count start [(count, mcount)] :: IO ()
This is the execution log:
Worflow-0.5.5demos>runghc sequence.hs 0 1 2 3 4 5 6 7 sequence.hs: win32ConsoleHandler sequence.hs: sequence.hs: interrupted Worflow-0.5.5demos> Worflow-0.5.5demos>runghc sequence.hs 7 8 9 10 11 ....
|type Workflow m l = WF Stat m l|
|type WorkflowList m a b = [(String, a -> Workflow m b)]|
|IResource (keyResource, serialize, deserialize, tshowp, treadp, defPath, readResource, writeResource, delResource)|
|class PMonadTrans t m a where|
|class Indexable a where|
|step :: (Monad m, MonadIO m, IResource a, Typeable a) => m a -> Workflow m a|
|step lifts a monadic computation to the WF monad, and provides transparent state loging and resume of computation|
|restartWorkflows :: (IResource a, Typeable a, IResource b, Typeable b) => WorkflowList IO a b -> IO ()|
|re-start the non finished workflows started for all initial values that are listed in the workflow list|
|getAll :: (IResource a, Typeable a, Monad m) => WF Stat m [a]|
|return all the intermediate results. it is supposed that all the intermediate result have the same type.|
|logWF :: (Monad m, MonadIO m) => String -> Workflow m ()|
|log a message in the workflow history. I can be printed out with printWFhistory|
|getWFKeys :: String -> IO [String]|
|return the list of object keys that are running|
|getWFHistory :: IResource a => String -> a -> IO (Maybe Stat)|
|return the current state of the computation, in the IO monad|
|delWFHistory :: IResource a => String -> a -> IO ()|
|delete the workflow. Make sure that the workdlow is not running|
|printHistory :: Stat -> IO ()|
|print the state changes along the workflow, that is, all the intermediate results|
|unsafeIOtoWF :: Monad m => IO a -> Workflow m a|
executes a IO computation inside of the workflow monad whatever the monad encapsulated in the workflow. Warning: this computation is executed whenever the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging. To avoid re-execution when restarting use: step $ unsafeIOtoWF...
To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly:
step $ action
step $ unsafeIOtoWF $ action
|waitUntil :: Integer -> IO ()|
|waitUntilSTM :: TVar Bool -> STM ()|
wait until a certain clock time, in the STM monad. This permits to compose timeouts with locks waiting for data.
flag <- getTimeoutFlag $ 5 * 60 ap - step . atomically $ readQueueSTM docQueue `orElse` waitUntilSTM flag > return True case ap of False -> logWF False or timeout >> correctWF doc True -> do
|writeQueue :: (IResource a, Typeable a) => String -> a -> IO ()|
|insert an element on top of the Queue Stack|
|writeQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()|
|Like writeQueue, but in the STM monad|
|readQueueSTM :: (IResource a, Typeable a) => String -> STM a|
|delete elements from the Queue stack an return them. in the STM monad|
|unreadQueue :: (IResource a, Typeable a) => String -> a -> IO ()|
|unreadQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()|
|getTimeSeconds :: IO Integer|
|getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool)|
|start the timeout and return the flag to be monitored by waitUntilSTM|
|isEmptyQueueSTM :: String -> STM Bool|
|Produced by Haddock version 2.4.2|