-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | library for transparent execution of interruptible computations -- -- Transparent support for interruptible computations. A workflow can be -- seen as a persistent thread that executes any monadic computation. -- Therefore, it can be used in very time consuming computations such are -- CPU intensive calculations or procedures that are most of the time -- waiting for the action of a process or an user, that are prone to -- comunication failures, timeouts or shutdowns. -- -- The computantion can be restarted at the interrupted point thanks to -- its logged state in permanent storage. Besides that, the package also -- provides other services associated to workflows -- -- New in this release, -- --
-- module Main where -- import Control.Workflow.Text -- import Control.Concurrent(threadDelay) -- import System.IO (hFlush,stdout) -- -- mcount n= do step $ do -- putStr (show n ++ " ") -- hFlush stdout -- threadDelay 1000000 -- mcount (n+1) -- return () -- to disambiguate the return type -- -- main= exec1 "count" $ mcount (0 :: Int) --module Control.Workflow.Text type Workflow m l = WF Stat m l type WorkflowList m a b = [(String, a -> Workflow m b)] -- | PMonadTrans permits |to define a partial monad transformer. They are -- not defined for all kinds of data but the ones that have instances of -- certain classes.That is because in the lift instance code there are -- some hidden use of these classes. This also may permit an accurate -- control of effects. An instance of MonadTrans is an instance of -- PMonadTrans class PMonadTrans t m a plift :: (PMonadTrans t m a, Monad m) => m a -> t m a -- | adapted from MonadCatchIO-mtl. Workflow need to express serializable -- constraints about the returned values, so the usual class definitions -- for lifting IO functions are not suitable. class MonadCatchIO m a catch :: (MonadCatchIO m a, Exception e) => m a -> (e -> m a) -> m a block :: MonadCatchIO m a => m a -> m a unblock :: MonadCatchIO m a => m a -> m a -- | Generalized version of throwIO throw :: (MonadIO m, Exception e) => e -> m a -- | Indexable is an utility class used to derive instances of IResource -- -- Example: -- --
-- data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
-- data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)
--
--
-- Since Person and Car are instances of Read ans Show, by
-- defining the Indexable instance will implicitly define the
-- IResource instance for file persistence:
--
--
-- instance Indexable Person where key Person{pname=n} = "Person " ++ n
-- instance Indexable Car where key Car{cname= n} = "Car " ++ n
--
class Indexable a
key :: Indexable a => a -> String
defPath :: Indexable a => a -> String
-- | Monads in which IO computations may be embedded. Any monad
-- built by applying a sequence of monad transformers to the IO
-- monad will be an instance of this class.
--
-- Instances should satisfy the following laws, which state that
-- liftIO is a transformer of monads:
--
--
class Monad m => MonadIO m :: (* -> *)
liftIO :: MonadIO m => IO a -> m a
-- | start or continue a workflow with no exception handling. | the
-- programmer has to handle inconsistencies in the workflow state | using
-- killWF or delWF in case of exception.
start :: (Monad m, MonadIO m, Indexable a, TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => String -> (a -> Workflow m b) -> a -> m (Either WFErrors b)
-- | start or continue a workflow with exception handling | the workflow
-- flags are updated even in case of exception | WFerrors are
-- raised as exceptions
exec :: (Indexable a, TwoSerializer w r a b, Typeable a, Typeable b, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m b
-- | a version of exec1 that deletes its state after complete execution or
-- thread killed
exec1d :: (TwoSerializer w r () b, Typeable b, MonadCatchIO m) => String -> (Workflow m b) -> m b
-- | a version of exec with no seed parameter.
exec1 :: (TwoSerializer w r () a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a
-- | start or restart an anonymous workflow inside another workflow its
-- state is deleted when finished and the result is stored in the
-- parent's WF state.
wfExec :: (Indexable a, TwoSerializer w r () a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m a
-- | re-start the non finished workflows in the list, for all the initial
-- values that they may have been called
restartWorkflows :: (TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => WorkflowList IO a b -> IO ()
-- | return conditions from the invocation of start/restart primitives
data WFErrors
NotFound :: WFErrors
AlreadyRunning :: WFErrors
Timeout :: WFErrors
Exception :: e -> WFErrors
-- | lifts a monadic computation to the WF monad, and provides transparent
-- state loging and resuming of computation
step :: (Monad m, MonadIO m, DynSerializer w r a, Typeable a) => m a -> Workflow m a
-- | permits modification of the workflow state by the procedure being
-- lifted if the boolean value is True. This is used internally for
-- control purposes
stepControl :: (Monad m, MonadIO m, DynSerializer w r a, Typeable a) => m a -> Workflow m a
-- | executes a computation inside of the workflow monad whatever the monad
-- encapsulated in the workflow. Warning: this computation is executed
-- whenever the workflow restarts, no matter if it has been already
-- executed previously. This is useful for intializations or debugging.
-- To avoid re-execution when restarting use: step $
-- unsafeIOtoWF...
--
-- To perform IO actions in a workflow that encapsulates an IO monad, use
-- step over the IO action directly:
--
-- -- step $ action ---- -- instead of -- --
-- step $ unsafeIOtoWF $ action --unsafeIOtoWF :: Monad m => IO a -> Workflow m a data WFRef a -- | Return the the workflow reference to the last logged result , usually, -- the last result stored by step. wiorkflow references can be can -- be accessed (to its content) outside of the workflow . They also can -- be (de)serialized. -- -- WARNING getWFRef can produce casting errors when the type demanded do -- not match the serialized data. Instead, newDBRef and -- stepWFRef are type safe at runtuime. getWFRef :: (DynSerializer w r a, Typeable a, MonadIO m) => Workflow m (WFRef a) -- | Log a value and return a reference to it. -- --
-- newWFRef x= step $ return x >>= getWFRef --newWFRef :: (DynSerializer w r a, Typeable a, MonadIO m) => a -> Workflow m (WFRef a) -- | Execute an step but return a reference to the result instead of the -- result itself -- --
-- stepWFRef exp= step exp >>= getWFRef --stepWFRef :: (DynSerializer w r a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a) -- | Read the content of a Workflow reference. Note that its result is not -- in the Workflow monad readWFRef :: (DynSerializer w r a, Typeable a) => WFRef a -> STM (Maybe a) -- | Writes a new value en in the workflow reference, that is, in the -- workflow log. Why would you use this?. Don do that!. modifiying the -- content of the workflow log would change the excution flow when the -- workflow restarts. This metod is used internally in the package the -- best way to communicate with a workflow is trough a persistent queue: -- --
-- worflow= exec1 wf do -- r <- stepWFRef expr -- push "queue" r -- back <- pop "queueback" -- ... --writeWFRef :: (DynSerializer w r a, Typeable a) => WFRef a -> a -> STM () -- | return all the steps of the workflow log. The values are dynamic -- -- to get all the steps with result of type Int: all <- -- getAll let lfacts = mapMaybe safeFromIDyn all :: -- [Int] getAll :: Monad m => Workflow m [IDynamic] safeFromIDyn :: (Typeable a, DynSerializer m n a) => IDynamic -> Maybe a -- | return the list of object keys that are running for a workflow getWFKeys :: String -> IO [String] -- | return the current state of the computation, in the IO monad getWFHistory :: (Indexable a, DynSerializer w r a) => String -> a -> IO (Maybe Stat) -- | observe the workflow log untiil a condition is met. waitFor :: (Indexable a, TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> IO b waitForSTM :: (Indexable a, TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> STM b -- | Wait until a certain clock time has passed by monitoring its flag, in -- the STM monad. This permits to compose timeouts with locks waiting for -- data using orElse -- --
-- flag <- getTimeoutFlag $ 5 * 60 -- ap - 'step' . atomically $ readSomewhere >= return . Just orElse waitUntilSTM flag >> return Nothing -- case ap of -- Nothing -> do logWF timeout ... -- Just x -> do logWF $ received ++ show x ... --waitUntilSTM :: TVar Bool -> STM () -- | Start the timeout and return the flag to be monitored by -- waitUntilSTM This timeout is persistent. This means that the -- time start to count from the first call to getTimeoutFlag on no matter -- if the workflow is restarted. The time that the worlkflow has been -- stopped count also. the wait time can exceed the time between -- failures. when timeout is 0 means no timeout. getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool) -- | Log a message in the workflow history. I can be printed out with -- printWFhistory The message is printed in the standard output -- too logWF :: (Monad m, MonadIO m) => String -> Workflow m () -- | kill the executing thread if not killed, but not its state. -- exec start or restartWorkflows will continue the -- workflow killThreadWF :: (Indexable a, DynSerializer w r a, Typeable a, MonadIO m) => String -> a -> m () -- | kill the process (if running) and drop it from the list of -- restart-able workflows. Its state history remains , so it can be -- inspected with getWfHistory printWFHistory and so on killWF :: (Indexable a, MonadIO m) => String -> a -> m () -- | delete the WF from the running list and delete the workflow state from -- persistent storage. Use it to perform cleanup if the process has been -- killed. delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m () -- | a version of KillThreadWF for workflows started wit no -- parameter by exec1 killThreadWF1 :: MonadIO m => String -> m () -- | a version of KillWF for workflows started wit no parameter by -- exec1 killWF1 :: MonadIO m => String -> m () -- | a version of delWF for workflows started wit no parameter by -- exec1 delWF1 :: MonadIO m => String -> m () -- | The execution log is cached in memory using the package -- TCache. This procedure defines the polcy for writing the -- cache into permanent storage. -- -- For fast workflows, or when TCache` is used also for other purposes , -- Asynchronous is the best option -- -- Asynchronous mode invokes clearSyncCache. For more -- complex use of the syncronization please use this -- clearSyncCache. -- -- When interruptions are controlled, use SyncManual mode and -- include a call to syncCache in the finalizaton code syncWrite :: (Monad m, MonadIO m) => SyncMode -> m () data SyncMode -- | write state after every step Synchronous :: SyncMode Asyncronous :: Int -> Int -> SyncMode -- | number of seconds between saves when asyncronous frecuency :: SyncMode -> Int -- | size of the cache when async cacheSize :: SyncMode -> Int -- | use Data.TCache.syncCache to write the state SyncManual :: SyncMode -- | print the state changes along the workflow, that is, all the -- intermediate results printHistory :: Stat -> IO () instance [overlap ok] Typeable WFErrors instance [overlap ok] Exception WFErrors instance [overlap ok] Show WFErrors instance [overlap ok] Indexable () instance [overlap ok] Indexable Integer instance [overlap ok] Indexable Int instance [overlap ok] Indexable String instance [overlap ok] (HasFork io, MonadCatchIO io) => HasFork (WF Stat io) instance [overlap ok] (TwoSerializer w r () a, Typeable a, MonadIO m, MonadCatchIO m) => MonadCatchIO (WF Stat m) a instance [overlap ok] Monad m => MonadIO (WF Stat m) instance [overlap ok] (MonadTrans t, Monad m) => PMonadTrans t m a instance [overlap ok] (Monad m, MonadIO m, DynSerializer w r a, Typeable a) => PMonadTrans (WF Stat) m a instance [overlap ok] Monad m => Monad (WF s m) -- | This module contains monadic combinators that express some workflow -- patterns. see the docAprobal.hs example included in the package -- -- Here the constraint `DynSerializer w r a` is equivalent to -- `Data.Refserialize a` This version permits optimal (de)serialization -- if you store in the queue different versions of largue structures, for -- example, documents. You must define the right RefSerialize instance -- however. See an example in docAprobal.hs incuded in the paclkage. -- Alternatively you can use Data.Binary serlializatiion with -- Control.Workflow.Binary.Patterns -- -- EXAMPLE: -- -- This fragment below describes the approbal procedure of a document. -- First the document reference is sent to a list of bosses trough a -- queue. ithey return a boolean in a return queue ( askUser) the -- booleans are summed up according with a monoid instance (sumUp) -- -- if the resullt is false, the correctWF workflow is executed If the -- result is True, the pipeline continues to the next stage -- (checkValidated) -- -- the next stage is the same process with a new list of users -- (superbosses). This time, there is a timeout of 7 days. the result of -- the users that voted is summed up according with the same monoid -- instance -- -- if the result is true the document is added to the persistent list of -- approbed documents if the result is false, the document is added to -- the persistent list of rejectec documents (checlkValidated1) -- --
-- docApprobal :: Document -> Workflow IO () -- docApprobal doc = getWFRef >>= docApprobal1 -- -- docApprobal1 rdoc= -- return True >>= -- log "requesting approbal from bosses" >>= -- sumUp 0 (map (askUser doc rdoc) bosses) >>= -- checkValidated >>= -- log "requesting approbal from superbosses or timeout" >>= -- sumUp (7*60*60*24) (map(askUser doc rdoc) superbosses) >>= -- checkValidated1 -- -- askUser _ _ user False = return False -- askUser doc rdoc user True = do -- step $ push (quser user) rdoc -- logWF (wait for any response from the user: ++ user) -- step . pop $ qdocApprobal (title doc) -- -- log txt x = logWF txt >> return x -- -- checkValidated :: Bool -> Workflow IO Bool -- checkValidated val = -- case val of -- False -> correctWF (title doc) rdoc >> return False -- _ -> return True -- -- checkValidated1 :: Bool -> Workflow IO () -- checkValidated1 val = step $ do -- case val of -- False -> push qrejected doc -- _ -> push qapproved doc -- mapM (u ->deleteFromQueue (quser u) rdoc) superbosses --module Control.Workflow.Text.Patterns -- | spawn a list of independent workflows (the first argument) with a seed -- value (the second argument). Their results are reduced by merge -- or select split :: (Typeable b, DynSerializer w r (Maybe b), HasFork io, MonadCatchIO io) => [a -> Workflow io b] -> a -> Workflow io [ActionWF b] -- | wait for the results and apply the cond to produce a single output in -- the Workflow monad merge :: (MonadIO io, Typeable a, Typeable b, TwoSerializer w r (Maybe a) b) => ([a] -> io b) -> [ActionWF a] -> Workflow io b -- | select the outputs of the workflows produced by split -- constrained within a timeout. The check filter, can select , discard -- or finish the entire computation before the timeout is reached. When -- the computation finalizes, it stop all the pending workflows and -- return the list of selected outputs the timeout is in seconds and is -- no limited to Int values, so it can last for years. -- -- This is necessary for the modelization of real-life institutional -- cycles such are political elections timeout of 0 means no timeout. select :: (TwoSerializer w r (Maybe a) [a], Typeable a, HasFork io, MonadCatchIO io) => Integer -> (a -> io Select) -> [ActionWF a] -> Workflow io [a] -- | spawn a list of workflows and reduces the results according with the -- comp parameter within a given timeout -- --
-- vote timeout actions comp x= -- split actions x >>= select timeout (const $ return Select) >>= comp --vote :: (TwoSerializer w r (Maybe b) [b], Typeable b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> ([b] -> Workflow io c) -> a -> Workflow io c -- | sum the outputs of a list of workflows according with its monoid -- definition -- --
-- sumUp timeout actions = vote timeout actions (return . mconcat) --sumUp :: (TwoSerializer w r (Maybe b) [b], Typeable b, Monoid b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> a -> Workflow io b data Select Select :: Select Discard :: Select FinishDiscard :: Select FinishSelect :: Select instance Typeable Select instance Read Select instance Show Select instance Exception Select -- | This module implements a persistent, transactional collection with -- Queue interface as well as indexed access by key -- -- use this version if you store in the queue different versions of -- largue structures, for example, documents and define a -- Data.RefSerialize instance. If not, use -- Data.Persistent.Queue.Binary Instead. -- -- Here QueueConstraints w r a means -- Data.RefSerlialize.Serialize a module Data.Persistent.Queue.Text -- | a queue reference type RefQueue a = DBRef (Queue a) -- | get the reference to new or existing queue trough its name getQRef :: (Typeable a, QueueConstraints w r a) => String -> RefQueue a -- | read the first element in the queue and delete it (pop) pop :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO a -- | version in the STM monad popSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM a pick :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO a -- | empty the queue (factually, it is deleted) flush :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO () -- | version in the STM monad flushSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM () -- | return the list of all elements in the queue. The queue remains -- unchanged pickAll :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO [a] -- | version in the STM monad pickAllSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM [a] -- | push an element in the queue push :: (Typeable a, QueueConstraints w r a) => RefQueue a -> a -> IO () -- | version in the STM monad pushSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> a -> STM () -- | return the first element in the queue that has the given key pickElem :: (Indexable a, Typeable a, QueueConstraints w r a) => RefQueue a -> String -> IO (Maybe a) -- | version in the STM monad pickElemSTM :: (Indexable a, Typeable a, QueueConstraints w r a) => RefQueue a -> String -> STM (Maybe a) -- | return the list of all elements in the queue and empty it readAll :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO [a] -- | a version in the STM monad readAllSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM [a] -- | delete all the elements of the queue that has the key of the parameter -- passed deleteElem :: (Indexable a, Typeable a, QueueConstraints w r a) => RefQueue a -> a -> IO () -- | verison in the STM monad deleteElemSTM :: (Typeable a, Indexable a, QueueConstraints w r a) => RefQueue a -> a -> STM () unreadSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> a -> STM () -- | check if the queue is empty isEmpty :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO Bool isEmptySTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM Bool instance [incoherent] Typeable1 Queue instance [incoherent] QueueConstraints w r a => Serializable (Queue a) instance [incoherent] (Serializer w r [a], SerialiserString w r) => Serializer w r (Queue a) instance [incoherent] (Serializer w r [a], SerialiserString w r, RunSerializer w r) => QueueConstraints w r a instance [incoherent] Indexable (Queue a) instance [incoherent] SerialiserString ST ST -- | This module implements a persistent, transactional collection with -- Queue interface as well as indexed access by key This module uses -- Data.Binary for serialization. -- -- Here QueueConstraints w r a means Data.Binary.Binary -- a -- -- For optimal (de)serialization if you store in the queue different -- versions of largue structures , for example, documents you better use -- Data.RefSerialize and Data.Persistent.Queue.Text -- Instead. module Data.Persistent.Queue.Binary -- | a queue reference type RefQueue a = DBRef (Queue a) -- | get the reference to new or existing queue trough its name getQRef :: (Typeable a, QueueConstraints w r a) => String -> RefQueue a -- | read the first element in the queue and delete it (pop) pop :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO a -- | version in the STM monad popSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM a pick :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO a -- | empty the queue (factually, it is deleted) flush :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO () -- | version in the STM monad flushSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM () -- | return the list of all elements in the queue. The queue remains -- unchanged pickAll :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO [a] -- | version in the STM monad pickAllSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM [a] -- | push an element in the queue push :: (Typeable a, QueueConstraints w r a) => RefQueue a -> a -> IO () -- | version in the STM monad pushSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> a -> STM () -- | return the first element in the queue that has the given key pickElem :: (Indexable a, Typeable a, QueueConstraints w r a) => RefQueue a -> String -> IO (Maybe a) -- | version in the STM monad pickElemSTM :: (Indexable a, Typeable a, QueueConstraints w r a) => RefQueue a -> String -> STM (Maybe a) -- | return the list of all elements in the queue and empty it readAll :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO [a] -- | a version in the STM monad readAllSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM [a] -- | delete all the elements of the queue that has the key of the parameter -- passed deleteElem :: (Indexable a, Typeable a, QueueConstraints w r a) => RefQueue a -> a -> IO () -- | verison in the STM monad deleteElemSTM :: (Typeable a, Indexable a, QueueConstraints w r a) => RefQueue a -> a -> STM () unreadSTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> a -> STM () -- | check if the queue is empty isEmpty :: (Typeable a, QueueConstraints w r a) => RefQueue a -> IO Bool isEmptySTM :: (Typeable a, QueueConstraints w r a) => RefQueue a -> STM Bool instance [incoherent] Typeable1 Queue instance [incoherent] QueueConstraints w r a => Serializable (Queue a) instance [incoherent] (Serializer w r [a], SerialiserString w r) => Serializer w r (Queue a) instance [incoherent] (Serializer w r [a], SerialiserString w r, RunSerializer w r) => QueueConstraints w r a instance [incoherent] Indexable (Queue a) instance [incoherent] SerialiserString PutM Get -- | A workflow can be seen as a persistent thread. The workflow monad -- writes a log that permit to restore the thread at the interrupted -- point. step is the (partial) monad transformer for the Workflow -- monad. A workflow is defined by its name and, optionally by the key of -- the single parameter passed. The primitives for starting workflows -- also restart the workflow when it has been in execution previously. -- -- Thiis module uses Data.Binary serialization. Here the constraint -- DynSerializer w r a is equivalent to Data.Binary a -- -- If you need to debug the workflow by reading the log or if you use -- largue structures that are subject of modifications along the -- workflow, as is the case typically of multiuser workflows with -- documents, then use Text seriialization with -- Control.Workflow.Text instead -- -- A small example that print the sequence of integers in te console if -- you interrupt the progam, when restarted again, it will start from the -- last printed number -- --
-- module Main where -- import Control.Workflow.Binary -- import Control.Concurrent(threadDelay) -- import System.IO (hFlush,stdout) -- -- mcount n= do step $ do -- putStr (show n ++ " ") -- hFlush stdout -- threadDelay 1000000 -- mcount (n+1) -- return () -- to disambiguate the return type -- -- main= exec1 "count" $ mcount (0 :: Int) --module Control.Workflow.Binary type Workflow m l = WF Stat m l type WorkflowList m a b = [(String, a -> Workflow m b)] -- | PMonadTrans permits |to define a partial monad transformer. They are -- not defined for all kinds of data but the ones that have instances of -- certain classes.That is because in the lift instance code there are -- some hidden use of these classes. This also may permit an accurate -- control of effects. An instance of MonadTrans is an instance of -- PMonadTrans class PMonadTrans t m a plift :: (PMonadTrans t m a, Monad m) => m a -> t m a -- | adapted from MonadCatchIO-mtl. Workflow need to express serializable -- constraints about the returned values, so the usual class definitions -- for lifting IO functions are not suitable. class MonadCatchIO m a catch :: (MonadCatchIO m a, Exception e) => m a -> (e -> m a) -> m a block :: MonadCatchIO m a => m a -> m a unblock :: MonadCatchIO m a => m a -> m a -- | Generalized version of throwIO throw :: (MonadIO m, Exception e) => e -> m a -- | Indexable is an utility class used to derive instances of IResource -- -- Example: -- --
-- data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
-- data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)
--
--
-- Since Person and Car are instances of Read ans Show, by
-- defining the Indexable instance will implicitly define the
-- IResource instance for file persistence:
--
--
-- instance Indexable Person where key Person{pname=n} = "Person " ++ n
-- instance Indexable Car where key Car{cname= n} = "Car " ++ n
--
class Indexable a
key :: Indexable a => a -> String
defPath :: Indexable a => a -> String
-- | Monads in which IO computations may be embedded. Any monad
-- built by applying a sequence of monad transformers to the IO
-- monad will be an instance of this class.
--
-- Instances should satisfy the following laws, which state that
-- liftIO is a transformer of monads:
--
--
class Monad m => MonadIO m :: (* -> *)
liftIO :: MonadIO m => IO a -> m a
-- | start or continue a workflow with no exception handling. | the
-- programmer has to handle inconsistencies in the workflow state | using
-- killWF or delWF in case of exception.
start :: (Monad m, MonadIO m, Indexable a, TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => String -> (a -> Workflow m b) -> a -> m (Either WFErrors b)
-- | start or continue a workflow with exception handling | the workflow
-- flags are updated even in case of exception | WFerrors are
-- raised as exceptions
exec :: (Indexable a, TwoSerializer w r a b, Typeable a, Typeable b, Monad m, MonadIO m, MonadCatchIO m) => String -> (a -> Workflow m b) -> a -> m b
-- | a version of exec1 that deletes its state after complete execution or
-- thread killed
exec1d :: (TwoSerializer w r () b, Typeable b, MonadCatchIO m) => String -> (Workflow m b) -> m b
-- | a version of exec with no seed parameter.
exec1 :: (TwoSerializer w r () a, Typeable a, Monad m, MonadIO m, MonadCatchIO m) => String -> Workflow m a -> m a
-- | start or restart an anonymous workflow inside another workflow its
-- state is deleted when finished and the result is stored in the
-- parent's WF state.
wfExec :: (Indexable a, TwoSerializer w r () a, Typeable a, MonadCatchIO m, MonadIO m) => Workflow m a -> Workflow m a
-- | re-start the non finished workflows in the list, for all the initial
-- values that they may have been called
restartWorkflows :: (TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => WorkflowList IO a b -> IO ()
-- | return conditions from the invocation of start/restart primitives
data WFErrors
NotFound :: WFErrors
AlreadyRunning :: WFErrors
Timeout :: WFErrors
Exception :: e -> WFErrors
-- | lifts a monadic computation to the WF monad, and provides transparent
-- state loging and resuming of computation
step :: (Monad m, MonadIO m, DynSerializer w r a, Typeable a) => m a -> Workflow m a
-- | permits modification of the workflow state by the procedure being
-- lifted if the boolean value is True. This is used internally for
-- control purposes
stepControl :: (Monad m, MonadIO m, DynSerializer w r a, Typeable a) => m a -> Workflow m a
-- | executes a computation inside of the workflow monad whatever the monad
-- encapsulated in the workflow. Warning: this computation is executed
-- whenever the workflow restarts, no matter if it has been already
-- executed previously. This is useful for intializations or debugging.
-- To avoid re-execution when restarting use: step $
-- unsafeIOtoWF...
--
-- To perform IO actions in a workflow that encapsulates an IO monad, use
-- step over the IO action directly:
--
-- -- step $ action ---- -- instead of -- --
-- step $ unsafeIOtoWF $ action --unsafeIOtoWF :: Monad m => IO a -> Workflow m a data WFRef a -- | Return the the workflow reference to the last logged result , usually, -- the last result stored by step. wiorkflow references can be can -- be accessed (to its content) outside of the workflow . They also can -- be (de)serialized. -- -- WARNING getWFRef can produce casting errors when the type demanded do -- not match the serialized data. Instead, newDBRef and -- stepWFRef are type safe at runtuime. getWFRef :: (DynSerializer w r a, Typeable a, MonadIO m) => Workflow m (WFRef a) -- | Log a value and return a reference to it. -- --
-- newWFRef x= step $ return x >>= getWFRef --newWFRef :: (DynSerializer w r a, Typeable a, MonadIO m) => a -> Workflow m (WFRef a) -- | Execute an step but return a reference to the result instead of the -- result itself -- --
-- stepWFRef exp= step exp >>= getWFRef --stepWFRef :: (DynSerializer w r a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a) -- | Read the content of a Workflow reference. Note that its result is not -- in the Workflow monad readWFRef :: (DynSerializer w r a, Typeable a) => WFRef a -> STM (Maybe a) -- | Writes a new value en in the workflow reference, that is, in the -- workflow log. Why would you use this?. Don do that!. modifiying the -- content of the workflow log would change the excution flow when the -- workflow restarts. This metod is used internally in the package the -- best way to communicate with a workflow is trough a persistent queue: -- --
-- worflow= exec1 wf do -- r <- stepWFRef expr -- push "queue" r -- back <- pop "queueback" -- ... --writeWFRef :: (DynSerializer w r a, Typeable a) => WFRef a -> a -> STM () -- | return all the steps of the workflow log. The values are dynamic -- -- to get all the steps with result of type Int: all <- -- getAll let lfacts = mapMaybe safeFromIDyn all :: -- [Int] getAll :: Monad m => Workflow m [IDynamic] safeFromIDyn :: (Typeable a, DynSerializer m n a) => IDynamic -> Maybe a -- | return the list of object keys that are running for a workflow getWFKeys :: String -> IO [String] -- | return the current state of the computation, in the IO monad getWFHistory :: (Indexable a, DynSerializer w r a) => String -> a -> IO (Maybe Stat) -- | observe the workflow log untiil a condition is met. waitFor :: (Indexable a, TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> IO b waitForSTM :: (Indexable a, TwoSerializer w r a b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> STM b -- | Wait until a certain clock time has passed by monitoring its flag, in -- the STM monad. This permits to compose timeouts with locks waiting for -- data using orElse -- --
-- flag <- getTimeoutFlag $ 5 * 60 -- ap - 'step' . atomically $ readSomewhere >= return . Just orElse waitUntilSTM flag >> return Nothing -- case ap of -- Nothing -> do logWF timeout ... -- Just x -> do logWF $ received ++ show x ... --waitUntilSTM :: TVar Bool -> STM () -- | Start the timeout and return the flag to be monitored by -- waitUntilSTM This timeout is persistent. This means that the -- time start to count from the first call to getTimeoutFlag on no matter -- if the workflow is restarted. The time that the worlkflow has been -- stopped count also. the wait time can exceed the time between -- failures. when timeout is 0 means no timeout. getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool) -- | Log a message in the workflow history. I can be printed out with -- printWFhistory The message is printed in the standard output -- too logWF :: (Monad m, MonadIO m) => String -> Workflow m () -- | kill the executing thread if not killed, but not its state. -- exec start or restartWorkflows will continue the -- workflow killThreadWF :: (Indexable a, DynSerializer w r a, Typeable a, MonadIO m) => String -> a -> m () -- | kill the process (if running) and drop it from the list of -- restart-able workflows. Its state history remains , so it can be -- inspected with getWfHistory printWFHistory and so on killWF :: (Indexable a, MonadIO m) => String -> a -> m () -- | delete the WF from the running list and delete the workflow state from -- persistent storage. Use it to perform cleanup if the process has been -- killed. delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m () -- | a version of KillThreadWF for workflows started wit no -- parameter by exec1 killThreadWF1 :: MonadIO m => String -> m () -- | a version of KillWF for workflows started wit no parameter by -- exec1 killWF1 :: MonadIO m => String -> m () -- | a version of delWF for workflows started wit no parameter by -- exec1 delWF1 :: MonadIO m => String -> m () -- | The execution log is cached in memory using the package -- TCache. This procedure defines the polcy for writing the -- cache into permanent storage. -- -- For fast workflows, or when TCache` is used also for other purposes , -- Asynchronous is the best option -- -- Asynchronous mode invokes clearSyncCache. For more -- complex use of the syncronization please use this -- clearSyncCache. -- -- When interruptions are controlled, use SyncManual mode and -- include a call to syncCache in the finalizaton code syncWrite :: (Monad m, MonadIO m) => SyncMode -> m () data SyncMode -- | write state after every step Synchronous :: SyncMode Asyncronous :: Int -> Int -> SyncMode -- | number of seconds between saves when asyncronous frecuency :: SyncMode -> Int -- | size of the cache when async cacheSize :: SyncMode -> Int -- | use Data.TCache.syncCache to write the state SyncManual :: SyncMode instance [overlap ok] Typeable WFErrors instance [overlap ok] Exception WFErrors instance [overlap ok] Show WFErrors instance [overlap ok] Indexable () instance [overlap ok] Indexable Integer instance [overlap ok] Indexable Int instance [overlap ok] Indexable String instance [overlap ok] (HasFork io, MonadCatchIO io) => HasFork (WF Stat io) instance [overlap ok] (TwoSerializer w r () a, Typeable a, MonadIO m, MonadCatchIO m) => MonadCatchIO (WF Stat m) a instance [overlap ok] Monad m => MonadIO (WF Stat m) instance [overlap ok] (MonadTrans t, Monad m) => PMonadTrans t m a instance [overlap ok] (Monad m, MonadIO m, DynSerializer w r a, Typeable a) => PMonadTrans (WF Stat) m a instance [overlap ok] Monad m => Monad (WF s m) -- | This module contains monadic combinators that express some workflow -- patterns. see the docAprobal.hs example included in the package -- -- This version uses Data.Binary serialization. Here the constraint -- `DynSerializer w r a` is equivalent to `Data.Binary a`. -- -- EXAMPLE: -- -- This fragment below describes the approbal procedure of a document. -- First the document reference is sent to a list of bosses trough a -- queue. ithey return a boolean in a return queue ( askUser) the -- booleans are summed up according with a monoid instance (sumUp) -- -- if the resullt is false, the correctWF workflow is executed If the -- result is True, the pipeline continues to the next stage -- (checkValidated) -- -- the next stage is the same process with a new list of users -- (superbosses). This time, there is a timeout of 7 days. the result of -- the users that voted is summed up according with the same monoid -- instance -- -- if the result is true the document is added to the persistent list of -- approbed documents if the result is false, the document is added to -- the persistent list of rejectec documents (checlkValidated1) -- --
-- docApprobal :: Document -> Workflow IO ()
-- docApprobal doc = getWFRef >>= docApprobal1
--
-- docApprobal1 rdoc=
-- return True >>=
-- log "requesting approbal from bosses" >>=
-- sumUp 0 (map (askUser doc rdoc) bosses) >>=
-- checkValidated >>=
-- log "requesting approbal from superbosses or timeout" >>=
-- sumUp (7*60*60*24) (map(askUser doc rdoc) superbosses) >>=
-- checkValidated1
--
-- askUser _ _ user False = return False
-- askUser doc rdoc user True = do
-- step $ push (quser user) rdoc
-- logWF ("wait for any response from the user: " ++ user)
-- step . pop $ qdocApprobal (title doc)
--
-- log txt x = logWF txt >> return x
--
-- checkValidated :: Bool -> Workflow IO Bool
-- checkValidated val =
-- case val of
-- False -> correctWF (title doc) rdoc >> return False
-- _ -> return True
--
-- checkValidated1 :: Bool -> Workflow IO ()
-- checkValidated1 val = step $ do
-- case val of
-- False -> push qrejected doc
-- _ -> push qapproved doc
-- mapM (u ->deleteFromQueue (quser u) rdoc) superbosses
--
module Control.Workflow.Binary.Patterns
-- | spawn a list of independent workflows (the first argument) with a seed
-- value (the second argument). Their results are reduced by merge
-- or select
split :: (Typeable b, DynSerializer w r (Maybe b), HasFork io, MonadCatchIO io) => [a -> Workflow io b] -> a -> Workflow io [ActionWF b]
-- | wait for the results and apply the cond to produce a single output in
-- the Workflow monad
merge :: (MonadIO io, Typeable a, Typeable b, TwoSerializer w r (Maybe a) b) => ([a] -> io b) -> [ActionWF a] -> Workflow io b
-- | select the outputs of the workflows produced by split
-- constrained within a timeout. The check filter, can select , discard
-- or finish the entire computation before the timeout is reached. When
-- the computation finalizes, it stop all the pending workflows and
-- return the list of selected outputs the timeout is in seconds and is
-- no limited to Int values, so it can last for years.
--
-- This is necessary for the modelization of real-life institutional
-- cycles such are political elections timeout of 0 means no timeout.
select :: (TwoSerializer w r (Maybe a) [a], Typeable a, HasFork io, MonadCatchIO io) => Integer -> (a -> io Select) -> [ActionWF a] -> Workflow io [a]
-- | spawn a list of workflows and reduces the results according with the
-- comp parameter within a given timeout
--
-- -- vote timeout actions comp x= -- split actions x >>= select timeout (const $ return Select) >>= comp --vote :: (TwoSerializer w r (Maybe b) [b], Typeable b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> ([b] -> Workflow io c) -> a -> Workflow io c -- | sum the outputs of a list of workflows according with its monoid -- definition -- --
-- sumUp timeout actions = vote timeout actions (return . mconcat) --sumUp :: (TwoSerializer w r (Maybe b) [b], Typeable b, Monoid b, HasFork io, MonadCatchIO io) => Integer -> [a -> Workflow io b] -> a -> Workflow io b data Select Select :: Select Discard :: Select FinishDiscard :: Select FinishSelect :: Select instance Typeable Select instance Read Select instance Show Select instance Exception Select instance Binary Select