{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE QuasiQuotes, OverloadedStrings #-}
module Hasql.Queue.Migrate where
import qualified Hasql.Queue.Internal as I
import Data.String
import Data.String.Here.Interpolated
import Hasql.Connection
import Hasql.Session
migrationQueryString :: String
-> String
migrationQueryString valueType = [i|
CREATE OR REPLACE FUNCTION notify_on(channel text) RETURNs VOID AS $$
BEGIN
EXECUTE (format(E'NOTIFY %I', channel));
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION listen_on(channel text) RETURNS VOID AS $$
BEGIN
EXECUTE (format(E'LISTEN %I', channel));
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION unlisten_on(channel text) RETURNS VOID AS $$
BEGIN
EXECUTE (format(E'UNLISTEN %I', channel));
END;
$$ LANGUAGE plpgsql;
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'state_t') THEN
CREATE TYPE state_t AS ENUM ('enqueued', 'failed');
END IF;
END$$;
CREATE SEQUENCE IF NOT EXISTS modified_index START 1;
CREATE TABLE IF NOT EXISTS payloads
( id BIGSERIAL PRIMARY KEY
, attempts int NOT NULL DEFAULT 0
, state state_t NOT NULL DEFAULT 'enqueued'
, modified_at int8 NOT NULL DEFAULT nextval('modified_index')
, value ${valueType} NOT NULL
);
CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at)
WHERE (state = 'enqueued');
|]
migrate :: Connection
-> String
-> IO ()
migrate conn valueType =
I.runThrow (sql $ fromString $ migrationQueryString valueType) conn
teardown :: Connection -> IO ()
teardown conn = do
let theQuery = [i|
DROP TABLE IF EXISTS payloads;
DROP TYPE IF EXISTS state_t;
DROP SEQUENCE IF EXISTS modified_index;
DROP FUNCTION IF EXISTS notify_on;
DROP FUNCTION IF EXISTS listen_on;
DROP FUNCTION IF EXISTS unlisten_on;
|]
I.runThrow (sql theQuery) conn