{-|
  This module has functions to send commands LISTEN and NOTIFY to the database server.
  It also has a function to wait for and handle notifications on a database connection.

  For more information check the [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-notify.html).

-}
module Hasql.Notifications
  ( notifyPool
  , notify
  , listen
  , unlisten
  , waitForNotifications
  , PgIdentifier
  , toPgIdentifier
  , fromPgIdentifier
  ) where

import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (sql, run, statement)
import qualified Hasql.Session as S
import qualified Hasql.Statement as HST
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import qualified Database.PostgreSQL.LibPQ as PQ
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.ByteString.Char8 (ByteString)
import Data.Functor.Contravariant (contramap)
import Control.Monad (void, forever)
import Control.Concurrent (threadWaitRead)
import Control.Exception (Exception, throw)

-- | A wrapped bytestring that represents a properly escaped and quoted PostgreSQL identifier
newtype PgIdentifier = PgIdentifier ByteString deriving (Show)

-- | Uncatchable exceptions thrown and never caught.
newtype FatalError = FatalError { fatalErrorMessage :: String }
  deriving (Show)

instance Exception FatalError

-- | Given a PgIdentifier returns the wrapped bytestring
fromPgIdentifier :: PgIdentifier -> ByteString
fromPgIdentifier (PgIdentifier bs) = bs

-- | Given a bytestring returns a properly quoted and escaped PgIdentifier
toPgIdentifier :: Text -> PgIdentifier
toPgIdentifier x =
  PgIdentifier $ "\"" <> T.encodeUtf8 (strictlyReplaceQuotes x) <> "\""
  where
    strictlyReplaceQuotes :: Text -> Text
    strictlyReplaceQuotes = T.replace "\"" ("\"\"" :: Text)

-- | Given a Hasql Pool, a channel and a message sends a notify command to the database
notifyPool :: Pool -- ^ Pool from which the connection will be used to issue a NOTIFY command.
           -> Text -- ^ Channel where to send the notification
           -> Text -- ^ Payload to be sent with the notification
           -> IO (Either UsageError ())
notifyPool pool channel mesg =
   use pool (statement (channel, mesg) callStatement)
   where
     callStatement = HST.Statement ("SELECT pg_notify" <> "($1, $2)") encoder HD.noResult False
     encoder = contramap fst (HE.param $ HE.nonNullable HE.text) <> contramap snd (HE.param $ HE.nonNullable HE.text)

-- | Given a Hasql Connection, a channel and a message sends a notify command to the database
notify :: Connection -- ^ Connection to be used to send the NOTIFY command
       -> PgIdentifier -- ^ Channel where to send the notification
       -> Text -- ^ Payload to be sent with the notification
       -> IO (Either S.QueryError ())
notify con channel mesg =
   run (sql ("NOTIFY " <> fromPgIdentifier channel <> ", '" <> T.encodeUtf8 mesg <> "'")) con

{-| 
  Given a Hasql Connection and a channel sends a listen command to the database.
  Once the connection sends the LISTEN command the server register its interest in the channel.
  Hence it's important to keep track of which connection was used to open the listen command.

  Example of listening and waiting for a notification:

  @
  import System.Exit (die)

  import Hasql.Connection
  import Hasql.Notifications

  main :: IO ()
  main = do
    dbOrError <- acquire "postgres://localhost/db_name"
    case dbOrError of
        Right db -> do
            let channelToListen = toPgIdentifier "sample-channel"
            listen db channelToListen
            waitForNotifications (\channel _ -> print $ "Just got notification on channel " <> channel) db
        _ -> die "Could not open database connection"
  @
-}
listen :: Connection -- ^ Connection to be used to send the LISTEN command
       -> PgIdentifier -- ^ Channel this connection will be registered to listen to
       -> IO ()
listen con channel =
  void $ withLibPQConnection con execListen
  where
    execListen pqCon = void $ PQ.exec pqCon $ "LISTEN " <> fromPgIdentifier channel

-- | Given a Hasql Connection and a channel sends a unlisten command to the database
unlisten :: Connection -- ^ Connection currently registerd by a previous 'listen' call
         -> PgIdentifier -- ^ Channel this connection will be deregistered from
         -> IO ()
unlisten con channel =
  void $ withLibPQConnection con execListen
  where
    execListen pqCon = void $ PQ.exec pqCon $ "UNLISTEN " <> fromPgIdentifier channel


{-| 
  Given a function that handles notifications and a Hasql connection it will listen 
  on the database connection and call the handler everytime a message arrives.

  The message handler passed as first argument needs two parameters channel and payload.
  See an example of handling notification on a separate thread:

  @
  import Control.Concurrent.Async (async)
  import Control.Monad (void)
  import System.Exit (die)

  import Hasql.Connection
  import Hasql.Notifications

  notificationHandler :: ByteString -> ByteString -> IO()
  notificationHandler channel payload = 
    void $ async do
      print $ "Handle payload " <> payload <> " in its own thread"

  main :: IO ()
  main = do
    dbOrError <- acquire "postgres://localhost/db_name"
    case dbOrError of
        Right db -> do
            let channelToListen = toPgIdentifier "sample-channel"
            listen db channelToListen
            waitForNotifications notificationHandler db
        _ -> die "Could not open database connection"
  @
-}

waitForNotifications :: (ByteString -> ByteString -> IO()) -- ^ Callback function to handle incoming notifications
                     -> Connection -- ^ Connection where we will listen to
                     -> IO ()
waitForNotifications sendNotification con =
  withLibPQConnection con $ void . forever . pqFetch
  where
    pqFetch pqCon = do
      mNotification <- PQ.notifies pqCon
      case mNotification of
        Nothing -> do
          mfd <- PQ.socket pqCon
          case mfd of
            Nothing  -> panic "Error checking for PostgreSQL notifications"
            Just fd -> do
              void $ threadWaitRead fd
              void $ PQ.consumeInput pqCon
        Just notification ->
           sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification)
    panic :: String -> a
    panic a = throw (FatalError a)