module Language.Embedded.Concurrent.CMD (
TID, ThreadId (..),
CID, ChanBound, Chan (..),
ThreadCMD (..),
ChanCMD (..),
Closeable, Uncloseable
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
#endif
import Control.Monad
import Control.Monad.Operational.Higher
import Data.IORef
import Data.Typeable
import Language.Embedded.Expression
import qualified Control.Concurrent as CC
import qualified Control.Concurrent.BoundedChan as Bounded
import Data.Word (Word16)
type ChanBound = Word16
type TID = VarId
type CID = VarId
data Flag a = Flag (IORef Bool) (CC.MVar a)
newFlag :: IO (Flag a)
newFlag = Flag <$> newIORef False <*> CC.newEmptyMVar
setFlag :: Flag a -> a -> IO Bool
setFlag (Flag flag var) val = do
set <- atomicModifyIORef flag $ \set -> (True, set)
when (not set) $ CC.putMVar var val
return set
waitFlag :: Flag a -> IO a
waitFlag (Flag _ var) = CC.withMVar var return
data ThreadId
= TIDEval CC.ThreadId (Flag ())
| TIDComp TID
deriving (Typeable)
instance Show ThreadId where
show (TIDEval tid _) = show tid
show (TIDComp tid) = show tid
data Closeable
data Uncloseable
data Chan t a
= ChanEval (Bounded.BoundedChan a) (IORef Bool) (IORef Bool)
| ChanComp CID
data ThreadCMD (prog :: * -> *) a where
ForkWithId :: (ThreadId -> prog ()) -> ThreadCMD prog ThreadId
Kill :: ThreadId -> ThreadCMD prog ()
Wait :: ThreadId -> ThreadCMD prog ()
data ChanCMD exp (prog :: * -> *) a where
NewChan :: VarPred exp a => exp ChanBound -> ChanCMD exp prog (Chan t a)
ReadChan :: VarPred exp a => Chan t a -> ChanCMD exp prog (exp a)
WriteChan :: (VarPred exp a, VarPred exp Bool)
=> Chan t a -> exp a -> ChanCMD exp prog (exp Bool)
CloseChan :: Chan Closeable a -> ChanCMD exp prog ()
ReadOK :: VarPred exp Bool
=> Chan Closeable a -> ChanCMD exp prog (exp Bool)
instance HFunctor ThreadCMD where
hfmap f (ForkWithId p) = ForkWithId $ f . p
hfmap _ (Kill tid) = Kill tid
hfmap _ (Wait tid) = Wait tid
instance HFunctor (ChanCMD exp) where
hfmap _ (NewChan sz) = NewChan sz
hfmap _ (ReadChan c) = ReadChan c
hfmap _ (WriteChan c x) = WriteChan c x
hfmap _ (CloseChan c) = CloseChan c
hfmap _ (ReadOK c) = ReadOK c
type instance IExp (ThreadCMD :+: i) = IExp i
type instance IExp (ChanCMD e) = e
type instance IExp (ChanCMD e :+: i) = e
runThreadCMD :: ThreadCMD IO a
-> IO a
runThreadCMD (ForkWithId p) = do
f <- newFlag
tidvar <- CC.newEmptyMVar
cctid <- CC.forkIO . void $ CC.takeMVar tidvar >>= p >> setFlag f ()
let tid = TIDEval cctid f
CC.putMVar tidvar tid
return tid
runThreadCMD (Kill (TIDEval t f)) = do
setFlag f ()
CC.killThread t
return ()
runThreadCMD (Wait (TIDEval _ f)) = do
waitFlag f
runChanCMD :: forall exp a. EvalExp exp
=> ChanCMD exp IO a -> IO a
runChanCMD (NewChan sz) =
ChanEval <$> Bounded.newBoundedChan (fromIntegral $ evalExp sz)
<*> newIORef False
<*> newIORef True
runChanCMD (ReadChan (ChanEval c closedref lastread)) = do
closed <- readIORef closedref
mval <- Bounded.tryReadChan c
case mval of
Just x -> do
return $ litExp x
Nothing
| closed -> do
writeIORef lastread False
return undefined
| otherwise -> do
litExp <$> Bounded.readChan c
runChanCMD (WriteChan (ChanEval c closedref _) x) = do
closed <- readIORef closedref
if closed
then return (litExp False)
else Bounded.writeChan c (evalExp x) >> return (litExp True)
runChanCMD (CloseChan (ChanEval _ closedref _)) = do
writeIORef closedref True
runChanCMD (ReadOK (ChanEval _ _ lastread)) = do
litExp <$> readIORef lastread
instance Interp ThreadCMD IO where
interp = runThreadCMD
instance EvalExp exp => Interp (ChanCMD exp) IO where
interp = runChanCMD