{-# LANGUAGE BangPatterns #-} {-# LANGUAGE ForeignFunctionInterface #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {- | Create a circular buffer over shared memory that can be accessed from separate processes. This module assumes that exactly one WriteBuffer and one ReadBuffer will be used to access the same shared memory object. The ReadBuffer and WriteBuffer may exist in separate processes. to use this library, in one process > bracket (createBuffer "aBuffer" "aSemaphore" 256 0o600) (removeBuffer) > (\buffer -> doSomethingWith buffer) and in the other > bracket (openBuffer "aBuffer" "aSemaphore" 256 0o600) (closeBuffer) > (\buffer -> doSomethingWith buffer) The buffer may be opened from either the reader or writer end, but you should ensure that the buffer is created before it is opened. As the underlying objects (shm and named posix semaphores) exist in the file system, failing to call removeBuffer will leave stale objects in the filesystem. Opening multiple ReadBuffers or WriteBuffers with the same names (whether in one process or several) results in undefined behavior. -} module System.Posix.MQueue ( WriteBuffer , ReadBuffer , Shared (..) -- * normal interface , putBuffer , getBuffer , putBufferList , getAvailable ) where import System.Posix.CircularBuffer (Shared (..)) import Control.Applicative import Control.Monad import Data.Bits import Foreign.Ptr import Foreign.Marshal.Alloc import Foreign.Storable import Foreign.C.Error import Foreign.C.String import Foreign.C.Types import Data.Int import Data.Word #include #include type MQD = #{type mqd_t} data WriteBuffer a = WB String {-# UNPACK #-} !MQD data ReadBuffer a = RB String {-# UNPACK #-} !MQD foreign import ccall "mq_open" c_mq_open_c :: CString -> CInt -> CInt -> Ptr () -> IO MQD foreign import ccall "mq_open" c_mq_open :: CString -> CInt -> IO MQD foreign import ccall "mq_close" c_mq_close :: MQD -> IO CInt foreign import ccall "mq_unlink" c_mq_unlink :: CString -> IO CInt foreign import ccall "mq_send" c_mq_send :: MQD -> Ptr () -> #{type size_t} -> CInt -> IO CInt foreign import ccall "mq_receive" c_mq_receive :: MQD -> Ptr () -> #{type size_t} -> Ptr CInt -> IO CInt cREAT, eXCL, rONLY, wRONLY :: CInt cREAT = #{const O_CREAT} eXCL = #{const O_EXCL} rONLY = #{const O_RDONLY} wRONLY = #{const O_WRONLY} mkAttr :: forall a b. Storable a => a -> Int -> (Ptr () -> IO b) -> IO b mkAttr a sz f = allocaBytes #{size struct mq_attr} $ \attrP -> do #{poke struct mq_attr, mq_flags} attrP (0 :: #{type long}) #{poke struct mq_attr, mq_maxmsg} attrP sz #{poke struct mq_attr, mq_msgsize} attrP (sizeOf a) #{poke struct mq_attr, mq_curmsgs} attrP (0 :: #{type long}) f attrP instance Storable a => Shared (WriteBuffer a) where createBuffer name _ sz fmode = do x <- withCString name $ \cName -> throwErrnoIfMinus1 "createBuffer: WriteBuffer" $ mkAttr (undefined :: a) sz $ c_mq_open_c cName (wRONLY .|. cREAT .|. eXCL) (fromIntegral fmode) return $ WB name x openBuffer name _ _ _fmode = do WB name <$> withCString name (\cName -> c_mq_open cName wRONLY) closeBuffer (WB _ mqd) = void $ throwErrnoIfMinus1 "closeBuffer: WriteBuffer" $ c_mq_close mqd removeBuffer wb = closeBuffer wb >> unlinkBuffer wb unlinkBuffer (WB name _) = void $ withCString name $ throwErrnoIfMinus1 "unlinkBuffer: WriteBuffer" . c_mq_unlink instance Storable a => Shared (ReadBuffer a) where createBuffer name _ sz fmode = do x <- withCString name $ \cName -> throwErrnoIfMinus1 "createBuffer: ReadBuffer" $ mkAttr (undefined :: a) sz $ c_mq_open_c cName (rONLY .|. cREAT .|. eXCL) (fromIntegral fmode) return $ RB name x openBuffer name _ _ _fmode = do RB name <$> withCString name (\cName -> c_mq_open cName rONLY) closeBuffer (RB _ mqd) = void $ throwErrnoIfMinus1 "closeBuffer: ReadBuffer" $ c_mq_close mqd removeBuffer wb = closeBuffer wb >> unlinkBuffer wb unlinkBuffer (RB name _) = void $ withCString name $ throwErrnoIfMinus1 "unlinkBuffer: ReadBuffer" . c_mq_unlink -- | Write a value to the writer end. -- -- This function is thread-safe. putBuffer :: Storable a => WriteBuffer a -> a -> IO () putBuffer (WB _ mqd) val = alloca $ \msgP -> do poke msgP val void $ throwErrnoIfMinus1 "putBuffer" $ c_mq_send mqd (castPtr msgP) (fromIntegral $ sizeOf val) 1 {-# INLINEABLE putBuffer #-} -- | read the next value from the reader end. -- -- This function is *NOT* thread-safe. getBuffer :: forall a. Storable a => ReadBuffer a -> IO a getBuffer (RB _ mqd) = alloca $ \msgP -> do let sz = sizeOf (undefined :: a) numBytes <- throwErrnoIfMinus1 "getBuffer" $ c_mq_receive mqd (castPtr msgP) (fromIntegral sz) nullPtr when (numBytes /= fromIntegral sz) $ error "getBuffer: too few bytes" peek msgP {-# INLINEABLE getBuffer #-} -- | Write a list of values to the writer end. -- -- This function is thread-safe. putBufferList :: Storable a => WriteBuffer a -> [a] -> IO () putBufferList wb = mapM_ (putBuffer wb) {-# INLINE putBufferList #-} -- | read all currently available values from the reader end. -- -- This function is *NOT* thread-safe. getAvailable :: Storable a => ReadBuffer a -> IO [a] getAvailable (RB _ _) = error "getAvailable: not implemented"