module ConcurrentBuffer
(
Buffer,
new,
push,
pushBytes,
pushStorable,
pull,
pullBytes,
pullStorable,
getSpace,
getBytes,
)
where
import ConcurrentBuffer.Prelude hiding (State, Buffer)
import qualified ConcurrentBuffer.PtrIO as A
import qualified Data.ByteString.Internal as C
data Buffer =
Buffer
!(TVar (ForeignPtr Word8))
!(TVar Int)
!(TVar Int)
!(TVar Int)
!(TVar Bool)
!(TVar Bool)
!(TVar Bool)
new :: Int -> IO Buffer
new capacity =
do
fptr <- mallocForeignPtrBytes capacity
atomically $ do
fptrVar <- newTVar fptr
startVar <- newTVar 0
endVar <- newTVar 0
capVar <- newTVar capacity
notPullingVar <- newTVar True
notPushingVar <- newTVar True
notAligningVar <- newTVar True
return (Buffer fptrVar startVar endVar capVar notPullingVar notPushingVar notAligningVar)
push :: Buffer -> Int -> (Ptr Word8 -> IO (Int, result)) -> IO result
push (Buffer fptrVar startVar endVar capVar notPullingVar notPushingVar notAligningVar) space ptrIO =
join $ atomically $ do
notPushing <- readTVar notPushingVar
guard notPushing
writeTVar notPushingVar False
fptr <- readTVar fptrVar
start <- readTVar startVar
end <- readTVar endVar
capacity <- readTVar capVar
let
!remainingSpace = capacity end
!capacityDelta = space remainingSpace
!occupiedSpace = end start
if capacityDelta <= 0
then
return $ do
(!pushedSpace, output) <- withForeignPtr fptr $ \ptr -> ptrIO (plusPtr ptr end)
atomically $ do
writeTVar endVar $! end + pushedSpace
writeTVar notPushingVar True
return output
else if capacityDelta > start
then
return $ do
let !newCapacity = occupiedSpace + space
newFPtr <- mallocForeignPtrBytes newCapacity
(!pushedSpace, output) <- withForeignPtr newFPtr $ \newPtr -> do
withForeignPtr fptr $ \ptr -> A.memcpy newPtr (plusPtr ptr start) (fromIntegral occupiedSpace)
ptrIO (plusPtr newPtr occupiedSpace)
let !newEnd = occupiedSpace + pushedSpace
atomically $ do
divergedStart <- readTVar startVar
let !newStart = divergedStart start
writeTVar fptrVar newFPtr
writeTVar startVar newStart
writeTVar endVar newEnd
writeTVar capVar newCapacity
writeTVar notPushingVar True
return output
else
do
notPulling <- readTVar notPullingVar
guard notPulling
writeTVar notAligningVar False
return $ do
(!pushedSpace, output) <- withForeignPtr fptr $ \ptr -> do
A.memmove ptr (plusPtr ptr start) (fromIntegral occupiedSpace)
atomically $ do
writeTVar startVar 0
writeTVar endVar occupiedSpace
writeTVar notAligningVar True
ptrIO (plusPtr ptr occupiedSpace)
atomically $ do
writeTVar endVar $! occupiedSpace + pushedSpace
writeTVar notPushingVar True
return output
pull :: Buffer -> Int -> (Ptr Word8 -> IO result) -> IO result
pull (Buffer fptrVar startVar endVar capVar notPullingVar notPushingVar notAligningVar) amount ptrIO =
join $ atomically $ do
notPulling <- readTVar notPullingVar
guard notPulling
fptr <- readTVar fptrVar
start <- readTVar startVar
end <- readTVar endVar
guard (amount <= end start)
notAligning <- readTVar notAligningVar
guard notAligning
writeTVar notPullingVar False
return $ do
pulled <- withForeignPtr fptr $ \ptr -> ptrIO (plusPtr ptr start)
atomically $ do
start <- readTVar startVar
writeTVar startVar $! start + amount
writeTVar notPullingVar True
return pulled
pushBytes :: Buffer -> ByteString -> IO ()
pushBytes buffer (C.PS bytesFPtr offset length) =
push buffer length $ \ptr ->
withForeignPtr bytesFPtr $ \bytesPtr ->
C.memcpy ptr (plusPtr bytesPtr offset) length $> (length, ())
pullBytes :: Buffer -> Int -> IO ByteString
pullBytes buffer amount =
pull buffer amount (\ptr -> C.create amount (\destPtr -> C.memcpy destPtr ptr amount))
pushStorable :: (Storable storable) => Buffer -> storable -> IO ()
pushStorable buffer storable =
push buffer amount (\ptr -> poke (castPtr ptr) storable $> (amount, ()))
where
amount = sizeOf storable
pullStorable :: (Storable storable) => Buffer -> IO storable
pullStorable buffer =
result
where
result =
pull buffer amount (\ptr -> peek (castPtr ptr))
where
amount =
sizeOf ((undefined :: IO a -> a) result)
getSpace :: Buffer -> IO Int
getSpace (Buffer fptrVar startVar endVar capVar notPullingVar notPushingVar notAligningVar) =
atomically $ do
end <- readTVar endVar
start <- readTVar startVar
return $! end start
getBytes :: Buffer -> IO ByteString
getBytes (Buffer fptrVar startVar endVar capVar notPullingVar notPushingVar notAligningVar) =
join $ atomically $ do
fptr <- readTVar fptrVar
end <- readTVar endVar
start <- readTVar startVar
let size = end start
return $ withForeignPtr fptr $ \ptr -> C.create size $ \destPtr -> C.memcpy destPtr ptr size