module PostgRESTWS.Database
( notifyPool
, notify
, listen
, unlisten
, waitForNotifications
) where
import Protolude
import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (sql, run)
import qualified Hasql.Session as S
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Database.PostgreSQL.LibPQ as PQ
import Data.Either.Combinators
newtype Error = NotifyError Text
notifyPool :: Pool -> ByteString -> ByteString -> IO (Either Error ())
notifyPool pool channel mesg =
mapError <$> use pool (sql ("NOTIFY " <> channel <> ", '" <> mesg <> "'"))
where
mapError :: Either UsageError () -> Either Error ()
mapError = mapLeft (NotifyError . show)
notify :: Connection -> ByteString -> ByteString -> IO (Either Error ())
notify con channel mesg =
mapError <$> run (sql ("NOTIFY " <> channel <> ", '" <> mesg <> "'")) con
where
mapError :: Either S.Error () -> Either Error ()
mapError = mapLeft (NotifyError . show)
listen :: Connection -> ByteString -> IO ()
listen con channel =
void $ withLibPQConnection con execListen
where
execListen pqCon = void $ PQ.exec pqCon $ "LISTEN " <> channel
unlisten :: Connection -> ByteString -> IO ()
unlisten con channel =
void $ withLibPQConnection con execListen
where
execListen pqCon = void $ PQ.exec pqCon $ "UNLISTEN " <> channel
waitForNotifications :: (ByteString -> ByteString -> IO()) -> Connection -> IO ()
waitForNotifications sendNotification con =
withLibPQConnection con $ void . forkIO . forever . pqFetch
where
pqFetch pqCon = do
mNotification <- PQ.notifies pqCon
case mNotification of
Nothing -> do
mfd <- PQ.socket pqCon
case mfd of
Nothing -> panic "Error checking for PostgreSQL notifications"
Just fd -> do
void $ threadWaitRead fd
void $ PQ.consumeInput pqCon
Just notification ->
sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification)