{-# LINE 1 "src/System/Posix/MQueue.hsc" #-}
{-# LANGUAGE BangPatterns #-}
{-# LINE 2 "src/System/Posix/MQueue.hsc" #-}
{-# LANGUAGE ForeignFunctionInterface #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}

{- | Create a circular buffer over shared memory that can be accessed from
   separate processes.

   This module assumes that exactly one WriteBuffer and one ReadBuffer will be
   used to access the same shared memory object.  The ReadBuffer and
   WriteBuffer may exist in separate processes.

   to use this library, in one process

   > bracket (createBuffer "aBuffer" "aSemaphore" 256 0o600) (removeBuffer)
   >         (\buffer -> doSomethingWith buffer)

   and in the other

   > bracket (openBuffer "aBuffer" "aSemaphore" 256 0o600) (closeBuffer)
   >         (\buffer -> doSomethingWith buffer)

   The buffer may be opened from either the reader or writer end, but you
   should ensure that the buffer is created before it is opened.

   As the underlying objects (shm and named posix semaphores) exist in the file
   system, failing to call removeBuffer will leave stale objects in the
   filesystem.

   Opening multiple ReadBuffers or WriteBuffers with the same names (whether in
   one process or several) results in undefined behavior.
 -}
module System.Posix.MQueue (
  WriteBuffer
, ReadBuffer
, Shared (..)
-- * normal interface
, putBuffer
, getBuffer
, putBufferList
, getAvailable
) where

import System.Posix.CircularBuffer (Shared (..))

import Control.Applicative
import Control.Monad
import Data.Bits
import Foreign.Ptr
import Foreign.Marshal.Alloc
import Foreign.Storable

import Foreign.C.Error
import Foreign.C.String
import Foreign.C.Types
import Data.Int
import Data.Word


{-# LINE 62 "src/System/Posix/MQueue.hsc" #-}

{-# LINE 63 "src/System/Posix/MQueue.hsc" #-}

type MQD = Int32
{-# LINE 65 "src/System/Posix/MQueue.hsc" #-}

data WriteBuffer a = WB String {-# UNPACK #-} !MQD
data ReadBuffer a  = RB String {-# UNPACK #-} !MQD

foreign import ccall "mq_open"
  c_mq_open_c :: CString -> CInt -> CInt -> Ptr () -> IO MQD

foreign import ccall "mq_open"
  c_mq_open :: CString -> CInt -> IO MQD

foreign import ccall "mq_close"
  c_mq_close :: MQD -> IO CInt

foreign import ccall "mq_unlink"
  c_mq_unlink :: CString -> IO CInt

foreign import ccall "mq_send"
  c_mq_send :: MQD -> Ptr () -> Word64 -> CInt -> IO CInt
{-# LINE 83 "src/System/Posix/MQueue.hsc" #-}

foreign import ccall "mq_receive"
  c_mq_receive :: MQD -> Ptr () -> Word64 -> Ptr CInt -> IO CInt
{-# LINE 86 "src/System/Posix/MQueue.hsc" #-}


cREAT, eXCL, rONLY, wRONLY :: CInt
cREAT = 64
{-# LINE 90 "src/System/Posix/MQueue.hsc" #-}
eXCL = 128
{-# LINE 91 "src/System/Posix/MQueue.hsc" #-}
rONLY  = 0
{-# LINE 92 "src/System/Posix/MQueue.hsc" #-}
wRONLY = 1
{-# LINE 93 "src/System/Posix/MQueue.hsc" #-}

mkAttr :: forall a b. Storable a => a -> Int -> (Ptr () -> IO b) -> IO b
mkAttr a sz f = allocaBytes (64) $ \attrP -> do
{-# LINE 96 "src/System/Posix/MQueue.hsc" #-}
    (\hsc_ptr -> pokeByteOff hsc_ptr 0) attrP (0 :: Int64)
{-# LINE 97 "src/System/Posix/MQueue.hsc" #-}
    (\hsc_ptr -> pokeByteOff hsc_ptr 8) attrP sz
{-# LINE 98 "src/System/Posix/MQueue.hsc" #-}
    (\hsc_ptr -> pokeByteOff hsc_ptr 16) attrP (sizeOf a)
{-# LINE 99 "src/System/Posix/MQueue.hsc" #-}
    (\hsc_ptr -> pokeByteOff hsc_ptr 24) attrP (0 :: Int64)
{-# LINE 100 "src/System/Posix/MQueue.hsc" #-}
    f attrP


instance Storable a => Shared (WriteBuffer a) where
  createBuffer name _ sz fmode = do
      x <- withCString name $ \cName -> throwErrnoIfMinus1 "createBuffer: WriteBuffer" $ mkAttr (undefined :: a) sz $ c_mq_open_c cName (wRONLY .|. cREAT .|. eXCL) (fromIntegral fmode)
      return $ WB name x
  openBuffer name _ _ _fmode = do
      WB name <$> withCString name (\cName -> c_mq_open cName wRONLY)
  closeBuffer (WB _ mqd) = void $ throwErrnoIfMinus1 "closeBuffer: WriteBuffer" $ c_mq_close mqd
  removeBuffer wb = closeBuffer wb >> unlinkBuffer wb
  unlinkBuffer (WB name _) = void $ withCString name $ throwErrnoIfMinus1 "unlinkBuffer: WriteBuffer" . c_mq_unlink 

instance Storable a => Shared (ReadBuffer a) where
  createBuffer name _ sz fmode = do
      x <- withCString name $ \cName -> throwErrnoIfMinus1 "createBuffer: ReadBuffer" $ mkAttr (undefined :: a) sz $ c_mq_open_c cName (rONLY .|. cREAT .|. eXCL) (fromIntegral fmode)
      return $ RB name x
  openBuffer name _ _ _fmode = do
      RB name <$> withCString name (\cName -> c_mq_open cName rONLY)
  closeBuffer (RB _ mqd) = void $ throwErrnoIfMinus1 "closeBuffer: ReadBuffer" $ c_mq_close mqd
  removeBuffer wb = closeBuffer wb >> unlinkBuffer wb
  unlinkBuffer (RB name _) = void $ withCString name $ throwErrnoIfMinus1 "unlinkBuffer: ReadBuffer" . c_mq_unlink 


-- | Write a value to the writer end.
--
-- This function is thread-safe.
putBuffer :: Storable a => WriteBuffer a -> a -> IO ()
putBuffer (WB _ mqd) val = alloca $ \msgP -> do
    poke msgP val
    void $ throwErrnoIfMinus1 "putBuffer" $ c_mq_send mqd (castPtr msgP) (fromIntegral $ sizeOf val) 1

{-# INLINEABLE putBuffer #-}

-- | read the next value from the reader end.
--
-- This function is *NOT* thread-safe.
getBuffer :: forall a. Storable a => ReadBuffer a -> IO a
getBuffer (RB _ mqd) = alloca $ \msgP -> do
    let sz = sizeOf (undefined :: a)
    numBytes <- throwErrnoIfMinus1 "getBuffer" $ c_mq_receive mqd (castPtr msgP) (fromIntegral sz) nullPtr
    when (numBytes /= fromIntegral sz) $ error "getBuffer: too few bytes"
    peek msgP

{-# INLINEABLE getBuffer #-}


-- | Write a list of values to the writer end.
--
-- This function is thread-safe.
putBufferList :: Storable a => WriteBuffer a -> [a] -> IO ()
putBufferList wb = mapM_ (putBuffer wb)
{-# INLINE putBufferList #-}

-- | read all currently available values from the reader end.
--
-- This function is *NOT* thread-safe.
getAvailable :: Storable a => ReadBuffer a -> IO [a]
getAvailable (RB _ _) = error "getAvailable: not implemented"