module Data.ByteString.FastBuilder.Internal
(
Builder(..)
, BuilderArg(..)
, DataSink(..)
, DynamicSink(..)
, Queue(..)
, Request(..)
, Response(..)
, SuspendBuilderException(..)
, ChunkOverflowException(..)
, Builder_
, BuildM(..)
, toBuilder_
, fromBuilder_
, mkBuilder
, useBuilder
, getSink
, getCur
, getEnd
, setCur
, setEnd
, runBuilder
, toLazyByteString
, toLazyByteStringWith
, toStrictByteString
, hPutBuilder
, primBounded
, primFixed
, primMapListBounded
, primMapListFixed
, byteString
, byteStringThreshold
, byteStringCopy
, byteStringCopyNoCheck
, byteStringInsert
, unsafeCString
, ensureBytes
, getBytes
, rebuild
) where
import Control.Concurrent (forkIOWithUnmask, myThreadId)
import Control.Concurrent.MVar
import qualified Control.Exception as E
import Control.Monad
import qualified Data.ByteString as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Unsafe as S
import qualified Data.ByteString.Lazy as L
import Data.IORef
import Data.Monoid
import Data.String
import Data.Word
import Foreign.C.String
import Foreign.C.Types
import Foreign.ForeignPtr
import Foreign.ForeignPtr.Unsafe
import Foreign.Marshal.Utils
import Foreign.Ptr
import qualified System.IO as IO
import System.IO.Unsafe
import GHC.Exts (Addr#, State#, RealWorld, Ptr(..), Int(..), Int#)
import GHC.Magic (oneShot)
import GHC.IO (IO(..))
import GHC.CString (unpackCString#)
import qualified Data.ByteString.Builder.Prim as P
import qualified Data.ByteString.Builder.Prim.Internal as PI
import qualified Data.ByteString.Builder.Extra as X
newtype Builder = Builder
{ unBuilder
:: BuilderArg -> State# RealWorld -> (# Addr#, Addr#, State# RealWorld #)
}
data BuilderArg = BuilderArg
DataSink
!(Ptr Word8)
!(Ptr Word8)
instance Monoid Builder where
mempty = Builder $ \(BuilderArg _ (Ptr cur) (Ptr end)) s -> (# cur, end, s #)
mappend (Builder a) (Builder b) = Builder $ \(BuilderArg dex (Ptr cur) (Ptr end)) s ->
case a (BuilderArg dex (Ptr cur) (Ptr end)) s of
(# cur', end', s' #) -> b (BuilderArg dex (Ptr cur') (Ptr end')) s'
mconcat xs = foldr mappend mempty xs
instance IsString Builder where
fromString = builderFromString
data DataSink
= DynamicSink !(IORef DynamicSink)
| GrowingBuffer !(IORef (ForeignPtr Word8))
| HandleSink !IO.Handle !(IORef Queue)
data DynamicSink
= ThreadedSink !(MVar Request) !(MVar Response)
| BoundedGrowingBuffer !(ForeignPtr Word8) !Int
data Queue = Queue !(ForeignPtr Word8) !Int
data Request
= Request !(Ptr Word8) !(Ptr Word8)
data Response
= Error E.SomeException
| Done !(Ptr Word8)
| MoreBuffer !(Ptr Word8) !Int
| InsertByteString !(Ptr Word8) !S.ByteString
deriving (Show)
data ChunkOverflowException
= ChunkOverflowException
!S.ByteString !(MVar Request) !(MVar Response) !Int
instance Show ChunkOverflowException where
show (ChunkOverflowException buf _ _ req) =
"ChunkOverflowException " ++ show buf ++ " _ _ " ++ show req
instance E.Exception ChunkOverflowException
data SuspendBuilderException = SuspendBuilderException !(MVar ())
instance Show SuspendBuilderException where
show _ = "SuspendBuilderException"
instance E.Exception SuspendBuilderException
type Builder_
= DataSink -> Addr# -> Addr# -> State# RealWorld
-> (# Addr#, Addr#, State# RealWorld #)
toBuilder_ :: Builder -> Builder_
toBuilder_ (Builder f) dex cur end s = f (BuilderArg dex (Ptr cur) (Ptr end)) s
fromBuilder_ :: Builder_ -> Builder
fromBuilder_ f = Builder $ \(BuilderArg dex (Ptr cur) (Ptr end)) s -> f dex cur end s
newtype BuildM a = BuildM { runBuildM :: (a -> Builder) -> Builder }
deriving (Functor)
instance Applicative BuildM where
pure = return
(<*>) = ap
instance Monad BuildM where
return x = BuildM $ \k -> k x
BuildM b >>= f = BuildM $ \k -> b $ \r -> runBuildM (f r) k
mkBuilder :: BuildM () -> Builder
mkBuilder (BuildM bb) = bb $ \_ -> mempty
useBuilder :: Builder -> BuildM ()
useBuilder b = BuildM $ \k -> b <> k ()
getSink :: BuildM DataSink
getSink = BuildM $ \k -> Builder $ \(BuilderArg dex cur end) s ->
unBuilder (k dex) (BuilderArg dex cur end) s
getCur :: BuildM (Ptr Word8)
getCur = BuildM $ \k -> Builder $ \(BuilderArg dex cur end) s ->
unBuilder (k cur) (BuilderArg dex cur end) s
getEnd :: BuildM (Ptr Word8)
getEnd = BuildM $ \k -> Builder $ \(BuilderArg dex cur end) s ->
unBuilder (k end) (BuilderArg dex cur end) s
setCur :: Ptr Word8 -> BuildM ()
setCur p = BuildM $ \k -> Builder $ \(BuilderArg dex _ end) s ->
unBuilder (k ()) (BuilderArg dex p end) s
setEnd :: Ptr Word8 -> BuildM ()
setEnd p = BuildM $ \k -> Builder $ \(BuilderArg dex cur _) s ->
unBuilder (k ()) (BuilderArg dex cur p) s
io :: IO a -> BuildM a
io (IO x) = BuildM $ \k -> Builder $ \ba s -> case x s of
(# s', val #) -> unBuilder (k val) ba s'
runBuilder :: Builder -> DataSink -> Ptr Word8 -> Ptr Word8 -> IO (Ptr Word8)
runBuilder (Builder f) sink !cur !end = IO $ \s ->
case f (BuilderArg sink cur end) s of
(# cur', _, s' #) -> (# s', Ptr cur' #)
toLazyByteString :: Builder -> L.ByteString
toLazyByteString = toLazyByteStringWith 100 32768
toLazyByteStringWith :: Int -> Int -> Builder -> L.ByteString
toLazyByteStringWith !initialSize !maxSize builder = unsafePerformIO $ do
fptr <- mallocForeignPtrBytes initialSize
sink <- newIORef $ BoundedGrowingBuffer fptr maxSize
let !base = unsafeForeignPtrToPtr fptr
let
finalPtr = unsafeDupablePerformIO $
runBuilder builder (DynamicSink sink) base (base `plusPtr` initialSize)
loop thunk = do
r <- E.try $ E.evaluate thunk
case r of
Right p -> do
BoundedGrowingBuffer finalFptr _ <- readIORef sink
let !finalBase = unsafeForeignPtrToPtr finalFptr
return $! L.fromStrict $
S.fromForeignPtr finalFptr 0 (p `minusPtr` finalBase)
Left ex
| Just (ChunkOverflowException chunk reqV respV minSize)
<- E.fromException ex
-> do
let rest = continueBuilderThreaded reqV respV minSize maxSize thunk
return $ L.fromChunks $
if S.null chunk then rest else chunk : rest
| otherwise -> do
myTid <- myThreadId
E.throwTo myTid ex
loop thunk
loop finalPtr
continueBuilderThreaded
:: MVar Request -> MVar Response -> Int -> Int -> Ptr Word8
-> [S.ByteString]
continueBuilderThreaded !reqV !respV !initialSize !maxSize thunk =
makeChunks (max maxSize initialSize) maxSize $ toBufferWriter reqV respV thunk
toBufferWriter :: MVar Request -> MVar Response -> Ptr Word8 -> X.BufferWriter
toBufferWriter !reqV !respV thunk buf0 sz0 = E.mask_ $ do
writer Nothing buf0 sz0
where
writer !maybeBuilderTid !buf !sz = do
putMVar reqV $ Request buf (buf `plusPtr` sz)
builderTid <- case maybeBuilderTid of
Just t -> return t
Nothing -> forkIOWithUnmask $ \u ->
builderThreadWithUnmask u respV thunk
resp <- wait builderTid
let go cur next = return(written, next)
where !written = cur `minusPtr` buf
case resp of
Error ex -> E.throwIO ex
Done cur -> go cur X.Done
MoreBuffer cur k -> go cur $ X.More k $ writer (Just builderTid)
InsertByteString cur str -> go cur $ X.Chunk str $ writer (Just builderTid)
wait !builderTid = do
r <- E.try $ takeMVar respV
case r of
Right resp -> return resp
Left exn -> do
resumeVar <- newEmptyMVar
E.throwTo builderTid $ SuspendBuilderException resumeVar
thisTid <- myThreadId
E.throwTo thisTid (exn :: E.SomeException)
putMVar resumeVar ()
wait builderTid
builderThreadWithUnmask
:: (forall a. IO a -> IO a) -> MVar Response -> Ptr Word8
-> IO ()
builderThreadWithUnmask unmask !respV thunk = loop
where
loop = do
r <- E.try $ unmask $ E.evaluate thunk
case r of
Right p -> putMVar respV $ Done p
Left ex
| Just (SuspendBuilderException lock) <- E.fromException ex
-> do takeMVar lock; loop
| otherwise -> putMVar respV $ Error ex
makeChunks :: Int -> Int -> X.BufferWriter -> [S.ByteString]
makeChunks !initialBufSize maxBufSize = go initialBufSize
where
go !bufSize w = unsafePerformIO $ do
fptr <- S.mallocByteString bufSize
(written, next) <- withForeignPtr fptr $ \buf -> w buf bufSize
let rest = case next of
X.Done -> []
X.More reqSize w' -> go (max reqSize maxBufSize) w'
X.Chunk chunk w' -> chunk : go maxBufSize w'
return $ if written == 0
then rest
else S.fromForeignPtr fptr 0 written : rest
toStrictByteString :: Builder -> S.ByteString
toStrictByteString builder = unsafePerformIO $ do
let cap = 100
fptr <- mallocForeignPtrBytes cap
bufferRef <- newIORef fptr
let !base = unsafeForeignPtrToPtr fptr
cur <- runBuilder builder (GrowingBuffer bufferRef) base (base `plusPtr` cap)
endFptr <- readIORef bufferRef
let !written = cur `minusPtr` unsafeForeignPtrToPtr endFptr
return $ S.fromForeignPtr endFptr 0 written
hPutBuilder :: IO.Handle -> Builder -> IO ()
hPutBuilder !h builder = do
let cap = 100
fptr <- mallocForeignPtrBytes cap
qRef <- newIORef $ Queue fptr 0
let !base = unsafeForeignPtrToPtr fptr
cur <- runBuilder builder (HandleSink h qRef) base (base `plusPtr` cap)
flushQueue h qRef cur
builderFromString :: String -> Builder
builderFromString = primMapListBounded P.charUtf8
primBounded :: PI.BoundedPrim a -> a -> Builder
primBounded prim x = rebuild $ seq x $ mkBuilder $ do
useBuilder $ ensureBytes $ PI.sizeBound prim
cur <- getCur
cur' <- io $ PI.runB prim x cur
setCur cur'
primFixed :: PI.FixedPrim a -> a -> Builder
primFixed prim x = primBounded (PI.toB prim) x
primMapListBounded :: PI.BoundedPrim a -> [a] -> Builder
primMapListBounded prim = \xs -> mconcat $ map (primBounded prim) xs
primMapListFixed :: PI.FixedPrim a -> [a] -> Builder
primMapListFixed prim = \xs -> primMapListBounded (PI.toB prim) xs
byteString :: S.ByteString -> Builder
byteString = byteStringThreshold maximalCopySize
maximalCopySize :: Int
maximalCopySize = 2 * X.smallChunkSize
byteStringThreshold :: Int -> S.ByteString -> Builder
byteStringThreshold th bstr = rebuild $
if S.length bstr >= th
then byteStringInsert bstr
else byteStringCopy bstr
byteStringCopy :: S.ByteString -> Builder
byteStringCopy !bstr =
ensureBytes (S.length bstr) <> byteStringCopyNoCheck bstr
byteStringCopyNoCheck :: S.ByteString -> Builder
byteStringCopyNoCheck !bstr = mkBuilder $ do
cur <- getCur
io $ S.unsafeUseAsCString bstr $ \ptr ->
copyBytes cur (castPtr ptr) len
setCur $ cur `plusPtr` len
where
!len = S.length bstr
byteStringInsert :: S.ByteString -> Builder
byteStringInsert !bstr = fromBuilder_ $ byteStringInsert_ bstr
byteStringInsert_ :: S.ByteString -> Builder_
byteStringInsert_ bstr = toBuilder_ $ mkBuilder $ do
sink <- getSink
case sink of
DynamicSink dRef -> do
dyn <- io $ readIORef dRef
case dyn of
ThreadedSink reqV respV -> do
cur <- getCur
io $ putMVar respV $ InsertByteString cur bstr
handleRequest reqV
BoundedGrowingBuffer fptr bound -> do
r <- remainingBytes
when (r < S.length bstr) $
growBufferBounded dRef fptr bound (S.length bstr)
useBuilder $ byteStringCopyNoCheck bstr
GrowingBuffer bufRef -> do
r <- remainingBytes
when (r < S.length bstr) $
growBuffer bufRef (S.length bstr)
useBuilder $ byteStringCopyNoCheck bstr
HandleSink h queueRef -> do
cur <- getCur
io $ flushQueue h queueRef cur
io $ S.hPut h bstr
unsafeCString :: CString -> Builder
unsafeCString cstr = rebuild $ let
!len = fromIntegral $ c_pure_strlen cstr
in
mappend (ensureBytes len) $ mkBuilder $ do
cur <- getCur
io $ copyBytes cur (castPtr cstr) len
setCur $ cur `plusPtr` len
foreign import ccall unsafe "strlen" c_pure_strlen :: CString -> CSize
ensureBytes :: Int -> Builder
ensureBytes !n = mkBuilder $ do
r <- remainingBytes
when (r < n) $ useBuilder $ getBytes n
getBytes :: Int -> Builder
getBytes (I# n) = fromBuilder_ (getBytes_ n)
getBytes_ :: Int# -> Builder_
getBytes_ n = toBuilder_ $ mkBuilder $ do
sink <- getSink
case sink of
DynamicSink dRef -> do
dyn <- io $ readIORef dRef
case dyn of
ThreadedSink reqV respV -> do
cur <- getCur
io $ putMVar respV $ MoreBuffer cur $ I# n
handleRequest reqV
BoundedGrowingBuffer fptr bound ->
growBufferBounded dRef fptr bound (I# n)
GrowingBuffer bufRef -> growBuffer bufRef (I# n)
HandleSink h queueRef -> do
cur <- getCur
io $ flushQueue h queueRef cur
switchQueue queueRef $ max 4096 (I# n)
remainingBytes :: BuildM Int
remainingBytes = minusPtr <$> getEnd <*> getCur
rebuild :: Builder -> Builder
rebuild (Builder f) = Builder $ oneShot (\ !arg s -> f arg s)
handleRequest :: MVar Request -> BuildM ()
handleRequest reqV = do
Request newCur newEnd <- io $ takeMVar reqV
setCur newCur
setEnd newEnd
growBuffer :: IORef (ForeignPtr Word8) -> Int -> BuildM ()
growBuffer !bufRef !req = do
cur <- getCur
end <- getCur
fptr <- io $ readIORef bufRef
let !base = unsafeForeignPtrToPtr fptr
let !size = cur `minusPtr` base
let !cap = end `minusPtr` base
let !newCap = cap + max cap req
newFptr <- io $ mallocForeignPtrBytes newCap
let !newBase = unsafeForeignPtrToPtr newFptr
setCur $ newBase `plusPtr` size
setEnd $ newBase `plusPtr` newCap
io $ do
copyBytes newBase base size
touchForeignPtr fptr
touchForeignPtr newFptr
writeIORef bufRef newFptr
flushQueue :: IO.Handle -> IORef Queue -> Ptr Word8 -> IO ()
flushQueue !h !qRef !cur = do
Queue fptr start <- readIORef qRef
let !end = cur `minusPtr` unsafeForeignPtrToPtr fptr
when (end > start) $ do
S.hPut h $ S.fromForeignPtr fptr start (end start)
writeIORef qRef $ Queue fptr end
switchQueue :: IORef Queue -> Int -> BuildM ()
switchQueue !qRef !minSize = do
end <- getCur
Queue fptr _ <- io $ readIORef qRef
let !base = unsafeForeignPtrToPtr fptr
let !cap = end `minusPtr` base
newFptr <- if minSize <= cap
then return fptr
else io $ mallocForeignPtrBytes minSize
let !newBase = unsafeForeignPtrToPtr newFptr
io $ writeIORef qRef $ Queue newFptr 0
setCur newBase
setEnd $ newBase `plusPtr` max minSize cap
growBufferBounded
:: IORef DynamicSink -> ForeignPtr Word8 -> Int -> Int -> BuildM ()
growBufferBounded !dRef !fptr !bound !req = do
cur <- getCur
end <- getCur
let !base = unsafeForeignPtrToPtr fptr
let !size = cur `minusPtr` base
let !cap = end `minusPtr` base
let !newCap = cap + max cap req
if bound < newCap
then chunkOverflow dRef req $ S.fromForeignPtr fptr 0 size
else do
newFptr <- io $ mallocForeignPtrBytes newCap
let !newBase = unsafeForeignPtrToPtr newFptr
setCur $ newBase `plusPtr` size
setEnd $ newBase `plusPtr` newCap
io $ do
copyBytes newBase base size
touchForeignPtr fptr
touchForeignPtr newFptr
writeIORef dRef $ BoundedGrowingBuffer newFptr bound
chunkOverflow :: IORef DynamicSink -> Int -> S.ByteString -> BuildM ()
chunkOverflow !dRef !minSize !chunk = do
myTid <- io $ myThreadId
reqV <- io $ newEmptyMVar
respV <- io $ newEmptyMVar
io $ E.throwTo myTid $ ChunkOverflowException chunk reqV respV minSize
io $ writeIORef dRef $ ThreadedSink reqV respV
handleRequest reqV