|
Module : Database.PostgreSQL.Enumerator
Copyright : (c) 2004 Oleg Kiselyov, Alistair Bayley
License : BSD-style
Maintainer : oleg@pobox.com, alistair@abayley.org
Stability : experimental
Portability : non-portable
PostgreSQL implementation of Database.Enumerator.
>
>
>
> module Database.PostgreSQL.Enumerator
> ( Session, connect, ConnectAttr(..)
> , prepareStmt, preparePrefetch
> , prepareQuery, prepareLargeQuery, prepareCommand
> , sql, sqlbind, prefetch, cmdbind
> , bindType
> , module Database.Enumerator
> )
> where
> import Database.Enumerator
> import Database.InternalEnumerator
> import Foreign.C
> import Control.Monad
> import Control.Exception (catchDyn, throwDyn, throwIO)
> import qualified Database.PostgreSQL.PGFunctions as DBAPI
> import Data.Char
> import Data.Dynamic
> import Data.IORef
> import Data.Int
> import Data.List
> import Data.Time
> import System.Time
>
>
--------------------------------------------------------------------
-- ** API Wrappers
--------------------------------------------------------------------
|These wrappers ensure that only DBExceptions are thrown,
and never PGExceptions.
We don't need wrappers for the colValXxx functions
because they never throw exceptions.
> convertAndRethrow :: DBAPI.PGException -> IO a
> convertAndRethrow (DBAPI.PGException e m) = do
> let
> sqlstate@(ssc,sssc) = errorSqlState m
> errorConstructor = case ssc of
> "XX" -> DBFatal
> "58" -> DBFatal
> "57" -> DBFatal
> "54" -> DBFatal
> "53" -> DBFatal
> "08" -> DBFatal
> _ -> DBError
> throwDB (errorConstructor sqlstate e m)
Postgres error messages (in Verbose mode) start with
ERROR: 42P01: relation "blahblahblah" does not exist.
> errorSqlState :: String -> (String, String)
> errorSqlState msg = let s = dropPrefix "ERROR: " msg in (take 2 s, take 3 (drop 2 s))
> dropPrefix p s = if isPrefixOf p s then drop (length p) s else s
|Common case: wrap an action with a convertAndRethrow.
> convertEx :: IO a -> IO a
> convertEx action = DBAPI.catchPG action convertAndRethrow
--------------------------------------------------------------------
-- ** Sessions
--------------------------------------------------------------------
We don't need much in an PostgreSQL Session record.
Session objects are created by 'connect'.
Deriving Typeable allows us to store Session objects in a
HashMap (say) of Dynamic objects e.g. for a connection pool.
> newtype Session = Session { dbHandle :: DBAPI.DBHandle }
> deriving Typeable
| Specify connection options to 'connect'.
You only need to use whatever subset is relevant for your connection.
> data ConnectAttr =
> CAhost String
> | CAhostaddr String
> | CAport String
> | CAdbname String
> | CAuser String
> | CApassword String
> | CAconnect_timeout Int
> | CAoptions String
> | CAsslmode String
> | CAservice String
>
> connect :: [ConnectAttr] -> ConnectA Session
> connect attrs = ConnectA $ convertEx $ do
> db <- DBAPI.openDb (unwords $ map encode attrs)
> DBAPI.disableNoticeReporting db
>
>
> DBAPI.setErrorVerbosity db DBAPI.ePQERRORS_VERBOSE
>
>
> return (Session db)
> where
>
> encode (CAhost s) = "host=" ++ enc s
> encode (CAhostaddr s) = "hostaddr=" ++ enc s
> encode (CAport s) = "port=" ++ enc s
> encode (CAdbname s) = "dbname=" ++ enc s
> encode (CAuser s) = "user=" ++ enc s
> encode (CApassword s) = "password=" ++ enc s
> encode (CAconnect_timeout i) = "connect_timeout=" ++ show i
> encode (CAoptions s) = "options=" ++ enc s
> encode (CAsslmode s) = "sslmode=" ++ enc s
> encode (CAservice s) = "service=" ++ enc s
> enc s = "'" ++ qu s ++ "'"
> qu s = case break ( \c -> c == '\'' || c == '"' ) s of
> (s,"") -> s
> (s,(c:t)) -> s ++ ('\\' : c : qu t)
> isolationLevelText ReadUncommitted = "read uncommitted"
> isolationLevelText ReadCommitted = "read committed"
> isolationLevelText RepeatableRead = "repeatable read"
> isolationLevelText Serialisable = "serializable"
> isolationLevelText Serializable = "serializable"
> instance ISession Session where
> disconnect sess = convertEx $ DBAPI.closeDb (dbHandle sess)
> beginTransaction sess isolation = do
> executeCommand sess (sql ("begin isolation level " ++ isolationLevelText isolation)) >> return ()
> commit sess = executeCommand sess (sql "commit") >> return ()
> rollback sess = executeCommand sess (sql "rollback") >> return ()
--------------------------------------------------------------------
-- Statements and Commands
--------------------------------------------------------------------
> newtype QueryString = QueryString String
|The simplest kind of a statement: no tuning parameters,
all default, little overhead.
> sql :: String -> QueryString
> sql str = QueryString str
> instance Command QueryString Session where
> executeCommand s (QueryString str) = executeCommand s str
> instance Command String Session where
> executeCommand s str = do
> (_,ntuple_str,_) <- convertEx $ DBAPI.nqExec (dbHandle s) str
> return $ if ntuple_str == "" then 0 else read ntuple_str
> instance Command BoundStmt Session where
> executeCommand s bs = return (boundCount bs)
> data CommandBind = CommandBind String [BindA Session PreparedStmtObj BindObj]
> cmdbind :: String -> [BindA Session PreparedStmtObj BindObj] -> CommandBind
> cmdbind sql parms = CommandBind sql parms
> instance Command CommandBind Session where
> executeCommand sess (CommandBind sqltext bas) = do
>
>
>
>
>
>
>
> let params = map (\(BindA ba) -> ba sess undefined) bas
> let bindtypes = DBAPI.bindTypes params
> let (PreparationA pa) = prepareCommand "" (QueryString sqltext) bindtypes
> pstmt <- pa sess
> writeIORef (stmtCursors pstmt) []
> (_, countstr, _) <- convertEx $ DBAPI.execPreparedCommand (dbHandle sess) (stmtName pstmt) params
> return (read countstr)
|At present the only resource tuning we support is the number of rows
prefetched by the FFI library.
We use a record to (hopefully) make it easy to add other
tuning parameters later.
> data QueryResourceUsage = QueryResourceUsage { prefetchRowCount :: Int }
> defaultResourceUsage :: QueryResourceUsage
> defaultResourceUsage = QueryResourceUsage 0
> data StmtType = SelectType | CommandType
Simple prepared statement: the analogue of QueryString. It is useful
for DDL and DML statements, and for simple queries (that is, queries
that do not need cursors and result in a small enough dataset -- because
it will be fetched entirely in one shot).
The data constructor is not exported.
> data PreparedStmtObj = PreparedStmtObj
> { stmtName :: String
> , stmtType :: StmtType
> , stmtPrefetch :: Int
> , stmtCursors :: IORef [RefCursor String]
> }
> beginsWithSelect "" = False
> beginsWithSelect text = isPrefixOf "select" . map toLower $ text
> inferStmtType text = if beginsWithSelect text then SelectType else CommandType
> prepareStmt ::
> String -> QueryString -> [DBAPI.Oid] -> PreparationA Session PreparedStmtObj
> prepareStmt [] _ _ = error "Prepared statement name must be non-empty"
> prepareStmt name (QueryString str) types =
> PreparationA (\sess -> do
> psname <- convertEx $ DBAPI.stmtPrepare (dbHandle sess) name (DBAPI.substituteBindPlaceHolders str) types
> c <- newIORef []
> return (PreparedStmtObj psname (inferStmtType str) 0 c)
> )
> prepareQuery ::
> String -> QueryString -> [DBAPI.Oid] -> PreparationA Session PreparedStmtObj
> prepareQuery name (QueryString str) types =
> PreparationA (\sess -> do
> psname <- convertEx $ DBAPI.stmtPrepare (dbHandle sess) name (DBAPI.substituteBindPlaceHolders str) types
> c <- newIORef []
> return (PreparedStmtObj psname SelectType 0 c)
> )
Here we use the same name for both the cursor name and the prepared statement name.
This isn't necessary, but it saves the user having to provide two names...
> preparePrefetch ::
> Int -> String -> QueryString -> [DBAPI.Oid] -> PreparationA Session PreparedStmtObj
> preparePrefetch count name (QueryString sqltext) types =
> PreparationA (\sess -> do
> let q = "DECLARE \"" ++ name ++ "\" NO SCROLL CURSOR FOR " ++ sqltext
> psname <- convertEx $ DBAPI.stmtPrepare (dbHandle sess) name (DBAPI.substituteBindPlaceHolders q) types
> c <- newIORef []
> return (PreparedStmtObj psname CommandType count c)
> )
> prepareLargeQuery ::
> Int -> String -> QueryString -> [DBAPI.Oid] -> PreparationA Session PreparedStmtObj
> prepareLargeQuery = preparePrefetch
> prepareCommand ::
> String -> QueryString -> [DBAPI.Oid] -> PreparationA Session PreparedStmtObj
> prepareCommand name (QueryString sqltext) types =
> PreparationA (\sess -> do
> psname <- convertEx $ DBAPI.stmtPrepare (dbHandle sess) name (DBAPI.substituteBindPlaceHolders sqltext) types
> c <- newIORef []
> return (PreparedStmtObj psname CommandType 0 c)
> )
|bindType is useful when constructing the list of Oids for stmtPrepare.
You don't need to pass the actual bind values, just dummy values
of the same type (the value isn't used, so 'Prelude.undefined' is OK here).
> bindType :: DBAPI.PGType a => a -> DBAPI.Oid
> bindType v = DBAPI.pgTypeOid v
We store the parent prepared-statement in BoundStmt.
This is used by the BoundStmt instance of Statement to set up
the parent PreparedStmtObj object in the Query object.
Note two different types of BoundStmt (prefetch and non-prefetch).
The prefetch version requires different behaviour both when
binding-and-running (it doesn't return a result-set, so we have to
execute it as a command), and when processing the result-set
(the non-prefetch version has all of the data available,
whereas the prefetch version must use the advance+cleanup actions).
> data BoundStmt =
> BoundStmtQuery
> { boundHandle :: DBAPI.ResultSetHandle
> , boundCount :: Int
> , boundParentStmt :: PreparedStmtObj
> }
> | BoundStmtCommand
> { boundParentStmt :: PreparedStmtObj
> , boundCount :: Int
> }
The bindRun method returns a BoundStmt,
which contains just the result-set (and row count).
> type BindObj = DBAPI.PGBindVal
> instance IPrepared PreparedStmtObj Session BoundStmt BindObj where
> bindRun sess stmt bas action = do
> let params = map (\(BindA ba) -> ba sess stmt) bas
> writeIORef (stmtCursors stmt) []
> case stmtType stmt of
> CommandType -> do
> (_, countstr, _) <- convertEx $ DBAPI.execPreparedCommand (dbHandle sess) (stmtName stmt) params
> action (BoundStmtCommand stmt (read countstr))
> SelectType -> do
> (rs, count) <- convertEx $ DBAPI.stmtExec (dbHandle sess) (stmtName stmt) params
> action (BoundStmtQuery rs count stmt)
> destroyStmt sess stmt = deallocateStmt sess (stmtName stmt)
> deallocateStmt sess name =
> when (not (null name)) $ do
> executeCommand sess ("deallocate \"" ++ name ++ "\"")
> return ()
-- Serialization (binding)
> makeBindAction x = BindA (\_ _ -> DBAPI.newBindVal x)
> instance DBBind (Maybe String) Session PreparedStmtObj BindObj where
> bindP (Just s) = makeBindAction (Just s)
> bindP Nothing = makeBindAction (Nothing `asTypeOf` Just "")
> instance DBBind (Maybe UTCTime) Session PreparedStmtObj BindObj where
> bindP = makeBindAction
> instance DBBind (Maybe Int) Session PreparedStmtObj BindObj where
> bindP = makeBindAction
> instance DBBind (Maybe Int64) Session PreparedStmtObj BindObj where
> bindP = makeBindAction
> instance DBBind (Maybe Float) Session PreparedStmtObj BindObj where
> bindP = makeBindAction
> instance DBBind (Maybe Double) Session PreparedStmtObj BindObj where
> bindP = makeBindAction
> instance DBBind (Maybe a) Session PreparedStmtObj BindObj
> => DBBind a Session PreparedStmtObj BindObj where
> bindP x = bindP (Just x)
The default instance, uses generic Show
> instance (Show a) => DBBind (Maybe a) Session PreparedStmtObj BindObj where
> bindP (Just x) = bindP (Just (show x))
> bindP Nothing = bindP (Nothing `asTypeOf` Just "")
--------------------------------------------------------------------
-- ** Queries
--------------------------------------------------------------------
> data SubQuery = SubQuery
> { stmtHandle :: DBAPI.ResultSetHandle
> , ntuples :: Int
> , curr'row :: Int
> }
> data Query = Query
> { subquery :: IORef SubQuery
> , advance'action :: Maybe (IO (DBAPI.ResultSetHandle, Int))
> , cleanup'action :: Maybe (IO ())
> , querySession :: Session
> , queryStmt :: Maybe PreparedStmtObj
> }
The following function creates the Query record. It has a few
decisions to make:
-- should we prepare a statement or do execImm?
-- should we use a cursor (better for large queries) or obtain
all data in one shot?
The use of cursor means we must be in a transaction.
-- what is the name of the prepared statement?
Potentially, we should be able to execute a previously prepared
statement...
Currently, if prefetchRowCount is 0 or the query is
not tuned, we use execImm. Otherwise,
we open a cursor and advance it by prefetchRowCount step.
We use anonymous prepared statement name.
The function commence'query also fetches some data (even if it turns
out 0 rows) so that later on, we could determine the type of data in columns
and prepare the buffers accordingly. We don't do that at the moment.
> instance Statement String Session Query where
> makeQuery sess sqltext = makeQuery sess (QueryString sqltext)
> instance Statement QueryString Session Query where
> makeQuery sess (QueryString sqltext) = commence'query'simple sess sqltext []
Simple prepared statements.
Query has been executed and result-set is in handle.
> instance Statement BoundStmt Session Query where
> makeQuery sess bs@(BoundStmtQuery _ _ _) = do
> sqr <- newIORef $ SubQuery (boundHandle bs) (boundCount bs) 0
> return (Query sqr Nothing Nothing sess (Just (boundParentStmt bs)))
> makeQuery sess bs@(BoundStmtCommand _ _) = do
> let pstmt = boundParentStmt bs
> let ru = QueryResourceUsage (stmtPrefetch pstmt)
>
>
> commencePreparedQuery sess ru (Just pstmt) (stmtName pstmt) ("ff_" ++ (stmtName pstmt))
Here we need to pop the next cursor name from the head of the list,
and use it to open
> instance Statement (NextResultSet mark PreparedStmtObj) Session Query where
> makeQuery sess (NextResultSet (PreparedStmt pstmt)) = do
> cursors <- readIORef (stmtCursors pstmt)
> if null cursors then throwDB (DBError ("02", "000") (1) "No more result sets to process.") else return ()
> let (RefCursor cursor) = head cursors
> writeIORef (stmtCursors pstmt) (tail cursors)
> let ru = QueryResourceUsage (stmtPrefetch pstmt)
> commencePreparedQuery sess ru (Just pstmt) cursor cursor
> instance Statement (RefCursor String) Session Query where
> makeQuery sess (RefCursor cursor) = do
> let ru = QueryResourceUsage 0
> commencePreparedQuery sess ru Nothing cursor cursor
Statements with resource usage
> data QueryStringTuned = QueryStringTuned QueryResourceUsage String [BindA Session PreparedStmtObj BindObj]
> sqlbind :: String -> [BindA Session PreparedStmtObj BindObj] -> QueryStringTuned
> sqlbind sql parms = QueryStringTuned defaultResourceUsage sql parms
> prefetch :: Int -> String -> [BindA Session PreparedStmtObj BindObj] -> QueryStringTuned
> prefetch count sql parms = QueryStringTuned (QueryResourceUsage count) sql parms
> instance Statement QueryStringTuned Session Query where
> makeQuery sess (QueryStringTuned resource_usage sqltext bas) = do
> let params = map (\(BindA ba) -> ba sess undefined) bas
> commence'query sess resource_usage sqltext params
> commence'query'simple sess sqltext params = do
> subq <- create'subq sess $
> convertEx $ DBAPI.stmtExecImm (dbHandle sess) (DBAPI.substituteBindPlaceHolders sqltext) params
> sqr <- newIORef subq
> return (Query sqr Nothing Nothing sess Nothing)
Now, prepare and open the cursor
> commence'query sess resourceUsage sqltext params
> | QueryResourceUsage{prefetchRowCount = 0} <- resourceUsage
> = commence'query'simple sess sqltext params
> commence'query sess resourceUsage sqltext params = do
> let prepared'statement'name = ""
> let default'cursor'name = "takusenp"
> let q = "DECLARE \"" ++ default'cursor'name ++ "\" NO SCROLL CURSOR FOR " ++ sqltext
> psname <- convertEx $ DBAPI.stmtPrepare (dbHandle sess) prepared'statement'name
> (DBAPI.substituteBindPlaceHolders q) (DBAPI.bindTypes params)
> convertEx $ DBAPI.execPreparedCommand (dbHandle sess) psname params
> commencePreparedQuery sess resourceUsage Nothing default'cursor'name default'cursor'name
commencePreparedQueryn assumes that the DECLARE CURSOR statement
has been prepared and executed.
Note there are two statement names (and therefore two statements) in scope here.
The first is the name we give to the DECLARE CURSOR statement;
currently this is just "" (passed in by commence'query).
The second is the name given to the FETCH FORWARD statement, so that it can
be re-executed quickly and easily.
This is also passed in by commence'query, but if you look at commence'query,
you'll see that it is the same as the cursor name (currently "takusenp").
> commencePreparedQuery sess resourceUsage parentStmt cursorName prefetchStmtName = do
> let countStr = if (prefetchRowCount resourceUsage) <= 0 then "ALL" else (show $ prefetchRowCount resourceUsage)
> let fetchq = "FETCH FORWARD " ++ countStr ++ " FROM \"" ++ cursorName ++ "\""
> sn <- convertEx $ DBAPI.stmtPrepare (dbHandle sess) prefetchStmtName fetchq []
> let
> advanceA = convertEx (DBAPI.stmtExec0t (dbHandle sess) sn)
> cleanupA = do
> executeCommand sess ("CLOSE \"" ++ cursorName ++ "\"")
> deallocateStmt sess prefetchStmtName
> subq <- create'subq sess advanceA
> sqr <- newIORef subq
> return (Query sqr (Just advanceA) (Just cleanupA) sess parentStmt)
> create'subq sess action = do
> (stmt,ntuples) <- action
> return $ SubQuery stmt ntuples 0
> destroy'subq sess subq = convertEx $ DBAPI.stmtFinalise (stmtHandle subq)
> appendRefCursor query cname = do
> case queryStmt query of
>
>
> Nothing -> return ()
> Just pstmt -> modifyIORef (stmtCursors pstmt) (++ [cname])
> instance IQuery Query Session ColumnBuffer where
>
> destroyQuery query = do
> subq <- readIORef (subquery query)
> destroy'subq (querySession query) subq
> maybe (return ()) id (cleanup'action query)
>
> fetchOneRow query = do
> let sess = querySession query
> subq' <- readIORef (subquery query)
> let subq = subq' { curr'row = succ (curr'row subq') }
> if ntuples subq == 0
> then return False
> else if curr'row subq > ntuples subq
> then maybe
> (return False)
> (\action -> destroy'subq sess subq >>
> create'subq sess action >>=
> writeIORef (subquery query) >>
> fetchOneRow query
> )
> (advance'action query)
> else writeIORef (subquery query) subq >> return True
>
>
> currentRowNum q = readIORef (subquery q) >>= return . curr'row
>
> freeBuffer q buffer = return ()
--------------------------------------------------------------------
-- result-set data buffers implementation
--------------------------------------------------------------------
|There aren't really Buffers to speak of with PostgreSQL,
so we just record the position of each column.
> data ColumnBuffer = ColumnBuffer { colPos :: Int }
> buffer_pos q buffer = do
> row <- currentRowNum q
> return (row, colPos buffer)
An auxiliary function: buffer allocation
> allocBuffer q colpos = return $ ColumnBuffer { colPos = colpos }
> bufferToAny fn query buffer = do
> subq <- readIORef (subquery query)
> ind <- DBAPI.colValNull (stmtHandle subq) (curr'row subq) (colPos buffer)
> if ind then return Nothing
> else
> fn (stmtHandle subq) (curr'row subq) (colPos buffer)
> >>= return . Just
> instance DBType (Maybe String) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValString
> instance DBType (RefCursor String) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol query buffer = do
> subq <- readIORef (subquery query)
> (Just v) <- bufferToAny DBAPI.colValString query buffer
> appendRefCursor query (RefCursor v)
> return (RefCursor v)
> instance DBType (Maybe UTCTime) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValUTCTime
> instance DBType (Maybe CalendarTime) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValCalTime
> instance DBType (Maybe Int) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValInt
> instance DBType (Maybe Double) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValDouble
> instance DBType (Maybe Float) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValFloat
> instance DBType (Maybe Int64) Query ColumnBuffer where
> allocBufferFor _ q n = allocBuffer q n
> fetchCol = bufferToAny DBAPI.colValInt64
|This single polymorphic instance replaces all of the type-specific non-Maybe instances
e.g. String, Int, Double, etc.
> instance DBType (Maybe a) Query ColumnBuffer
> => DBType a Query ColumnBuffer where
> allocBufferFor _ = allocBufferFor (undefined::Maybe a)
> fetchCol q buffer = throwIfDBNull (buffer_pos q buffer) $ fetchCol q buffer
|This polymorphic instance assumes that the value is in a String column,
and uses Read to convert the String to a Haskell data value.
> instance (Show a, Read a) => DBType (Maybe a) Query ColumnBuffer where
> allocBufferFor _ = allocBufferFor (undefined::String)
> fetchCol q buffer = do
> v <- bufferToAny DBAPI.colValString q buffer
> case v of
> Just s -> if s == "" then return Nothing else return (Just (read s))
> Nothing -> return Nothing