streaming-postgresql-simple-0.2.0.3: Stream postgresql-query results using the streaming library

Safe HaskellNone
LanguageHaskell2010

Database.PostgreSQL.Simple.Streaming

Contents

Synopsis

Queries that return results

query :: (ToRow q, FromRow r, MonadResource m) => Connection -> Query -> q -> Stream (Of r) m () Source #

Perform a SELECT or other SQL query that is expected to return results. Uses PostgreSQL's single row mode to stream results directly from the socket to Haskell.

It is an error to perform another query using the same connection from within a stream. This applies to both streaming-postgresql-simple and postgresql-simple itself. If you do need to perform subsequent queries from within a stream, you should use stream, which uses cursors and allows interleaving of queries.

To demonstrate the problems of interleaving queries, if we run the following:

let doQuery c =
      queryWith_ (Pg.fromRow :: RowParser (Pg.Only Int))
                 c
                 "VALUES (1), (2)"
in S.print (S.mapM_ (_ -> doQuery c) (doQuery c))

We will encounter the exception:

Exception: QueryError {qeMessage = "another command is already in progressn", qeQuery = "VALUES (1), (2)"}

Exceptions that may be thrown:

  • FormatError: the query string could not be formatted correctly.
  • QueryError: the result contains no columns (i.e. you should be using execute instead of query).
  • ResultError: result conversion failed.
  • SqlError: the postgresql backend returned an error, e.g. a syntax or type error, or an incorrect table or column name.

query_ :: (FromRow r, MonadResource m) => Connection -> Query -> Stream (Of r) m () Source #

A version of query that does not perform query substitution.

Queries taking parser as argument

queryWith :: (ToRow q, MonadResource m) => RowParser r -> Connection -> Query -> q -> Stream (Of r) m () Source #

A version of query taking parser as argument.

queryWith_ :: MonadResource m => RowParser r -> Connection -> Query -> Stream (Of r) m () Source #

A version of query_ taking parser as argument.

Queries that stream results

stream :: (FromRow row, ToRow params, MonadMask m, MonadResource m) => Connection -> Query -> params -> Stream (Of row) m () Source #

Perform a SELECT or other SQL query that is expected to return results. Results are streamed incrementally from the server.

When dealing with small results that don't require further access to the database it may be simpler (and perhaps faster) to use query instead.

This is implemented using a database cursor. As such, this requires a transaction. This function will detect whether or not there is a transaction in progress, and will create a ReadCommitted ReadOnly transaction if needed. The cursor is given a unique temporary name, so the consumer may itself call stream.

Due to the dependency on transactions, you must ensure that commit or rollback aren't called on the connection used to form a stream. Doing so causes the stream cursor to be released, making it impossible to stream more results. If you do perform a commit or rollback, stream will raise an exception indicating that a transaction was aborted.

If you performing transaction writes in a stream, consider instead using save points, which will nest correctly with stream.

Exceptions that may be thrown:

  • FormatError: the query string could not be formatted correctly.
  • QueryError: the result contains no columns (i.e. you should be using execute instead of query).
  • ResultError: result conversion failed.
  • SqlError: the postgresql backend returned an error, e.g. a syntax or type error, or an incorrect table or column name.

streamWithOptions :: (FromRow row, ToRow params, MonadResource m, MonadMask m) => FoldOptions -> Connection -> Query -> params -> Stream (Of row) m () Source #

stream_ :: (FromRow row, MonadMask m, MonadResource m) => Connection -> Query -> Stream (Of row) m () Source #

A version of stream that does not perform query substitution.

Queries that stream results taking a parser as an argument

streamWith :: (ToRow params, MonadMask m, MonadResource m) => RowParser row -> Connection -> Query -> params -> Stream (Of row) m () Source #

A version of stream taking a parser as an argument.

streamWithOptionsAndParser :: (ToRow params, MonadMask m, MonadResource m) => FoldOptions -> RowParser row -> Connection -> Query -> params -> Stream (Of row) m () Source #

A version of streamWithOptions taking a parser as an argument.

streamWith_ :: (MonadMask m, MonadResource m) => RowParser row -> Connection -> Query -> Stream (Of row) m () Source #

A version of stream_ taking a parser as an argument.

streamWithOptionsAndParser_ :: (MonadMask m, MonadResource m) => FoldOptions -> RowParser row -> Connection -> Query -> Stream (Of row) m () Source #

A version of streamWithOptions_ taking a parser as an argument.

Streaming data in and out of PostgreSQL with COPY

copyIn :: (ToRow params, MonadIO m) => Connection -> Query -> params -> Stream (Of ByteString) m (Maybe ByteString) -> m (Maybe Int64) Source #

Issue a COPY FROM STDIN query and stream the results in.

Note that the data in the stream not need to represent a single row, or even an integral number of rows.

The stream indicates whether or not the copy was succesful. If the stream terminates with Nothing, the copy is succesful. If the stream terminates with Just error, the error message will be logged.

If copying was successful, the number of rows processed is returned.

copyOut :: (MonadIO m, ToRow params) => Connection -> Query -> params -> Stream (Of ByteString) m Int64 Source #

Issue a COPY TO STDOUT query and stream the results. When the stream is drained it returns the total amount of rows returned. Each element in the stream is either exactly one row of the result, or header or footer data depending on format.

Re-exported symbols

runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a #

Unwrap a ResourceT transformer, and call all registered release actions.

Note that there is some reference counting involved due to resourceForkIO. If multiple threads are sharing the same collection of resources, only the last call to runResourceT will deallocate the resources.

Since 0.3.0