module MFlow (
Params, Workflow, HttpData(..),Processable(..), ToHttpData(..)
, Token(..), ProcList
,flushRec, receive, receiveReq, receiveReqTimeout, send, sendFlush, sendFragment
, sendEndFragment
,addMessageFlows,getMessageFlows, transient, stateless,anonymous
,noScript,hlog, setNotFoundResponse,getNotFoundResponse,
btag, bhtml, bbody,Attribs
,addTokenToList,deleteTokenInList, msgScheduler)
where
import Control.Concurrent.MVar
import Data.IORef
import GHC.Conc(unsafeIOToSTM)
import Data.Typeable
import Data.Maybe(isJust, isNothing, fromMaybe, fromJust)
import Data.Char(isSeparator)
import Data.List(isPrefixOf, elem , span, (\\))
import Control.Monad(when)
import Data.Monoid
import Control.Concurrent(forkIO,threadDelay,killThread, myThreadId, ThreadId)
import Unsafe.Coerce
import System.IO.Unsafe
import Data.TCache.DefaultPersistence hiding(Indexable(..))
import Data.ByteString.Lazy.Char8 as B (ByteString, concat,pack, unpack,empty,append,cons,fromChunks)
import qualified Data.Map as M
import System.IO
import System.Time
import Control.Workflow
import MFlow.Cookies
import Control.Monad.Trans
data HttpData = HttpData Params [Cookie] ByteString | Error WFErrors ByteString deriving (Typeable, Show)
instance ToHttpData HttpData where
toHttpData= id
instance ToHttpData ByteString where
toHttpData bs= HttpData [] [] bs
instance Monoid HttpData where
mempty= HttpData [] [] empty
mappend (HttpData h c s) (HttpData h' c' s')= HttpData (h++h') (c++ c') $ mappend s s'
type ProcList = WorkflowList IO Token ()
data Req = forall a.( Processable a,Typeable a)=> Req a deriving Typeable
type Params = [(String,String)]
class Processable a where
pwfname :: a -> String
puser :: a -> String
pind :: a -> String
getParams :: a -> Params
instance Processable Req where
pwfname (Req x)= pwfname x
puser (Req x)= puser x
pind (Req x)= pind x
getParams (Req x)= getParams x
data Resp = Fragm HttpData
| EndFragm HttpData
| Resp HttpData
anonymous= "anon#"
noScript = "noscript"
data Token = Token{twfname,tuser, tind :: String , q :: MVar Req, qr :: MVar Resp} deriving Typeable
instance Indexable Token where
key (Token w u i _ _ )=
if u== anonymous then u++ i
else u
instance Show Token where
show t = "Token " ++ key t
instance Read Token where
readsPrec _ ('T':'o':'k':'e': 'n':' ':str1)
| anonymous `isPrefixOf` str1= [(Token w anonymous i (newVar 0) (newVar 0), tail str2)]
| otherwise = [(Token w ui "0" (newVar 0) (newVar 0), tail str2)]
where
(ui,str')= span(/='@') str1
i = drop (length anonymous) ui
(w,str2) = span (not . isSeparator) $ tail str'
newVar _= unsafePerformIO $ newEmptyMVar
readsPrec _ str= error $ "parse error in Token read from: "++ str
instance Serializable Token where
serialize = pack . show
deserialize= read . unpack
iorefqmap= unsafePerformIO . newMVar $ M.empty
addTokenToList t@Token{..} =
modifyMVar_ iorefqmap $ \ map ->
return $ M.insert ( tind ++ twfname ++ tuser ) t map
deleteTokenInList t@Token{..} =
modifyMVar_ iorefqmap $ \ map ->
return $ M.delete (tind ++ twfname ++ tuser) map
getToken msg= do
qmap <- readMVar iorefqmap
let u= puser msg ; w= pwfname msg ; i=pind msg
let mqs = M.lookup ( i ++ w ++ u) qmap
case mqs of
Nothing -> do
q <- newEmptyMVar
qr <- newEmptyMVar
let token= Token w u i q qr
addTokenToList token
return token
Just token-> return token
send :: ToHttpData a => Token -> a -> IO()
send (Token _ _ _ queue qresp) msg= do
putMVar qresp . Resp $ toHttpData msg
sendFlush t msg= flushRec t >> send t msg
sendFragment :: ToHttpData a => Token -> a -> IO()
sendFragment (Token _ _ _ _ qresp) msg= putMVar qresp . Fragm $ toHttpData msg
sendEndFragment :: ToHttpData a => Token -> a -> IO()
sendEndFragment (Token _ _ _ _ qresp ) msg= putMVar qresp . EndFragm $ toHttpData msg
receive :: Typeable a => Token -> IO a
receive t= receiveReq t >>= return . fromReq
flushRec t@(Token _ _ _ queue _)= do
empty <- isEmptyMVar queue
when (not empty) $ takeMVar queue >> return ()
receiveReq :: Token -> IO Req
receiveReq (Token _ _ _ queue _)= readMVar queue
fromReq :: Typeable a => Req -> a
fromReq (Req x) = x' where
x'= case cast x of
Nothing -> error $ "receive: received type: "++ show (typeOf x) ++ " does not match the desired type:" ++ show (typeOf x')
Just y -> y
receiveReqTimeout :: Int
-> Integer
-> Token
-> IO Req
receiveReqTimeout 0 0 t= receiveReq t
receiveReqTimeout time time2 t=
let id= keyWF (twfname t) t in withKillTimeout id time time2 (receiveReq t)
delMsgHistory t = do
let statKey= keyWF (twfname t) t
delWFHistory1 statKey
stateless :: (ToHttpData b) => (Params -> IO b) -> (Token -> Workflow IO ())
stateless f = transient $ \tk ->do
req <- receiveReq tk
resp <- f (getParams req)
sendFlush tk resp
transient :: (Token -> IO ()) -> (Token -> Workflow IO ())
transient f= unsafeIOtoWF . f
_messageFlows :: MVar (M.Map String (Token-> Workflow IO ()))
_messageFlows= unsafePerformIO $ newMVar M.empty
addMessageFlows wfs= modifyMVar_ _messageFlows(\ms -> return $ M.union ms (M.fromList $ map flt wfs))
where flt ("",f)= (noScript,f)
flt e= e
getMessageFlows = readMVar _messageFlows
class ToHttpData a where
toHttpData :: a -> HttpData
tellToWF (Token _ _ _ queue qresp ) msg = do
putMVar queue $ Req msg
m <- takeMVar qresp
case m of
Resp r -> return r
Fragm r -> do
result <- getStream r
return result
where
getStream r = do
mr <- takeMVar qresp
case mr of
Resp _ -> error "\"send\" used instead of \"sendFragment\" or \"sendEndFragment\""
Fragm h -> do
rest <- unsafeInterleaveIO $ getStream h
let result= mappend r rest
return result
EndFragm h -> do
let result= mappend r h
return result
instance ToHttpData String where
toHttpData= HttpData [] [] . pack
msgScheduler
:: (Typeable a,Processable a)
=> a -> IO (HttpData, ThreadId)
msgScheduler x = do
token <- getToken x
th <- startMessageFlow (pwfname x) token
r<- tellToWF token x
return (r,th)
where
startMessageFlow wfname token =
forkIO $ do
wfs <- getMessageFlows
r <- startWF wfname token wfs
case r of
Left NotFound -> sendFlush token (Error NotFound $ "Not found: " <> pack wfname)
Left AlreadyRunning -> return ()
Left Timeout -> return()
Left (WFException e)-> do
let user= key token
print e
logError user wfname e
moveState wfname token token{tuser= "error/"++tuser token}
case user of
"admin" -> sendFlush token (show e)
_ -> sendFlush token ("An Error has ocurred." :: ByteString)
Right _ -> do
delMsgHistory token; return ()
logError u wf e= do
hSeek hlog SeekFromEnd 0
TOD t _ <- getClockTime
hPutStrLn hlog (","++show [u,show t,wf,e]) >> hFlush hlog
logFileName= "errlog"
hlog= unsafePerformIO $ openFile logFileName ReadWriteMode
defNotFoundResponse msg=
"<html><h4>Error 404: Page not found or error ocurred:</h4><h3>" <> msg <>
"</h3><br/>" <> opts <> "<br/><a href=\"/\" >press here to go home</a></html>"
where
paths= Prelude.map B.pack . M.keys $ unsafePerformIO getMessageFlows
opts= "options: " <> B.concat (Prelude.map (\s ->
"<a href=\""<> s <>"\">"<> s <>"</a>, ") paths)
notFoundResponse= unsafePerformIO $ newIORef defNotFoundResponse
setNotFoundResponse f= liftIO $ writeIORef notFoundResponse f
getNotFoundResponse= unsafePerformIO $ readIORef notFoundResponse
type Attribs= [(String,String)]
btag :: String -> Attribs -> ByteString -> ByteString
btag t rs v= "<" `append` pt `append` attrs rs `append` ">" `append` v `append`"</"`append` pt `append` ">"
where
pt= pack t
attrs []= B.empty
attrs rs= pack $ concatMap(\(n,v) -> (' ' : n) ++ "=" ++ v ) rs
bhtml :: Attribs -> ByteString -> ByteString
bhtml ats v= btag "html" ats v
bbody :: Attribs -> ByteString -> ByteString
bbody ats v= btag "body" ats v