module Hans.Buffer.Stream (
Buffer(),
newBuffer,
closeBuffer,
bytesAvailable,
putBytes,
takeBytes,
tryTakeBytes,
) where
import Hans.Buffer.Signal
import Control.Monad (when,unless)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.IORef (IORef,newIORef,atomicModifyIORef',readIORef)
import qualified Data.Sequence as Seq
data Buffer = Buffer { bufState :: !(IORef State)
, bufSignal :: !Signal
}
newBuffer :: IO Buffer
newBuffer =
do bufState <- newIORef emptyState
bufSignal <- newSignal
return Buffer { .. }
closeBuffer :: Buffer -> IO ()
closeBuffer Buffer { .. } =
do atomicModifyIORef' bufState sClose
signal bufSignal
bytesAvailable :: Buffer -> IO Bool
bytesAvailable Buffer { .. } =
do st <- readIORef bufState
return (not (sNull st))
putBytes :: S.ByteString -> Buffer -> IO ()
putBytes bytes Buffer { .. } =
do unless (S.null bytes) (atomicModifyIORef' bufState (sPut bytes))
signal bufSignal
takeBytes :: Int -> Buffer -> IO L.ByteString
takeBytes n Buffer { .. }
| n <= 0 = return L.empty
| otherwise = loop
where
loop =
do (bytes,more,closed) <- atomicModifyIORef' bufState (sTake n)
if L.null bytes && not closed
then do waitSignal bufSignal
loop
else do when (more || closed) (signal bufSignal)
return bytes
tryTakeBytes :: Int -> Buffer -> IO (Maybe L.ByteString)
tryTakeBytes n Buffer { .. }
| n <= 0 = return Nothing
| otherwise =
do (bytes,more,closed) <- atomicModifyIORef' bufState (sTake n)
when (more || closed) (signal bufSignal)
if L.null bytes
then return Nothing
else return (Just bytes)
data State = State { stBuf :: !(Seq.Seq S.ByteString)
, stClosed :: !Bool
}
emptyState :: State
emptyState = State { stBuf = Seq.empty, stClosed = False }
sNull :: State -> Bool
sNull State { .. } = Seq.null stBuf
sClose :: State -> (State, ())
sClose State { .. } = (State { stClosed = True, .. }, ())
sTake :: Int -> State -> (State, (L.ByteString,Bool,Bool))
sTake n0 State { .. } = go [] n0 stBuf
where
go acc n mem
| n > 0 =
case Seq.viewl mem of
buf Seq.:< mem'
| S.length buf > n ->
let (as,bs) = S.splitAt n buf
in finalize (L.fromStrict as:acc) (bs Seq.<| mem')
| otherwise ->
go (L.fromStrict buf:acc) (n S.length buf) mem'
Seq.EmptyL ->
finalize acc Seq.empty
| otherwise =
finalize acc mem
finalize acc mem =
( State { stBuf = mem, .. }
, (L.concat (reverse acc), not (Seq.null mem), stClosed)
)
sPut :: S.ByteString -> State -> (State, ())
sPut bytes State { .. } = (State { stBuf = stBuf Seq.|> bytes, .. }, ())