module Control.Workflow
( Workflow
, WorkflowList
, IResource(..)
, registerType
, PMonadTrans (..)
, Indexable (key)
, step
, startWF
, restartWorkflows
, getStep
, getAll
, logWF
, getWFKeys
, getWFHistory
, delWFHistory
, printHistory
, unsafeIOtoWF
, waitFor
, waitUntil
, waitUntilSTM
, syncWrite
, writeQueue
, writeQueueSTM
, readQueue
, readQueueSTM
, unreadQueue
, unreadQueueSTM
, getTimeSeconds
, getTimeoutFlag
, isEmptyQueue
, isEmptyQueueSTM
)
where
import System.IO.Unsafe
import Control.Monad(when,liftM)
import Control.Exception(Exception, throw)
import Control.Concurrent (forkIO,threadDelay, ThreadId)
import Control.Concurrent.STM
import GHC.Conc(unsafeIOToSTM)
import GHC.Base (maxInt)
import Data.TCache.Dynamic
import Data.RefSerialize
import Data.List((\\),find,elemIndices, isPrefixOf)
import Data.Typeable
import System.Time
import Control.Monad.Trans
import Control.Monad (replicateM)
import System.IO(hPutStrLn, stderr)
import Data.List(elemIndex)
import Data.Maybe(fromJust, isNothing)
import qualified Data.Map as M(Map,fromList,elems, insert, lookup)
import System.Mem.StableName
data WF s m l = WF { st :: s -> m (s,l) }
type Workflow m l= WF Stat m l
type WorkflowList m a b= [(String, a -> Workflow m b) ]
data Stat = RunningWorkflows [String]
| Stat{ wfName :: String, state:: Int, index :: Int, recover:: Bool, sync :: Bool
, versions ::[IDynamic], timeout :: Maybe (TVar Bool)}
deriving (Typeable)
stat0 = Stat{ wfName="", state=0, index=0, recover=False, versions = []
, sync= True, timeout= Nothing}
hasht x= (hashStableName . unsafePerformIO . makeStableName) x
instance Serialize IDynamic where
showp= tshowp
readp = treadp
instance Serialize Stat where
showp (RunningWorkflows list)= do
str <- showp list
return $ "StatWorkflows "++ str
showp (Stat wfName state index recover sync versions _)= do
parsea <- rshowp versions
return $ "Stat "++ show wfName ++" "++ show state++" "++show index++" "++show recover++" "
++ show sync ++ parsea
readp = choice [rStat, rWorkflows] where
rStat= do
symbol "Stat"
wfName <- readp
state <- readp
index <- readp --integer
recover <- readp --bool
sync <- readp --bool
versions <- rreadp
return $ Stat wfName state index recover sync versions Nothing
rWorkflows= do
symbol "StatWorkflows"
list <- readp
return $ RunningWorkflows list
workflowsPath= "Workflows/"
instance IResource Stat where
keyResource s@Stat{wfName=name}= "Stat#" ++ name
keyResource (RunningWorkflows _)= "RunningWorkflows"
defPath _= workflowsPath
serialize x= runW $ showp x
deserialize str = runR readp str
class Indexable a where
key:: a -> String
instance Typeable a => Indexable a where
key x= show $ typeOf x
instance (Serialize a, Indexable a) => IResource a where
keyResource x=key x
tshowp= showp
treadp= readp
unsafeIOtoWF :: (Monad m) => IO a -> Workflow m a
unsafeIOtoWF x= let y= unsafePerformIO ( x >>= return) in y `seq` return y
class PMonadTrans t m a where
plift :: Monad m => m a -> t m a
instance (Monad m,MonadIO m,IResource a, Typeable a) => PMonadTrans (WF Stat) m a where
plift = step
instance (MonadTrans t, Monad m) => PMonadTrans t m a where
plift= Control.Monad.Trans.lift
instance Monad m => MonadIO (WF Stat m) where
liftIO=unsafeIOtoWF
instance Monad m => Monad (WF s m) where
return x = WF (\s -> return (s, x))
WF g >>= f = WF (\s -> do
(s1, x) <- g s
let WF fun= f x
(s3, x') <- fun s1
return (s3, x'))
step :: (Monad m,MonadIO m,IResource a, Typeable a) => m a -> Workflow m a
step mx= WF(\s -> do
let stat= state s
let ind= index s
if recover s && ind < stat
then return (s{index=ind +1 }, fromIDyn $ versions s !! (stat ind1) )
else do
x' <- mx
let versionss= versions s
let ver= toIDyn x': versionss
let s'= s{recover= False, versions = ver, state= state s+1}
liftIO $ do
withResources ([]::[Stat]) (\_-> [s' ])
when (sync s ) syncCache
return (s', x') )
startWF
:: (Monad m,MonadIO m,IResource a, Typeable a, IResource b, Typeable b)
=> String
-> a
-> WorkflowList m a b
-> m b
startWF namewf v wfs= do
liftIO (registerType :: IO Stat)
liftIO (registerType :: IO Control.Workflow.Queue)
liftIO (registerType :: IO String)
liftIO (registerType :: IO Integer)
case lookup namewf wfs of
Nothing -> error $ "startWF: workflow name not found in workflow list: "++namewf;
Just f -> do
let name= namewf ++ "#" ++ keyResource v
let stat1= stat0{wfName= name , versions= [toIDyn v]}
mst <- liftIO $ getResource stat1
let (vn, stat,create)= case mst of
Nothing -> (v, stat1, True)
Just s-> (v,s{index=0,recover=True},False)
let
addwf [wf ] = resources{ toAdd=[ toIDyn $ RunningWorkflows (name:xs) ]
++ [ toIDyn stat]}
where xs= case wf of Nothing -> []; Just dyn -> xs where RunningWorkflows xs = fromIDyn dyn
when create $ liftIO . atomically $ withDSTMResources [toIDyn $ RunningWorkflows undefined ] addwf
runWF name f vn stat
runWF :: (Monad m,MonadIO m,IResource a,Typeable a, IResource b, Typeable b)
=> String -> ( a -> Workflow m b) -> a -> Stat -> m b
runWF name f v s=do
when (sync s) $ liftIO $ syncCache
(s', v') <- st (f v) $ s
let delWF Nothing = error $ "runWF: Workflow list not found: "
delWF (Just (RunningWorkflows xs ))=
let name= name++ "#" ++ keyResource v
in case elem name xs of
False -> error $"runWF: not found state for workflow: "++ name
True -> RunningWorkflows (xs \\ [name])
liftIO $ withResource (RunningWorkflows undefined) delWF
when (sync s) $ liftIO $ syncCache
return v'
restartWorkflows
::(IResource a, Typeable a, IResource b, Typeable b)
=> WorkflowList IO a b
-> IO ()
restartWorkflows map = do
liftIO (registerType :: IO Stat)
liftIO (registerType :: IO Control.Workflow.Queue)
liftIO (registerType :: IO String)
liftIO (registerType :: IO Integer)
mw <- liftIO $ getResource ((RunningWorkflows undefined ) )
case mw of
Nothing -> return ()
Just (RunningWorkflows all) -> mapM_ start all
where
start key1= do
let key= case elemIndex '#' key1 of
Just n -> take n key1
Nothing -> key1
let mf= lookup key map
if isNothing mf then return ()
else do
let f= fromJust mf
let st0= stat0{wfName = key1}
mst <- liftIO $ getResource st0
case mst of
Nothing -> error $ "restartWorkflows: not found "++ keyResource st0
Just st-> do
liftIO . forkIO $ runWF key f (fromIDyn . last $ versions st ) st{index=0,recover=True} >> return ()
return ()
syncWrite:: (Monad m, MonadIO m)
=> Bool
-> Int
-> Int
-> WF Stat m ()
syncWrite bool time maxsize= WF(\s -> do
when (bool== False) $ do
liftIO $ clearSyncCacheProc time defaultCheck maxsize
return ()
return (s{ sync= bool},()))
getStep
:: (IResource a, Typeable a, Monad m)
=> Int
-> Workflow m a
getStep i= WF(\s -> do
let stat= state s
return (s, if i > 0 && i <= stat then fromIDyn $ versions s !! (stat i)
else if i < 0 && i >= stat then fromIDyn $ versions s !! (stat +i)
else error "getStep: wrong index")
)
getAll :: (IResource a, Typeable a, Monad m) => WF Stat m [a]
getAll = WF(\s -> return (s, map fromIDyn . take (state s+1) $ versions s))
logWF :: (Monad m, MonadIO m) => String -> Workflow m ()
logWF str= WF (\s -> do
time <- liftIO $ getClockTime >>= toCalendarTime >>= return . calendarTimeToString
let str2 = time ++ ": "++ str
let (state1, index1, versions1)=
let stat= state s ; ind= index s in
if recover s && ind < stat
then (stat,ind +1, versions s)
else (stat +1, ind, toIDyn str2 : versions s)
return (s{versions= versions1, state= state1, index= index1}, ()) )
getWFKeys :: String -> IO [String]
getWFKeys wfname= do
mwfs <- getResource $ RunningWorkflows undefined
case mwfs of
Nothing -> return []
Just (RunningWorkflows wfs) -> return $ map (tail . dropWhile (/= '#')) $ filter (isPrefixOf wfname) wfs
getWFHistory :: (IResource a) => String -> a -> IO (Maybe Stat)
getWFHistory wfname x= getResource stat0{wfName= wfname ++ "#" ++ keyResource x}
delWFHistory :: IResource a => String -> a -> IO ()
delWFHistory wfname1 x=do
let wfname= wfname1 ++ "#" ++ keyResource x
let
doit [Just (RunningWorkflows wfs)] =
resources{ toAdd = [RunningWorkflows (wfs \\ [wfname])]
, toDelete= [stat0{wfName= wfname}] }
doit _ = error "delWFHistory: list of running workflows not found"
atomically $ withSTMResources[RunningWorkflows undefined] doit
syncCache
printHistory :: Stat -> IO ()
printHistory stat= do
putStrLn . runW $ showp $ Pretty stat
putStrLn "-----------------------------------"
data Pretty = Pretty Stat
instance Serialize Pretty where
showp (Pretty (Stat wfName state index recover sync versions _))= do
name <- showp wfName
vers <- showElem (zip ( reverse $ take (length versions)[1..] ) versions ) ""
return $ "Workflow name= " ++ name ++ "\n" ++ vers
where
showElem [] str= return $ str ++ "\n"
showElem ((n, IDynamic e):es) str= do
etext <- tshowp e
showElem es $ "Step " ++ show n ++ ": " ++ etext ++ "\n" ++ str
readp = undefined
reference :: IDynamic -> STM (TVar IDynamic)
reference x=do
mv <- getTVars [ x ]
case mv of
[Nothing] -> do
insertResources [x]
reference x
[Just cl] -> return cl
where
insertResources xs= withDSTMResources [] $ const resources{toAdd= [x]}
waitForData :: (IResource a, Typeable a,IResource b, Typeable b)
=> (b -> Bool)
-> a
-> IO b
waitForData filter x= atomically $ waitForDataSTM filter x
waitForDataSTM :: (IResource a, Typeable a,IResource b, Typeable b)
=> (b -> Bool)
-> a
-> STM b
waitForDataSTM filter x= do
tv <- reference $ toIDyn x
do
dyn <- readTVar tv
case safeFromIDyn dyn of
Nothing -> retry
Just x ->
case filter x of
False -> retry
True -> return x
waitFor
:: (IResource a, Typeable a, IResource b, Typeable b)
=> (b -> Bool)
-> String
-> a
-> IO b
waitFor filter wfname x= atomically $ waitForSTM filter wfname x
waitForSTM
:: (IResource a, Typeable a, IResource b, Typeable b)
=> (b -> Bool)
-> String
-> a
-> STM b
waitForSTM filter wfname x= do
mtv <- getTVars [toIDyn stat0{wfName=wfname ++ "#" ++ keyResource x}]
case mtv of
[Nothing] -> error $ "workflow "++ wfname ++" not initialized for data " ++ keyResource x
[Just tv] ->
do
dyn <- readTVar tv
let Stat{ versions= d: _}= fromIDyn dyn
case safeFromIDyn d of
Nothing -> retry
Just x ->
case filter x of
False -> retry
True -> return x
getTimeoutFlag :: (MonadIO m)
=> Integer
-> Workflow m (TVar Bool)
getTimeoutFlag t=do
tnow<- step $ liftIO getTimeSeconds
flag tnow t
where
flag tnow delta= WF(\s -> do
(s', tv) <- case timeout s of
Nothing -> do
tv <- liftIO $ newTVarIO False
return (s{timeout= Just tv}, tv)
Just tv -> return (s, tv)
liftIO $ do
let t = tnow + delta
forkIO $ do waitUntil t ; atomically $ writeTVar tv True
return (s', tv))
getTimeSeconds :: IO Integer
getTimeSeconds= do
TOD n _ <- getClockTime
return n
waitUntilSTM :: TVar Bool -> STM()
waitUntilSTM tv = do
b <- readTVar tv
if b == False then retry else return ()
waitUntil:: Integer -> IO()
waitUntil t= do
tnow <- getTimeSeconds
let delay | ttnow < 0= 0
| ttnow > (fromIntegral maxInt) = maxInt
| otherwise = fromIntegral $ t tnow
threadDelay $ delay * 1000000
if t tnow <= 0 then return () else waitUntil t
data Queue= Queue {name :: String, imp :: [IDynamic], out :: [IDynamic]} deriving (Typeable)
instance Serialize Queue where
showp (Queue name imp out)= do
sin<- showp imp
sout <- showp out
return $ "Queue " ++ show name ++ " " ++ sin ++ " " ++ sout
readp = do
symbol "Queue"
name <- readp
sin <- readp
sout <- readp
return $ Queue name sin sout
instance IResource Queue where
keyResource (Queue name _ _)= "Queue#" ++ name
serialize x= runW $ showp x
deserialize str = runR readp str
defPath _= workflowsPath
readQueue
:: (IResource a , Typeable a)
=> String
-> IO a
readQueue = atomically . readQueueSTM
readQueueSTM :: (IResource a , Typeable a) => String -> STM a
readQueueSTM queue = do
let qempty= Queue queue [] []
let empty= toIDyn qempty
reference empty
d <- withSTMResources [qempty] doit
releaseTVars [empty]
return $ fromIDyn d
where
doit [ Nothing] = Retry
doit [Just(Queue _ [] [])] = Retry
doit [Just(Queue _ imp [])] = doit [Just (Queue queue [] $ reverse imp)]
doit [Just (Queue _ imp list)] =
resources { toAdd= [ Queue queue imp (tail list)]
, toReturn= head list }
unreadQueue :: (IResource a , Typeable a) => String -> a -> IO ()
unreadQueue queue x= atomically $ unreadQueueSTM queue x
unreadQueueSTM :: (IResource a , Typeable a) => String -> a -> STM ()
unreadQueueSTM queue x=
withSTMResources [Queue queue undefined undefined] $ \[r]-> resources{ toAdd= doit r}
where
doit Nothing = [Queue queue [] [ toIDyn x] ]
doit (Just(Queue _ imp out)) = [Queue queue imp ( toIDyn x : out) ]
writeQueue :: (IResource a, Typeable a) => String -> a -> IO ()
writeQueue queue v = atomically $ writeQueueSTM queue v
writeQueueSTM :: (IResource a, Typeable a) => String -> a -> STM ()
writeQueueSTM queue v=
withSTMResources [Queue queue undefined undefined] $ \[r]-> resources{ toAdd= doit r}
where
doit Nothing = [Queue queue [toIDyn v] []]
doit (Just(Queue _ imp out)) = [Queue queue ( toIDyn v : imp) out]
isEmptyQueue = atomically . isEmptyQueueSTM
isEmptyQueueSTM :: String -> STM Bool
isEmptyQueueSTM queue= do
withDSTMResources [toIDyn $ Queue queue undefined undefined] doit
where
doit [ r]= resources{toReturn= ret} where
ret=case r of
Nothing -> True
Just x -> case fromIDyn x of
Queue _ [] [] -> True
_ -> False