module System.Posix.CircularBuffer (
WriteBuffer
, ReadBuffer
, Shared (..)
, WaitStrategy (..)
, putBuffer
, getBuffer
, tryGetBuffer
, putBufferList
, getAvailable
, sizeOfInt
) where
import System.Posix.SharedBuffer
import Control.Concurrent.MVar
import Control.Exception (try, finally)
import Control.Monad
import Data.Bits
import Data.Maybe (isJust)
import Foreign.ForeignPtr
import Foreign.Ptr
import Foreign.Marshal.Array (advancePtr)
import Foreign.Storable
import System.Posix.Semaphore.Unsafe
import System.Posix (FileMode)
import Debug.Trace (traceEventIO)
import Control.Concurrent (threadDelay,yield)
data WriteBuffer a = WB CircularBuffer (MVar Int)
data ReadBuffer a = RB CircularBuffer (ForeignPtr Int)
class Shared b where
createBuffer :: String -> String -> Int -> FileMode -> IO b
openBuffer :: String -> String -> Int -> FileMode -> IO b
closeBuffer :: b -> IO ()
removeBuffer :: b -> IO ()
unlinkBuffer :: b -> IO ()
instance Storable a => Shared (WriteBuffer a) where
createBuffer = openSharedBuffer makeWB
OpenSemFlags{semCreate = True, semExclusive = True}
ShmOpenFlags{shmCreate = True
,shmReadWrite = True
,shmExclusive = True
,shmTrunc = False
}
writeProtection
(sizeOf (undefined :: a))
openBuffer = openSharedBuffer makeWB
OpenSemFlags{semCreate = False, semExclusive = False}
ShmOpenFlags{shmCreate = False
,shmReadWrite = True
,shmExclusive = False
,shmTrunc = False
}
writeProtection
(sizeOf (undefined :: a))
closeBuffer (WB cb _) = closeBuffer cb
removeBuffer (WB cb _) = removeBuffer cb
unlinkBuffer (WB cb _) = unlinkBuffer cb
instance Storable a => Shared (ReadBuffer a) where
createBuffer = openSharedBuffer makeRB
OpenSemFlags{semCreate = True, semExclusive = True}
ShmOpenFlags{shmCreate = True
,shmReadWrite = False
,shmExclusive = True
,shmTrunc = False
}
[ProtRead]
(sizeOf (undefined :: a))
openBuffer = openSharedBuffer makeRB
OpenSemFlags{semCreate = False, semExclusive = False}
ShmOpenFlags{shmCreate = False
,shmReadWrite = False
,shmExclusive = False
,shmTrunc = False
}
[ProtRead]
(sizeOf (undefined :: a))
closeBuffer (RB cb _) = closeBuffer cb
removeBuffer (RB cb _) = removeBuffer cb
unlinkBuffer (RB cb _) = unlinkBuffer cb
makeRB :: CircularBuffer -> MVar Int -> ForeignPtr Int -> ReadBuffer a
makeRB buf _ fp = RB buf fp
makeWB :: CircularBuffer -> MVar Int -> ForeignPtr Int -> WriteBuffer a
makeWB buf mv _ = WB buf mv
openSharedBuffer :: (CircularBuffer -> MVar Int -> ForeignPtr Int -> b)
-> OpenSemFlags
-> ShmOpenFlags
-> [Protection]
-> Int
-> String
-> String
-> Int
-> FileMode
-> IO b
openSharedBuffer maker semFlags shmFlags prot bitwidth shmName cbSemName reqCbSize mode = do
let bufsz = fromIntegral $ bitwidth*cbSize
cbSize = 2^(ceiling (logBase 2 (fromIntegral $ 1+reqCbSize) :: Double) :: Int)
cbBuf <- openSBuffer shmName bufsz shmFlags prot mode
cbSem <- semOpen cbSemName semFlags mode 0
seqref <- newMVar 0
seqptr <- mallocForeignPtr
withForeignPtr seqptr $ flip poke 0
return $ maker (CircularBuffer{cbBuf,cbSize,cbSem,cbSemName}) seqref seqptr
putBuffer :: Storable a => WriteBuffer a -> a -> IO ()
putBuffer (WB cb seqvar) val = modifyMVar_ seqvar $ \seqnum -> do
writeSeqBlocking cb seqnum val
return $! seqnum+1
putBufferList :: Storable a => WriteBuffer a -> [a] -> IO ()
putBufferList (WB cb seqvar) vals = modifyMVar_ seqvar $ \seqnum -> do
cnt <- writeSeqList cb seqnum vals
return $! seqnum+cnt
data WaitStrategy =
KBlocking
| Spin !Int !Int !Int
| SpinContinuous !Int
getBuffer :: Storable a => WaitStrategy -> ReadBuffer a -> IO a
getBuffer ws (RB cb seqvar) = withForeignPtr seqvar $ \seqPtr -> do
seqnum <- peek seqPtr
val <- readSeqBlocking ws cb seqnum
poke seqPtr $ seqnum+1
return val
tryGetBuffer :: Storable a => ReadBuffer a -> IO (Maybe a)
tryGetBuffer (RB cb seqvar) = withForeignPtr seqvar $ \seqPtr -> do
seqnum <- peek seqPtr
val <- tryReadSeq cb seqnum
when (isJust val) $ poke seqPtr $ seqnum+1
return val
getAvailable :: Storable a => ReadBuffer a -> IO [a]
getAvailable (RB cb seqvar) = withForeignPtr seqvar $ \seqPtr -> do
seqnum <- peek seqPtr
val <- readSeqReady cb seqnum
poke seqPtr $ seqnum+length val
return val
data CircularBuffer = CircularBuffer
{ cbSize :: !Int
, cbBuf :: !SharedBuffer
, cbSem :: !Semaphore
, cbSemName :: String
}
instance Shared CircularBuffer where
createBuffer = error "can't create a CircularBuffer directly"
openBuffer = error "can't open a CircularBuffer directly"
closeBuffer = closeSharedBuffer . cbBuf
removeBuffer cb =
removeSharedBuffer (cbBuf cb) `finally`
void (try (semUnlink (cbSemName cb)) :: IO (Either IOError ()))
unlinkBuffer cb =
void (try $ unlinkSharedBuffer (cbBuf cb) `finally` semUnlink (cbSemName cb) :: IO (Either IOError ()))
readSeqBlocking :: Storable a => WaitStrategy -> CircularBuffer -> Int -> IO a
readSeqBlocking KBlocking cb = \rseq -> do
waitAndLock (cbSem cb)
readSeq cb rseq
readSeqBlocking (SpinContinuous n) cb = \rseq -> do
waitSpin n (cbSem cb)
readSeq cb rseq
readSeqBlocking (Spin spinStay spinStart spinStop) cb = \rseq -> do
waitBackoff spinStart spinStay spinStop (cbSem cb)
readSeq cb rseq
tryReadSeq :: Storable a => CircularBuffer -> Int -> IO (Maybe a)
tryReadSeq cb rseq = do
gotLock <- unsafeSemTryWait $ cbSem cb
if gotLock then Just `fmap` readSeq cb rseq else return Nothing
readSeqReady :: Storable a => CircularBuffer -> Int -> IO [a]
readSeqReady cb@CircularBuffer{..} rseq = do
curReady <- unsafeSemGetValue cbSem
vals <- readSeqs cb rseq curReady
replicateM_ curReady (unsafeSemLock cbSem)
return vals
writeSeqBlocking :: Storable a => CircularBuffer -> Int -> a -> IO ()
writeSeqBlocking cb@CircularBuffer{..} writePos val = do
let waitUntilAvailable = do
curLag <- unsafeSemGetValue cbSem
when (curLag >= cbSize1) $ do
traceEventIO "writeSeqBlocking: waitUntilAvailable spinning"
waitUntilAvailable
waitUntilAvailable
writeSeq cb writePos val
unsafeSemPost cbSem
writeSeqList :: Storable a => CircularBuffer -> Int -> [a] -> IO Int
writeSeqList cb@CircularBuffer{..} writePos = go 0
where
go !len [] = return len
go !prevWritten xs = do
currentLag <- unsafeSemGetValue cbSem
let numReady = cbSizecurrentLag
(toWrite,toWait) = splitAt numReady xs
offset = writePos+prevWritten
numWritten <- foldM (\(!ix) x -> writeSeq cb (offset+ix) x >> return (ix+1)) 0 toWrite
replicateM_ numWritten (unsafeSemPost cbSem)
go (prevWritten+numWritten) toWait
readSeq :: Storable a => CircularBuffer -> Int -> IO a
readSeq CircularBuffer{..} rseq = do
let cbPtr = castPtr $ sbPtr $ cbBuf
offset = rseq .&. (cbSize1)
peek (cbPtr `advancePtr` offset)
readSeqs :: Storable a => CircularBuffer -> Int -> Int -> IO [a]
readSeqs CircularBuffer{..} rseq count
| count <= 0 = return []
| otherwise = go (count1) []
where
cbPtr = castPtr $ sbPtr cbBuf
go 0 acc = do
let offset = rseq .&. (cbSize1)
x <- peek (cbPtr `advancePtr` offset)
return (x:acc)
go n acc = do
let offset = (n+rseq) .&. (cbSize1)
x <- peek (cbPtr `advancePtr` offset)
go (n1) (x:acc)
writeSeq :: Storable a => CircularBuffer -> Int -> a -> IO ()
writeSeq CircularBuffer{..} wseq val = do
let cbPtr = castPtr $ sbPtr $ cbBuf
offset = wseq .&. (cbSize1)
poke (cbPtr `advancePtr` offset) val
waitAndLock :: Semaphore -> IO ()
waitAndLock sem = do
gotLock <- unsafeSemTryWait sem
when (not gotLock) $ do
gotLock' <- semTimedWait 10 0 sem
when (not gotLock') $ waitAndLock sem
waitSpin :: Int -> Semaphore -> IO ()
waitSpin n sem = go (20 :: Int)
where
go 0 = yield >> go 20
go k = do
gotLock <- unsafeSemTryWait sem
when (not gotLock) $ do
replicateM_ n $ return ()
go (k1)
waitBackoff :: Int -> Int -> Int -> Semaphore -> IO ()
waitBackoff k0 stay k1 sem = go 0 0
where
go _ 0 = do
gotLock <- unsafeSemTryWait sem
when (not gotLock) $ go stay k0
go 0 n = go stay (n*2)
go l n
| n < k1 = do
mapM_ (\_ -> return ()) [0..n]
gotLock <- unsafeSemTryWait sem
when (not gotLock) $ go (l1) n
| otherwise = do
threadDelay 500
gotLock <- unsafeSemTryWait sem
when (not gotLock) $ go 1 n
sizeOfInt :: Int
sizeOfInt = $(let sz = sizeOf (0::Int) in [| sz |])