{-# LANGUAGE LambdaCase #-}
module Database.PostgreSQL.Simple.LargeObjects.Stream where
import Control.Exception.Lifted (AssertionFailed (..), bracket, throwIO)
import Control.Monad.Loops (whileJust_)
import Control.Monad.State as State
import qualified Data.ByteString as BS
import Data.ByteString.Builder (byteString)
import qualified Data.ByteString.Builder as BS
import qualified Data.ByteString.Lazy as LBS
import Data.IORef (modifyIORef, newIORef, readIORef)
import Data.Semigroup ((<>))
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.LargeObjects (LoFd, Oid (..))
import qualified Database.PostgreSQL.Simple.LargeObjects as Sql
import System.IO (IOMode (ReadMode, WriteMode))
import System.IO.Streams (makeOutputStream)
import qualified System.IO.Streams as Streams
newLargeObjectBS
:: Connection
-> BS.ByteString
-> IO Oid
newLargeObjectBS conn contents = do
oid <- Sql.loCreat conn
n <- withLargeObject conn oid WriteMode $ \lofd -> Sql.loWrite conn lofd contents
let l = BS.length contents
when (n /= l) . throwIO . AssertionFailed $
"newLargeObjectBS: loWrite reported writing " <> show n <> " bytes, expected " <> show l <> "."
return oid
newLargeObjectLBS
:: Connection
-> LBS.ByteString
-> IO (Oid, Int)
newLargeObjectLBS conn = newLargeObjectStream conn <=< Streams.fromLazyByteString
newLargeObjectStream
:: Connection
-> Streams.InputStream BS.ByteString
-> IO (Oid, Int)
newLargeObjectStream conn s = do
oid <- Sql.loCreat conn
t <- withLargeObject conn oid WriteMode $ \lofd -> do
whileJust_ (Streams.read s) $ \chunk -> do
n <- Sql.loWrite conn lofd chunk
let l = BS.length chunk
when (n /= l) . throwIO . AssertionFailed $
"newLargeObjectStream: loWrite reported writing " <> show n <> " bytes, expected " <> show l <> "."
Sql.loTell conn lofd
return (oid, t)
withLargeObject
:: Connection
-> Oid
-> IOMode
-> (LoFd -> IO a)
-> IO a
withLargeObject conn oid mode f =
bracket (Sql.loOpen conn oid mode)
(\lofd -> Sql.loClose conn lofd)
f
streamLargeObject
:: Connection
-> Oid
-> Streams.OutputStream BS.Builder
-> IO ()
streamLargeObject conn oid os =
withLargeObject conn oid ReadMode $ \lofd ->
fix $ \again -> do
chunk <- Sql.loRead conn lofd 8192
case BS.length chunk of
0 -> return ()
_ -> do
Streams.write (Just $ byteString chunk) os
again
streamLargeObjectRange
:: Connection
-> Oid
-> Int
-> Int
-> Streams.OutputStream BS.Builder
-> IO ()
streamLargeObjectRange conn oid start end os =
withLargeObject conn oid ReadMode $ \lofd -> do
_ <- Sql.loSeek conn lofd Sql.AbsoluteSeek start
let again n = do
let nextChunkSize = min 8192 (end - n)
chunk <- Sql.loRead conn lofd nextChunkSize
case BS.length chunk of
0 -> return ()
k -> do
Streams.write (Just $ byteString chunk) os
again (n + k)
again start
withLargeObjectLBS :: Connection -> Oid -> (LBS.ByteString -> IO ()) -> IO ()
withLargeObjectLBS conn oid f = do
lo <- newIORef mempty
cb <- makeOutputStream $ \case
Just chunk -> modifyIORef lo $ \chunks -> chunks <> chunk
Nothing -> do
payload <- readIORef lo
f $ BS.toLazyByteString payload
streamLargeObject conn oid cb
Streams.write Nothing cb