-- | Basic concurrency primitives. -- -- To compile the C code resulting from 'Language.Embedded.Backend.C.compile' -- for programs with concurrency primitives, use something like -- -- > cc -std=c99 -Iinclude csrc/chan.c -lpthread YOURPROGRAM.c module Language.Embedded.Concurrent ( ThreadId (..) , Chan (..) , ChanSize (..) , ThreadCMD , ChanCMD , Closeable, Uncloseable , fork, forkWithId, asyncKillThread, killThread, waitThread, delayThread , timesSizeOf, timesSize, plusSize , newChan, newCloseableChan , readChan, writeChan , readChanBuf, writeChanBuf , closeChan, lastChanReadOK , newChan', newCloseableChan' , readChan', writeChan' , readChanBuf', writeChanBuf' ) where import Control.Monad.Operational.Higher import Data.Ix import Data.Typeable import Language.Embedded.Concurrent.Backend.C () import Language.Embedded.Concurrent.CMD import Language.Embedded.Expression import Language.Embedded.Imperative.CMD (Arr) -- | Fork off a computation as a new thread. fork :: (ThreadCMD :<: instr) => ProgramT instr (Param2 exp pred) m () -> ProgramT instr (Param2 exp pred) m ThreadId fork = forkWithId . const -- | Fork off a computation as a new thread, with access to its own thread ID. forkWithId :: (ThreadCMD :<: instr) => (ThreadId -> ProgramT instr (Param2 exp pred) m ()) -> ProgramT instr (Param2 exp pred) m ThreadId forkWithId = singleton . inj . ForkWithId -- | Forcibly terminate a thread, then continue execution immediately. asyncKillThread :: (ThreadCMD :<: instr) => ThreadId -> ProgramT instr (Param2 exp pred) m () asyncKillThread = singleton . inj . Kill -- | Forcibly terminate a thread. Blocks until the thread is actually dead. killThread :: (ThreadCMD :<: instr, Monad m) => ThreadId -> ProgramT instr (Param2 exp pred) m () killThread t = do singleton . inj $ Kill t waitThread t -- | Wait for a thread to terminate. waitThread :: (ThreadCMD :<: instr) => ThreadId -> ProgramT instr (Param2 exp pred) m () waitThread = singleton . inj . Wait -- | Sleep for a given amount of microseconds. Implemented with `usleep`. -- A C compiler might require a feature test macro to be defined, -- otherwise it emits a warning about an implicitly declared function. -- For more details, see: http://man7.org/linux/man-pages/man3/usleep.3.html delayThread :: (Integral i, ThreadCMD :<: instr) => exp i -> ProgramT instr (Param2 exp pred) m () delayThread = singleton . inj . Sleep -------------------------------------------------------------------------------- -- Channel frontend -------------------------------------------------------------------------------- -- | Create a new channel. Writing a reference type to a channel will copy -- contents into the channel, so modifying it post-write is completely -- safe. newChan :: forall a i exp pred instr m . (pred a, Integral i, ChanCMD :<: instr) => exp i -> ProgramT instr (Param2 exp pred) m (Chan Uncloseable a) newChan n = newChan' $ n `timesSizeOf` (Proxy :: Proxy a) newCloseableChan :: forall a i exp pred instr m . (pred a, Integral i, ChanCMD :<: instr) => exp i -> ProgramT instr (Param2 exp pred) m (Chan Closeable a) newCloseableChan n = newCloseableChan' $ n `timesSizeOf` (Proxy :: Proxy a) -- | Read an element from a channel. If channel is empty, blocks until there -- is an item available. -- If 'closeChan' has been called on the channel *and* if the channel is -- empty, @readChan@ returns an undefined value immediately. readChan :: ( Typeable a, pred a , FreeExp exp, FreePred exp a , ChanCMD :<: instr, Monad m ) => Chan t a -> ProgramT instr (Param2 exp pred) m (exp a) readChan = readChan' -- | Read an arbitrary number of elements from a channel into an array. -- The semantics are the same as for 'readChan', where "channel is empty" -- is defined as "channel contains less data than requested". -- Returns @False@ without reading any data if the channel is closed. readChanBuf :: ( Typeable a, pred a , Ix i, Integral i , FreeExp exp, FreePred exp Bool , ChanCMD :<: instr, Monad m ) => Chan t a -> exp i -- ^ Offset in array to start writing -> exp i -- ^ Elements to read -> Arr i a -> ProgramT instr (Param2 exp pred) m (exp Bool) readChanBuf = readChanBuf' -- | Write a data element to a channel. -- If 'closeChan' has been called on the channel, all calls to @writeChan@ -- become non-blocking no-ops and return @False@, otherwise returns @True@. -- If the channel is full, this function blocks until there's space in the -- queue. writeChan :: ( Typeable a, pred a , FreeExp exp, FreePred exp Bool , ChanCMD :<: instr, Monad m ) => Chan t a -> exp a -> ProgramT instr (Param2 exp pred) m (exp Bool) writeChan = writeChan' -- | Write an arbitrary number of elements from an array into an channel. -- The semantics are the same as for 'writeChan', where "channel is full" -- is defined as "channel has insufficient free space to store all written -- data". writeChanBuf :: ( Typeable a, pred a , Ix i, Integral i , FreeExp exp, FreePred exp Bool , ChanCMD :<: instr, Monad m ) => Chan t a -> exp i -- ^ Offset in array to start reading -> exp i -- ^ Elements to write -> Arr i a -> ProgramT instr (Param2 exp pred) m (exp Bool) writeChanBuf = writeChanBuf' -- | When 'readChan' was last called on the given channel, did the read -- succeed? -- Always returns @True@ unless 'closeChan' has been called on the channel. -- Always returns @True@ if the channel has never been read. lastChanReadOK :: (FreeExp exp, FreePred exp Bool, ChanCMD :<: instr, Monad m) => Chan Closeable c -> ProgramT instr (Param2 exp pred) m (exp Bool) lastChanReadOK = fmap valToExp . singleInj . ReadOK -- | Close a channel. All subsequent write operations will be no-ops. -- After the channel is drained, all subsequent read operations will be -- no-ops as well. closeChan :: (ChanCMD :<: instr) => Chan Closeable c -> ProgramT instr (Param2 exp pred) m () closeChan = singleInj . CloseChan -------------------------------------------------------------------------------- -- Unsafe channel primitives -------------------------------------------------------------------------------- newChan' :: (Integral i, ChanCMD :<: instr) => ChanSize exp pred i -> ProgramT instr (Param2 exp pred) m (Chan Uncloseable a) newChan' = singleInj . NewChan newCloseableChan' :: (Integral i, ChanCMD :<: instr) => ChanSize exp pred i -> ProgramT instr (Param2 exp pred) m (Chan Closeable a) newCloseableChan' = singleInj . NewChan readChan' :: ( Typeable a, pred a , FreeExp exp, FreePred exp a , ChanCMD :<: instr, Monad m ) => Chan t c -> ProgramT instr (Param2 exp pred) m (exp a) readChan' = fmap valToExp . singleInj . ReadOne readChanBuf' :: ( Typeable a, pred a , Ix i, Integral i , FreeExp exp, FreePred exp Bool , ChanCMD :<: instr, Monad m ) => Chan t c -> exp i -- ^ Offset in array to start writing -> exp i -- ^ Elements to read -> Arr i a -> ProgramT instr (Param2 exp pred) m (exp Bool) readChanBuf' ch off sz arr = fmap valToExp . singleInj $ ReadChan ch off sz arr writeChan' :: ( Typeable a, pred a , FreeExp exp, FreePred exp Bool , ChanCMD :<: instr, Monad m ) => Chan t c -> exp a -> ProgramT instr (Param2 exp pred) m (exp Bool) writeChan' c = fmap valToExp . singleInj . WriteOne c writeChanBuf' :: ( Typeable a, pred a , Ix i, Integral i , FreeExp exp, FreePred exp Bool , ChanCMD :<: instr, Monad m ) => Chan t c -> exp i -- ^ Offset in array to start reading -> exp i -- ^ Elements to write -> Arr i a -> ProgramT instr (Param2 exp pred) m (exp Bool) writeChanBuf' ch off sz arr = fmap valToExp . singleInj $ WriteChan ch off sz arr