{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

-- | Functions for running Postgres queries.
module Postgres
  ( -- Connection
    Connection.Connection,
    Connection.connection,
    -- Settings
    Settings.Settings,
    Settings.decoder,
    -- Querying
    Query.Query,
    Query.Error (..),
    Query.sql,
    doQuery,
    -- Handling transactions
    transaction,
    inTestTransaction,
    -- Reexposing useful postgresql-typed types
    PGTypes.PGColumn (pgDecode),
    PGTypes.PGParameter (pgEncode),
  )
where

import qualified Control.Exception.Safe as Exception
import qualified Data.Pool
import Database.PostgreSQL.Typed (PGConnection)
import qualified Database.PostgreSQL.Typed.Array as PGArray
import Database.PostgreSQL.Typed.Protocol
  ( PGError,
    pgBegin,
    pgCommit,
    pgErrorCode,
    pgRollback,
    pgRollbackAll,
  )
import qualified Database.PostgreSQL.Typed.Types as PGTypes
import GHC.Stack (HasCallStack, withFrozenCallStack)
import qualified List
import qualified Log
import qualified Log.SqlQuery as SqlQuery
import qualified Platform
import Postgres.Connection (Connection)
import qualified Postgres.Connection as Connection
import qualified Postgres.Query as Query
import qualified Postgres.Settings as Settings
import qualified Postgres.Time as Time
import qualified Task
import qualified Tuple
import qualified Prelude

-- |
-- Perform a database transaction.
transaction :: Connection -> (Connection -> Task e a) -> Task e a
transaction :: Connection -> (Connection -> Task e a) -> Task e a
transaction Connection
conn Connection -> Task e a
func =
  let start :: PGConnection -> Task x PGConnection
      start :: PGConnection -> Task x PGConnection
start PGConnection
c =
        Connection -> IO PGConnection -> Task x PGConnection
forall a x. Connection -> IO a -> Task x a
doIO Connection
conn (IO PGConnection -> Task x PGConnection)
-> IO PGConnection -> Task x PGConnection
forall a b. (a -> b) -> a -> b
<| do
          PGConnection -> IO ()
pgBegin PGConnection
c
          PGConnection -> IO PGConnection
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure PGConnection
c
      --
      end :: Platform.Succeeded -> PGConnection -> Task x ()
      end :: Succeeded -> PGConnection -> Task x ()
end Succeeded
succeeded PGConnection
c =
        Connection -> IO () -> Task x ()
forall a x. Connection -> IO a -> Task x a
doIO Connection
conn
          (IO () -> Task x ()) -> IO () -> Task x ()
forall a b. (a -> b) -> a -> b
<| case Succeeded
succeeded of
            Succeeded
Platform.Succeeded -> PGConnection -> IO ()
pgCommit PGConnection
c
            Succeeded
Platform.Failed -> PGConnection -> IO ()
pgRollback PGConnection
c
            Platform.FailedWith SomeException
_ -> PGConnection -> IO ()
pgRollback PGConnection
c
      --
      setSingle :: PGConnection -> Connection
      setSingle :: PGConnection -> Connection
setSingle PGConnection
c =
        -- All queries in a transactions must run on the same thread.
        Connection
conn {singleOrPool :: SingleOrPool PGConnection
Connection.singleOrPool = PGConnection -> SingleOrPool PGConnection
forall c. c -> SingleOrPool c
Connection.Single PGConnection
c}
   in Connection -> (PGConnection -> Task e a) -> Task e a
forall e a. Connection -> (PGConnection -> Task e a) -> Task e a
withConnection Connection
conn ((PGConnection -> Task e a) -> Task e a)
-> (PGConnection -> Task e a) -> Task e a
forall a b. (a -> b) -> a -> b
<| \PGConnection
c ->
        Task e PGConnection
-> (Succeeded -> PGConnection -> Task e ())
-> (PGConnection -> Task e a)
-> Task e a
forall e a c b.
Task e a
-> (Succeeded -> a -> Task e c) -> (a -> Task e b) -> Task e b
Platform.bracketWithError (PGConnection -> Task e PGConnection
forall x. PGConnection -> Task x PGConnection
start PGConnection
c) Succeeded -> PGConnection -> Task e ()
forall x. Succeeded -> PGConnection -> Task x ()
end (PGConnection -> Connection
setSingle (PGConnection -> Connection)
-> (Connection -> Task e a) -> PGConnection -> Task e a
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> Connection -> Task e a
func)

-- | Run code in a transaction, then roll that transaction back.
--   Useful in tests that shouldn't leave anything behind in the DB.
inTestTransaction :: Connection -> (Connection -> Task x a) -> Task x a
inTestTransaction :: Connection -> (Connection -> Task x a) -> Task x a
inTestTransaction Connection
conn Connection -> Task x a
func =
  let start :: PGConnection -> Task x PGConnection
      start :: PGConnection -> Task x PGConnection
start PGConnection
c = do
        Connection -> PGConnection -> Task x ()
forall x. Connection -> PGConnection -> Task x ()
rollbackAllSafe Connection
conn PGConnection
c
        Connection -> IO () -> Task x ()
forall a x. Connection -> IO a -> Task x a
doIO Connection
conn (IO () -> Task x ()) -> IO () -> Task x ()
forall a b. (a -> b) -> a -> b
<| PGConnection -> IO ()
pgBegin PGConnection
c
        PGConnection -> Task x PGConnection
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure PGConnection
c
      --
      end :: Platform.Succeeded -> PGConnection -> Task x ()
      end :: Succeeded -> PGConnection -> Task x ()
end Succeeded
_ PGConnection
c =
        Connection -> PGConnection -> Task x ()
forall x. Connection -> PGConnection -> Task x ()
rollbackAllSafe Connection
conn PGConnection
c
      --
      setSingle :: PGConnection -> Connection
      setSingle :: PGConnection -> Connection
setSingle PGConnection
c =
        -- All queries in a transactions must run on the same thread.
        Connection
conn {singleOrPool :: SingleOrPool PGConnection
Connection.singleOrPool = PGConnection -> SingleOrPool PGConnection
forall c. c -> SingleOrPool c
Connection.Single PGConnection
c}
   in --
      Connection -> (PGConnection -> Task x a) -> Task x a
forall e a. Connection -> (PGConnection -> Task e a) -> Task e a
withConnection Connection
conn ((PGConnection -> Task x a) -> Task x a)
-> (PGConnection -> Task x a) -> Task x a
forall a b. (a -> b) -> a -> b
<| \PGConnection
c ->
        Task x PGConnection
-> (Succeeded -> PGConnection -> Task x ())
-> (PGConnection -> Task x a)
-> Task x a
forall e a c b.
Task e a
-> (Succeeded -> a -> Task e c) -> (a -> Task e b) -> Task e b
Platform.bracketWithError (PGConnection -> Task x PGConnection
forall x. PGConnection -> Task x PGConnection
start PGConnection
c) Succeeded -> PGConnection -> Task x ()
forall x. Succeeded -> PGConnection -> Task x ()
end (PGConnection -> Connection
setSingle (PGConnection -> Connection)
-> (Connection -> Task x a) -> PGConnection -> Task x a
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> Connection -> Task x a
func)

rollbackAllSafe :: Connection -> PGConnection -> Task x ()
rollbackAllSafe :: Connection -> PGConnection -> Task x ()
rollbackAllSafe Connection
conn PGConnection
c =
  Connection -> IO () -> Task x ()
forall a x. Connection -> IO a -> Task x a
doIO Connection
conn (IO () -> Task x ()) -> IO () -> Task x ()
forall a b. (a -> b) -> a -> b
<| do
    -- Because calling `rollbackAllTransactions` when no transactions are
    -- running will result in a warning message in the log (even if tests
    -- pass), let's start by beginning a transaction, so that we alwas have
    -- at least one to kill.
    PGConnection -> IO ()
pgBegin PGConnection
c
    PGConnection -> IO ()
pgRollbackAll PGConnection
c

-- | Run a query against MySql. This will return a list of rows, where the @row@
-- type is a tuple containing the queried columns.
--
-- > doQuery
-- >   connection
-- >   [sql| SELECT name, breed FROM doggos |]
-- >   (\result ->
-- >     case result of
-- >       Ok rows -> Task.succeed rows
-- >       Err err -> Task.fail err
-- >   )
doQuery ::
  HasCallStack =>
  Connection ->
  Query.Query row ->
  (Result Query.Error [row] -> Task e a) ->
  Task e a
doQuery :: Connection
-> Query row -> (Result Error [row] -> Task e a) -> Task e a
doQuery Connection
conn Query row
query Result Error [row] -> Task e a
handleResponse =
  Connection -> Query row -> Task Error [row]
forall row. Connection -> Query row -> Task Error [row]
runQuery Connection
conn Query row
query
    -- Handle the response before wrapping the operation in a context. This way,
    -- if the response handling logic creates errors, those errors can inherit
    -- context values like the query string.
    Task Error [row]
-> (Task Error [row] -> Task Error [row]) -> Task Error [row]
forall a b. a -> (a -> b) -> b
|> ( \Task Error [row]
task ->
           (HasCallStack => Text -> Task Error [row] -> Task Error [row])
-> Text -> Task Error [row] -> Task Error [row]
forall a. HasCallStack => (HasCallStack => a) -> a
withFrozenCallStack HasCallStack => Text -> Task Error [row] -> Task Error [row]
forall e a. HasCallStack => Text -> Task e a -> Task e a
Platform.tracingSpan Text
"Postgresql Query" (Task Error [row] -> Task Error [row])
-> Task Error [row] -> Task Error [row]
forall a b. (a -> b) -> a -> b
<| do
             [row]
res <-
               Task Error [row] -> Task Error () -> Task Error [row]
forall e a b. Task e a -> Task e b -> Task e a
Platform.finally
                 Task Error [row]
task
                 ( do
                     Details -> Task Error ()
forall d e. TracingSpanDetails d => d -> Task e ()
Platform.setTracingSpanDetails Details
queryInfo
                     Text -> Task Error ()
forall e. Text -> Task e ()
Platform.setTracingSpanSummary
                       ( (Details -> Maybe Text
SqlQuery.sqlOperation Details
queryInfo Maybe Text -> (Maybe Text -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
Maybe.withDefault Text
"?")
                           Text -> Text -> Text
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ Text
" "
                           Text -> Text -> Text
forall appendable.
Semigroup appendable =>
appendable -> appendable -> appendable
++ (Details -> Maybe Text
SqlQuery.queriedRelation Details
queryInfo Maybe Text -> (Maybe Text -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
Maybe.withDefault Text
"?")
                       )
                 )
             -- If we end up here it means the query succeeded. Overwrite the tracing
             -- details to contain the amount of selected rows. This information can be
             -- useful when debugging slow queries.
             Details -> Task Error ()
forall d e. TracingSpanDetails d => d -> Task e ()
Platform.setTracingSpanDetails
               Details
queryInfo {rowsReturned :: Maybe Int
SqlQuery.rowsReturned = Int -> Maybe Int
forall a. a -> Maybe a
Just ([row] -> Int
forall a. List a -> Int
List.length [row]
res)}
             [row] -> Task Error [row]
forall (f :: * -> *) a. Applicative f => a -> f a
Prelude.pure [row]
res
       )
    Task Error [row]
-> (Task Error [row] -> Task Error (Result Error [row]))
-> Task Error (Result Error [row])
forall a b. a -> (a -> b) -> b
|> ([row] -> Result Error [row])
-> Task Error [row] -> Task Error (Result Error [row])
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map [row] -> Result Error [row]
forall error value. value -> Result error value
Ok
    Task Error (Result Error [row])
-> (Task Error (Result Error [row]) -> Task e (Result Error [row]))
-> Task e (Result Error [row])
forall a b. a -> (a -> b) -> b
|> (Error -> Task e (Result Error [row]))
-> Task Error (Result Error [row]) -> Task e (Result Error [row])
forall x y a. (x -> Task y a) -> Task x a -> Task y a
Task.onError (Result Error [row] -> Task e (Result Error [row])
forall a x. a -> Task x a
Task.succeed (Result Error [row] -> Task e (Result Error [row]))
-> (Error -> Result Error [row])
-> Error
-> Task e (Result Error [row])
forall b c a. (b -> c) -> (a -> b) -> a -> c
<< Error -> Result Error [row]
forall error value. error -> Result error value
Err)
    Task e (Result Error [row])
-> (Task e (Result Error [row]) -> Task e a) -> Task e a
forall a b. a -> (a -> b) -> b
|> (Result Error [row] -> Task e a)
-> Task e (Result Error [row]) -> Task e a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
andThen Result Error [row] -> Task e a
handleResponse
  where
    queryInfo :: Details
queryInfo = Query row -> Details -> Details
forall row. Query row -> Details -> Details
Query.details Query row
query (Connection -> Details
Connection.connDetails Connection
conn)

fromPGError :: Connection -> PGError -> Query.Error
fromPGError :: Connection -> PGError -> Error
fromPGError Connection
c PGError
pgError =
  -- There's a lot of errors Postgres might throw. For a couple we have custom
  -- `Error` constructors defined, because we've seen a couple of them and would
  -- like to handle them in special ways or define custom error messages for
  -- them. If a Postgres error starts showing up in our log, please feel free
  -- to add a special case for it to this list!
  case PGError -> ByteString
pgErrorCode PGError
pgError of
    ByteString
"23505" ->
      PGError -> String
forall e. Exception e => e -> String
Exception.displayException PGError
pgError
        String -> (String -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> String -> Text
Text.fromList
        Text -> (Text -> Error) -> Error
forall a b. a -> (a -> b) -> b
|> Text -> Error
Query.UniqueViolation
    ByteString
"57014" ->
      Float -> Error
Query.Timeout (Interval -> Float
Time.milliseconds (Connection -> Interval
Connection.timeout Connection
c))
    ByteString
_ ->
      PGError -> String
forall e. Exception e => e -> String
Exception.displayException PGError
pgError
        String -> (String -> Text) -> Text
forall a b. a -> (a -> b) -> b
|> String -> Text
Text.fromList
        -- We add the full error in the context array rather than the
        -- message string, to help errors being grouped correctly in a
        -- bug tracker. Errors might contain unique bits of data like
        -- generated id's or timestamps which when included in the main
        -- error message would result in each error being grouped by
        -- itself.
        Text -> (Text -> Error) -> Error
forall a b. a -> (a -> b) -> b
|> (\Text
err -> Text -> [Context] -> Error
Query.Other Text
"Postgres query failed with unexpected error" [Text -> Text -> Context
forall a. (Show a, ToJSON a) => Text -> a -> Context
Log.context Text
"error" Text
err])

--
-- CONNECTION HELPERS
--

runQuery :: Connection -> Query.Query row -> Task Query.Error [row]
runQuery :: Connection -> Query row -> Task Error [row]
runQuery Connection
conn Query row
query =
  Connection
-> (PGConnection -> Task Error [row]) -> Task Error [row]
forall e a. Connection -> (PGConnection -> Task e a) -> Task e a
withConnection Connection
conn ((PGConnection -> Task Error [row]) -> Task Error [row])
-> (PGConnection -> Task Error [row]) -> Task Error [row]
forall a b. (a -> b) -> a -> b
<| \PGConnection
c ->
    Query row -> PGConnection -> IO [row]
forall row. Query row -> PGConnection -> IO [row]
Query.runQuery Query row
query PGConnection
c
      IO [row]
-> (IO [row] -> IO (Either PGError [row]))
-> IO (Either PGError [row])
forall a b. a -> (a -> b) -> b
|> IO [row] -> IO (Either PGError [row])
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
Exception.try
      IO (Either PGError [row])
-> (IO (Either PGError [row]) -> IO (Result Error [row]))
-> IO (Result Error [row])
forall a b. a -> (a -> b) -> b
|> (Either PGError [row] -> Result Error [row])
-> IO (Either PGError [row]) -> IO (Result Error [row])
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map
        ( \Either PGError [row]
res -> case Either PGError [row]
res of
            Prelude.Right [row]
x -> [row] -> Result Error [row]
forall error value. value -> Result error value
Ok [row]
x
            Prelude.Left PGError
err -> Error -> Result Error [row]
forall error value. error -> Result error value
Err (Connection -> PGError -> Error
fromPGError Connection
conn PGError
err)
        )
      IO (Result Error [row])
-> (IO (Result Error [row]) -> Task Error [row])
-> Task Error [row]
forall a b. a -> (a -> b) -> b
|> Handler -> IO (Result Error [row]) -> Task Error [row]
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything (Connection -> Handler
Connection.doAnything Connection
conn)
      Task Error [row]
-> (Task Error [row] -> Task Error [row]) -> Task Error [row]
forall a b. a -> (a -> b) -> b
|> Connection -> Task Error [row] -> Task Error [row]
forall a. Connection -> Task Error a -> Task Error a
withTimeout Connection
conn

withTimeout :: Connection -> Task Query.Error a -> Task Query.Error a
withTimeout :: Connection -> Task Error a -> Task Error a
withTimeout Connection
conn Task Error a
task =
  if Interval -> Int
Time.microseconds (Connection -> Interval
Connection.timeout Connection
conn) Int -> Int -> Bool
forall comparable.
Ord comparable =>
comparable -> comparable -> Bool
> Int
0
    then
      Float -> Error -> Task Error a -> Task Error a
forall err a. Float -> err -> Task err a -> Task err a
Task.timeout
        (Interval -> Float
Time.milliseconds (Connection -> Interval
Connection.timeout Connection
conn))
        (Float -> Error
Query.Timeout (Interval -> Float
Time.milliseconds (Connection -> Interval
Connection.timeout Connection
conn)))
        Task Error a
task
    else Task Error a
task

-- | by default, queries pull a connection from the connection pool.
--   For SQL transactions, we want all queries within the transaction to run
--   on the same connection. withConnection lets transaction bundle
--   queries on the same connection.
withConnection :: Connection -> (PGConnection -> Task e a) -> Task e a
withConnection :: Connection -> (PGConnection -> Task e a) -> Task e a
withConnection Connection
conn PGConnection -> Task e a
func =
  let acquire :: Data.Pool.Pool conn -> Task x (conn, Data.Pool.LocalPool conn)
      acquire :: Pool conn -> Task x (conn, LocalPool conn)
acquire Pool conn
pool =
        Text
-> [Context]
-> Task x (conn, LocalPool conn)
-> Task x (conn, LocalPool conn)
forall e b.
HasCallStack =>
Text -> [Context] -> Task e b -> Task e b
Log.withContext Text
"acquiring Postgres connection from pool" []
          (Task x (conn, LocalPool conn) -> Task x (conn, LocalPool conn))
-> Task x (conn, LocalPool conn) -> Task x (conn, LocalPool conn)
forall a b. (a -> b) -> a -> b
<| Connection
-> IO (conn, LocalPool conn) -> Task x (conn, LocalPool conn)
forall a x. Connection -> IO a -> Task x a
doIO Connection
conn
          (IO (conn, LocalPool conn) -> Task x (conn, LocalPool conn))
-> IO (conn, LocalPool conn) -> Task x (conn, LocalPool conn)
forall a b. (a -> b) -> a -> b
<| Pool conn -> IO (conn, LocalPool conn)
forall a. Pool a -> IO (a, LocalPool a)
Data.Pool.takeResource Pool conn
pool
      --
      release :: Data.Pool.Pool conn -> Platform.Succeeded -> (conn, Data.Pool.LocalPool conn) -> Task y ()
      release :: Pool conn -> Succeeded -> (conn, LocalPool conn) -> Task y ()
release Pool conn
pool Succeeded
succeeded (conn
c, LocalPool conn
localPool) =
        Connection -> IO () -> Task y ()
forall a x. Connection -> IO a -> Task x a
doIO Connection
conn
          (IO () -> Task y ()) -> IO () -> Task y ()
forall a b. (a -> b) -> a -> b
<| case Succeeded
succeeded of
            Succeeded
Platform.Succeeded ->
              LocalPool conn -> conn -> IO ()
forall a. LocalPool a -> a -> IO ()
Data.Pool.putResource LocalPool conn
localPool conn
c
            Succeeded
Platform.Failed ->
              Pool conn -> LocalPool conn -> conn -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
Data.Pool.destroyResource Pool conn
pool LocalPool conn
localPool conn
c
            Platform.FailedWith SomeException
_ ->
              Pool conn -> LocalPool conn -> conn -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
Data.Pool.destroyResource Pool conn
pool LocalPool conn
localPool conn
c
   in --
      case Connection -> SingleOrPool PGConnection
Connection.singleOrPool Connection
conn of
        (Connection.Single PGConnection
c) ->
          PGConnection -> Task e a
func PGConnection
c
        --
        (Connection.Pool Pool PGConnection
pool) ->
          Task e (PGConnection, LocalPool PGConnection)
-> (Succeeded
    -> (PGConnection, LocalPool PGConnection) -> Task e ())
-> ((PGConnection, LocalPool PGConnection) -> Task e a)
-> Task e a
forall e a c b.
Task e a
-> (Succeeded -> a -> Task e c) -> (a -> Task e b) -> Task e b
Platform.bracketWithError (Pool PGConnection -> Task e (PGConnection, LocalPool PGConnection)
forall conn x. Pool conn -> Task x (conn, LocalPool conn)
acquire Pool PGConnection
pool) (Pool PGConnection
-> Succeeded -> (PGConnection, LocalPool PGConnection) -> Task e ()
forall conn y.
Pool conn -> Succeeded -> (conn, LocalPool conn) -> Task y ()
release Pool PGConnection
pool) ((PGConnection, LocalPool PGConnection) -> PGConnection
forall a b. (a, b) -> a
Tuple.first ((PGConnection, LocalPool PGConnection) -> PGConnection)
-> (PGConnection -> Task e a)
-> (PGConnection, LocalPool PGConnection)
-> Task e a
forall a b c. (a -> b) -> (b -> c) -> a -> c
>> PGConnection -> Task e a
func)

doIO :: Connection -> Prelude.IO a -> Task x a
doIO :: Connection -> IO a -> Task x a
doIO Connection
conn IO a
io =
  Handler -> IO (Result x a) -> Task x a
forall e a. Handler -> IO (Result e a) -> Task e a
Platform.doAnything (Connection -> Handler
Connection.doAnything Connection
conn) (IO a
io IO a -> (IO a -> IO (Result x a)) -> IO (Result x a)
forall a b. a -> (a -> b) -> b
|> (a -> Result x a) -> IO a -> IO (Result x a)
forall (m :: * -> *) a value.
Functor m =>
(a -> value) -> m a -> m value
map a -> Result x a
forall error value. value -> Result error value
Ok)

-- useful typeclass instances
instance PGTypes.PGType "jsonb" => PGTypes.PGType "jsonb[]" where
  type PGVal "jsonb[]" = PGArray.PGArray (PGTypes.PGVal "jsonb")

instance PGTypes.PGType "jsonb" => PGArray.PGArrayType "jsonb[]" where
  type PGElemType "jsonb[]" = "jsonb"