{-# LANGUAGE   DeriveDataTypeable
             , ScopedTypeVariables
             , FlexibleContexts

              #-}
{-# OPTIONS -IControl/Workflow       #-}

{- | This module contains monadic combinators that express some workflow patterns.
see the docAprobal.hs example included in the package

Here  the constraint `DynSerializer w r a` is equivalent to  `Data.Refserialize a`
This version permits optimal (de)serialization if you store in the queue different versions of largue structures, for
example, documents.  You must  define the right RefSerialize instance however.
See an example in docAprobal.hs incuded in the paclkage.
Alternatively you can use  Data.Binary serlializatiion with Control.Workflow.Binary.Patterns

EXAMPLE:

This fragment below describes the approbal procedure of a document.
First the document reference is sent to a list of bosses trough a queue.
ithey return a boolean in a  return queue ( askUser)
the booleans are summed up according with a monoid instance (sumUp)

if the resullt is false, the correctWF workflow is executed
If the result is True, the pipeline continues to the next stage  (checkValidated)

the next stage is the same process with a new list of users (superbosses).
This time, there is a timeout of 7 days. the result of the users that voted is summed
up according with the same monoid instance

if the result is true the document is added to the persistent list of approbed documents
if the result is false, the document is added to the persistent list of rejectec documents (checlkValidated1)


@docApprobal :: Document -> Workflow IO ()
docApprobal doc =  `getWFRef` \>>= docApprobal1


docApprobal1 rdoc=
    return True \>>=
    log \"requesting approbal from bosses\" \>>=
    `sumUp` 0 (map (askUser doc rdoc) bosses)  \>>=
    checkValidated \>>=
    log \"requesting approbal from superbosses or timeout\"  \>>=
    `sumUp` (7*60*60*24) (map(askUser doc rdoc) superbosses) \>>=
    checkValidated1


askUser _ _ user False = return False
askUser doc rdoc user  True =  do
      `step` $ `push` (quser user) rdoc
      `logWF` ("wait for any response from the user: " ++ user)
      `step` . `pop` $ qdocApprobal (title doc)

log txt x = `logWF` txt >> return x

checkValidated :: Bool -> `Workflow` IO Bool
checkValidated  val =
      case val of
        False -> correctWF (title doc) rdoc >> return False
        _     -> return True


checkValidated1 :: Bool -> Workflow IO ()
checkValidated1 val = step $ do
      case  val of
        False -> `push` qrejected doc
        _     -> `push` qapproved doc
      mapM (\u ->deleteFromQueue (quser u) rdoc) superbosses@

-}

module Control.Workflow.Patterns(
-- * Low level combinators
split, merge, select,
-- * High level conbinators
vote, sumUp, Select(..)
) where
import Control.Concurrent.STM
import Data.Monoid
import Control.Concurrent.MonadIO
import qualified Control.Monad.CatchIO as CMC
import Control.Workflow.Stat
import Control.Workflow
import Data.Typeable
import Prelude hiding (catch)
import Control.Monad(when)
import Control.Exception.Extensible (Exception)
import Data.RefSerialize
import Control.Workflow.Stat
import Debug.Trace
import Data.TCache

a !> b = trace b a

data ActionWF a= ActionWF (WFRef(Maybe a))  (WFRef (String, Bool))

-- | spawn a list of independent workflows (the first argument) with a seed value (the second argument).
-- Their results are reduced by `merge` or `select`
split :: ( Typeable b
           , Serialize b
           , HasFork io
           , CMC.MonadCatchIO io)
          => [a -> Workflow io b] -> a  -> Workflow io [ActionWF b]
split actions a = mapM (\ac ->
     do
         mv <- newWFRef Nothing
         fork  (ac a >>= step . liftIO . atomically . writeWFRef mv . Just)
         r <- getWFRef
         return  $ ActionWF mv  r)

     actions



-- | wait for the results and apply the cond to produce a single output in the Workflow monad
merge :: ( MonadIO io
           , Typeable a
           , Typeable b
           , Serialize a, Serialize b)
           => ([a] -> io b) -> [ActionWF a] -> Workflow io b
merge  cond actions= mapM (\(ActionWF mv _) -> readWFRef1 mv ) actions >>= step . cond

readWFRef1 :: ( MonadIO io
              , Serialize a
              , Typeable a)
              => WFRef (Maybe a) -> io  a
readWFRef1 mv = liftIO . atomically $ do
      v <- readWFRef mv
      case v of
       Just(Just v)  -> return v
       Just Nothing  -> retry
       Nothing -> error $ "readWFRef1: workflow not found "++ show mv


data Select
            = Select
            | Discard
            | FinishDiscard
            | FinishSelect
            deriving(Typeable, Read, Show)

instance Exception Select

-- | select the outputs of the workflows produced by `split` constrained within a timeout.
-- The check filter, can select , discard or finish the entire computation before
-- the timeout is reached. When the computation finalizes, it stop all
-- the pending workflows and return the list of selected outputs
-- the timeout is in seconds and is no limited to Int values, so it can last for years.
--
-- This is necessary for the modelization of real-life institutional cycles such are political elections
-- timeout of 0 means no timeout.
select ::
         ( Serialize a
         , Serialize [a]
         , Typeable a
         , HasFork io
         , CMC.MonadCatchIO io)
         => Integer
         -> (a ->   io Select)
         -> [ActionWF a]
         -> Workflow io [a]
select timeout check actions=   do
 res  <- newMVar []
 flag <- getTimeoutFlag timeout
 parent <- myThreadId
 checks <- newEmptyMVar
 count <- newMVar 1
 let process = do
        let check'  (ActionWF ac _) =  do
               r <- readWFRef1 ac
               b <- check r
               case b of
                  Discard -> return ()
                  Select  -> addRes r
                  FinishDiscard -> do
                       throwTo parent FinishDiscard
                  FinishSelect -> do
                       addRes r
                       throwTo parent FinishDiscard

               n <- CMC.block $ do
                     n <- takeMVar count
                     putMVar count (n+1)
                     return n

               if ( n == length actions)
                     then throwTo parent FinishDiscard
                     else return ()

              `CMC.catch` (\(e :: Select) -> throwTo parent e)

        do
             ws <- mapM ( fork . check') actions
             putMVar checks  ws

        liftIO $ atomically $ do
           v <- readTVar flag -- wait fo timeout
           case v of
             False -> retry
             True  -> return ()
        throw FinishDiscard
        where

        addRes r=  CMC.block $ do
            l <- takeMVar  res
            putMVar  res $ r : l

 let killall  = do
       mapM_ (\(ActionWF _ th) -> killWFP th) actions
       ws <- readMVar checks
       liftIO $ mapM_ killThread ws

 stepControl $ CMC.catch   process -- (WF $ \s -> process >>= \ r -> return (s, r))
              (\(e :: Select)-> do
                 readMVar res
                 )
       `CMC.finally`   killall

killWFP r= liftIO $ do
    s <-  atomically $ do
              (s,_)<- readWFRef r >>= justify ("wfSelect " ++ show r)
              writeWFRef r (s, True)
              return s

    killWF  s ()

justify str Nothing = error str
justify _ (Just x) = return x

-- | spawn a list of workflows and reduces the results according with the comp parameter within a given timeout
--
-- @
--   vote timeout actions comp x=
--        split actions x >>= select timeout (const $ return Select)  >>=  comp
-- @
vote
      :: ( Serialize b
         , Serialize [b]
         , Typeable b
         , HasFork io
         , CMC.MonadCatchIO io)
      => Integer
      -> [a -> Workflow io  b]
      -> ([b] -> Workflow io c)
      -> a
      -> Workflow io c
vote timeout actions comp x=
  split actions x >>= select timeout (const $ return Select)  >>=  comp


-- | sum the outputs of a list of workflows  according with its monoid definition
--
-- @ sumUp timeout actions = vote timeout actions (return . mconcat) @
sumUp
  :: ( Serialize b
     , Serialize [b]
     , Typeable b
     , Monoid b
     , HasFork io
     , CMC.MonadCatchIO io)
     => Integer
     -> [a -> Workflow io b]
     -> a
     -> Workflow io b
sumUp timeout actions = vote timeout actions (return . mconcat)