module Control.Concurrent.CHP.Channels.Base where
import Control.Concurrent.STM
import Control.Monad
import Data.Unique (Unique)
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Poison
newtype Chanin a = Chanin (STMChannel a) deriving Eq
newtype Chanout a = Chanout (STMChannel a) deriving Eq
newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a, Maybe ())))
deriving Eq
data Chan r w a = Chan {
getChannelIdentifier :: Unique,
reader :: r a,
writer :: w a}
instance Poisonable (Chanin a) where
poison (Chanin c) = liftCHP . liftIO_CHP $ poisonReadC c
checkForPoison (Chanin c) = liftCHP $ liftIO_CHP (checkPoisonReadC c) >>= checkPoison
instance Poisonable (Chanout a) where
poison (Chanout c) = liftCHP . liftIO_CHP $ poisonWriteC c
checkForPoison (Chanout c) = liftCHP $ liftIO_CHP (checkPoisonWriteC c) >>= checkPoison
stmChannel :: Int -> (a -> String) -> IO (Unique, STMChannel a)
stmChannel pri sh =
do c <- atomically $ newTVar $ NoPoison (Nothing, Nothing)
e <- newEventPri (liftM (ChannelComm . maybe "" sh . getVal) $ readTVar c) 2 pri
return (getEventUnique e, STMChan (e,c))
where
getVal PoisonItem = Nothing
getVal (NoPoison (x, _)) = x
consumeData :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison a)
consumeData tv = do d <- readTVar tv
case d of
PoisonItem -> return PoisonItem
NoPoison (Nothing, _) -> retry
NoPoison (Just x, a) -> do writeTVar tv $ NoPoison (Nothing, a)
return $ NoPoison x
sharedError :: String -> String
sharedError s
= unlines ["CHP: Assumption violated; found " ++ s ++ " when placing " ++ s ++ "."
,"This is typically because you have multiple writers or multiple readers accessing a unshared (one-to-one) channel."
,"If you want to have many writers or many readers, use an appropriate (any-to-one, one-to-any or any-to-any) shared channel, using the claim function."
]
sendData :: TVar (WithPoison (Maybe a, Maybe ())) -> a -> STM (WithPoison ())
sendData tv x = do y <- readTVar tv
case y of
PoisonItem -> return PoisonItem
NoPoison (Just _, _) -> error $ sharedError "data"
NoPoison (Nothing, a) -> do writeTVar tv $ NoPoison (Just x, a)
return $ NoPoison ()
consumeAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ())
consumeAck tv = do d <- readTVar tv
case d of
PoisonItem -> return PoisonItem
NoPoison (_, Nothing) -> retry
NoPoison (x, Just _) -> do writeTVar tv $ NoPoison (x, Nothing)
return $ NoPoison ()
sendAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ())
sendAck tv = do d <- readTVar tv
case d of
PoisonItem -> return PoisonItem
NoPoison (_, Just _) -> error $ sharedError "ack"
NoPoison (x, Nothing) -> do writeTVar tv $ NoPoison (x, Just ())
return $ NoPoison ()
startReadChannelC :: STMChannel a -> (Event, STM (WithPoison a))
startReadChannelC (STMChan (e,tv)) = (e, consumeData tv)
endReadChannelC :: STMChannel a -> STM (WithPoison ())
endReadChannelC (STMChan (_,tv)) = sendAck tv
readChannelC :: STMChannel a -> (Event, STM (), STM (WithPoison a))
readChannelC (STMChan (e, tv))
= (e, sendAck tv >> return (), consumeData tv)
poisonReadC :: STMChannel a -> IO ()
poisonReadC (STMChan (e,tv))
= atomically $ do poisonEvent e
writeTVar tv PoisonItem
checkPoisonReadC :: STMChannel a -> IO (WithPoison ())
checkPoisonReadC (STMChan (e,_)) = atomically $ checkEventForPoison e
startWriteChannelC :: STMChannel a -> (Event, STM (WithPoison ()))
startWriteChannelC (STMChan (e,tv))
= (e, do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison _ -> return $ NoPoison ())
sendWriteChannelC :: STMChannel a -> a -> STM (WithPoison ())
sendWriteChannelC (STMChan (_, tv)) = sendData tv
endWriteChannelC :: STMChannel a -> STM (WithPoison ())
endWriteChannelC (STMChan (_, tv))
= consumeAck tv
writeChannelC :: STMChannel a -> a -> (Event, STM (), STM (WithPoison ()))
writeChannelC (STMChan (e, tv)) val
= (e, sendData tv val >> return (), consumeAck tv)
poisonWriteC :: STMChannel a -> IO ()
poisonWriteC (STMChan (e,tv))
= atomically $ do poisonEvent e
writeTVar tv PoisonItem
checkPoisonWriteC :: STMChannel a -> IO (WithPoison ())
checkPoisonWriteC (STMChan (e,_)) = atomically $ checkEventForPoison e