-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Workflow patterns over a monad for thread state logging & recovery -- -- Transparent support for interruptible computations. A workflow can be -- seen as a persistent thread that executes a monadic computation. -- Therefore, it can be used in very time consuming computations such are -- CPU intensive calculations or procedures that are most of the time -- waiting for the action of a process or an user, that are prone to -- comunication failures, timeouts or shutdowns. It also can be used if -- you like to restart your program at the point where the user left it -- last time . . The computation can be restarted at the interrupted -- point thanks to its logged state in permanent storage. The thread -- state is located in files by default. It can be moved and continued in -- another computer. Besides that, the package also provides other higher -- level services associated to workflows: Workflow patterns, and a -- general configuarion utility, workflow observation events and -- references to the internal state. The state can be stored maintaining -- memory references (using the RefSerialize package), so that it -- is possible to track the modifications of a big structure (for example -- a document) along the workflow execution. -- -- See Control.Workflow for details -- -- In this release: * Adaptation for MonadMask instance introduced in the -- package exceptions-0.6 @package Workflow @version 0.8.1 module Control.Workflow.Stat data WF s m l WF :: (s -> m (s, l)) -> WF s m l st :: WF s m l -> s -> m (s, l) data Stat Running :: (Map String (String, Maybe ThreadId)) -> Stat Stat :: DBRef Stat -> String -> Int -> Bool -> Maybe Integer -> Integer -> (Context, ByteString) -> [(Int, (IDynamic, Bool))] -> [IDynamic] -> Stat self :: Stat -> DBRef Stat wfName :: Stat -> String state :: Stat -> Int recover :: Stat -> Bool timeout :: Stat -> Maybe Integer lastActive :: Stat -> Integer context :: Stat -> (Context, ByteString) references :: Stat -> [(Int, (IDynamic, Bool))] versions :: Stat -> [IDynamic] stat0 :: Stat statPrefix1 :: [Char] statPrefix :: [Char] header :: Stat -> STW () getHeader :: STR ([Char], Int, Maybe Integer, Integer, [a], (Context, ByteString)) lenLen :: Int64 -- | Return the unique name of a workflow with a parameter (executed with -- exec or start) keyWF :: Indexable a => String -> a -> String data WFRef a WFRef :: !Int -> !(DBRef Stat) -> WFRef a pathWFlows :: [Char] stFName :: IResource a => a -> [Char] fr :: Key -> IO (Maybe ByteString) safe :: FilePath -> (Handle -> IO a) -> IO a hReadFile :: Handle -> IO ByteString readHeader :: ByteString -> Handle -> IO (Maybe ([Char], Int, Maybe Integer, Integer, [a], (Context, ByteString))) keyRunning :: String -- | show the state changes along the workflow, that is, all the -- intermediate results showHistory :: Stat -> ByteString showp1 :: IDynamic -> STW () wFRefStr :: [Char] instance [overlap ok] Typeable Stat instance [overlap ok] Typeable1 WFRef instance [overlap ok] Show (WFRef a) instance [overlap ok] Read (WFRef a) instance [overlap ok] Serialize (WFRef a) instance [overlap ok] (Show a, Read a) => Serialize a instance [overlap ok] Serialize ThreadId instance [overlap ok] IResource Stat instance [overlap ok] Indexable (WFRef a) instance [overlap ok] Serialize Stat -- | A workflow can be seen as a persistent thread. The workflow monad -- writes a log that permit to restore the thread at the interrupted -- point. step is the (partial) monad transformer for the Workflow -- monad. A workflow is defined by its name and, optionally by the key of -- the single parameter passed. There primitives for starting workflows -- also restart the interrupted workflow when it has been in execution -- previously. -- -- A small example that print the sequence of integers in te console if -- you interrupt the progam, when restarted again, it will start from the -- last printed number -- --
--   module Main where
--   import Control.Workflow
--   import Control.Concurrent(threadDelay)
--   import System.IO (hFlush,stdout)
--   
--   mcount n= do step $  do
--                          putStr (show n ++ " ")
--                          hFlush stdout
--                          threadDelay 1000000
--                mcount (n+1)
--                return () -- to disambiguate the return type
--   
--   main= exec1  "count"  $ mcount (0 :: Int)
--   
-- --
--   >>> runghc demos\sequence.hs
--   >0 1 2 3
--   >CTRL-C Pressed
--   
--   >>> runghc demos\sequence.hs
--   >3 4 5 6 7
--   >CTRL-C Pressed
--   
--   >>> runghc demos\sequence.hs
--   >7 8 9 10 11
--   ...
--   
-- -- The program restart at the last saved step. -- -- As you can see, some side effect can be re-executed after recovery if -- the log is not complete. This may happen after an unexpected shutdown -- (in this case) or due to an asynchronous log writing policy. (see -- syncWrite) -- -- When the step results are big and complex, use the -- Data.RefSerialize package to define the (de)serialization -- instances The log size will be reduced. showHistory` method will print -- the structure changes in each step. -- -- If instead of RefSerialize, you use read and show instances, -- there will be no reduction. but still it will work, and the log will -- be readable for debugging purposes. The RefSerialize istance is -- automatically derived from Read, Show instances. -- -- Data.Binary instances are also fine for serialization. To use Binary, -- just define a binary instance of your data by using showpBinary -- and readpBinary. -- -- Within the RefSerialize instance of a structure, you can freely mix -- Show,Read RefSerialize and Data Binary instances. -- -- Control.Workflow.Patterns contains higher level workflow -- patterns for handling multiple workflows -- -- Control.Workflow.Configuration permits the use of workflows for -- configuration purposes module Control.Workflow data Stat type Workflow m = WF Stat m type WorkflowList m a b = Map String (a -> Workflow m b) -- | PMonadTrans permits |to define a partial monad transformer. -- They are not defined for all kinds of data but the ones that have -- instances of certain classes.That is because in the lift instance code -- there are some hidden use of these classes. This also may permit an -- accurate control of effects. An instance of MonadTrans is an instance -- of PMonadTrans class PMonadTrans t m a plift :: (PMonadTrans t m a, Monad m) => m a -> t m a -- | Adapted from the MonadCatchIO-mtl package. However, in this -- case it is needed to express serializable constraints about the -- returned values, so the usual class definitions for lifting IO -- functions are not suitable. class MonadCatchIO m a catch :: (MonadCatchIO m a, Exception e) => m a -> (e -> m a) -> m a block :: MonadCatchIO m a => m a -> m a unblock :: MonadCatchIO m a => m a -> m a class MonadIO io => HasFork io fork :: HasFork io => io () -> io ThreadId -- | Generalized version of throwIO throw :: (MonadIO m, Exception e) => e -> m a -- | Indexable is an utility class used to derive instances of IResource -- -- Example: -- --
--   data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
--   data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)
--   
-- -- Since Person and Car are instances of Read ans Show, by -- defining the Indexable instance will implicitly define the -- IResource instance for file persistence: -- --
--   instance Indexable Person where  key Person{pname=n} = "Person " ++ n
--   instance Indexable Car where key Car{cname= n} = "Car " ++ n
--   
class Indexable a key :: Indexable a => a -> String defPath :: Indexable a => a -> String -- | Return the unique name of a workflow with a parameter (executed with -- exec or start) keyWF :: Indexable a => String -> a -> String -- | Start or continue a workflow . WFErrors and exceptions are -- returned as Left err (even if they were triggered as -- exceptions). Other exceptions are returned as Left (Exception -- e) use killWF or delWF in case of error to clear -- the log. start :: (MonadCatch m, MonadIO m, Indexable a, Serialize a, Typeable a) => String -> (a -> Workflow m b) -> a -> m (Either WFErrors b) -- | Start or continue a workflow with exception handling the workflow -- flags are updated even in case of exception WFerrors are -- raised as exceptions exec :: (Indexable a, Serialize a, Typeable a, Monad m, MonadIO m, MonadCatch m) => String -> (a -> Workflow m b) -> a -> m b -- | A version of exec1 that deletes its state after complete execution or -- thread killed exec1d :: (MonadIO m, MonadCatch m) => String -> (Workflow m b) -> m b -- | A version of exec with no seed parameter. exec1 :: (Monad m, MonadIO m, MonadCatch m) => String -> Workflow m a -> m a -- | executes a workflow, but does not mark it as finished even if the -- process ended. It this case, the workflow just will return the last -- result. If the workflow was gathering data from user questions for a -- configuration, then this primitive will store them in the log the -- first time, and can be retrieve it the next time. exec1nc :: (Monad m, MonadIO m, MonadMask m) => String -> Workflow m a -> m a -- | Start or restart an anonymous workflow inside another workflow. Its -- state is deleted when finished and the result is stored in the -- parent's WF state. wfExec :: (Serialize a, Typeable a, MonadCatch m, MonadIO m) => Workflow m a -> Workflow m a -- | Start or continue a workflow from a list of workflows with exception -- handling. see start for details about exception and error -- handling startWF :: (MonadCatch m, MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a) => String -> a -> WorkflowList m a b -> m (Either WFErrors b) -- | Re-start the non finished workflows in the list, for all the initial -- values that they may have been invoked. The list contain he -- identifiers of the workflows and the procedures to be called. All the -- workflows initiated with exec* or start* will be restarted with all -- possible seed values. restartWorkflows :: (Serialize a, Typeable a) => Map String (a -> Workflow IO b) -> IO () -- | Return conditions from the invocation of start/restart primitives data WFErrors NotFound :: WFErrors AlreadyRunning :: WFErrors Timeout :: WFErrors WFException :: String -> WFErrors -- | Lifts a monadic computation to the WF monad, and provides transparent -- state loging and resuming the computation Note: Side effect can be -- repeated at recovery time if the log was not complete before shut down -- see the integer sequence example, above. step :: (MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a getWFStat :: Monad m => Workflow m (DBRef Stat) stepExec :: (Typeable t, Serialize t, MonadIO m) => DBRef Stat -> m t -> m (DBRef Stat, t) -- | Executes a computation inside of the workflow monad whatever the monad -- encapsulated in the workflow. Warning: this computation is executed -- whenever the workflow restarts, no matter if it has been already -- executed previously. This is useful for intializations or debugging. -- To avoid re-execution when restarting use: step $ -- unsafeIOtoWF... -- -- To perform IO actions in a workflow that encapsulates an IO monad, use -- step over the IO action directly: -- --
--   step $ action
--   
-- -- instead of -- --
--   step $ unsafeIOtoWF $ action
--   
unsafeIOtoWF :: Monad m => IO a -> Workflow m a data WFRef a -- | Log a value in the workflow log and return a reference to it. -- --
--   newWFRef x= stepWFRef (return  x) >>= return . fst
--   
newWFRef :: (Serialize a, Typeable a, MonadIO m, MonadCatch m) => a -> Workflow m (WFRef a) -- | Execute an step and return a reference to the result besides the -- result itself stepWFRef :: (Serialize a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a, a) -- | Read the content of a Workflow reference. Note that its result is not -- in the Workflow monad readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a) -- | Writes a new value en in the workflow reference, that is, in the -- workflow log. Why would you use this?. Don't do that!. modifiying the -- content of the workflow log would change the excution flow when the -- workflow restarts. This metod is used internally in the package. The -- best way to communicate with a workflow is trough a persistent queue, -- using Data.Persistent.Collection: -- --
--   worflow= exec1 wf do
--            r <- stepWFRef  expr
--            push "queue" r
--            back <- pop "queueback"
--            ...
--   
writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM () -- | Moves the state of workflow with a seed value to become the state of -- other seed value This may be of interest when the entry value changes -- its key value but should not initiate a new workflow but continues -- with the current one moveState :: (MonadIO m, Indexable a, Serialize a, Typeable a) => String -> a -> a -> m () -- | wait until the workflow is restarted waitWFActive :: String -> STM () -- | Return all the steps of the workflow log. The values are dynamic -- -- to get all the steps with result of type Int: all <- -- getAll let lfacts = mapMaybe safeFromIDyn all :: -- [Int] getAll :: Monad m => Workflow m [IDynamic] safeFromIDyn :: (Typeable a, Serialize a) => IDynamic -> Either String a -- | Return the keys of the workflows that are running with a given prefix getWFKeys :: String -> IO [String] -- | Return the current state of the computation, in the IO monad getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat) -- | Observe the workflow log until a condition is met. waitFor :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> IO b waitForSTM :: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) => (b -> Bool) -> String -> a -> STM b -- | Wait until a certain clock time has passed by monitoring its flag, in -- the STM monad. This permits to compose timeouts with locks waiting for -- data using orElse -- -- -- --
--   flag <- getTimeoutFlag $  5 * 60
--   ap   <- step  .  atomically $  readSomewhere >>= return . Just  orElse  waitUntilSTM flag  >> return Nothing
--   case ap of
--        Nothing -> do logWF timeout ...
--        Just x -> do logWF $ received ++ show x ...
--   
waitUntilSTM :: TVar Bool -> STM () -- | Start the timeout and return the flag to be monitored by -- waitUntilSTM This timeout is persistent. This means that the -- counter is initialized in the first call to getTimeoutFlag no matter -- if the workflow is restarted. The time during which the worlkflow has -- been stopped count also. Thus, the wait time can exceed the time -- between failures. when timeout is 0 means no timeout. getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool) -- | Return either the result of the STM conputation or Nothing in case of -- timeout. The computation can retry This timeout is persistent. This -- means that the counter is initialized in the first call to -- getTimeoutFlag no matter if the workflow is restarted. The time during -- which the worlkflow has been stopped count also. Thus, the wait time -- can exceed the time between failures. when timeout is 0 it means no -- timeout. withTimeout :: (MonadIO m, Typeable a, Serialize a) => Integer -> STM a -> Workflow m (Maybe a) -- | Executes a computation understanding that it is inside the workflow -- identified by id. If f finish after time it -- genetates a Timeout exception which may result in the end of -- the workflow if the programmer does not catch it. If the workflow is -- restarted after time2 has elapsed, the workflow will restart -- from the beginning. If not, it will restart after the last logged -- step. -- -- Usually time2> time -- -- time2=0 means time2 is infinite withKillTimeout :: -- CMC.MonadCatchIO m => String -> Int -> Integer -> m a -- -> m a withKillTimeout id time time2 f = do tid <- liftIO -- myThreadId tstart <- liftIO getTimeSeconds let final= liftIO $ do -- tnow <- getTimeSeconds let ref = getDBRef $ keyResource $ -- stat0{wfName=id} -- !> (keyResource $ stat0{wfName=id} ) when -- (time2 /=0) . atomically $ do s <- readDBRef ref onNothing -- error ( withKillTimeout: Workflow not found: ++ id) writeDBRef -- ref s{lastActive= tnow,timeout= Just (time2 - fromIntegral (tnow - -- tstart))} clearRunningFlag id let proc= do twatchdog <- liftIO $ -- case time of 0 -> return tid _ -> forkIO $ threadDelay (time * -- 1000000) >> throwTo tid Timeout r <- f liftIO $ killThread -- twatchdog return r -- -- proc finally final withKillTimeout :: (MonadIO m, MonadCatch m) => String -> Int -> Integer -> m a -> m a -- | Log a message in the workflow history. I can be printed out with -- showHistory The message is printed in the standard output too logWF :: MonadIO m => String -> Workflow m () clearRunningFlag :: MonadIO m => [Char] -> m (Map String (String, Maybe ThreadId), Maybe ThreadId) -- | Kill the executing thread if not killed, but not its state. -- exec start or restartWorkflows will continue the -- workflow killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m () -- | Kill the process (if running) and drop it from the list of -- restart-able workflows. Its state history remains , so it can be -- inspected with getWfHistory showHistory and so on. -- -- When the workflow has been called with no parameter, use: () killWF :: (Indexable a, MonadIO m) => String -> a -> m () -- | Delete the WF from the running list and delete the workflow state from -- persistent storage. Use it to perform cleanup if the process has been -- killed. -- -- When the workflow has been called with no parameter, use: () delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m () -- | A version of KillThreadWF for workflows started wit no -- parameter by exec1 killThreadWF1 :: MonadIO m => String -> m () -- | Delete the history of a workflow. Be sure that this WF has finished. delWFHistory :: Indexable a => [Char] -> a -> IO () delWFHistory1 :: [Char] -> IO () -- | Specify the cache synchronization policy with permanent storage. See -- SyncMode for details syncWrite :: SyncMode -> IO () data SyncMode :: * -- | sync state to permanent storage when atomicallySync is invoked Synchronous :: SyncMode Asyncronous :: Int -> (Integer -> Integer -> Integer -> Bool) -> Int -> SyncMode -- | number of seconds between saves when asyncronous frecuency :: SyncMode -> Int -- | The user-defined check-for-cleanup-from-cache for each object. -- defaultCheck is an example check :: SyncMode -> Integer -> Integer -> Integer -> Bool -- | size of the cache when async cacheSize :: SyncMode -> Int -- | use syncCache to write the state SyncManual :: SyncMode -- | show the state changes along the workflow, that is, all the -- intermediate results showHistory :: Stat -> ByteString -- | True if the workflow in recovery mode, reading the log to recover the -- process state isInRecover :: Monad m => Workflow m Bool runWF1 :: MonadIO m => String -> WF Stat m b -> Stat -> Bool -> m b getState :: (Monad m, MonadIO m, Indexable a, Serialize a, Typeable a) => String -> x -> a -> m (Either WFErrors (String, x, Stat)) instance [overlap ok] Typeable WFErrors instance [overlap ok] Typeable WFInfo instance [overlap ok] Read WFErrors instance [overlap ok] Show WFErrors instance [overlap ok] Read WFInfo instance [overlap ok] Show WFInfo instance [overlap ok] Exception WFErrors instance [overlap ok] (HasFork io, MonadIO io, MonadCatch io) => HasFork (WF Stat io) instance [overlap ok] HasFork IO instance [overlap ok] MonadIO m => MonadIO (WF Stat m) instance [overlap ok] (MonadTrans t, Monad m) => PMonadTrans t m a instance [overlap ok] (Monad m, Functor m) => Applicative (Workflow m) instance [overlap ok] (Monad m, MonadIO m, Serialize a, Typeable a) => PMonadTrans (WF Stat) m a instance [overlap ok] (Monad m, Functor m) => Functor (Workflow m) instance [overlap ok] Monad m => Monad (WF s m) -- | Helpers for application initialization module Control.Workflow.Configuration -- | to execute one computation once . It executes at the first run only once :: (Typeable a, Serialize a, MonadIO m) => m a -> Workflow m a -- | to execute a computation every time it is invoked. A synonimous of -- unsafeIOtoWF ever :: (Typeable a, Serialize a, MonadIO m) => IO a -> Workflow m a -- | executes a computation with once and ever statements a -- synonym of exec1nc runConfiguration :: (Monad m, MonadIO m, MonadMask m) => String -> Workflow m a -> m a -- | This module contains monadic combinators that express some workflow -- patterns. see the docAprobal.hs example included in the package -- -- EXAMPLE: -- -- This fragment below describes the approbal procedure of a document. -- First the document reference is sent to a list of bosses trough a -- queue. ithey return a boolean in a return queue ( askUser) the -- booleans are summed up according with a monoid instance (sumUp) -- -- if the resullt is false, the correctWF workflow is executed If the -- result is True, the pipeline continues to the next stage -- (checkValidated) -- -- the next stage is the same process with a new list of users -- (superbosses). There is a timeout of seven days. The result of the -- users that voted is summed up according with the same monoid instance -- -- if the result is true the document is added to the persistent list of -- approbed documents if the result is false, the document is added to -- the persistent list of rejectec documents (checlkValidated1) -- -- The program can be interrupted at any moment. The Workflow monad will -- restartWorkflows it at the point where it was interrupted. -- -- This example uses queues from Data.Persistent.Queue -- --
--   docApprobal :: Document -> Workflow IO ()
--   docApprobal doc =  getWFRef >>= docApprobal1
--   
--   docApprobal1 rdoc=
--       return True >>=
--       log "requesting approbal from bosses" >>=
--       sumUp 0 (map (askUser doc rdoc) bosses)  >>=
--       checkValidated >>=
--       log "requesting approbal from superbosses or timeout"  >>=
--       sumUp (7*60*60*24) (map(askUser doc rdoc) superbosses) >>=
--       checkValidated1
--   
--   askUser _ _ user False = return False
--   askUser doc rdoc user  True =  do
--         step $ push (quser user) rdoc
--         logWF ("wait for any response from the user: " ++ user)
--         step . pop $ qdocApprobal (title doc)
--   
--   log txt x = logWF txt >> return x
--   
--   checkValidated :: Bool -> Workflow IO Bool
--   checkValidated  val =
--         case val of
--           False -> correctWF (title doc) rdoc >> return False
--           _     -> return True
--   
--   checkValidated1 :: Bool -> Workflow IO ()
--   checkValidated1 val = step $ do
--         case  val of
--           False -> push qrejected doc
--           _     -> push qapproved doc
--         mapM (u ->deleteFromQueue (quser u) rdoc) superbosses
--   
module Control.Workflow.Patterns -- | spawn a list of independent workflow actions with a seed -- value a The results are reduced by merge or -- select split :: (Typeable b, Serialize b, HasFork io, MonadMask io) => [a -> Workflow io b] -> a -> Workflow io [ActionWF b] -- | wait for the results and apply the cond to produce a single output in -- the Workflow monad merge :: (MonadIO io, Typeable a, Typeable b, Serialize a, Serialize b) => ([a] -> io b) -> [ActionWF a] -> Workflow io b -- | select the outputs of the workflows produced by split -- constrained within a timeout. The check filter, can select , discard -- or finish the entire computation before the timeout is reached. When -- the computation finalizes, it kill all the pending workflows and -- return the list of selected outputs the timeout is in seconds and it -- is is in the workflow monad, so it is possible to restart the process -- if interrupted, so it can proceed for years. -- -- This is necessary for the modelization of real-life institutional -- cycles such are political elections A timeout of 0 means no timeout. select :: (Serialize a, Typeable a, HasFork io, MonadMask io) => Integer -> (a -> STM Select) -> [ActionWF a] -> Workflow io [a] -- | spawn a list of workflows and reduces the results according with the -- comp parameter within a given timeout -- --
--   vote timeout actions comp x=
--        split actions x >>= select timeout (const $ return Select)  >>=  comp
--   
vote :: (Serialize b, Typeable b, HasFork io, MonadMask io) => Integer -> [a -> Workflow io b] -> ([b] -> Workflow io c) -> a -> Workflow io c -- | sum the outputs of a list of workflows according with its monoid -- definition -- --
--   sumUp timeout actions = vote timeout actions (return . mconcat)
--   
sumUp :: (Serialize b, Typeable b, Monoid b, HasFork io, MonadMask io) => Integer -> [a -> Workflow io b] -> a -> Workflow io b data Select -- | select the source output Select :: Select -- | Discard the source output Discard :: Select -- | Continue the source process Continue :: Select -- | Discard this output, kill all and return the selected outputs FinishDiscard :: Select -- | Select this output, kill all and return the selected outputs FinishSelect :: Select instance [overlap ok] Typeable Select instance [overlap ok] Read Select instance [overlap ok] Show Select instance [overlap ok] Exception Select