{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE Rank2Types #-}
module Transient.Internals where
import Control.Applicative
import Control.Monad.State
import Data.Dynamic
import qualified Data.Map as M
import Data.Monoid
import Debug.Trace
import System.IO.Unsafe
import Unsafe.Coerce
import Control.Exception
import Control.Concurrent
import Control.Concurrent.STM
import System.Mem.StableName
import Data.Maybe
import GHC.Conc
import Data.List
import Data.IORef
import System.Environment
import System.IO (hFlush,stdout)
import System.Exit
{-# INLINE (!>) #-}
(!>) :: Show a => b -> a -> b
(!>) x y= trace (show y) x
infixr 0 !>
data TransIO x = Transient {runTrans :: StateT EventF IO (Maybe x)}
type SData= ()
type EventId= Int
type TransientIO= TransIO
data EventF = forall a b . EventF{meffects :: Effects
,event :: Maybe SData
,xcomp :: TransIO a
,fcomp :: [b -> TransIO b]
,mfData :: M.Map TypeRep SData
,mfSequence :: Int
,threadId :: ThreadId
,freeTh :: Bool
,parent :: Maybe EventF
,children :: TVar[EventF]
,maxThread :: Maybe (IORef Int)
}
deriving Typeable
type Effects= forall a b c.TransIO a -> TransIO a -> (a -> TransIO b)
-> StateIO (StateIO (Maybe c) -> StateIO (Maybe c), Maybe a)
instance MonadState EventF TransIO where
get = Transient $ get >>= return . Just
put x= Transient $ put x >> return (Just ())
state f = Transient $ do
s <- get
let ~(a, s') = f s
put s'
return $ Just a
type StateIO= StateT EventF IO
runTransient :: TransIO x -> IO (Maybe x, EventF)
runTransient t= do
th <- myThreadId
let eventf0= EventF baseEffects Nothing empty [] M.empty 0
th False Nothing (unsafePerformIO $ newTVarIO []) Nothing
runStateT (runTrans t) eventf0
runTransState st x = runStateT (runTrans x) st
getCont :: TransIO EventF
getCont = Transient $ Just <$> get
runCont :: EventF -> StateIO (Maybe a)
runCont (EventF _ _ x fs _ _ _ _ _ _ _)= runTrans $ do
r <- unsafeCoerce x
compose fs r
runCont' cont= runStateT (runCont cont) cont
getContinuations :: StateIO [a -> TransIO b]
getContinuations= do
EventF _ _ _ fs _ _ _ _ _ _ _ <- get
return $ unsafeCoerce fs
compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs
runClosure :: EventF -> StateIO (Maybe a)
runClosure (EventF _ _ x _ _ _ _ _ _ _ _) = unsafeCoerce $ runTrans x
runContinuation :: EventF -> a -> StateIO (Maybe b)
runContinuation (EventF _ _ _ fs _ _ _ _ _ _ _) =
runTrans . (unsafeCoerce $ compose $ fs)
setContinuation :: TransIO a -> (a -> TransIO b) -> [c -> TransIO c] -> StateIO ()
setContinuation b c fs = do
(EventF eff ev _ _ d e f g h i j) <- get
put $ EventF eff ev b ( unsafeCoerce c: fs) d e f g h i j
withContinuation c mx= do
EventF eff ev f1 fs d e f g h i j <- get
put $ EventF eff ev mx ( unsafeCoerce c: fs) d e f g h i j
r <- mx
restoreStack fs
return r
runContinuations :: [a -> TransIO b] -> c -> TransIO d
runContinuations fs x= (compose $ unsafeCoerce fs) x
instance Functor TransIO where
fmap f mx=
do
x <- mx
return $ f x
instance Applicative TransIO where
pure a = Transient . return $ Just a
f <*> g = Transient $ do
rf <- liftIO $ newIORef (Nothing,[])
rg <- liftIO $ newIORef (Nothing,[])
fs <- getContinuations
let
hasWait (_:Wait:_)= True
hasWait _ = False
appf k = Transient $ do
Log rec _ full <- getData `onNothing` return (Log False [] [])
(liftIO $ writeIORef rf (Just k,full))
(x, full2)<- liftIO $ readIORef rg
when (hasWait full ) $
let full'= head full: full2
in (setData $ Log rec full' full')
return $ Just k <*> x
appg x = Transient $ do
Log rec _ full <- getData `onNothing` return (Log False [] [])
liftIO $ writeIORef rg (Just x, full)
(k,full1) <- liftIO $ readIORef rf
when (hasWait full) $
let full'= head full: full1
in (setData $ Log rec full' full')
return $ k <*> Just x
setContinuation f appf fs
k <- runTrans f
was <- getData `onNothing` return NoRemote
when (was == WasParallel) $ setData NoRemote
Log recovery _ full <- getData `onNothing` return (Log False [] [])
if was== WasRemote || (not recovery && was == NoRemote && isNothing k )
then do
restoreStack fs
return Nothing
else do
when (isJust k) $ liftIO $ writeIORef rf (k,full)
setContinuation g appg fs
x <- runTrans g
Log recovery _ full' <- getData `onNothing` return (Log False [] [])
liftIO $ writeIORef rg (x,full')
restoreStack fs
k'' <- if was== WasParallel
then do
(k',_) <- liftIO $ readIORef rf
return k'
else return k
return $ k'' <*> x
restoreStack fs=
modify $ \(EventF eff _ f _ a b c d parent children g1) ->
EventF eff Nothing f fs a b c d parent children g1
readWithErr line=
let [(v,left)] = readsPrec 0 line
in (v `seq` return [(v,left)])
`catch` (\(e::SomeException) ->
error ("read error of " ++ show( typeOf v) ++ " in: "++ "\""++line++"\""))
readsPrec' _= unsafePerformIO . readWithErr
data IDynamic= IDyns String | forall a.(Read a, Show a,Typeable a) => IDynamic a
instance Show IDynamic where
show (IDynamic x)= show $ show x
show (IDyns s)= show s
instance Read IDynamic where
readsPrec n str= map (\(x,s) -> (IDyns x,s)) $ readsPrec' n str
type Recover= Bool
type CurrentPointer= [LogElem]
type LogEntries= [LogElem]
data LogElem= Wait | Exec | Var IDynamic deriving (Read,Show)
data Log= Log Recover CurrentPointer LogEntries deriving Typeable
instance Alternative TransIO where
empty = Transient $ return Nothing
(<|>) = mplus
data RemoteStatus= WasRemote | WasParallel | NoRemote deriving (Typeable, Eq, Show)
instance MonadPlus TransIO where
mzero= empty
mplus x y= Transient $ do
mx <- runTrans x
was <- getData `onNothing` return NoRemote
if was== WasRemote
then return Nothing
else
case mx of
Nothing -> runTrans y
justx -> return justx
stop :: Alternative m => m stopped
stop= empty
class AdditionalOperators m where
(**>) :: m a -> m b -> m b
(<**) :: m a -> m b -> m a
atEnd' ::m a -> m b -> m a
atEnd' = (<**)
(<***) :: m a -> m b -> m a
atEnd :: m a -> m b -> m a
atEnd= (<***)
instance AdditionalOperators TransIO where
(**>) x y= Transient $ do
runTrans x
runTrans y
(<***) ma mb= Transient $ do
fs <- getContinuations
setContinuation ma (\x -> mb >> return x) fs
a <- runTrans ma
runTrans mb
restoreStack fs
return a
(<**) ma mb= Transient $ do
a <- runTrans ma
runTrans mb
return a
infixr 1 <*** , <**, **>
(<|) :: TransIO a -> TransIO b -> TransIO a
(<|) ma mb = Transient $ do
fs <- getContinuations
ref <- liftIO $ newIORef False
setContinuation ma (cont ref ) fs
r <- runTrans ma
restoreStack fs
return r
where
cont ref x= Transient $ do
n <- liftIO $ readIORef ref
if n == True
then return $ Just x
else do liftIO $ writeIORef ref True
runTrans mb
return $ Just x
instance Monoid a => Monoid (TransIO a) where
mappend x y = mappend <$> x <*> y
mempty= return mempty
setEventCont :: TransIO a -> (a -> TransIO b) -> StateIO EventF
setEventCont x f = do
st@(EventF eff e _ fs d n r applic ch rc bs) <- get
let cont= EventF eff e x ( unsafeCoerce f : fs) d n r applic ch rc bs
put cont
return cont
resetEventCont mx _=do
st@(EventF eff e _ fs d n r nr ch rc bs) <- get
let f= \mx -> case mx of
Nothing -> empty
Just x -> (unsafeCoerce $ head fs) x
put $ EventF eff e (f mx) ( tailsafe fs) d n r nr ch rc bs
return id
tailsafe []=[]
tailsafe (x:xs)= xs
{-# INLINE baseEffects #-}
baseEffects :: Effects
baseEffects x x' f' = do
c <-setEventCont x' f'
mk <- runTrans x
t <- resetEventCont mk c
return (t,mk)
instance Monad TransIO where
return = pure
x >>= f = Transient $ do
(t,mk) <- baseEffects x x f
t $ case mk of
Just k -> runTrans (f k)
Nothing -> return Nothing
instance MonadIO TransIO where
liftIO x = Transient $ liftIO x >>= return . Just
waitQSemB sem= atomicModifyIORef sem $ \n -> if n > 0 then(n-1,True) else (n,False)
signalQSemB sem= atomicModifyIORef sem $ \n -> (n + 1,())
threads :: Int -> TransIO a -> TransIO a
threads n proc= Transient $ do
msem <- gets maxThread
sem <- liftIO $ newIORef n
modify $ \s -> s{maxThread= Just sem}
r <- runTrans proc
modify $ \s -> s{maxThread = msem}
return r
oneThread :: TransIO a -> TransientIO a
oneThread comp= do
chs <- liftIO $ newTVarIO []
r <- comp
modify $ \ s -> s{children= chs}
killChilds
return r
showThreads :: TransIO empty
showThreads= do
st' <- gets (fromJust . parent)
liftIO $ showTree 0 st'
stop
where
toplevel st =
case parent st of
Nothing -> st
Just p -> toplevel p
showThreads' n rchs= do
chs <- atomically $ readTVar rchs
mapM_ (showTree n) chs
showTree n ch= do
putStr $ take n $ repeat ' '
print $ threadId ch
showThreads' (n+4) $ children ch
addThreads' :: Int -> TransIO ()
addThreads' n= Transient $ do
msem <- gets maxThread
case msem of
Just sem -> liftIO $ modifyIORef sem $ \n' -> n + n'
Nothing -> do
sem <- liftIO (newIORef n)
modify $ \ s -> s{maxThread= Just sem}
return $ Just ()
addThreads n= Transient $ do
msem <- gets maxThread
case msem of
Nothing -> return ()
Just sem -> liftIO $ modifyIORef sem $ \n' -> if n' > n then n' else n
return $ Just ()
freeThreads :: TransIO a -> TransIO a
freeThreads proc= Transient $ do
st <- get
put st{freeTh= True}
r <- runTrans proc
modify $ \s -> s{freeTh= freeTh st}
return r
hookedThreads :: TransIO a -> TransIO a
hookedThreads proc= Transient $ do
st <- get
put st{freeTh= False}
r <- runTrans proc
modify $ \st -> st{freeTh= freeTh st}
return r
killChilds :: TransientIO()
killChilds= Transient $ do
cont <- get
liftIO $ killChildren $ children cont
return $ Just ()
getData :: (MonadState EventF m,Typeable a) => m (Maybe a)
getData = resp where
resp= gets mfData >>= \list ->
case M.lookup ( typeOf $ typeResp resp ) list of
Just x -> return . Just $ unsafeCoerce x
Nothing -> return Nothing
typeResp :: m (Maybe x) -> x
typeResp= undefined
getSData :: Typeable a => TransIO a
getSData= Transient getData
setData :: (MonadState EventF m, Typeable a) => a -> m ()
setData x=
let t= typeOf x in modify $ \st -> st{mfData= M.insert t (unsafeCoerce x) (mfData st)}
delData :: ( MonadState EventF m,Typeable a) => a -> m ()
delData x= modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}
genId :: MonadState EventF m => m Int
genId= do
st <- get
let n= mfSequence st
put st{mfSequence= n+1}
return n
getPrevId :: MonadState EventF m => m Int
getPrevId= do
n <- gets mfSequence
return n
instance Read SomeException where
readsPrec n str=
let [(s , r)]= read str in [(SomeException $ ErrorCall s,r)]
data StreamData a= SMore a | SLast a | SDone | SError SomeException deriving (Typeable, Show,Read)
waitEvents :: IO b -> TransIO b
waitEvents io= do
mr <- parallel (SMore <$> io)
case mr of
SMore x -> return x
SError e -> throw e
waitEvents' :: IO b -> TransIO b
waitEvents' io= do
mr <- parallel (SMore <$> io)
case mr of
SMore x -> return x
SError e -> throw e
async :: IO b -> TransIO b
async io= do
mr <- parallel (SLast <$> io)
case mr of
SLast x -> return x
SError e -> throw e
spawn :: IO b -> TransIO b
spawn io= freeThreads $ do
mr <- parallel (SMore <$>io)
case mr of
SMore x -> return x
SError e -> throw e
sample :: Eq a => IO a -> Int -> TransIO a
sample action interval= do
v <- liftIO action
prev <- liftIO $ newIORef v
waitEvents (loop action prev) <|> async (return v)
where
loop action prev= loop'
where
loop'= do
threadDelay interval
v <- action
v' <- readIORef prev
if v /= v' then writeIORef prev v >> return v else loop'
parallel :: IO (StreamData b) -> TransIO (StreamData b)
parallel ioaction= Transient $ do
cont <- get
case event cont of
j@(Just _) -> do
put cont{event=Nothing}
return $ unsafeCoerce j
Nothing -> do
liftIO $ loop cont ioaction
was <- getData `onNothing` return NoRemote
when (was /= WasRemote) $ setData WasParallel
return Nothing
loop :: EventF -> IO (StreamData t) -> IO ()
loop (cont'@(EventF eff e x fs a b c d _ childs g)) rec = do
chs <- liftIO $ newTVarIO []
let cont = EventF eff e x fs a b c d (Just cont') chs g
iocont dat= do
runStateT (runCont cont) cont{event= Just $ unsafeCoerce dat}
return ()
loop'= forkMaybe False cont $ do
mdat <- threadDelay 0 >> rec `catch` \(e :: SomeException) -> return $ SError e
case mdat of
se@(SError _) -> iocont se
SDone -> iocont SDone
last@(SLast _) -> iocont last
more@(SMore _) -> do
forkMaybe False cont $ iocont more
loop'
loop'
return ()
where
forkMaybe True cont proc = forkMaybe' True cont proc
forkMaybe False cont proc = do
dofork <- case maxThread cont of
Nothing -> return True
Just sem -> do
dofork <- waitQSemB sem
if dofork then return True else return False
forkMaybe' dofork cont proc
forkMaybe' dofork cont proc=
if dofork
then do
forkFinally1 (do
th <- myThreadId
hangThread cont' cont{threadId=th}
proc)
$ \me -> do
case me of
Left e -> do
when (fromException e /= Just ThreadKilled)$ liftIO $ print e
killChildren $ children cont
Right _ -> when(not $ freeTh cont') $ do
th <- myThreadId
mparent <- free th cont
return ()
case maxThread cont of
Just sem -> signalQSemB sem
Nothing -> return ()
return ()
else proc
forkFinally1 :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally1 action and_then =
mask $ \restore -> forkIO $ try (restore action) >>= and_then
free th env= do
if isNothing $ parent env
then return Nothing
else do
let msibling= fmap children $ parent env
case msibling of
Nothing -> return Nothing
Just sibling -> do
found <- atomically $ do
sbs <- readTVar sibling
let (sbs', found) = drop [] th sbs
when found $ writeTVar sibling sbs'
return found
if (not found && isJust (parent env))
then free th $ fromJust $ parent env
else return $ Just env
where
drop processed th []= (processed,False)
drop processed th (ev:evts)| th == threadId ev= (processed ++ evts, True)
| otherwise= drop (ev:processed) th evts
hangThread parent child = when(not $ freeTh parent) $ do
let headpths= children parent
atomically $ do
ths <- readTVar headpths
writeTVar headpths $ child:ths
killChildren childs = do
ths <- atomically $ do
ths <- readTVar childs
writeTVar childs []
return ths
mapM_ (killThread . threadId) ths
type EventSetter eventdata response= (eventdata -> IO response) -> IO ()
type ToReturn response= IO response
react
:: Typeable eventdata
=> EventSetter eventdata response
-> ToReturn response
-> TransIO eventdata
react setHandler iob= Transient $ do
cont <- get
case event cont of
Nothing -> do
liftIO $ setHandler $ \dat ->do
runStateT (runCont cont) cont{event= Just $ unsafeCoerce dat}
iob
was <- getData `onNothing` return NoRemote
when (was /= WasRemote) $ setData WasParallel
return Nothing
j@(Just _) -> do
put cont{event=Nothing}
return $ unsafeCoerce j
getLineRef= unsafePerformIO $ newTVarIO Nothing
roption= unsafePerformIO $ newMVar []
option :: (Typeable b, Show b, Read b, Eq b) =>
b -> String -> TransIO b
option ret message= do
let sret= show ret
liftIO $ putStrLn $ "Enter "++sret++"\tto: " ++ message
liftIO $ modifyMVar_ roption $ \msgs-> return $ sret:msgs
waitEvents $ getLine' (==ret)
liftIO $ putStrLn $ show ret ++ " chosen"
return ret
input :: (Typeable a, Read a,Show a) => (a -> Bool) -> String -> TransIO a
input cond prompt= Transient . liftIO $do
putStr prompt >> hFlush stdout
atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case reads1 r of
(s,_):_ -> if cond s
then do
unsafeIOToSTM $ print s
writeTVar getLineRef Nothing
return $ Just s
else return Nothing
_ -> return Nothing
getLine' cond= do
atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case reads1 r of
(s,_):_ -> if cond s
then do
writeTVar getLineRef Nothing
return s
else retry
_ -> retry
reads1 s=x where
x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec' 0 s
typeOfr :: [(a,String)] -> a
typeOfr = undefined
inputLoop= do
inputLoop'
where
inputLoop'= do
r<- getLine
processLine r
inputLoop'
processLine r= do
let rs = breakSlash [] r
mapM_ (\ r ->
do
threadDelay 100000
atomically . writeTVar getLineRef $ Just r ) rs
where
breakSlash :: [String] -> String -> [String]
breakSlash [] ""= [""]
breakSlash s ""= s
breakSlash res ('\"':s)=
let (r,rest) = span(/= '\"') s
in breakSlash (res++[r]) $ tail1 rest
breakSlash res s=
let (r,rest) = span(/= '/') s
in breakSlash (res++[r]) $ tail1 rest
tail1 []=[]
tail1 x= tail x
stay rexit= do
mr <- takeMVar rexit
case mr of
Right Nothing -> stay rexit
Right (Just r) -> return r
Left msg -> putStrLn msg >> exitWith ExitSuccess
newtype Exit a= Exit a deriving Typeable
keep :: Typeable a => TransIO a -> IO a
keep mx = do
rexit <- newEmptyMVar
forkIO $ do
liftIO $ putMVar rexit $ Right Nothing
runTransient $ do
setData $ Exit rexit
async inputLoop
<|> do mx
<|> do
option "end" "exit"
killChilds
exit' (Left "terminated by user" `asTypeOf` (type1 mx))
return ()
threadDelay 10000
execCommandLine
stay rexit
where
type1 :: TransIO a -> Either String (Maybe a)
type1= undefined
keep' :: Typeable a => TransIO a -> IO a
keep' mx = do
rexit <- newEmptyMVar
forkIO $ do
runTransient $ do
setData $ Exit rexit
mx >> liftIO (putMVar rexit $ Right Nothing)
return ()
threadDelay 10000
execCommandLine
stay rexit
execCommandLine= do
args <- getArgs
let mindex = findIndex (\o -> o == "-p" || o == "--path" ) args
when (isJust mindex) $ do
let i= fromJust mindex +1
when (length args >= i) $ do
let path= args !! i
putStr "Executing: " >> print path
processLine path
exit :: Typeable a => a -> TransIO a
exit x= do
Exit rexit <- getSData <|> error "exit: not the type expected" `asTypeOf` type1 x
liftIO $ putMVar rexit . Right $ Just x
stop
where
type1 :: a -> TransIO (Exit (MVar (Either String (Maybe a))))
type1= undefined
exit' x= do
Exit rexit <- getSData <|> error "exit: not type expected"
liftIO $ putMVar rexit x ; stop
onNothing :: Monad m => m (Maybe b) -> m b -> m b
onNothing iox iox'= do
mx <- iox
case mx of
Just x -> return x
Nothing -> iox'