{- | Non monadic low level support stuff for the MFlow application server. it implements an scheduler of queued 'Processable' messages that are served according with the source identification and the verb invoked. Ths scheduler executed the appropriate workflow (using the workflow package) the workflow may send additional messages to the source, identified by a 'Token' . The computation state is logged and can be recovered. The message communication is trough polimorphic, monoidal queues. There is no asumption about message codification, so instantiations of this scheduler for many different infrastructures is possible. "MFlow.Hack" is an instantiation for the Hack interface in a Web context. In order to manage resources, the serving process may die after a timeout. as well as the logged state, usually, after a longer timeout . All these details are hidden in the monad of "MFlow.Forms" that provides an higuer level interface. Altroug fragments streaming 'sendFragment' 'sendEndFragment' are only provided at this level. 'stateless' and 'transient' serving processes are possible. `stateless` are request-response with no intermediate messaging dialog. `transient` processes have no persistent state, so they restart anew after a timeout or a crash. -} {-# LANGUAGE DeriveDataTypeable, UndecidableInstances ,ExistentialQuantification, MultiParamTypeClasses ,FunctionalDependencies ,TypeSynonymInstances ,FlexibleInstances ,FlexibleContexts #-} module MFlow ( Params, getParam1, Req(..), Resp(..), Workflow, HttpData(..),Processable(..), ConvertTo(..), Token(..), getToken, Error(..), ProcList ,flushRec, receive, receiveReq, receiveReqTimeout, send, sendFlush, sendFragment, sendEndFragment ,msgScheduler, addMessageFlows,getMessageFlows, transient, stateless ) where import Control.Concurrent.STM import Control.Concurrent.STM.TChan import GHC.Conc(unsafeIOToSTM) import Data.Typeable import Data.Maybe(isJust, isNothing, fromMaybe, fromJust) import Data.List(isPrefixOf, elem , span, (\\)) import Control.Monad(when) import Data.Monoid import Control.Concurrent(forkIO,threadDelay,killThread, myThreadId, ThreadId) import Control.Concurrent.MVar import Unsafe.Coerce import System.IO.Unsafe import Data.TCache.DefaultPersistence import Data.ByteString.Lazy.Char8(pack, unpack) import qualified Data.Map as M import Control.Workflow.Text import MFlow.Cookies import Debug.Trace (!>)= flip trace data HttpData a= HttpData [Cookie] a deriving Typeable -- | List of (wfname, workflow) pairs, to be scheduled depending on the message's pwfname type ProcList = WorkflowList IO Token () data Req = forall a.( Processable a,Typeable a)=> Req a deriving Typeable type Params = [(String,String)] class Processable a where pwfname :: a -> String puser :: a -> String pind :: a -> String getParams :: a -> Params -- getServer ::a -> String -- getPath :: a -> String -- getPort :: a -> Int instance Processable Req where pwfname (Req x)= pwfname x puser (Req x)= puser x pind (Req x)= pind x getParams (Req x)= getParams x -- getServer (Req x)= getServer x -- getPath (Req x)= getPath x -- getPort (Req x)= getPort x data Resp = forall a c.( Typeable a,Typeable c, Monoid c, ConvertTo a c)=> Fragm a | forall a c.( Typeable a,Typeable c, Monoid c, ConvertTo a c)=> EndFragm a | forall a c.( Typeable a,Typeable c, ConvertTo a c) => Resp a --instance Typeable a => Typeable (Workflow IO a) where -- typeOf = \_ -> mkTyConApp (mkTyCon "Control.Workflow.Workflow IO") [Data.Typeable.typeOf (undefined ::a)] data Token = Token{twfname,tuser, tind :: String , q :: TChan Req, qr :: TChan Resp} deriving Typeable {-idToken (Token _ _ n)= n instance IResource Token where keyResource (Token w u i _ _ )= u ++ "#" ++ w ++ "#" ++ i serialize t = "Token " ++ keyResource t deserialize ('T':'o':'k':'e':'n':' ':str) = Token w u i undef undef where (w,r) = span (/= '#') str (u,_:i)= span (/= '#') $ tail r undef= error "deserialize for Token undefined" -} instance Indexable Token where key (Token w u i _ _ )= u ++ "#" ++ w ++ "#" ++ i instance Show Token where show t = "Token " ++ key t instance Read Token where readsPrec _ ('T':'o':'k':'e':'n':' ':str1) = [(Token w u i (newChan 0) (newChan 0), tail str2)] where (str,str2)= span(/=' ') str (w,r) = span (/= '#') str (u,_:i)= span (/= '#') $ tail r newChan _= unsafePerformIO newTChanIO -- undef= error "deserialize for Token undefined" readsPrec _ str= error $ "parse error in Token read from: "++ str instance Serializable Token where serialize= pack . show deserialize= read . unpack iorefqmap= unsafePerformIO . newMVar $ M.empty getToken msg= do qmap <- readMVar iorefqmap let u= puser msg ; w= pwfname msg ; i=pind msg let mqs = M.lookup ( i ++ w ++ u) qmap (q,qr) <- case mqs of Nothing -> do q <- atomically $ newTChan -- `debug` (i++w++u) qr <- atomically $ newTChan let qs= (q,qr) modifyMVar_ iorefqmap $ \ map -> return $ M.insert ( i ++ w ++ u) qs map return qs Just qs -> return qs return (Token w u i q qr ) --`debug1` "returning getToken" {- instance (Monad m, Show a) => Traceable (Workflow m a) where debugf iox str = do x <- iox return $ debug x (str++" => Workflow "++ show x) -} -- | send a complete response send :: (Typeable a, Typeable b, ConvertTo a b) => Token -> a -> IO() send (Token _ _ _ queue qresp) msg= atomically $ do writeTChan qresp $ Resp msg sendFlush t msg= flushRec t >> send t msg -- | send a response fragment. Useful for streaming. the last packet must sent trough 'send' sendFragment :: ( Typeable a, Typeable b, Monoid b, ConvertTo a b) => Token -> a -> IO() sendFragment (Token _ _ _ _ qresp) msg= atomically $ writeTChan qresp $ Fragm msg sendEndFragment :: ( Typeable a, Typeable b, Monoid b, ConvertTo a b) => Token -> a -> IO() sendEndFragment (Token _ _ _ _ qresp ) msg= atomically $ writeTChan qresp $ EndFragm msg --emptyReceive (Token queue _ _)= emptyQueue queue receive :: (Processable a, Typeable a) => Token -> IO a receive t= receiveReq t >>= return . fromReq flushRec t@(Token _ _ _ queue _)= do empty <- atomically $ isEmptyTChan queue if empty then return() else atomically(readTChan queue) >> flushRec t receiveReq :: Token -> IO Req receiveReq (Token _ _ _ queue _)= atomically $ readTChan queue fromReq :: (Processable a, Typeable a) => Req -> a fromReq (Req x) = x' where x'= case cast x of Nothing -> error $ "receive: received type: "++ show (typeOf x) ++ " does not match the desired type:" ++ show (typeOf x') Just y -> y receiveReqTimeout :: Int -> Integer -> Token -> IO Req receiveReqTimeout 0 0 t= receiveReq t receiveReqTimeout time time2 t@(Token _ _ _ queue qresp)=do let id= twfname t ++ "#" ++key t flag <- transientTimeout time r <- atomically $ (readTChan queue >>= return . Just ) `orElse` (waitUntilSTM flag >> return Nothing) case r of Just r@(Req v) -> return r Nothing -> do clearRunningFlag id !> "killed" forkIO $ sessionTimeout (fromIntegral time2- time) id throw Timeout where sessionTimeout t id= do --this thread becomes now dedicated to keep the second timeout before deleting the state flag <- transientTimeout t r <- atomically $ (waitWFActive id >> return True) `orElse` (waitUntilSTM flag >> return False ) -- or timeout case r of False -> delWF1 id !> "state deleted" True -> return () -- thread has been reactivated transientTimeout 0= atomically $ newTVar False transientTimeout t= do flag <- atomically $ newTVar False forkIO $ threadDelay (t * 1000000) >> atomically (writeTVar flag True) >> myThreadId >>= killThread return flag delMsgHistory t = do let qnme=key t let statKey= twfname t ++ "#" ++ qnme -- !> "wf" --let qnme= keyWF wfname t delWFHistory1 statKey -- `debug` "delWFHistory" where u= undefined -- ! to add a simple monadic computation of type (a -> IO b) to the list -- of the scheduler stateless :: ( Typeable a, Processable a, Typeable b , ConvertTo b c, Typeable c) => (a -> IO b) -> (Token -> Workflow IO ()) stateless f = transient $ \tk -> receive tk >>= f >>= send tk -- | to add a monadic computation that send and receive messages, but does -- not store its state in permanent storage. transient :: (Token -> IO ()) -> (Token -> Workflow IO ()) transient f= unsafeIOtoWF . f -- WF(\s -> f t>>= \x-> return (s, x) ) _messageFlows :: MVar ProcList _messageFlows= unsafePerformIO $ newMVar [] -- [(String,Token -> Workflow IO ())]) addMessageFlows wfs= modifyMVar_ _messageFlows(\ms -> return $ ms ++ wfs) getMessageFlows = readMVar _messageFlows class ConvertTo a b | a -> b where convert :: a -> b --tellToWF :: (Typeable a, Typeable c, Processable a) => Token -> a -> IO c tellToWF (Token _ _ _ queue qresp ) msg = do atomically $ writeTChan queue $ Req msg -- `debug` ("tell wf="++wf) m <- atomically $ readTChan qresp case m of Resp r -> return . cast1 $ convert r Fragm r -> do result <- getStream $ convert r return $ cast1 result where cast1 :: (Typeable a, Typeable b) => a -> b cast1 x= y where y= case cast x of Nothing -> error $ "cast error: " ++ show (typeOf x) ++ " to " ++ show (typeOf y) Just y -> y getStream :: (Typeable a, Monoid a) => a -> IO a getStream r = do mr <- atomically $ readTChan qresp case mr of Resp _ -> error "\"send\" used instead of \"sendFragment\" or \"sendEndFragment\"" Fragm h -> do rest <- unsafeInterleaveIO $ getStream ( convert h) let result= mappend r (cast1 rest) return (cast1 result ) EndFragm h -> do let result= mappend r $ cast1 (convert h) return (cast1 result) data Error= Error String deriving (Read, Show, Typeable) instance Indexable Error where key _= error "Idexablew instance for Error" msgScheduler :: (Processable a, Typeable a, ConvertTo Error c, Typeable c) => a -> ProcList -> IO (c, ThreadId) msgScheduler x wfs = do token <- getToken x th <- startMessageFlow (pwfname x) token wfs r<- tellToWF token x return (r,th) where startMessageFlow wfname token wfs= forkIO $ do r <- startWF wfname token wfs -- `debug`( "init wf " ++ wfname) case r of Left NotFound -> error ( "procedure not found: "++ wfname) Left AlreadyRunning -> return () -- `debug` ("already Running " ++ wfname) Left Timeout -> return () -- `debug` "Timeout in webScheduler" Left (Exception e)-> send token $ Error (show e) -- `debug` ("WF error: "++ show e) Right _ -> do delMsgHistory token; return () -- `debug` ("finished " ++ wfname) getParam1 :: ( Typeable a, Read a) => String -> Params -> Maybe a getParam1 str req= r where r= case lookup str req of Just x -> maybeRead x Nothing -> Nothing maybeRead str= if typeOf r == (typeOf $ Just "") then Just $ unsafeCoerce str else case readsPrec 0 str of [(x,"")] -> Just x _ -> Nothing