{-# OPTIONS -fglasgow-exts -fallow-overlapping-instances  -fallow-undecidable-instances -O2  #-}
{-transparent low level support (state logging, resume of the computation state, wait for data condition) for very long time 
  computations. Workflow give the two first services to any monadic computation of type  (a-> m a)  
  
                    f x >>=\x'-> g x' >>= \x''->... z by 
                    
                    prefixing the user with the method step: 
                    
                         step f  x >>= \x'-> step g  x' >>= \x''->...  
                   
in this way, a workflow can be described with the familiar "do" notation. In principle, there is no other limitation
on the syntax but the restriction (a -> m a): All computations consume and produce the same type of data.
                             
Alberto Gomez Corona agocorona@gmail.com 2008
-}
module Control.Workflow (
    Workflow --    a useful type name
    ,WorkflowStep
    ,WorkflowList 
    ,Stat

    ,step -- :: (Monad m) =>  (a -> m a) -> ( a -> Workflow m a) 
          -- encapsulates a monadic computation into state monad  that brings persistence and
          -- recovery services

    ,startWF -- :: (Monad m) => 
             --     String ->           mame of workflow in the workflowlist
             --     a ->                initial data value 
             --     WorkflowList m a -> assoc-list of (workflow name string,Workflow methods)
             --     m a                 resulting value
             -- start or continue a workflow
             

    , restartWorkflows -- :: (IResource  a, Serialize a) =>  WorkflowList IO a -> IO ()
                       -- re-start the non finished workflows. needs the assoclist. 
   

    , getStep -- Monad m => Int -> Workflow m a  return the n-tn intermediate result
              --                                 if Int < 0 count from the current result back

    , getAll  -- :: Monad m => Workfow m  [a]  return all the intermediate results

    , unsafeIOtoWF -- executes a IO operation. this is executed  whenever re-started, no matter where is the resume point
                   -- This is useful for external IO re-initializations not controllable by the State monad.


    , waitFor -- ::(IResource a, Serialize a) => (a ->Bool) -> a -> IO a
              -- wait until a object (with a certaing key=keyResource x) meet a certain condition 
              -- (useful for checking external actions, possibly by other workflows or by direct use of TCache primitives ) 
    , waitUntil -- :: Integer -> IO()
                -- wait until the absolute time in seconds is reached  (as returned by getClockTime)
    , syncWrite -- syncWrite:: Monad m => 
               -- Bool ->  True means that changes are inmediately saved after each step
               -- Int ->  number of seconds between saves when async
               -- Int ->  max size of the cache when async
               -- WF m (Stat a) ()   in the workflow monad
               
               -- Turn on and off syncronized writing to disk
               -- select async mode only 
               --       -for very fast workflow steps  or 
               --       -when the cache policies are dictated outside of the workflow 
               --        trough SyncCacheProc (see TCache module)
               
)


where


import System.IO.Unsafe
import Control.Monad(when,liftM)
import Unsafe.Coerce
import Control.Concurrent (forkIO,threadDelay)
import Control.Concurrent.STM(atomically, retry, readTVar)
import Debug.Trace

import Data.TCache.Dynamic
import Data.RefSerialize
import Data.List((\\),find,elemIndices)
import Data.Typeable
import System.Time

debug a b = trace b a

data WF m s l = WF { st :: s -> m (s,l) }

type Workflow m l= WF m (Stat l) l  -- not so scary

type WorkflowStep m a= ( a -> Workflow m  a)

type WorkflowList m a = [(String,  WorkflowStep m a)]

data Stat a=  Workflows [(String,String)]
           |Stat{ wfName :: String, state:: Int, index :: Int, recover:: Bool, sync :: Bool , versions :: [a]} 
         
           deriving (Typeable)

stat0 = Stat{ wfName="", state=0, index=0, recover=False, versions =[], sync= True}


-- serialization of data is done trough RefSerialize because it permits to store
-- different versions of the same object with minumum memory.

instance Serialize a =>  Serialize (Stat a) where
    showp (Workflows list)= do
      str <- showp list
      return $ "StatWorkflows "++ str
              

    showp  (Stat wfName state index recover sync versions )= do
       parsea <- rshowp  versions 
       return $ "Stat "++ show wfName ++" "++ show state++" "++show index++" "++show recover++" "++ show sync ++ parsea  
       
    readp = choice [rStat, rWorkflows] where
        rStat= do
              symbol "Stat" 
              wfName  <- stringLiteral
              state   <- integer
              index   <- integer
              recover <- bool
              sync    <- bool
              versions<- rreadp
              return $ Stat wfName (fromIntegral state) (fromIntegral index) recover sync versions 
              


        rWorkflows= do
               symbol "StatWorkflows"
               list <- readp
               return $ Workflows list
        
--persistence trough TCache   , default persistence in files      
        
instance (IResource a, Serialize a,Typeable a)=> IResource (Stat a) where
   keyResource Stat{wfName=name, versions = []}= prefix ++name
   keyResource Stat{wfName=name, versions = (a:_)}= prefix ++name++"#"++keyResource a

   keyResource w@(Workflows xs)= "StatWorkflows"

 
   defPath x= "Workflows/" ++ show (typeOf x)++"/"  -- directory for Workflow data

   serialize x= runW $ showp x
   deserialize str = runR readp str

prefix= "Stat#"
lengthPrefix= length prefix

insertDResources xs=  withDResources [] (\_-> xs)

--unsafeIOtoWF ::  Monad m => IO a -> WF m (Stat b) a
unsafeIOtoWF x= let y= unsafePerformIO x in y `seq` return y

instance Monad m =>  Monad (WF m s) where
    return  x = WF (\s ->  return  (s, x)) 
    WF g >>= f = WF (\s ->do
                (s1, x) <- g s 
                
                let WF fun=  f x 
                (s3, x') <- fun s1 
                
                return (s3, x'))
  
class (IResource  a, Serialize a,Typeable a) => Workflow_ a where
  -- | step lift a monadic computation (a -> m a) in in to the WF monad, provides state loging and automatic resume
  step :: (Monad m) =>  (a -> m a) -> ( a -> Workflow m a)
  step f =  \x -> WF(\s -> do 
        let stat= state s
        let ind= index s
        if recover s && ind < stat
          then  return (s{index=ind +1 },   versions s !! (stat - ind-1) )
          else do
                x'<- f x
                let s'= s{recover= False, versions =  x': versions s, state= state s+1} 
                unsafeIOtoWF $ do

                let 
                     doit1 xs
                          | keyResource s /= keyResource s' -- `debug` ("keys:"++keyResource s ++", "++keyResource s')
                               =  
                                  let newlist= newpair :(xs \\[oldpair])
                                      newpair= (keyResource s',original)
                                      oldpair= (key,original)
                                      key= keyResource s
                                      original= case lookup key xs of
                                                    Nothing   -> error $ "workflow stat not found: " ++key
                                                    Just old  -> old
                                                    
                                  in [Insert $ toIDyn $ (Workflows newlist :: Stat a)
                                     ,Delete $ toIDyn s, Insert  $ toIDyn s'
                                     ,Insert (toIDyn x')] 
                                          -- `debug`("insert "++keyResource s'++","++keyResource x'++" delete: "++
                                          --         keyResource s++","++keyResource x)
                          | otherwise= [Insert $ toIDyn s', Insert $ toIDyn x'] 
                                          -- `debug`("insert "++keyResource s'++","++keyResource x')
                let
                  doit [Nothing] =   doit1  []
                  doit [Just d] = let Workflows xs= (fromIDyn d :: Stat a) 
                                  in doit1 xs 

                withDResourcesID [toIDyn $ (Workflows undefined :: Stat a)] doit
                
                when (sync s) $ unsafeIOtoWF $ syncCache 
                                
                return (s', x') )
                
  -- | start or continue a workflow. WorkflowList is a assoclist of (name, workflow computation)    
  startWF :: (Monad m) => String -> a -> WorkflowList m a ->m a 
  startWF name v wfs= do
          unsafeIOtoWF  (registerType  :: IO (Control.Workflow.Stat a))
          case lookup name wfs of  
           Nothing -> error $ "MonadWF.startWF: workflow not found: "++name; 
           Just f -> do

             let stat1= stat0{index=0,wfName= name,versions=[v]} ::Stat a
             let key= keyResource stat1 
             (vn, stat, found) <- do 
                   wxs <- unsafeIOtoWF $ getResource $ (Workflows undefined :: Stat a)
                   case wxs of
                    Nothing -> return (v,stat1, False)
                    Just (Workflows xs) -> 
                     case find (\(_,s)-> s == key) xs of  
                               
                        Just (key1, oldkey1) -> do 
                                -- already in course
                                mst <- unsafeIOtoWF $ getResource stat0{wfName= drop lengthPrefix key1}
                                case mst of
                                  Nothing -> error $ "no stat for key. " ++ key
                                  Just s@Stat{versions=(a:_)} -> return (a,s{index=0,recover=True}, True)  -- the last value
                        Nothing -> return (v, stat1, False)
             -- insert it in the running workflow list
             when (found == False) $ 
               let addWF [Nothing] = [Workflows [(key,key)],stat]
                   addWF [Just (Workflows xs)]=  [Workflows ((key,key):xs),stat]
                             
               in unsafeIOtoWF $ withResources [(Workflows undefined :: Stat a)] addWF 
                                     
             runWF name f vn stat  -- `debug` (serialize stat)
                



  restartWorkflows :: (IResource  a, Serialize a) =>  WorkflowList IO a -> IO ()
  restartWorkflows map = do
          unsafeIOtoWF  (registerType  :: IO (Control.Workflow.Stat a))
          mw <- getResource ((Workflows undefined ) :: Stat a) 
          case mw of
            Nothing -> return ()
            Just (Workflows all) -> mapM_ start all
          where
            start :: (String, String) -> IO ()
            start (key,_)= do
              let name= let [init,end]= elemIndices '#' key 
                            
                            rest= drop (init + 1) key
                        in take (end-init -1) rest    
              case  lookup name map of
                 Just f  -> do
                          let st0= Key $ (defPath (stat0 :: Stat a))++key
                          mst <- getDResource $ IDynamic st0
                          case mst of
                           Nothing -> error $ "getResource: not found "++ keyResource st0 
                           Just (idyn) -> do
                             let st = fromIDyn idyn :: Stat a
                             forkIO $ runWF key f (head $ versions st) st >> return ()
                             return ()
                 Nothing -> error $ "workflow not found: "++ name

 
  
  runWF :: (Monad m,IResource  a, Serialize a) => String ->( a -> Workflow m a) ->  a -> (Stat a) -> m  a
  runWF name f v s=do
 
           (s', v')  <-  st (f v) $ s   
           
           let  key= keyResource s'
           
           let  delWF [Nothing] = error $ " Workflow list not found: " 
                delWF [Just d]= let Workflows xs= (fromIDyn d :: Stat a) in
                    case lookup key xs of  
                        Nothing -> error $"runWF not found state for key: "++ key      
                        Just oldkey ->
                          (map (Delete . toIDyn) $ versions s')  ++    -- delete all intermediate objects generated                                                              
                          [Insert . toIDyn  $ (Workflows (xs \\ [(key,oldkey)]) ::Stat a)
                          ,Delete $ toIDyn s'] 

                                      
           unsafeIOtoWF $ withDResourcesID  [toIDyn $ (Workflows undefined :: Stat a)]  delWF 
           when (sync s) $ unsafeIOtoWF $ syncCache 
           return v'

  -- switch on and off syncronous write for each step (default is syncronous)
  -- for very fast steps, asyncronous is better.h
  -- when TCache is used for other purposes, is better to define the cache policy direct
  syncWrite:: (Monad m , IResource a)=> Bool -> Int  ->  Int ->  WF m (Stat a) ()
  syncWrite bool time maxsize= WF(\s ->do
                when (bool== False) $ do
                    unsafeIOtoWF $ clearSyncCacheProc  time defaultCheck maxsize
                    return ()
                return (s{ sync= bool},()))


instance (IResource  a,Serialize a,Typeable a) => Workflow_ a



--  return the result of the previous step
getStep :: Monad m => Int -> Workflow m a
getStep i=  WF(\s ->do 
                let  stat= state s 
                return (s, if i > 0 && i <= stat then versions s !! (stat - i) 
                           else if i < 0 && i >= -stat then versions s !! (stat +i) 
                           else error "getStep: wrong index")
                )
                
-- get all the step results
getAll :: Monad m => WF m (Stat a) [a]
getAll =  WF(\s ->return (s, take (state s) $ versions s))

{-

exec :: (Monad m) =>  WF m s a -> s -> m a
exec (WF f) s = do (s', x) <- f s
                   return x 

run :: (Monad m,Monad (WF m s)) =>  (a -> WF m s a) -> s -> a -> m a
run f s a= exec (f a) s

-}

     
--------- event handling--------------     
reference x=do
             mv <- getDTVars [x ]
             case mv of
               [Nothing] -> do 
                       insertDResources [x]
                       reference x
             
               [Just cl] -> return cl

type Filter a= (a -> Bool)
-- wait until a object (with a certaing key=keyResource x) meet a certain condition (useful to check external actions ) 
-- a --a -> WF m (Stat a)  a
--waitFor :: (IResource a, RefSerialize a) =>  Filter a -> a -> WF IO (Stat a) a
--waitFor filter x= (step $  waitFor1 filter) x where
--NOTE if you Delete the object from te cache, waitFor will no longuer work
waitFor ::  (IResource a, Serialize a, Typeable a) => Filter a -> a -> IO a
waitFor  filter x=  do
        tv <- reference (toIDyn $  x) 
        atomically $ do      
                dyn  <- readTVar tv
                let  x= fromIDyn dyn
                case filter x   
                 of 
                        False -> retry
                        True  -> return x 
                          


           
waitUntil:: Integer -> IO()
waitUntil t= do
                threadDelayInteger $ delay* 1000000 
                
                print ("execution at tnow="++show tnow++" ,t="++show t)
                
        where
        tnow= n where TOD n _ = unsafePerformIO getClockTime

        delay | t-tnow >0 = t-tnow 
                | otherwise  = 0
                
        threadDelayInteger:: Integer -> IO()
        threadDelayInteger time| time < imaxInt = threadDelay $ fromIntegral time
                                | otherwise=do  threadDelay maxInt
                                                threadDelayInteger ( time - imaxInt)
                where   maxInt = maxBound :: Int
                        imaxInt= fromIntegral maxInt