{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE ViewPatterns #-} -- Copyright 2010, 2011, 2012, 2013 Chris Forno -- Copyright 2014-2018 Dylan Simon -- |The Protocol module allows for direct, low-level communication with a -- PostgreSQL server over TCP/IP. You probably don't want to use this module -- directly. module Database.PostgreSQL.Typed.Protocol ( PGDatabase(..) , defaultPGDatabase , PGConnection , PGError(..) , pgErrorCode , pgConnectionDatabase , pgTypeEnv , pgConnect , pgDisconnect , pgReconnect -- * Query operations , pgDescribe , pgSimpleQuery , pgSimpleQueries_ , pgPreparedQuery , pgPreparedLazyQuery , pgCloseStatement -- * Transactions , pgBegin , pgCommit , pgRollback , pgCommitAll , pgRollbackAll , pgTransaction -- * HDBC support , pgDisconnectOnce , pgRun , PGPreparedStatement , pgPrepare , pgClose , PGColDescription(..) , PGRowDescription , pgBind , pgFetch -- * Notifications , PGNotification(..) , pgGetNotifications , pgGetNotification ) where #if !MIN_VERSION_base(4,8,0) import Control.Applicative ((<$>), (<$)) #endif import Control.Arrow ((&&&), first, second) import Control.Exception (Exception, throwIO, onException, finally) import Control.Monad (void, liftM2, replicateM, when, unless) #ifdef VERSION_cryptonite import qualified Crypto.Hash as Hash import qualified Data.ByteArray.Encoding as BA #endif import qualified Data.Binary.Get as G import qualified Data.ByteString as BS import qualified Data.ByteString.Builder as B import qualified Data.ByteString.Char8 as BSC import Data.ByteString.Internal (w2c) import qualified Data.ByteString.Lazy as BSL import qualified Data.ByteString.Lazy.Char8 as BSLC import Data.ByteString.Lazy.Internal (smallChunkSize) import qualified Data.Foldable as Fold import Data.IORef (IORef, newIORef, writeIORef, readIORef, atomicModifyIORef, atomicModifyIORef', modifyIORef') import Data.Int (Int32, Int16) import qualified Data.Map.Lazy as Map import Data.Maybe (fromMaybe) import Data.Monoid ((<>)) #if !MIN_VERSION_base(4,8,0) import Data.Monoid (mempty) #endif import Data.Tuple (swap) import Data.Typeable (Typeable) #if !MIN_VERSION_base(4,8,0) import Data.Word (Word) #endif import Data.Word (Word32) import Network (HostName, PortID(..), connectTo) import System.IO (Handle, hFlush, hClose, stderr, hPutStrLn, hSetBuffering, BufferMode(BlockBuffering)) import System.IO.Error (mkIOError, eofErrorType, ioError) import System.IO.Unsafe (unsafeInterleaveIO) import Text.Read (readMaybe) import Database.PostgreSQL.Typed.Types import Database.PostgreSQL.Typed.Dynamic data PGState = StateUnsync -- no Sync | StatePending -- expecting ReadyForQuery -- ReadyForQuery received: | StateIdle | StateTransaction | StateTransactionFailed -- Terminate sent or EOF received | StateClosed deriving (Show, Eq) -- |Information for how to connect to a database, to be passed to 'pgConnect'. data PGDatabase = PGDatabase { pgDBHost :: HostName -- ^ The hostname (ignored if 'pgDBPort' is 'UnixSocket') , pgDBPort :: PortID -- ^ The port, likely either @PortNumber 5432@ or @UnixSocket \"\/tmp\/.s.PGSQL.5432\"@ , pgDBName :: BS.ByteString -- ^ The name of the database , pgDBUser, pgDBPass :: BS.ByteString , pgDBParams :: [(BS.ByteString, BS.ByteString)] -- ^ Extra parameters to set for the connection (e.g., ("TimeZone", "UTC")) , pgDBDebug :: Bool -- ^ Log all low-level server messages , pgDBLogMessage :: MessageFields -> IO () -- ^ How to log server notice messages (e.g., @print . PGError@) } instance Eq PGDatabase where PGDatabase h1 s1 n1 u1 p1 l1 _ _ == PGDatabase h2 s2 n2 u2 p2 l2 _ _ = h1 == h2 && s1 == s2 && n1 == n2 && u1 == u2 && p1 == p2 && l1 == l2 newtype PGPreparedStatement = PGPreparedStatement Integer deriving (Eq, Show) preparedStatementName :: PGPreparedStatement -> BS.ByteString preparedStatementName (PGPreparedStatement n) = BSC.pack $ show n -- |An established connection to the PostgreSQL server. -- These objects are not thread-safe and must only be used for a single request at a time. data PGConnection = PGConnection { connHandle :: Handle , connDatabase :: !PGDatabase , connPid :: !Word32 -- unused , connKey :: !Word32 -- unused , connTypeEnv :: PGTypeEnv , connParameters :: IORef (Map.Map BS.ByteString BS.ByteString) , connPreparedStatementCount :: IORef Integer , connPreparedStatementMap :: IORef (Map.Map (BS.ByteString, [OID]) PGPreparedStatement) , connState :: IORef PGState , connInput :: IORef (G.Decoder PGBackendMessage) , connTransaction :: IORef Word , connNotifications :: IORef (Queue PGNotification) } data PGColDescription = PGColDescription { pgColName :: BS.ByteString , pgColTable :: !OID , pgColNumber :: !Int16 , pgColType :: !OID , pgColSize :: !Int16 , pgColModifier :: !Int32 , pgColBinary :: !Bool } deriving (Show) type PGRowDescription = [PGColDescription] type MessageFields = Map.Map Char BS.ByteString data PGNotification = PGNotification { pgNotificationPid :: !Word32 , pgNotificationChannel :: !BS.ByteString , pgNotificationPayload :: BSL.ByteString } deriving (Show) -- |Simple amortized fifo data Queue a = Queue [a] [a] emptyQueue :: Queue a emptyQueue = Queue [] [] enQueue :: a -> Queue a -> Queue a enQueue a (Queue e d) = Queue (a:e) d deQueue :: Queue a -> (Queue a, Maybe a) deQueue (Queue e (x:d)) = (Queue e d, Just x) deQueue (Queue (reverse -> x:d) []) = (Queue [] d, Just x) deQueue q = (q, Nothing) queueToList :: Queue a -> [a] queueToList (Queue e d) = d ++ reverse e -- |PGFrontendMessage represents a PostgreSQL protocol message that we'll send. -- See . data PGFrontendMessage = StartupMessage [(BS.ByteString, BS.ByteString)] -- only sent first | CancelRequest !Word32 !Word32 -- sent first on separate connection | Bind { portalName :: BS.ByteString, statementName :: BS.ByteString, bindParameters :: PGValues, binaryColumns :: [Bool] } | CloseStatement { statementName :: BS.ByteString } | ClosePortal { portalName :: BS.ByteString } -- |Describe a SQL query/statement. The SQL string can contain -- parameters ($1, $2, etc.). | DescribeStatement { statementName :: BS.ByteString } | DescribePortal { portalName :: BS.ByteString } | Execute { portalName :: BS.ByteString, executeRows :: !Word32 } | Flush -- |Parse SQL Destination (prepared statement) | Parse { statementName :: BS.ByteString, queryString :: BSL.ByteString, parseTypes :: [OID] } | PasswordMessage BS.ByteString -- |SimpleQuery takes a simple SQL string. Parameters ($1, $2, -- etc.) aren't allowed. | SimpleQuery { queryString :: BSL.ByteString } | Sync | Terminate deriving (Show) -- |PGBackendMessage represents a PostgreSQL protocol message that we'll receive. -- See . data PGBackendMessage = AuthenticationOk | AuthenticationCleartextPassword | AuthenticationMD5Password BS.ByteString -- AuthenticationSCMCredential | BackendKeyData Word32 Word32 | BindComplete | CloseComplete | CommandComplete BS.ByteString -- |Each DataRow (result of a query) is a list of 'PGValue', which are assumed to be text unless known to be otherwise. | DataRow PGValues | EmptyQueryResponse -- |An ErrorResponse contains the severity, "SQLSTATE", and -- message of an error. See -- . | ErrorResponse { messageFields :: MessageFields } | NoData | NoticeResponse { messageFields :: MessageFields } | NotificationResponse PGNotification -- |A ParameterDescription describes the type of a given SQL -- query/statement parameter ($1, $2, etc.). Unfortunately, -- PostgreSQL does not give us nullability information for the -- parameter. | ParameterDescription [OID] | ParameterStatus BS.ByteString BS.ByteString | ParseComplete | PortalSuspended | ReadyForQuery PGState -- |A RowDescription contains the name, type, table OID, and -- column number of the resulting columns(s) of a query. The -- column number is useful for inferring nullability. | RowDescription PGRowDescription deriving (Show) -- |PGException is thrown upon encountering an 'ErrorResponse' with severity of -- ERROR, FATAL, or PANIC. It holds the message of the error. newtype PGError = PGError { pgErrorFields :: MessageFields } deriving (Typeable) instance Show PGError where show (PGError m) = displayMessage m instance Exception PGError -- |Produce a human-readable string representing the message displayMessage :: MessageFields -> String displayMessage m = "PG" ++ f 'S' ++ (if null fC then ": " else " [" ++ fC ++ "]: ") ++ f 'M' ++ (if null fD then fD else '\n' : fD) where fC = f 'C' fD = f 'D' f c = BSC.unpack $ Map.findWithDefault BS.empty c m makeMessage :: BS.ByteString -> BS.ByteString -> MessageFields makeMessage m d = Map.fromAscList [('D', d), ('M', m)] -- |Message SQLState code. -- See . pgErrorCode :: PGError -> BS.ByteString pgErrorCode (PGError e) = Map.findWithDefault BS.empty 'C' e defaultLogMessage :: MessageFields -> IO () defaultLogMessage = hPutStrLn stderr . displayMessage -- |A database connection with sane defaults: -- localhost:5432:postgres defaultPGDatabase :: PGDatabase defaultPGDatabase = PGDatabase { pgDBHost = "localhost" , pgDBPort = PortNumber 5432 , pgDBName = "postgres" , pgDBUser = "postgres" , pgDBPass = BS.empty , pgDBParams = [] , pgDBDebug = False , pgDBLogMessage = defaultLogMessage } connDebug :: PGConnection -> Bool connDebug = pgDBDebug . connDatabase connLogMessage :: PGConnection -> MessageFields -> IO () connLogMessage = pgDBLogMessage . connDatabase -- |The database information for this connection. pgConnectionDatabase :: PGConnection -> PGDatabase pgConnectionDatabase = connDatabase -- |The type environment for this connection. pgTypeEnv :: PGConnection -> PGTypeEnv pgTypeEnv = connTypeEnv #ifdef VERSION_cryptonite md5 :: BS.ByteString -> BS.ByteString md5 = BA.convertToBase BA.Base16 . (Hash.hash :: BS.ByteString -> Hash.Digest Hash.MD5) #endif nul :: B.Builder nul = B.word8 0 byteStringNul :: BS.ByteString -> B.Builder byteStringNul s = B.byteString s <> nul lazyByteStringNul :: BSL.ByteString -> B.Builder lazyByteStringNul s = B.lazyByteString s <> nul -- |Given a message, determine the (optional) type ID and the body messageBody :: PGFrontendMessage -> (Maybe Char, B.Builder) messageBody (StartupMessage kv) = (Nothing, B.word32BE 0x30000 <> Fold.foldMap (\(k, v) -> byteStringNul k <> byteStringNul v) kv <> nul) messageBody (CancelRequest pid key) = (Nothing, B.word32BE 80877102 <> B.word32BE pid <> B.word32BE key) messageBody Bind{ portalName = d, statementName = n, bindParameters = p, binaryColumns = bc } = (Just 'B', byteStringNul d <> byteStringNul n <> (if any fmt p then B.word16BE (fromIntegral $ length p) <> Fold.foldMap (B.word16BE . fromIntegral . fromEnum . fmt) p else B.word16BE 0) <> B.word16BE (fromIntegral $ length p) <> Fold.foldMap val p <> (if or bc then B.word16BE (fromIntegral $ length bc) <> Fold.foldMap (B.word16BE . fromIntegral . fromEnum) bc else B.word16BE 0)) where fmt (PGBinaryValue _) = True fmt _ = False val PGNullValue = B.int32BE (-1) val (PGTextValue v) = B.word32BE (fromIntegral $ BS.length v) <> B.byteString v val (PGBinaryValue v) = B.word32BE (fromIntegral $ BS.length v) <> B.byteString v messageBody CloseStatement{ statementName = n } = (Just 'C', B.char7 'S' <> byteStringNul n) messageBody ClosePortal{ portalName = n } = (Just 'C', B.char7 'P' <> byteStringNul n) messageBody DescribeStatement{ statementName = n } = (Just 'D', B.char7 'S' <> byteStringNul n) messageBody DescribePortal{ portalName = n } = (Just 'D', B.char7 'P' <> byteStringNul n) messageBody Execute{ portalName = n, executeRows = r } = (Just 'E', byteStringNul n <> B.word32BE r) messageBody Flush = (Just 'H', mempty) messageBody Parse{ statementName = n, queryString = s, parseTypes = t } = (Just 'P', byteStringNul n <> lazyByteStringNul s <> B.word16BE (fromIntegral $ length t) <> Fold.foldMap B.word32BE t) messageBody (PasswordMessage s) = (Just 'p', B.byteString s <> nul) messageBody SimpleQuery{ queryString = s } = (Just 'Q', lazyByteStringNul s) messageBody Sync = (Just 'S', mempty) messageBody Terminate = (Just 'X', mempty) -- |Send a message to PostgreSQL (low-level). pgSend :: PGConnection -> PGFrontendMessage -> IO () pgSend c@PGConnection{ connHandle = h, connState = sr } msg = do modifyIORef' sr $ state msg when (connDebug c) $ putStrLn $ "> " ++ show msg B.hPutBuilder h $ Fold.foldMap B.char7 t <> B.word32BE (fromIntegral $ 4 + BS.length b) BS.hPut h b -- or B.hPutBuilder? But we've already had to convert to BS to get length where (t, b) = second (BSL.toStrict . B.toLazyByteString) $ messageBody msg state _ StateClosed = StateClosed state Sync _ = StatePending state SimpleQuery{} _ = StatePending state Terminate _ = StateClosed state _ _ = StateUnsync pgFlush :: PGConnection -> IO () pgFlush = hFlush . connHandle getByteStringNul :: G.Get BS.ByteString getByteStringNul = fmap BSL.toStrict G.getLazyByteStringNul getMessageFields :: G.Get MessageFields getMessageFields = g . w2c =<< G.getWord8 where g '\0' = return Map.empty g f = liftM2 (Map.insert f) getByteStringNul getMessageFields -- |Parse an incoming message. getMessageBody :: Char -> G.Get PGBackendMessage getMessageBody 'R' = auth =<< G.getWord32be where auth 0 = return AuthenticationOk auth 3 = return AuthenticationCleartextPassword auth 5 = AuthenticationMD5Password <$> G.getByteString 4 auth op = fail $ "pgGetMessage: unsupported authentication type: " ++ show op getMessageBody 't' = do numParams <- G.getWord16be ParameterDescription <$> replicateM (fromIntegral numParams) G.getWord32be getMessageBody 'T' = do numFields <- G.getWord16be RowDescription <$> replicateM (fromIntegral numFields) getField where getField = do name <- getByteStringNul oid <- G.getWord32be -- table OID col <- G.getWord16be -- column number typ' <- G.getWord32be -- type siz <- G.getWord16be -- type size tmod <- G.getWord32be -- type modifier fmt <- G.getWord16be -- format code return $ PGColDescription { pgColName = name , pgColTable = oid , pgColNumber = fromIntegral col , pgColType = typ' , pgColSize = fromIntegral siz , pgColModifier = fromIntegral tmod , pgColBinary = toEnum (fromIntegral fmt) } getMessageBody 'Z' = ReadyForQuery <$> (rs . w2c =<< G.getWord8) where rs 'I' = return StateIdle rs 'T' = return StateTransaction rs 'E' = return StateTransactionFailed rs s = fail $ "pgGetMessage: unknown ready state: " ++ show s getMessageBody '1' = return ParseComplete getMessageBody '2' = return BindComplete getMessageBody '3' = return CloseComplete getMessageBody 'C' = CommandComplete <$> getByteStringNul getMessageBody 'S' = liftM2 ParameterStatus getByteStringNul getByteStringNul getMessageBody 'D' = do numFields <- G.getWord16be DataRow <$> replicateM (fromIntegral numFields) (getField =<< G.getWord32be) where getField 0xFFFFFFFF = return PGNullValue getField len = PGTextValue <$> G.getByteString (fromIntegral len) -- could be binary, too, but we don't know here, so have to choose one getMessageBody 'K' = liftM2 BackendKeyData G.getWord32be G.getWord32be getMessageBody 'E' = ErrorResponse <$> getMessageFields getMessageBody 'I' = return EmptyQueryResponse getMessageBody 'n' = return NoData getMessageBody 's' = return PortalSuspended getMessageBody 'N' = NoticeResponse <$> getMessageFields getMessageBody 'A' = NotificationResponse <$> do PGNotification <$> G.getWord32be <*> getByteStringNul <*> G.getLazyByteStringNul getMessageBody t = fail $ "pgGetMessage: unknown message type: " ++ show t getMessage :: G.Decoder PGBackendMessage getMessage = G.runGetIncremental $ do typ <- G.getWord8 len <- G.getWord32be G.isolate (fromIntegral len - 4) $ getMessageBody (w2c typ) class Show m => RecvMsg m where -- |Read from connection, returning immediate value or non-empty data recvMsgData :: PGConnection -> IO (Either m BS.ByteString) recvMsgData c = do r <- BS.hGetSome (connHandle c) smallChunkSize if BS.null r then do writeIORef (connState c) StateClosed hClose (connHandle c) -- Should this instead be a special PGError? ioError $ mkIOError eofErrorType "PGConnection" (Just (connHandle c)) Nothing else return (Right r) -- |Expected ReadyForQuery message recvMsgSync :: Maybe m recvMsgSync = Nothing -- |NotificationResponse message recvMsgNotif :: PGConnection -> PGNotification -> IO (Maybe m) recvMsgNotif c n = Nothing <$ modifyIORef' (connNotifications c) (enQueue n) -- |ErrorResponse message recvMsgErr :: PGConnection -> MessageFields -> IO (Maybe m) recvMsgErr c m = Nothing <$ connLogMessage c m -- |Any other unhandled message recvMsg :: PGConnection -> PGBackendMessage -> IO (Maybe m) recvMsg c m = Nothing <$ connLogMessage c (makeMessage (BSC.pack $ "Unexpected server message: " ++ show m) "Each statement should only contain a single query") -- |Process all pending messages data RecvNonBlock = RecvNonBlock deriving (Show) instance RecvMsg RecvNonBlock where recvMsgData c = do r <- BS.hGetNonBlocking (connHandle c) smallChunkSize if BS.null r then return (Left RecvNonBlock) else return (Right r) -- |Wait for ReadyForQuery data RecvSync = RecvSync deriving (Show) instance RecvMsg RecvSync where recvMsgSync = Just RecvSync -- |Wait for NotificationResponse instance RecvMsg PGNotification where recvMsgNotif _ = return . Just -- |Return any message (throwing errors) instance RecvMsg PGBackendMessage where recvMsgErr _ = throwIO . PGError recvMsg _ = return . Just -- |Return any message or ReadyForQuery instance RecvMsg (Either PGBackendMessage RecvSync) where recvMsgSync = Just $ Right RecvSync recvMsgErr _ = throwIO . PGError recvMsg _ = return . Just . Left -- |Receive the next message from PostgreSQL (low-level). pgRecv :: RecvMsg m => PGConnection -> IO m pgRecv c@PGConnection{ connInput = dr, connState = sr } = rcv =<< readIORef dr where next = writeIORef dr new = G.pushChunk getMessage -- read and parse rcv (G.Done b _ m) = do when (connDebug c) $ putStrLn $ "< " ++ show m got (new b) m rcv (G.Fail _ _ r) = next (new BS.empty) >> fail r -- not clear how can recover rcv d@(G.Partial r) = recvMsgData c `onException` next d >>= either (<$ next d) (rcv . r . Just) -- process message msg (ParameterStatus k v) = Nothing <$ modifyIORef' (connParameters c) (Map.insert k v) msg (NoticeResponse m) = Nothing <$ connLogMessage c m msg (ErrorResponse m) = recvMsgErr c m msg m@(ReadyForQuery s) = do s' <- atomicModifyIORef' sr (s, ) if s' == StatePending then return recvMsgSync -- expected else recvMsg c m -- unexpected msg (NotificationResponse n) = recvMsgNotif c n msg m@AuthenticationOk = do writeIORef sr StatePending recvMsg c m msg m = recvMsg c m got d m = msg m `onException` next d >>= maybe (rcv d) (<$ next d) -- |Connect to a PostgreSQL server. pgConnect :: PGDatabase -> IO PGConnection pgConnect db = do param <- newIORef Map.empty state <- newIORef StateUnsync prepc <- newIORef 0 prepm <- newIORef Map.empty input <- newIORef getMessage tr <- newIORef 0 notif <- newIORef emptyQueue h <- connectTo (pgDBHost db) (pgDBPort db) hSetBuffering h (BlockBuffering Nothing) let c = PGConnection { connHandle = h , connDatabase = db , connPid = 0 , connKey = 0 , connParameters = param , connPreparedStatementCount = prepc , connPreparedStatementMap = prepm , connState = state , connTypeEnv = unknownPGTypeEnv , connInput = input , connTransaction = tr , connNotifications = notif } pgSend c $ StartupMessage $ [ ("user", pgDBUser db) , ("database", pgDBName db) , ("client_encoding", "UTF8") , ("standard_conforming_strings", "on") , ("bytea_output", "hex") , ("DateStyle", "ISO, YMD") , ("IntervalStyle", "iso_8601") ] ++ pgDBParams db pgFlush c conn c where conn c = pgRecv c >>= msg c msg c (Right RecvSync) = do cp <- readIORef (connParameters c) return c { connTypeEnv = PGTypeEnv { pgIntegerDatetimes = fmap ("on" ==) $ Map.lookup "integer_datetimes" cp , pgServerVersion = Map.lookup "server_version" cp } } msg c (Left (BackendKeyData p k)) = conn c{ connPid = p, connKey = k } msg c (Left AuthenticationOk) = conn c msg c (Left AuthenticationCleartextPassword) = do pgSend c $ PasswordMessage $ pgDBPass db pgFlush c conn c #ifdef VERSION_cryptonite msg c (Left (AuthenticationMD5Password salt)) = do pgSend c $ PasswordMessage $ "md5" `BS.append` md5 (md5 (pgDBPass db <> pgDBUser db) `BS.append` salt) pgFlush c conn c #endif msg _ (Left m) = fail $ "pgConnect: unexpected response: " ++ show m -- |Disconnect cleanly from the PostgreSQL server. pgDisconnect :: PGConnection -- ^ a handle from 'pgConnect' -> IO () pgDisconnect c@PGConnection{ connHandle = h } = pgSend c Terminate `finally` hClose h -- |Disconnect cleanly from the PostgreSQL server, but only if it's still connected. pgDisconnectOnce :: PGConnection -- ^ a handle from 'pgConnect' -> IO () pgDisconnectOnce c@PGConnection{ connState = cs } = do s <- readIORef cs unless (s == StateClosed) $ pgDisconnect c -- |Possibly re-open a connection to a different database, either reusing the connection if the given database is already connected or closing it and opening a new one. -- Regardless, the input connection must not be used afterwards. pgReconnect :: PGConnection -> PGDatabase -> IO PGConnection pgReconnect c@PGConnection{ connDatabase = cd, connState = cs } d = do s <- readIORef cs if cd == d && s /= StateClosed then return c{ connDatabase = d } else do pgDisconnectOnce c pgConnect d pgSync :: PGConnection -> IO () pgSync c@PGConnection{ connState = sr } = do s <- readIORef sr case s of StateClosed -> fail "pgSync: operation on closed connection" StatePending -> wait StateUnsync -> do pgSend c Sync pgFlush c wait _ -> return () where wait = do RecvSync <- pgRecv c return () rowDescription :: PGBackendMessage -> PGRowDescription rowDescription (RowDescription d) = d rowDescription NoData = [] rowDescription m = error $ "describe: unexpected response: " ++ show m -- |Describe a SQL statement/query. A statement description consists of 0 or -- more parameter descriptions (a PostgreSQL type) and zero or more result -- field descriptions (for queries) (consist of the name of the field, the -- type of the field, and a nullability indicator). pgDescribe :: PGConnection -> BSL.ByteString -- ^ SQL string -> [OID] -- ^ Optional type specifications -> Bool -- ^ Guess nullability, otherwise assume everything is -> IO ([OID], [(BS.ByteString, OID, Bool)]) -- ^ a list of parameter types, and a list of result field names, types, and nullability indicators. pgDescribe h sql types nulls = do pgSync h pgSend h Parse{ queryString = sql, statementName = BS.empty, parseTypes = types } pgSend h DescribeStatement{ statementName = BS.empty } pgSend h Sync pgFlush h ParseComplete <- pgRecv h ParameterDescription ps <- pgRecv h (,) ps <$> (mapM desc . rowDescription =<< pgRecv h) where desc (PGColDescription{ pgColName = name, pgColTable = tab, pgColNumber = col, pgColType = typ }) = do n <- nullable tab col return (name, typ, n) -- We don't get nullability indication from PostgreSQL, at least not directly. -- Without any hints, we have to assume that the result can be null and -- leave it up to the developer to figure it out. nullable oid col | nulls && oid /= 0 = do -- In cases where the resulting field is tracable to the column of a -- table, we can check there. (_, r) <- pgPreparedQuery h "SELECT attnotnull FROM pg_catalog.pg_attribute WHERE attrelid = $1 AND attnum = $2" [26, 21] [pgEncodeRep (oid :: OID), pgEncodeRep (col :: Int16)] [] case r of [[s]] -> return $ not $ pgDecodeRep s [] -> return True _ -> fail $ "Failed to determine nullability of column #" ++ show col | otherwise = return True rowsAffected :: (Integral i, Read i) => BS.ByteString -> i rowsAffected = ra . BSC.words where ra [] = -1 ra l = fromMaybe (-1) $ readMaybe $ BSC.unpack $ last l -- Do we need to use the PGColDescription here always, or are the request formats okay? fixBinary :: [Bool] -> PGValues -> PGValues fixBinary (False:b) (PGBinaryValue x:r) = PGTextValue x : fixBinary b r fixBinary (True :b) (PGTextValue x:r) = PGBinaryValue x : fixBinary b r fixBinary (_:b) (x:r) = x : fixBinary b r fixBinary _ l = l -- |A simple query is one which requires sending only a single 'SimpleQuery' -- message to the PostgreSQL server. The query is sent as a single string; you -- cannot bind parameters. Note that queries can return 0 results (an empty -- list). pgSimpleQuery :: PGConnection -> BSL.ByteString -- ^ SQL string -> IO (Int, [PGValues]) -- ^ The number of rows affected and a list of result rows pgSimpleQuery h sql = do pgSync h pgSend h $ SimpleQuery sql pgFlush h go start where go = (pgRecv h >>=) start (RowDescription rd) = go $ row (map pgColBinary rd) id start (CommandComplete c) = got c [] start EmptyQueryResponse = return (0, []) start m = fail $ "pgSimpleQuery: unexpected response: " ++ show m row bc r (DataRow fs) = go $ row bc (r . (fixBinary bc fs :)) row _ r (CommandComplete c) = got c (r []) row _ _ m = fail $ "pgSimpleQuery: unexpected row: " ++ show m got c r = return (rowsAffected c, r) -- |A simple query which may contain multiple queries (separated by semi-colons) whose results are all ignored. -- This function can also be used for \"SET\" parameter queries if necessary, but it's safer better to use 'pgDBParams'. pgSimpleQueries_ :: PGConnection -> BSL.ByteString -- ^ SQL string -> IO () pgSimpleQueries_ h sql = do pgSync h pgSend h $ SimpleQuery sql pgFlush h go where go = pgRecv h >>= res res (Left (RowDescription _)) = go res (Left (CommandComplete _)) = go res (Left EmptyQueryResponse) = go res (Left (DataRow _)) = go res (Right RecvSync) = return () res m = fail $ "pgSimpleQueries_: unexpected response: " ++ show m pgPreparedBind :: PGConnection -> BS.ByteString -> [OID] -> PGValues -> [Bool] -> IO (IO ()) pgPreparedBind c sql types bind bc = do pgSync c m <- readIORef (connPreparedStatementMap c) (p, n) <- maybe (atomicModifyIORef' (connPreparedStatementCount c) (succ &&& (,) False . PGPreparedStatement)) (return . (,) True) $ Map.lookup key m unless p $ pgSend c Parse{ queryString = BSL.fromStrict sql, statementName = preparedStatementName n, parseTypes = types } pgSend c Bind{ portalName = BS.empty, statementName = preparedStatementName n, bindParameters = bind, binaryColumns = bc } let go = pgRecv c >>= start start ParseComplete = do modifyIORef' (connPreparedStatementMap c) $ Map.insert key n go start BindComplete = return () start r = fail $ "pgPrepared: unexpected response: " ++ show r return go where key = (sql, types) -- |Prepare a statement, bind it, and execute it. -- If the given statement has already been prepared (and not yet closed) on this connection, it will be re-used. pgPreparedQuery :: PGConnection -> BS.ByteString -- ^ SQL statement with placeholders -> [OID] -- ^ Optional type specifications (only used for first call) -> PGValues -- ^ Paremeters to bind to placeholders -> [Bool] -- ^ Requested binary format for result columns -> IO (Int, [PGValues]) pgPreparedQuery c sql types bind bc = do start <- pgPreparedBind c sql types bind bc pgSend c Execute{ portalName = BS.empty, executeRows = 0 } pgSend c Sync pgFlush c start go id where go r = pgRecv c >>= row r row r (DataRow fs) = go (r . (fixBinary bc fs :)) row r (CommandComplete d) = return (rowsAffected d, r []) row r EmptyQueryResponse = return (0, r []) row _ m = fail $ "pgPreparedQuery: unexpected row: " ++ show m -- |Like 'pgPreparedQuery' but requests results lazily in chunks of the given size. -- Does not use a named portal, so other requests may not intervene. pgPreparedLazyQuery :: PGConnection -> BS.ByteString -> [OID] -> PGValues -> [Bool] -> Word32 -- ^ Chunk size (1 is common, 0 is all-at-once) -> IO [PGValues] pgPreparedLazyQuery c sql types bind bc count = do start <- pgPreparedBind c sql types bind bc unsafeInterleaveIO $ do execute start go id where execute = do pgSend c Execute{ portalName = BS.empty, executeRows = count } pgSend c Flush pgFlush c go r = pgRecv c >>= row r row r (DataRow fs) = go (r . (fixBinary bc fs :)) row r PortalSuspended = r <$> unsafeInterleaveIO (execute >> go id) row r (CommandComplete _) = return (r []) row r EmptyQueryResponse = return (r []) row _ m = fail $ "pgPreparedLazyQuery: unexpected row: " ++ show m -- |Close a previously prepared query (if necessary). pgCloseStatement :: PGConnection -> BS.ByteString -> [OID] -> IO () pgCloseStatement c sql types = do mn <- atomicModifyIORef (connPreparedStatementMap c) $ swap . Map.updateLookupWithKey (\_ _ -> Nothing) (sql, types) Fold.mapM_ (pgClose c) mn -- |Begin a new transaction. If there is already a transaction in progress (created with 'pgBegin' or 'pgTransaction') instead creates a savepoint. pgBegin :: PGConnection -> IO () pgBegin c@PGConnection{ connTransaction = tr } = do t <- atomicModifyIORef' tr (succ &&& id) void $ pgSimpleQuery c $ BSLC.pack $ if t == 0 then "BEGIN" else "SAVEPOINT pgt" ++ show t predTransaction :: Word -> (Word, Word) predTransaction 0 = (0, error "pgTransaction: no transactions") predTransaction x = (x', x') where x' = pred x -- |Rollback to the most recent 'pgBegin'. pgRollback :: PGConnection -> IO () pgRollback c@PGConnection{ connTransaction = tr } = do t <- atomicModifyIORef' tr predTransaction void $ pgSimpleQuery c $ BSLC.pack $ if t == 0 then "ROLLBACK" else "ROLLBACK TO SAVEPOINT pgt" ++ show t -- |Commit the most recent 'pgBegin'. pgCommit :: PGConnection -> IO () pgCommit c@PGConnection{ connTransaction = tr } = do t <- atomicModifyIORef' tr predTransaction void $ pgSimpleQuery c $ BSLC.pack $ if t == 0 then "COMMIT" else "RELEASE SAVEPOINT pgt" ++ show t -- |Rollback all active 'pgBegin's. pgRollbackAll :: PGConnection -> IO () pgRollbackAll c@PGConnection{ connTransaction = tr } = do writeIORef tr 0 void $ pgSimpleQuery c $ BSLC.pack "ROLLBACK" -- |Commit all active 'pgBegin's. pgCommitAll :: PGConnection -> IO () pgCommitAll c@PGConnection{ connTransaction = tr } = do writeIORef tr 0 void $ pgSimpleQuery c $ BSLC.pack "COMMIT" -- |Wrap a computation in a 'pgBegin', 'pgCommit' block, or 'pgRollback' on exception. pgTransaction :: PGConnection -> IO a -> IO a pgTransaction c f = do pgBegin c onException (do r <- f pgCommit c return r) (pgRollback c) -- |Prepare, bind, execute, and close a single (unnamed) query, and return the number of rows affected, or Nothing if there are (ignored) result rows. pgRun :: PGConnection -> BSL.ByteString -> [OID] -> PGValues -> IO (Maybe Integer) pgRun c sql types bind = do pgSync c pgSend c Parse{ queryString = sql, statementName = BS.empty, parseTypes = types } pgSend c Bind{ portalName = BS.empty, statementName = BS.empty, bindParameters = bind, binaryColumns = [] } pgSend c Execute{ portalName = BS.empty, executeRows = 1 } -- 0 does not mean none pgSend c Sync pgFlush c go where go = pgRecv c >>= res res ParseComplete = go res BindComplete = go res (DataRow _) = go res PortalSuspended = return Nothing res (CommandComplete d) = return (Just $ rowsAffected d) res EmptyQueryResponse = return (Just 0) res m = fail $ "pgRun: unexpected response: " ++ show m -- |Prepare a single query and return its handle. pgPrepare :: PGConnection -> BSL.ByteString -> [OID] -> IO PGPreparedStatement pgPrepare c sql types = do n <- atomicModifyIORef' (connPreparedStatementCount c) (succ &&& PGPreparedStatement) pgSync c pgSend c Parse{ queryString = sql, statementName = preparedStatementName n, parseTypes = types } pgSend c Sync pgFlush c ParseComplete <- pgRecv c return n -- |Close a previously prepared query. pgClose :: PGConnection -> PGPreparedStatement -> IO () pgClose c n = do pgSync c pgSend c ClosePortal{ portalName = preparedStatementName n } pgSend c CloseStatement{ statementName = preparedStatementName n } pgSend c Sync pgFlush c CloseComplete <- pgRecv c CloseComplete <- pgRecv c return () -- |Bind a prepared statement, and return the row description. -- After 'pgBind', you must either call 'pgFetch' until it completes (returns @(_, 'Just' _)@) or 'pgFinish' before calling 'pgBind' again on the same prepared statement. pgBind :: PGConnection -> PGPreparedStatement -> PGValues -> IO PGRowDescription pgBind c n bind = do pgSync c pgSend c ClosePortal{ portalName = sn } pgSend c Bind{ portalName = sn, statementName = sn, bindParameters = bind, binaryColumns = [] } pgSend c DescribePortal{ portalName = sn } pgSend c Sync pgFlush c CloseComplete <- pgRecv c BindComplete <- pgRecv c rowDescription <$> pgRecv c where sn = preparedStatementName n -- |Fetch some rows from an executed prepared statement, returning the next N result rows (if any) and number of affected rows when complete. pgFetch :: PGConnection -> PGPreparedStatement -> Word32 -- ^Maximum number of rows to return, or 0 for all -> IO ([PGValues], Maybe Integer) pgFetch c n count = do pgSync c pgSend c Execute{ portalName = preparedStatementName n, executeRows = count } pgSend c Sync pgFlush c go where go = pgRecv c >>= res res (DataRow v) = first (v :) <$> go res PortalSuspended = return ([], Nothing) res (CommandComplete d) = do pgSync c pgSend c ClosePortal{ portalName = preparedStatementName n } pgSend c Sync pgFlush c CloseComplete <- pgRecv c return ([], Just $ rowsAffected d) res EmptyQueryResponse = return ([], Just 0) res m = fail $ "pgFetch: unexpected response: " ++ show m -- |Retrieve any pending notifications. Non-blocking. pgGetNotifications :: PGConnection -> IO [PGNotification] pgGetNotifications c = do RecvNonBlock <- pgRecv c queueToList <$> atomicModifyIORef' (connNotifications c) (emptyQueue, ) -- |Retrieve a notifications, blocking if necessary. pgGetNotification :: PGConnection -> IO PGNotification pgGetNotification c = maybe (pgRecv c) return =<< atomicModifyIORef' (connNotifications c) deQueue