module System.Posix.CircularBuffer (
WriteBuffer
, ReadBuffer
, Shared (..)
, putBuffer
, getBuffer
, putBufferList
, getAvailable
, sizeOfInt
) where
import System.Posix.SharedBuffer
import Control.Concurrent.MVar
import Control.Exception (try)
import Control.Monad
import Data.Bits
import Foreign.ForeignPtr
import Foreign.Ptr
import Foreign.Marshal.Array (advancePtr)
import Foreign.Storable
import System.Posix.Semaphore.Unsafe
import System.Posix (FileMode)
import Debug.Trace (traceEventIO)
data WriteBuffer a = WB CircularBuffer (MVar Int)
data ReadBuffer a = RB CircularBuffer (ForeignPtr Int)
class Shared b where
createBuffer :: String -> String -> Int -> FileMode -> IO b
openBuffer :: String -> String -> Int -> FileMode -> IO b
closeBuffer :: b -> IO ()
removeBuffer :: b -> IO ()
unlinkBuffer :: b -> IO ()
instance Storable a => Shared (WriteBuffer a) where
createBuffer = openSharedBuffer makeWB
OpenSemFlags{semCreate = True, semExclusive = True}
ShmOpenFlags{shmCreate = True
,shmReadWrite = True
,shmExclusive = True
,shmTrunc = False
}
writeProtection
(sizeOf (undefined :: a))
openBuffer = openSharedBuffer makeWB
OpenSemFlags{semCreate = False, semExclusive = False}
ShmOpenFlags{shmCreate = False
,shmReadWrite = True
,shmExclusive = False
,shmTrunc = False
}
writeProtection
(sizeOf (undefined :: a))
closeBuffer (WB cb _) = closeBuffer cb
removeBuffer (WB cb _) = removeBuffer cb
unlinkBuffer (WB cb _) = unlinkBuffer cb
instance Storable a => Shared (ReadBuffer a) where
createBuffer = openSharedBuffer makeRB
OpenSemFlags{semCreate = True, semExclusive = True}
ShmOpenFlags{shmCreate = True
,shmReadWrite = False
,shmExclusive = True
,shmTrunc = False
}
[ProtRead]
(sizeOf (undefined :: a))
openBuffer = openSharedBuffer makeRB
OpenSemFlags{semCreate = False, semExclusive = False}
ShmOpenFlags{shmCreate = False
,shmReadWrite = False
,shmExclusive = False
,shmTrunc = False
}
[ProtRead]
(sizeOf (undefined :: a))
closeBuffer (RB cb _) = closeBuffer cb
removeBuffer (RB cb _) = removeBuffer cb
unlinkBuffer (RB cb _) = unlinkBuffer cb
makeRB :: CircularBuffer -> MVar Int -> ForeignPtr Int -> ReadBuffer a
makeRB buf _ fp = RB buf fp
makeWB :: CircularBuffer -> MVar Int -> ForeignPtr Int -> WriteBuffer a
makeWB buf mv _ = WB buf mv
openSharedBuffer :: (CircularBuffer -> MVar Int -> ForeignPtr Int -> b)
-> OpenSemFlags
-> ShmOpenFlags
-> [Protection]
-> Int
-> String
-> String
-> Int
-> FileMode
-> IO b
openSharedBuffer maker semFlags shmFlags prot bitwidth shmName cbSemName reqCbSize mode = do
let bufsz = fromIntegral $ bitwidth*cbSize
cbSize = 2^(ceiling (logBase 2 (fromIntegral $ 1+reqCbSize) :: Double) :: Int)
cbBuf <- openSBuffer shmName bufsz shmFlags prot mode
cbSem <- semOpen cbSemName semFlags mode 0
seqref <- newMVar 0
seqptr <- mallocForeignPtr
withForeignPtr seqptr $ flip poke 0
return $ maker (CircularBuffer{cbBuf,cbSize,cbSem,cbSemName}) seqref seqptr
putBuffer :: Storable a => WriteBuffer a -> a -> IO ()
putBuffer (WB cb seqvar) val = modifyMVar_ seqvar $ \seqnum -> do
writeSeqBlocking cb seqnum val
return $! seqnum+1
putBufferList :: Storable a => WriteBuffer a -> [a] -> IO ()
putBufferList (WB cb seqvar) vals = modifyMVar_ seqvar $ \seqnum -> do
cnt <- writeSeqList cb seqnum vals
return $! seqnum+cnt
getBuffer :: Storable a => ReadBuffer a -> IO a
getBuffer (RB cb seqvar) = withForeignPtr seqvar $ \seqPtr -> do
seqnum <- peek seqPtr
val <- readSeqBlocking cb seqnum
poke seqPtr $ seqnum+1
return val
getAvailable :: Storable a => ReadBuffer a -> IO [a]
getAvailable (RB cb seqvar) = withForeignPtr seqvar $ \seqPtr -> do
seqnum <- peek seqPtr
val <- readSeqReady cb seqnum
poke seqPtr $ seqnum+length val
return val
data CircularBuffer = CircularBuffer
{ cbSize :: !Int
, cbBuf :: !SharedBuffer
, cbSem :: !Semaphore
, cbSemName :: String
}
instance Shared CircularBuffer where
createBuffer = error "can't create a CircularBuffer directly"
openBuffer = error "can't open a CircularBuffer directly"
closeBuffer = closeSharedBuffer . cbBuf
removeBuffer cb = do
removeSharedBuffer (cbBuf cb)
void (try (semUnlink (cbSemName cb)) :: IO (Either IOError ()))
unlinkBuffer cb =
void (try $ unlinkSharedBuffer (cbBuf cb) >> semUnlink (cbSemName cb) :: IO (Either IOError ()))
readSeqBlocking :: Storable a => CircularBuffer -> Int -> IO a
readSeqBlocking cb = \rseq -> do
waitAndLock (cbSem cb)
readSeq cb rseq
readSeqReady :: Storable a => CircularBuffer -> Int -> IO [a]
readSeqReady cb@CircularBuffer{..} rseq = do
curReady <- unsafeSemGetValue cbSem
vals <- readSeqs cb rseq curReady
replicateM_ curReady (unsafeSemLock cbSem)
return vals
writeSeqBlocking :: Storable a => CircularBuffer -> Int -> a -> IO ()
writeSeqBlocking cb@CircularBuffer{..} writePos val = do
let waitUntilAvailable = do
curLag <- unsafeSemGetValue cbSem
when (curLag >= cbSize1) $ do
traceEventIO "writeSeqBlocking: waitUntilAvailable spinning"
waitUntilAvailable
waitUntilAvailable
writeSeq cb writePos val
unsafeSemPost cbSem
writeSeqList :: Storable a => CircularBuffer -> Int -> [a] -> IO Int
writeSeqList cb@CircularBuffer{..} writePos = go 0
where
go !len [] = return len
go !prevWritten xs = do
currentLag <- unsafeSemGetValue cbSem
let numReady = cbSizecurrentLag
(toWrite,toWait) = splitAt numReady xs
offset = writePos+prevWritten
numWritten <- foldM (\(!ix) x -> writeSeq cb (offset+ix) x >> return (ix+1)) 0 toWrite
replicateM_ numWritten (unsafeSemPost cbSem)
go (prevWritten+numWritten) toWait
readSeq :: Storable a => CircularBuffer -> Int -> IO a
readSeq CircularBuffer{..} rseq = do
let cbPtr = castPtr $ sbPtr $ cbBuf
offset = rseq .&. (cbSize1)
peek (cbPtr `advancePtr` offset)
readSeqs :: Storable a => CircularBuffer -> Int -> Int -> IO [a]
readSeqs CircularBuffer{..} rseq count
| count <= 0 = return []
| otherwise = go (count1) []
where
cbPtr = castPtr $ sbPtr cbBuf
go 0 acc = do
let offset = rseq .&. (cbSize1)
x <- peek (cbPtr `advancePtr` offset)
return (x:acc)
go n acc = do
let offset = (n+rseq) .&. (cbSize1)
x <- peek (cbPtr `advancePtr` offset)
go (n1) (x:acc)
writeSeq :: Storable a => CircularBuffer -> Int -> a -> IO ()
writeSeq CircularBuffer{..} wseq val = do
let cbPtr = castPtr $ sbPtr $ cbBuf
offset = wseq .&. (cbSize1)
poke (cbPtr `advancePtr` offset) val
waitAndLock :: Semaphore -> IO ()
waitAndLock sem = do
gotLock <- unsafeSemTryWait sem
when (not gotLock) $ do
gotLock' <- semTimedWait 10 0 sem
when (not gotLock') $ waitAndLock sem
sizeOfInt :: Int
sizeOfInt = $(let sz = sizeOf (0::Int) in [| sz |])