module MFlow (
Params, getParam1, Req(..), Resp(..), Workflow, HttpData(..),Processable(..), ConvertTo(..), Token(..), getToken, Error(..), ProcList
,flushRec, receive, receiveReq, receiveReqTimeout, send, sendFlush, sendFragment, sendEndFragment
,msgScheduler, addMessageFlows,getMessageFlows, transient, stateless
)
where
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import GHC.Conc(unsafeIOToSTM)
import Data.Typeable
import Data.Maybe(isJust, isNothing, fromMaybe, fromJust)
import Data.List(isPrefixOf, elem , span, (\\))
import Control.Monad(when)
import Data.Monoid
import Control.Concurrent(forkIO,threadDelay,killThread, myThreadId, ThreadId)
import Control.Concurrent.MVar
import Unsafe.Coerce
import System.IO.Unsafe
import Data.TCache.DefaultPersistence
import Data.ByteString.Lazy.Char8(pack, unpack)
import qualified Data.Map as M
import Control.Workflow
import MFlow.Cookies
import Debug.Trace
(!>)= flip trace
data HttpData a= HttpData [Cookie] a deriving Typeable
type ProcList = WorkflowList IO Token ()
data Req = forall a.( Processable a,Typeable a)=> Req a deriving Typeable
type Params = [(String,String)]
class Processable a where
pwfname :: a -> String
puser :: a -> String
pind :: a -> String
getParams :: a -> Params
instance Processable Req where
pwfname (Req x)= pwfname x
puser (Req x)= puser x
pind (Req x)= pind x
getParams (Req x)= getParams x
data Resp = forall a c.( Typeable a,Typeable c, Monoid c, ConvertTo a c)=> Fragm a
| forall a c.( Typeable a,Typeable c, Monoid c, ConvertTo a c)=> EndFragm a
| forall a c.( Typeable a,Typeable c, ConvertTo a c) => Resp a
data Token = Token{twfname,tuser, tind :: String , q :: TChan Req, qr :: TChan Resp} deriving Typeable
instance Indexable Token where
key (Token w u i _ _ )= u ++ "#" ++ w ++ "#" ++ i
instance Show Token where
show t = "Token " ++ key t
instance Read Token where
readsPrec _ ('T':'o':'k':'e':'n':' ':str1) = [(Token w u i (newChan 0) (newChan 0), tail str2)]
where
(str,str2)= span(/=' ') str
(w,r) = span (/= '#') str
(u,_:i)= span (/= '#') $ tail r
newChan _= unsafePerformIO newTChanIO
readsPrec _ str= error $ "parse error in Token read from: "++ str
instance Serializable Token where
serialize= pack . show
deserialize= read . unpack
iorefqmap= unsafePerformIO . newMVar $ M.empty
getToken msg= do
qmap <- readMVar iorefqmap
let u= puser msg ; w= pwfname msg ; i=pind msg
let mqs = M.lookup ( i ++ w ++ u) qmap
(q,qr) <- case mqs of
Nothing -> do
q <- atomically $ newTChan
qr <- atomically $ newTChan
let qs= (q,qr)
modifyMVar_ iorefqmap $ \ map -> return $ M.insert ( i ++ w ++ u) qs map
return qs
Just qs -> return qs
return (Token w u i q qr )
send :: (Typeable a, Typeable b, ConvertTo a b) => Token -> a -> IO()
send (Token _ _ _ queue qresp) msg= atomically $ do
writeTChan qresp $ Resp msg
sendFlush t msg= flushRec t >> send t msg
sendFragment :: ( Typeable a, Typeable b, Monoid b, ConvertTo a b) => Token -> a -> IO()
sendFragment (Token _ _ _ _ qresp) msg= atomically $ writeTChan qresp $ Fragm msg
sendEndFragment :: ( Typeable a, Typeable b, Monoid b, ConvertTo a b) => Token -> a -> IO()
sendEndFragment (Token _ _ _ _ qresp ) msg= atomically $ writeTChan qresp $ EndFragm msg
receive :: (Processable a, Typeable a) => Token -> IO a
receive t= receiveReq t >>= return . fromReq
flushRec t@(Token _ _ _ queue _)= do
empty <- atomically $ isEmptyTChan queue
if empty then return() else atomically(readTChan queue) >> flushRec t
receiveReq :: Token -> IO Req
receiveReq (Token _ _ _ queue _)= atomically $ readTChan queue
fromReq :: (Processable a, Typeable a) => Req -> a
fromReq (Req x) = x' where
x'= case cast x of
Nothing -> error $ "receive: received type: "++ show (typeOf x) ++ " does not match the desired type:" ++ show (typeOf x')
Just y -> y
receiveReqTimeout :: Int
-> Integer
-> Token
-> IO Req
receiveReqTimeout 0 0 t= receiveReq t
receiveReqTimeout time time2 t@(Token _ _ _ queue qresp)=do
let id= twfname t ++ "#" ++key t
flag <- transientTimeout time
r <- atomically $ (readTChan queue >>= return . Just )
`orElse` (waitUntilSTM flag >> return Nothing)
case r of
Just r@(Req v) -> return r
Nothing -> do
clearRunningFlag id !> "killed"
forkIO $ sessionTimeout (fromIntegral time2 time) id
throw Timeout
where
sessionTimeout t id= do
flag <- transientTimeout t
r <- atomically $ (waitWFActive id >> return True)
`orElse`
(waitUntilSTM flag >> return False )
case r of
False -> delWF1 id !> "state deleted"
True -> return ()
transientTimeout 0= atomically $ newTVar False
transientTimeout t= do
flag <- atomically $ newTVar False
forkIO $ threadDelay (t * 1000000) >> atomically (writeTVar flag True) >> myThreadId >>= killThread
return flag
delMsgHistory t = do
let qnme=key t
let statKey= twfname t ++ "#" ++ qnme
delWFHistory1 statKey
where
u= undefined
stateless :: ( Typeable a, Processable a, Typeable b
, ConvertTo b c, Typeable c)
=> (a -> IO b) -> (Token -> Workflow IO ())
stateless f = transient $ \tk -> receive tk >>= f >>= send tk
transient :: (Token -> IO ()) -> (Token -> Workflow IO ())
transient f= unsafeIOtoWF . f
_messageFlows :: MVar ProcList
_messageFlows= unsafePerformIO $ newMVar []
addMessageFlows wfs= modifyMVar_ _messageFlows(\ms -> return $ ms ++ wfs)
getMessageFlows = readMVar _messageFlows
class ConvertTo a b | a -> b where
convert :: a -> b
tellToWF (Token _ _ _ queue qresp ) msg = do
atomically $ writeTChan queue $ Req msg
m <- atomically $ readTChan qresp
case m of
Resp r -> return . cast1 $ convert r
Fragm r -> do
result <- getStream $ convert r
return $ cast1 result
where
cast1 :: (Typeable a, Typeable b) => a -> b
cast1 x= y where
y= case cast x of
Nothing -> error $ "cast error: " ++ show (typeOf x) ++ " to " ++ show (typeOf y)
Just y -> y
getStream :: (Typeable a, Monoid a) => a -> IO a
getStream r = do
mr <- atomically $ readTChan qresp
case mr of
Resp _ -> error "\"send\" used instead of \"sendFragment\" or \"sendEndFragment\""
Fragm h -> do
rest <- unsafeInterleaveIO $ getStream ( convert h)
let result= mappend r (cast1 rest)
return (cast1 result )
EndFragm h -> do
let result= mappend r $ cast1 (convert h)
return (cast1 result)
data Error= Error String deriving (Read, Show, Typeable)
instance Indexable Error where
key _= error "Idexablew instance for Error"
msgScheduler
:: (Processable a, Typeable a, ConvertTo Error c, Typeable c)
=> a -> ProcList -> IO (c, ThreadId)
msgScheduler x wfs = do
token <- getToken x
th <- startMessageFlow (pwfname x) token wfs
r<- tellToWF token x
return (r,th)
where
startMessageFlow wfname token wfs=
forkIO $ do
r <- startWF wfname token wfs
case r of
Left NotFound -> error ( "procedure not found: "++ wfname)
Left AlreadyRunning -> return ()
Left Timeout -> return ()
Left (Exception e)-> send token $ Error (show e)
Right _ -> do delMsgHistory token; return ()
getParam1 :: ( Typeable a, Read a) => String -> Params -> Maybe a
getParam1 str req= r
where
r= case lookup str req of
Just x -> maybeRead x
Nothing -> Nothing
maybeRead str=
if typeOf r == (typeOf $ Just "")
then Just $ unsafeCoerce str
else case readsPrec 0 str of
[(x,"")] -> Just x
_ -> Nothing