module Language.Embedded.Concurrent.CMD (
TID, ThreadId (..),
CID, ChanBound, Chan (..),
ThreadCMD (..),
ChanCMD (..),
Closeable, Uncloseable
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
#endif
import Control.Monad.Operational.Higher
import Control.Monad.Reader
import Data.IORef
import Data.Typeable
import Language.Embedded.Expression
import qualified Control.Concurrent as CC
import qualified Control.Concurrent.BoundedChan as Bounded
import Data.Word (Word16)
type ChanBound = Word16
type TID = VarId
type CID = VarId
data Flag a = Flag (IORef Bool) (CC.MVar a)
newFlag :: IO (Flag a)
newFlag = Flag <$> newIORef False <*> CC.newEmptyMVar
setFlag :: Flag a -> a -> IO Bool
setFlag (Flag flag var) val = do
set <- atomicModifyIORef flag $ \set -> (True, set)
when (not set) $ CC.putMVar var val
return set
waitFlag :: Flag a -> IO a
waitFlag (Flag _ var) = CC.withMVar var return
data ThreadId
= TIDEval CC.ThreadId (Flag ())
| TIDComp TID
deriving (Typeable)
instance Show ThreadId where
show (TIDEval tid _) = show tid
show (TIDComp tid) = tid
data Closeable
data Uncloseable
data Chan t a
= ChanEval (Bounded.BoundedChan a) (IORef Bool) (IORef Bool)
| ChanComp CID
data ThreadCMD fs a where
ForkWithId :: (ThreadId -> prog ()) -> ThreadCMD (Param3 prog exp pred) ThreadId
Kill :: ThreadId -> ThreadCMD (Param3 prog exp pred) ()
Wait :: ThreadId -> ThreadCMD (Param3 prog exp pred) ()
data ChanCMD fs a where
NewChan :: pred a => exp ChanBound -> ChanCMD (Param3 prog exp pred) (Chan t a)
ReadChan :: pred a => Chan t a -> ChanCMD (Param3 prog exp pred) (Val a)
WriteChan :: pred a
=> Chan t a -> exp a -> ChanCMD (Param3 prog exp pred) (Val Bool)
CloseChan :: Chan Closeable a -> ChanCMD (Param3 prog exp pred) ()
ReadOK :: Chan Closeable a -> ChanCMD (Param3 prog exp pred) (Val Bool)
instance HFunctor ThreadCMD where
hfmap f (ForkWithId p) = ForkWithId $ f . p
hfmap _ (Kill tid) = Kill tid
hfmap _ (Wait tid) = Wait tid
instance HBifunctor ThreadCMD where
hbimap f _ (ForkWithId p) = ForkWithId $ f . p
hbimap _ _ (Kill tid) = Kill tid
hbimap _ _ (Wait tid) = Wait tid
instance (ThreadCMD :<: instr) => Reexpressible ThreadCMD instr where
reexpressInstrEnv reexp (ForkWithId p) = ReaderT $ \env ->
singleInj $ ForkWithId (flip runReaderT env . p)
reexpressInstrEnv reexp (Kill tid) = lift $ singleInj $ Kill tid
reexpressInstrEnv reexp (Wait tid) = lift $ singleInj $ Wait tid
instance HFunctor ChanCMD where
hfmap _ (NewChan sz) = NewChan sz
hfmap _ (ReadChan c) = ReadChan c
hfmap _ (WriteChan c x) = WriteChan c x
hfmap _ (CloseChan c) = CloseChan c
hfmap _ (ReadOK c) = ReadOK c
instance HBifunctor ChanCMD where
hbimap _ f (NewChan sz) = NewChan (f sz)
hbimap _ _ (ReadChan c) = ReadChan c
hbimap _ f (WriteChan c x) = WriteChan c (f x)
hbimap _ _ (CloseChan c) = CloseChan c
hbimap _ _ (ReadOK c) = ReadOK c
instance (ChanCMD :<: instr) => Reexpressible ChanCMD instr where
reexpressInstrEnv reexp (NewChan sz) = lift . singleInj . NewChan =<< reexp sz
reexpressInstrEnv reexp (ReadChan c) = lift $ singleInj $ ReadChan c
reexpressInstrEnv reexp (WriteChan c x) = lift . singleInj . WriteChan c =<< reexp x
reexpressInstrEnv reexp (CloseChan c) = lift $ singleInj $ CloseChan c
reexpressInstrEnv reexp (ReadOK c) = lift $ singleInj $ ReadOK c
runThreadCMD :: ThreadCMD (Param3 IO exp pred) a
-> IO a
runThreadCMD (ForkWithId p) = do
f <- newFlag
tidvar <- CC.newEmptyMVar
cctid <- CC.forkIO . void $ CC.takeMVar tidvar >>= p >> setFlag f ()
let tid = TIDEval cctid f
CC.putMVar tidvar tid
return tid
runThreadCMD (Kill (TIDEval t f)) = do
setFlag f ()
CC.killThread t
return ()
runThreadCMD (Wait (TIDEval _ f)) = do
waitFlag f
runChanCMD :: ChanCMD (Param3 IO IO pred) a -> IO a
runChanCMD (NewChan sz) = do
sz' <- sz
ChanEval <$> Bounded.newBoundedChan (fromIntegral sz')
<*> newIORef False
<*> newIORef True
runChanCMD (ReadChan (ChanEval c closedref lastread)) = do
closed <- readIORef closedref
mval <- Bounded.tryReadChan c
case mval of
Just x -> do
return $ ValEval x
Nothing
| closed -> do
writeIORef lastread False
return undefined
| otherwise -> do
ValEval <$> Bounded.readChan c
runChanCMD (WriteChan (ChanEval c closedref _) x) = do
closed <- readIORef closedref
x' <- x
if closed
then return (ValEval False)
else Bounded.writeChan c x' >> return (ValEval True)
runChanCMD (CloseChan (ChanEval _ closedref _)) = do
writeIORef closedref True
runChanCMD (ReadOK (ChanEval _ _ lastread)) = do
ValEval <$> readIORef lastread
instance InterpBi ThreadCMD IO (Param1 pred) where
interpBi = runThreadCMD
instance InterpBi ChanCMD IO (Param1 pred) where
interpBi = runChanCMD