-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | library for transparent execution of interruptible computations -- -- Transparent support for interruptible computations. A workflow can be -- seen as a persistent thread that executes any monadic computation. -- Therefore, it can be used in very time consuming computations such are -- CPU intensive calculations or procedures that are most of the time -- waiting for the action of a process or an user, that are prone to -- comunication failures, timeouts or shutdowns. -- -- The computantion can be restarted at the interrupted point thanks to -- its logged state in permanent storage. Besides that, the package also -- provides other services associated to workflows -- -- New in this release, -- --
-- module Main where -- import Control.Workflow.Text -- import Control.Concurrent(threadDelay) -- import System.IO (hFlush,stdout) -- -- mcount n= do step $ do -- putStr (show n ++ " ") -- hFlush stdout -- threadDelay 1000000 -- mcount (n+1) -- return () -- to disambiguate the return type -- -- main= exec1 "count" $ mcount (0 :: Int) --module Control.Workflow type Workflow m = WF Stat m type WorkflowList m a b = [(String, a -> Workflow m b)] -- | PMonadTrans permits |to define a partial monad transformer. They are -- not defined for all kinds of data but the ones that have instances of -- certain classes.That is because in the lift instance code there are -- some hidden use of these classes. This also may permit an accurate -- control of effects. An instance of MonadTrans is an instance of -- PMonadTrans class PMonadTrans t m a plift :: (PMonadTrans t m a, Monad m) => m a -> t m a -- | adapted from MonadCatchIO-mtl. Workflow need to express serializable -- constraints about the returned values, so the usual class definitions -- for lifting IO functions are not suitable. class MonadCatchIO m a catch :: (MonadCatchIO m a, Exception e) => m a -> (e -> m a) -> m a block :: MonadCatchIO m a => m a -> m a unblock :: MonadCatchIO m a => m a -> m a -- | Generalized version of throwIO throw :: (MonadIO m, Exception e) => e -> m a -- | Indexable is an utility class used to derive instances of IResource -- -- Example: -- --
-- data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
-- data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)
--
--
-- Since Person and Car are instances of Read ans Show, by
-- defining the Indexable instance will implicitly define the
-- IResource instance for file persistence:
--
--
-- instance Indexable Person where key Person{pname=n} = "Person " ++ n
-- instance Indexable Car where key Car{cname= n} = "Car " ++ n
--
class Indexable a
key :: Indexable a => a -> String
defPath :: Indexable a => a -> String
-- | start or continue a workflow with no exception handling. | the
-- programmer has to handle inconsistencies in the workflow state | using
-- killWF or delWF in case of exception.
start :: (Monad m, MonadIO m, Indexable a, Serialize a, Serialize b, Typeable a, Typeable b) => String -> (a -> Workflow m b) -> a -> m (Either WFErrors b)
-- | start or continue a workflow with exception handling | the workflow
-- flags are updated even in case of exception | WFerrors are
-- raised as exceptions
exec :: (Indexable a, Serialize a, Serialize b, Typeable a, Typeable b, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m b
-- | a version of exec1 that deletes its state after complete execution or
-- thread killed
exec1d :: (Serialize b, Typeable b, MonadCatchIO m) => String -> (Workflow m b) -> m b
-- | a version of exec with no seed parameter.
exec1 :: (Serialize a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a
-- | start or restart an anonymous workflow inside another workflow its
-- state is deleted when finished and the result is stored in the
-- parent's WF state.
wfExec :: (Indexable a, Serialize a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m a
-- | start or continue a workflow from a list of workflows in the IO monad
-- with exception handling. The excepton is returned as a Left value
startWF :: (MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a, Typeable b) => String -> a -> WorkflowList m a b -> m (Either WFErrors b)
-- | re-start the non finished workflows in the list, for all the initial
-- values that they may have been called
restartWorkflows :: (Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => WorkflowList IO a b -> IO ()
-- | return conditions from the invocation of start/restart primitives
data WFErrors
NotFound :: WFErrors
AlreadyRunning :: WFErrors
Timeout :: WFErrors
Exception :: e -> WFErrors
-- | lifts a monadic computation to the WF monad, and provides transparent
-- state loging and resuming of computation
step :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a
-- | permits modification of the workflow state by the procedure being
-- lifted if the boolean value is True. This is used internally for
-- control purposes
stepControl :: (Monad m, MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a
-- | executes a computation inside of the workflow monad whatever the monad
-- encapsulated in the workflow. Warning: this computation is executed
-- whenever the workflow restarts, no matter if it has been already
-- executed previously. This is useful for intializations or debugging.
-- To avoid re-execution when restarting use: step $
-- unsafeIOtoWF...
--
-- To perform IO actions in a workflow that encapsulates an IO monad, use
-- step over the IO action directly:
--
-- -- step $ action ---- -- instead of -- --
-- step $ unsafeIOtoWF $ action --unsafeIOtoWF :: Monad m => IO a -> Workflow m a data WFRef a -- | Return the reference to the last logged result , usually, the last -- result stored by step. wiorkflow references can be accessed -- outside of the workflow . They also can be (de)serialized. -- -- WARNING getWFRef can produce casting errors when the type demanded do -- not match the serialized data. Instead, newDBRef and -- stepWFRef are type safe at runtuime. getWFRef :: (Monad m, MonadIO m, Serialize a, Typeable a) => Workflow m (WFRef a) -- | Log a value and return a reference to it. -- --
-- newWFRef x= step $ return x >>= getWFRef --newWFRef :: (Serialize a, Typeable a, MonadIO m) => a -> Workflow m (WFRef a) -- | Execute an step but return a reference to the result instead of the -- result itself -- --
-- stepWFRef exp= step exp >>= getWFRef --stepWFRef :: (Serialize a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a) -- | Read the content of a Workflow reference. Note that its result is not -- in the Workflow monad readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a) -- | Writes a new value en in the workflow reference, that is, in the -- workflow log. Why would you use this?. Don do that!. modifiying the -- content of the workflow log would change the excution flow when the -- workflow restarts. This metod is used internally in the package the -- best way to communicate with a workflow is trough a persistent queue: -- --
-- worflow= exec1 wf do -- r <- stepWFRef expr -- push "queue" r -- back <- pop "queueback" -- ... --writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM () waitWFActive :: String -> STM () -- | return all the steps of the workflow log. The values are dynamic -- -- to get all the steps with result of type Int: all <- -- getAll let lfacts = mapMaybe safeFromIDyn all :: -- [Int] getAll :: Monad m => Workflow m [IDynamic] safeFromIDyn :: (Typeable a, Serialize a) => IDynamic -> Maybe a -- | return the list of object keys that are running for a workflow getWFKeys :: String -> IO [String] -- | return the current state of the computation, in the IO monad getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat) -- | observe the workflow log untiil a condition is met. waitFor :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> IO b waitForSTM :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> STM b -- | Wait until a certain clock time has passed by monitoring its flag, in -- the STM monad. This permits to compose timeouts with locks waiting for -- data using orElse -- --
-- flag <- getTimeoutFlag $ 5 * 60 -- ap - 'step' . atomically $ readSomewhere >= return . Just orElse waitUntilSTM flag >> return Nothing -- case ap of -- Nothing -> do logWF timeout ... -- Just x -> do logWF $ received ++ show x ... --waitUntilSTM :: TVar Bool -> STM () -- | Start the timeout and return the flag to be monitored by -- waitUntilSTM This timeout is persistent. This means that the -- time start to count from the first call to getTimeoutFlag on no matter -- if the workflow is restarted. The time that the worlkflow has been -- stopped count also. the wait time can exceed the time between -- failures. when timeout is 0 means no timeout. getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool) -- | Log a message in the workflow history. I can be printed out with -- printWFhistory The message is printed in the standard output -- too logWF :: (Monad m, MonadIO m) => String -> Workflow m () clearRunningFlag :: MonadIO m => [Char] -> m (Map String (String, Maybe ThreadId), Maybe ThreadId) -- | kill the executing thread if not killed, but not its state. -- exec start or restartWorkflows will continue the -- workflow killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m () -- | kill the process (if running) and drop it from the list of -- restart-able workflows. Its state history remains , so it can be -- inspected with getWfHistory printWFHistory and so on killWF :: (Indexable a, MonadIO m) => String -> a -> m () -- | delete the WF from the running list and delete the workflow state from -- persistent storage. Use it to perform cleanup if the process has been -- killed. delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m () -- | a version of KillThreadWF for workflows started wit no -- parameter by exec1 killThreadWF1 :: MonadIO m => String -> m () -- | a version of KillWF for workflows started wit no parameter by -- exec1 killWF1 :: MonadIO m => String -> m () -- | a version of delWF for workflows started wit no parameter by -- exec1 delWF1 :: MonadIO m => String -> m () delWFHistory :: Indexable a => String -> a -> IO () delWFHistory1 :: String -> IO () -- | The execution log is cached in memory using the package -- TCache. This procedure defines the polcy for writing the -- cache into permanent storage. -- -- For fast workflows, or when TCache` is used also for other purposes , -- Asynchronous is the best option -- -- Asynchronous mode invokes clearSyncCache. For more -- complex use of the syncronization please use this -- clearSyncCache. -- -- When interruptions are controlled, use SyncManual mode and -- include a call to syncCache in the finalizaton code syncWrite :: (Monad m, MonadIO m) => SyncMode -> m () data SyncMode -- | write state after every step Synchronous :: SyncMode Asyncronous :: Int -> Int -> SyncMode -- | number of seconds between saves when asyncronous frecuency :: SyncMode -> Int -- | size of the cache when async cacheSize :: SyncMode -> Int -- | use Data.TCache.syncCache to write the state SyncManual :: SyncMode -- | print the state changes along the workflow, that is, all the -- intermediate results printHistory :: Stat -> IO () instance [overlap ok] Typeable WFErrors instance [overlap ok] Exception WFErrors instance [overlap ok] Show WFErrors instance [overlap ok] Indexable () instance [overlap ok] (HasFork io, MonadCatchIO io) => HasFork (WF Stat io) instance [overlap ok] (Serialize a, Typeable a, MonadIO m, MonadCatchIO m) => MonadCatchIO (WF Stat m) a instance [overlap ok] Monad m => MonadIO (WF Stat m) instance [overlap ok] (MonadTrans t, Monad m) => PMonadTrans t m a instance [overlap ok] (Monad m, MonadIO m, Serialize a, Typeable a) => PMonadTrans (WF Stat) m a instance [overlap ok] (Monad m, Functor m) => Functor (Workflow m) instance [overlap ok] Monad m => Monad (WF s m) -- | This module contains monadic combinators that express some workflow -- patterns. see the docAprobal.hs example included in the package -- -- Here the constraint `DynSerializer w r a` is equivalent to -- `Data.Refserialize a` This version permits optimal (de)serialization -- if you store in the queue different versions of largue structures, for -- example, documents. You must define the right RefSerialize instance -- however. See an example in docAprobal.hs incuded in the paclkage. -- Alternatively you can use Data.Binary serlializatiion with -- Control.Workflow.Binary.Patterns -- -- EXAMPLE: -- -- This fragment below describes the approbal procedure of a document. -- First the document reference is sent to a list of bosses trough a -- queue. ithey return a boolean in a return queue ( askUser) the -- booleans are summed up according with a monoid instance (sumUp) -- -- if the resullt is false, the correctWF workflow is executed If the -- result is True, the pipeline continues to the next stage -- (checkValidated) -- -- the next stage is the same process with a new list of users -- (superbosses). This time, there is a timeout of 7 days. the result of -- the users that voted is summed up according with the same monoid -- instance -- -- if the result is true the document is added to the persistent list of -- approbed documents if the result is false, the document is added to -- the persistent list of rejectec documents (checlkValidated1) -- --
-- docApprobal :: Document -> Workflow IO () -- docApprobal doc = getWFRef >>= docApprobal1 -- -- docApprobal1 rdoc= -- return True >>= -- log "requesting approbal from bosses" >>= -- sumUp 0 (map (askUser doc rdoc) bosses) >>= -- checkValidated >>= -- log "requesting approbal from superbosses or timeout" >>= -- sumUp (7*60*60*24) (map(askUser doc rdoc) superbosses) >>= -- checkValidated1 -- -- askUser _ _ user False = return False -- askUser doc rdoc user True = do -- step $ push (quser user) rdoc -- logWF (wait for any response from the user: ++ user) -- step . pop $ qdocApprobal (title doc) -- -- log txt x = logWF txt >> return x -- -- checkValidated :: Bool -> Workflow IO Bool -- checkValidated val = -- case val of -- False -> correctWF (title doc) rdoc >> return False -- _ -> return True -- -- checkValidated1 :: Bool -> Workflow IO () -- checkValidated1 val = step $ do -- case val of -- False -> push qrejected doc -- _ -> push qapproved doc -- mapM (u ->deleteFromQueue (quser u) rdoc) superbosses --module Control.Workflow.Patterns -- | spawn a list of independent workflows (the first argument) with a seed -- value (the second argument). Their results are reduced by merge -- or select split :: (Typeable b, Serialize b, HasFork io, MonadCatchIO io) => [a -> Workflow io b] -> a -> Workflow io [ActionWF b] -- | wait for the results and apply the cond to produce a single output in -- the Workflow monad merge :: (MonadIO io, Typeable a, Typeable b, Serialize a, Serialize b) => ([a] -> io b) -> [ActionWF a] -> Workflow io b -- | select the outputs of the workflows produced by split -- constrained within a timeout. The check filter, can select , discard -- or finish the entire computation before the timeout is reached. When -- the computation finalizes, it stop all the pending workflows and -- return the list of selected outputs the timeout is in seconds and is -- no limited to Int values, so it can last for years. -- -- This is necessary for the modelization of real-life institutional -- cycles such are political elections timeout of 0 means no timeout. select :: (Serialize a, Serialize [a], Typeable a, HasFork io, MonadCatchIO io) => Integer -> (a -> io Select) -> [ActionWF a] -> Workflow io [a] -- | spawn a list of workflows and reduces the results according with the -- comp parameter within a given timeout -- --
-- vote timeout actions comp x= -- split actions x >>= select timeout (const $ return Select) >>= comp --vote :: (Serialize b, Serialize [b], Typeable b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> ([b] -> Workflow io c) -> a -> Workflow io c -- | sum the outputs of a list of workflows according with its monoid -- definition -- --
-- sumUp timeout actions = vote timeout actions (return . mconcat) --sumUp :: (Serialize b, Serialize [b], Typeable b, Monoid b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> a -> Workflow io b data Select Select :: Select Discard :: Select FinishDiscard :: Select FinishSelect :: Select instance Typeable Select instance Read Select instance Show Select instance Exception Select