{-# LANGUAGE CPP #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE UndecidableInstances #-}
module Language.Embedded.Concurrent.CMD (
TID, ThreadId (..),
CID, Chan (..),
ChanSize (..),
timesSizeOf, timesSize, plusSize,
ThreadCMD (..),
ChanCMD (..),
Closeable, Uncloseable
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Typeable
#endif
import qualified Control.Chan as Chan
import qualified Control.Concurrent as CC
import Control.Monad.Operational.Higher
import Control.Monad.Reader
import Data.Dynamic
import Data.IORef
import Data.Ix (Ix)
import Data.Maybe (fromMaybe)
import Control.Monads
import Language.Embedded.Traversal
import Language.Embedded.Expression
import Language.Embedded.Imperative.CMD
import Language.Embedded.Imperative (getArr, setArr)
type TID = VarId
type CID = VarId
data Flag a = Flag (IORef Bool) (CC.MVar a)
newFlag :: IO (Flag a)
newFlag = Flag <$> newIORef False <*> CC.newEmptyMVar
setFlag :: Flag a -> a -> IO Bool
setFlag (Flag flag var) val = do
set <- atomicModifyIORef flag $ \set -> (True, set)
when (not set) $ CC.putMVar var val
return set
waitFlag :: Flag a -> IO a
waitFlag (Flag _ var) = CC.withMVar var return
data ThreadId
= TIDRun CC.ThreadId (Flag ())
| TIDComp TID
deriving (Typeable)
instance Show ThreadId where
show (TIDRun tid _) = show tid
show (TIDComp tid) = tid
data Closeable
data Uncloseable
data Chan t a
= ChanRun (Chan.Chan Dynamic)
| ChanComp CID
data ChanSize exp pred i where
OneSize :: (pred a, Integral i) => proxy a -> exp i -> ChanSize exp pred i
TimesSize :: Integral i => exp i -> ChanSize exp pred i -> ChanSize exp pred i
PlusSize :: Integral i => ChanSize exp pred i -> ChanSize exp pred i -> ChanSize exp pred i
mapSizeExp :: (exp i -> exp' i) -> ChanSize exp pred i -> ChanSize exp' pred i
mapSizeExp f (OneSize t sz) = OneSize t (f sz)
mapSizeExp f (TimesSize n sz) = TimesSize (f n) (mapSizeExp f sz)
mapSizeExp f (PlusSize a b) = PlusSize (mapSizeExp f a) (mapSizeExp f b)
mapSizeExpA :: (Functor m, Monad m)
=> (exp i -> m (exp' i))
-> ChanSize exp pred i
-> m (ChanSize exp' pred i)
mapSizeExpA f (OneSize t sz) = OneSize t <$> f sz
mapSizeExpA f (TimesSize n sz) = do
n' <- f n
sz' <- mapSizeExpA f sz
return $ TimesSize n' sz'
mapSizeExpA f (PlusSize a b) = do
a' <- mapSizeExpA f a
b' <- mapSizeExpA f b
return $ PlusSize a' b'
timesSizeOf :: (pred a, Integral i) => exp i -> proxy a -> ChanSize exp pred i
timesSizeOf = flip OneSize
timesSize :: Integral i => exp i -> ChanSize exp pred i -> ChanSize exp pred i
timesSize = TimesSize
plusSize :: Integral i => ChanSize exp pred i -> ChanSize exp pred i -> ChanSize exp pred i
plusSize = PlusSize
data ThreadCMD fs a where
ForkWithId :: (ThreadId -> prog ()) -> ThreadCMD (Param3 prog exp pred) ThreadId
Kill :: ThreadId -> ThreadCMD (Param3 prog exp pred) ()
Wait :: ThreadId -> ThreadCMD (Param3 prog exp pred) ()
Sleep :: Integral i => exp i -> ThreadCMD (Param3 prog exp pred) ()
data ChanCMD fs a where
NewChan :: ChanSize exp pred i -> ChanCMD (Param3 prog exp pred) (Chan t c)
CloseChan :: Chan Closeable c -> ChanCMD (Param3 prog exp pred) ()
ReadOK :: Chan Closeable c -> ChanCMD (Param3 prog exp pred) (Val Bool)
ReadOne :: (Typeable a, pred a)
=> Chan t c -> ChanCMD (Param3 prog exp pred) (Val a)
WriteOne :: (Typeable a, pred a)
=> Chan t c -> exp a -> ChanCMD (Param3 prog exp pred) (Val Bool)
ReadChan :: (Typeable a, pred a, Ix i, Integral i)
=> Chan t c -> exp i -> exp i
-> Arr i a -> ChanCMD (Param3 prog exp pred) (Val Bool)
WriteChan :: (Typeable a, pred a, Ix i, Integral i)
=> Chan t c -> exp i -> exp i
-> Arr i a -> ChanCMD (Param3 prog exp pred) (Val Bool)
instance HFunctor ThreadCMD where
hfmap f (ForkWithId p) = ForkWithId $ f . p
hfmap _ (Kill tid) = Kill tid
hfmap _ (Wait tid) = Wait tid
hfmap _ (Sleep tid) = Sleep tid
instance HBifunctor ThreadCMD where
hbimap f _ (ForkWithId p) = ForkWithId $ f . p
hbimap _ _ (Kill tid) = Kill tid
hbimap _ _ (Wait tid) = Wait tid
hbimap _ g (Sleep us) = Sleep $ g us
instance DryInterp ThreadCMD where
dryInterp (ForkWithId _) = liftM TIDComp $ freshStr "t"
dryInterp (Kill _) = return ()
dryInterp (Wait _) = return ()
dryInterp (Sleep _) = return ()
instance (ThreadCMD :<: instr) => Reexpressible ThreadCMD instr env where
reexpressInstrEnv reexp (ForkWithId p) = ReaderT $ \env ->
singleInj $ ForkWithId (flip runReaderT env . p)
reexpressInstrEnv reexp (Kill tid) = lift $ singleInj $ Kill tid
reexpressInstrEnv reexp (Wait tid) = lift $ singleInj $ Wait tid
reexpressInstrEnv reexp (Sleep us) = (lift . singleInj . Sleep) =<< reexp us
instance HFunctor ChanCMD where
hfmap _ (NewChan sz) = NewChan sz
hfmap _ (ReadOne c) = ReadOne c
hfmap _ (ReadChan c f t a) = ReadChan c f t a
hfmap _ (WriteOne c x) = WriteOne c x
hfmap _ (WriteChan c f t a) = WriteChan c f t a
hfmap _ (CloseChan c) = CloseChan c
hfmap _ (ReadOK c) = ReadOK c
instance HBifunctor ChanCMD where
hbimap _ f (NewChan sz) = NewChan (mapSizeExp f sz)
hbimap _ _ (ReadOne c) = ReadOne c
hbimap _ f (ReadChan c n n' a) = ReadChan c (f n) (f n') a
hbimap _ f (WriteOne c x) = WriteOne c (f x)
hbimap _ f (WriteChan c n n' a) = WriteChan c (f n) (f n') a
hbimap _ _ (CloseChan c ) = CloseChan c
hbimap _ _ (ReadOK c) = ReadOK c
instance DryInterp ChanCMD where
dryInterp (NewChan _) = liftM ChanComp $ freshStr "chan"
dryInterp (ReadOne _) = liftM ValComp $ freshStr "v"
dryInterp (ReadChan _ _ _ _) = liftM ValComp $ freshStr "v"
dryInterp (WriteOne _ _) = liftM ValComp $ freshStr "v"
dryInterp (WriteChan _ _ _ _) = liftM ValComp $ freshStr "v"
dryInterp (CloseChan _) = return ()
dryInterp (ReadOK _) = liftM ValComp $ freshStr "v"
instance (ChanCMD :<: instr) => Reexpressible ChanCMD instr env where
reexpressInstrEnv reexp (NewChan sz) =
lift . singleInj . NewChan =<< mapSizeExpA reexp sz
reexpressInstrEnv reexp (ReadOne c) = lift $ singleInj $ ReadOne c
reexpressInstrEnv reexp (ReadChan c f t a) = do
rf <- reexp f
rt <- reexp t
lift $ singleInj $ ReadChan c rf rt a
reexpressInstrEnv reexp (WriteOne c x) = lift . singleInj . WriteOne c =<< reexp x
reexpressInstrEnv reexp (WriteChan c f t a) = do
rf <- reexp f
rt <- reexp t
lift $ singleInj $ WriteChan c rf rt a
reexpressInstrEnv reexp (CloseChan c) = lift $ singleInj $ CloseChan c
reexpressInstrEnv reexp (ReadOK c) = lift $ singleInj $ ReadOK c
runThreadCMD :: ThreadCMD (Param3 IO IO pred) a -> IO a
runThreadCMD (ForkWithId p) = do
f <- newFlag
tidvar <- CC.newEmptyMVar
cctid <- CC.forkIO . void $ CC.takeMVar tidvar >>= p >> setFlag f ()
let tid = TIDRun cctid f
CC.putMVar tidvar tid
return tid
runThreadCMD (Kill (TIDRun t f)) = do
setFlag f ()
CC.killThread t
return ()
runThreadCMD (Wait (TIDRun _ f)) = do
waitFlag f
runThreadCMD (Sleep us) = do
us' <- us
CC.threadDelay $ fromIntegral us'
runChanCMD :: forall pred a. ChanCMD (Param3 IO IO pred) a -> IO a
runChanCMD (NewChan sz) = do
sz' <- evalChanSize sz
ChanRun <$> Chan.newChan sz'
runChanCMD (ReadOne (ChanRun c)) =
ValRun . convertDynamic . head <$> Chan.readChan c 1
runChanCMD (ReadChan (ChanRun c) off len arr) = do
off' <- off
len' <- len
xs <- Chan.readChan c $ fromIntegral (len' - off')
let xs' = map convertDynamic xs
interpretBi id $ forM_ (zip [off' .. ] xs') $ \(i, x) -> do
setArr arr (return i) (return x) :: Program ArrCMD (Param2 IO pred) ()
ValRun <$> Chan.lastReadOK c
runChanCMD (WriteOne (ChanRun c) x) =
ValRun <$> (Chan.writeChan c . return . toDyn =<< x)
runChanCMD (WriteChan (ChanRun c) off len (arr :: Arr ix el)) = do
off' <- off
len' <- len
xs <- interpretBi id $ forM [off' .. off' + len' - 1] $ \i -> do
getArr arr (return i) :: Program ArrCMD (Param2 IO pred) (IO el)
ValRun <$> (Chan.writeChan c =<< map toDyn <$> sequence xs)
runChanCMD (CloseChan (ChanRun c)) = Chan.closeChan c
runChanCMD (ReadOK (ChanRun c)) = ValRun <$> Chan.lastReadOK c
instance InterpBi ThreadCMD IO (Param1 pred) where
interpBi = runThreadCMD
instance InterpBi ChanCMD IO (Param1 pred) where
interpBi = runChanCMD
evalChanSize :: ChanSize IO pred i -> IO Int
evalChanSize (OneSize _ sz) = do
sz' <- sz
return $ fromIntegral sz'
evalChanSize (TimesSize n sz) = do
n' <- n
sz' <- evalChanSize sz
return $ fromIntegral n' * sz'
evalChanSize (PlusSize a b) = do
a' <- evalChanSize a
b' <- evalChanSize b
return $ a' + b'
convertDynamic :: Typeable a => Dynamic -> a
convertDynamic = fromMaybe (error "readChan: unknown element") . fromDynamic
instance FreeExp IO
where
type FreePred IO = Typeable
constExp = return
varExp = error "varExp: unimplemented over IO"