module Control.Workflow (
Workflow
,WorkflowStep
,WorkflowList
,Stat
,step
,startWF
, restartWorkflows
, getStep
, getAll
, unsafeIOtoWF
, waitFor
, waitUntil
, syncWrite
)
where
import System.IO.Unsafe
import Control.Monad(when,liftM)
import Unsafe.Coerce
import Control.Concurrent (forkIO,threadDelay)
import Control.Concurrent.STM(atomically, retry, readTVar)
import Debug.Trace
import Data.TCache.Dynamic
import Data.RefSerialize
import Data.List((\\),find,elemIndices)
import Data.Typeable
import System.Time
debug a b = trace b a
data WF m s l = WF { st :: s -> m (s,l) }
type Workflow m l= WF m (Stat l) l
type WorkflowStep m a= ( a -> Workflow m a)
type WorkflowList m a = [(String, WorkflowStep m a)]
data Stat a= Workflows [(String,String)]
|Stat{ wfName :: String, state:: Int, index :: Int, recover:: Bool, sync :: Bool , versions :: [a]}
deriving (Typeable)
stat0 = Stat{ wfName="", state=0, index=0, recover=False, versions =[], sync= True}
instance Serialize a => Serialize (Stat a) where
showp (Workflows list)= do
str <- showp list
return $ "StatWorkflows "++ str
showp (Stat wfName state index recover sync versions )= do
parsea <- rshowp versions
return $ "Stat "++ show wfName ++" "++ show state++" "++show index++" "++show recover++" "++ show sync ++ parsea
readp = choice [rStat, rWorkflows] where
rStat= do
symbol "Stat"
wfName <- stringLiteral
state <- integer
index <- integer
recover <- bool
sync <- bool
versions<- rreadp
return $ Stat wfName (fromIntegral state) (fromIntegral index) recover sync versions
rWorkflows= do
symbol "StatWorkflows"
list <- readp
return $ Workflows list
instance (IResource a, Serialize a,Typeable a)=> IResource (Stat a) where
keyResource Stat{wfName=name, versions = []}= prefix ++name
keyResource Stat{wfName=name, versions = (a:_)}= prefix ++name++"#"++keyResource a
keyResource w@(Workflows xs)= "StatWorkflows"
defPath x= "Workflows/" ++ show (typeOf x)++"/"
serialize x= runW $ showp x
deserialize str = runR readp str
prefix= "Stat#"
lengthPrefix= length prefix
insertDResources xs= withDResources [] (\_-> xs)
unsafeIOtoWF x= let y= unsafePerformIO x in y `seq` return y
instance Monad m => Monad (WF m s) where
return x = WF (\s -> return (s, x))
WF g >>= f = WF (\s ->do
(s1, x) <- g s
let WF fun= f x
(s3, x') <- fun s1
return (s3, x'))
class (IResource a, Serialize a,Typeable a) => Workflow_ a where
step :: (Monad m) => (a -> m a) -> ( a -> Workflow m a)
step f = \x -> WF(\s -> do
let stat= state s
let ind= index s
if recover s && ind < stat
then return (s{index=ind +1 }, versions s !! (stat ind1) )
else do
x'<- f x
let s'= s{recover= False, versions = x': versions s, state= state s+1}
unsafeIOtoWF $ do
let
doit1 xs
| keyResource s /= keyResource s'
=
let newlist= newpair :(xs \\[oldpair])
newpair= (keyResource s',original)
oldpair= (key,original)
key= keyResource s
original= case lookup key xs of
Nothing -> error $ "workflow stat not found: " ++key
Just old -> old
in [Insert $ toIDyn $ (Workflows newlist :: Stat a)
,Delete $ toIDyn s, Insert $ toIDyn s'
,Insert (toIDyn x')]
| otherwise= [Insert $ toIDyn s', Insert $ toIDyn x']
let
doit [Nothing] = doit1 []
doit [Just d] = let Workflows xs= (fromIDyn d :: Stat a)
in doit1 xs
withDResourcesID [toIDyn $ (Workflows undefined :: Stat a)] doit
when (sync s) $ unsafeIOtoWF $ syncCache
return (s', x') )
startWF :: (Monad m) => String -> a -> WorkflowList m a ->m a
startWF name v wfs= do
unsafeIOtoWF (registerType :: IO (Control.Workflow.Stat a))
case lookup name wfs of
Nothing -> error $ "MonadWF.startWF: workflow not found: "++name;
Just f -> do
let stat1= stat0{index=0,wfName= name,versions=[v]} ::Stat a
let key= keyResource stat1
(vn, stat, found) <- do
wxs <- unsafeIOtoWF $ getResource $ (Workflows undefined :: Stat a)
case wxs of
Nothing -> return (v,stat1, False)
Just (Workflows xs) ->
case find (\(_,s)-> s == key) xs of
Just (key1, oldkey1) -> do
mst <- unsafeIOtoWF $ getResource stat0{wfName= drop lengthPrefix key1}
case mst of
Nothing -> error $ "no stat for key. " ++ key
Just s@Stat{versions=(a:_)} -> return (a,s{index=0,recover=True}, True)
Nothing -> return (v, stat1, False)
when (found == False) $
let addWF [Nothing] = [Workflows [(key,key)],stat]
addWF [Just (Workflows xs)]= [Workflows ((key,key):xs),stat]
in unsafeIOtoWF $ withResources [(Workflows undefined :: Stat a)] addWF
runWF name f vn stat
restartWorkflows :: (IResource a, Serialize a) => WorkflowList IO a -> IO ()
restartWorkflows map = do
unsafeIOtoWF (registerType :: IO (Control.Workflow.Stat a))
mw <- getResource ((Workflows undefined ) :: Stat a)
case mw of
Nothing -> return ()
Just (Workflows all) -> mapM_ start all
where
start :: (String, String) -> IO ()
start (key,_)= do
let name= let [init,end]= elemIndices '#' key
rest= drop (init + 1) key
in take (endinit 1) rest
case lookup name map of
Just f -> do
let st0= Key $ (defPath (stat0 :: Stat a))++key
mst <- getDResource $ IDynamic st0
case mst of
Nothing -> error $ "getResource: not found "++ keyResource st0
Just (idyn) -> do
let st = fromIDyn idyn :: Stat a
forkIO $ runWF key f (head $ versions st) st >> return ()
return ()
Nothing -> error $ "workflow not found: "++ name
runWF :: (Monad m,IResource a, Serialize a) => String ->( a -> Workflow m a) -> a -> (Stat a) -> m a
runWF name f v s=do
(s', v') <- st (f v) $ s
let key= keyResource s'
let delWF [Nothing] = error $ " Workflow list not found: "
delWF [Just d]= let Workflows xs= (fromIDyn d :: Stat a) in
case lookup key xs of
Nothing -> error $"runWF not found state for key: "++ key
Just oldkey ->
(map (Delete . toIDyn) $ versions s') ++
[Insert . toIDyn $ (Workflows (xs \\ [(key,oldkey)]) ::Stat a)
,Delete $ toIDyn s']
unsafeIOtoWF $ withDResourcesID [toIDyn $ (Workflows undefined :: Stat a)] delWF
when (sync s) $ unsafeIOtoWF $ syncCache
return v'
syncWrite:: (Monad m , IResource a)=> Bool -> Int -> Int -> WF m (Stat a) ()
syncWrite bool time maxsize= WF(\s ->do
when (bool== False) $ do
unsafeIOtoWF $ clearSyncCacheProc time defaultCheck maxsize
return ()
return (s{ sync= bool},()))
instance (IResource a,Serialize a,Typeable a) => Workflow_ a
getStep :: Monad m => Int -> Workflow m a
getStep i= WF(\s ->do
let stat= state s
return (s, if i > 0 && i <= stat then versions s !! (stat i)
else if i < 0 && i >= stat then versions s !! (stat +i)
else error "getStep: wrong index")
)
getAll :: Monad m => WF m (Stat a) [a]
getAll = WF(\s ->return (s, take (state s) $ versions s))
reference x=do
mv <- getDTVars [x ]
case mv of
[Nothing] -> do
insertDResources [x]
reference x
[Just cl] -> return cl
type Filter a= (a -> Bool)
waitFor :: (IResource a, Serialize a, Typeable a) => Filter a -> a -> IO a
waitFor filter x= do
tv <- reference (toIDyn $ x)
atomically $ do
dyn <- readTVar tv
let x= fromIDyn dyn
case filter x
of
False -> retry
True -> return x
waitUntil:: Integer -> IO()
waitUntil t= do
threadDelayInteger $ delay* 1000000
print ("execution at tnow="++show tnow++" ,t="++show t)
where
tnow= n where TOD n _ = unsafePerformIO getClockTime
delay | ttnow >0 = ttnow
| otherwise = 0
threadDelayInteger:: Integer -> IO()
threadDelayInteger time| time < imaxInt = threadDelay $ fromIntegral time
| otherwise=do threadDelay maxInt
threadDelayInteger ( time imaxInt)
where maxInt = maxBound :: Int
imaxInt= fromIntegral maxInt