{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE UndecidableInstances #-}

{-|
Module      : Mealstrom.PostgresJSONStore
Description : Main backend for FSMs and WALs.
Copyright   : (c) Max Amanshauser, 2016
License     : MIT
Maintainer  : max@lambdalifting.org

This module is the main backend for FSMs. Instances are stored in a table
with the name passed as storeName when creating the PostgresJSONStore. WALs use
the same name with "Wal" appended.
-}

module Mealstrom.PostgresJSONStore(
    PostgresJSONStore,
    mkStore,
    _fsmRead,
    _fsmCreate,
    _fsmUpdate,
    _batchConversion
) where


import           Control.Exception                           (handle,SomeException)
import           Control.Monad                               (void)
import           Database.PostgreSQL.Simple                as PGS
import           Database.PostgreSQL.Simple.FromRow
import           Database.PostgreSQL.Simple.ToField
import           Database.PostgreSQL.Simple.Transaction
import           Database.PostgreSQL.Simple.Types
import           Data.Aeson
import qualified Data.ByteString.Char8                     as DBSC8
import           Data.Int                                    (Int64)
import           Data.Maybe                                  (listToMaybe)
import           Data.Pool
import           Data.Text
import           Data.Time
import           Data.Typeable                        hiding (Proxy)
import           GHC.Generics
import           Database.PostgreSQL.Simple.FromField        (FromField (fromField),
                                                              fromJSONField,
                                                              Conversion)

import           Mealstrom.FSM
import           Mealstrom.FSMStore
import           Mealstrom.WALStore

data PostgresJSONStore = PostgresJSONStore {
    PostgresJSONStore -> Pool Connection
storeConnPool :: Pool Connection,
    PostgresJSONStore -> Text
storeName     :: Text
}

instance (FromJSON s, FromJSON e, FromJSON a,
          ToJSON   s, ToJSON   e, ToJSON   a,
          Typeable s, Typeable e, Typeable a,
          MealyInstance k s e a)              => FSMStore PostgresJSONStore k s e a where
    fsmRead :: PostgresJSONStore -> k -> Proxy k s e a -> IO (Maybe s)
fsmRead PostgresJSONStore
st k
k Proxy k s e a
p = PostgresJSONStore
-> k -> Proxy k s e a -> IO (Maybe (Instance k s e a))
forall s e a k.
(FromJSON s, FromJSON e, FromJSON a, Typeable s, Typeable e,
 Typeable a, MealyInstance k s e a) =>
PostgresJSONStore
-> k -> Proxy k s e a -> IO (Maybe (Instance k s e a))
Mealstrom.PostgresJSONStore._fsmRead PostgresJSONStore
st k
k Proxy k s e a
p IO (Maybe (Instance k s e a))
-> (Maybe (Instance k s e a) -> IO (Maybe s)) -> IO (Maybe s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe (Instance k s e a)
mi -> Maybe s -> IO (Maybe s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe s -> IO (Maybe s)) -> Maybe s -> IO (Maybe s)
forall a b. (a -> b) -> a -> b
$ (Instance k s e a -> s) -> Maybe (Instance k s e a) -> Maybe s
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Machine s e a -> s
forall s e a. Machine s e a -> s
currState (Machine s e a -> s)
-> (Instance k s e a -> Machine s e a) -> Instance k s e a -> s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Instance k s e a -> Machine s e a
forall k s e a. Instance k s e a -> Machine s e a
machine) Maybe (Instance k s e a)
mi
    fsmCreate :: PostgresJSONStore -> Instance k s e a -> IO (Maybe String)
fsmCreate      = PostgresJSONStore -> Instance k s e a -> IO (Maybe String)
forall k s e a.
(ToJSON s, ToJSON e, ToJSON a, Typeable s, Typeable e, Typeable a,
 MealyInstance k s e a) =>
PostgresJSONStore -> Instance k s e a -> IO (Maybe String)
Mealstrom.PostgresJSONStore._fsmCreate
    fsmUpdate :: PostgresJSONStore
-> k -> MachineTransformer s e a -> IO MealyStatus
fsmUpdate      = PostgresJSONStore
-> k -> MachineTransformer s e a -> IO MealyStatus
forall k s e a.
(FromJSON s, FromJSON e, FromJSON a, ToJSON s, ToJSON e, ToJSON a,
 Typeable s, Typeable e, Typeable a, MealyInstance k s e a) =>
PostgresJSONStore
-> k -> MachineTransformer s e a -> IO MealyStatus
Mealstrom.PostgresJSONStore._fsmUpdate

instance (FSMKey k) => WALStore PostgresJSONStore k where
    walUpsertIncrement :: PostgresJSONStore -> k -> IO ()
walUpsertIncrement = PostgresJSONStore -> k -> IO ()
forall k. FSMKey k => PostgresJSONStore -> k -> IO ()
Mealstrom.PostgresJSONStore.walUpsertIncrement
    walDecrement :: PostgresJSONStore -> k -> IO ()
walDecrement       = PostgresJSONStore -> k -> IO ()
forall k. FSMKey k => PostgresJSONStore -> k -> IO ()
Mealstrom.PostgresJSONStore.walDecrement
    walScan :: PostgresJSONStore -> Int -> IO [WALEntry k]
walScan            = PostgresJSONStore -> Int -> IO [WALEntry k]
forall k. FSMKey k => PostgresJSONStore -> Int -> IO [WALEntry k]
Mealstrom.PostgresJSONStore.walScan

-- |We create a database pool (no subpools) of 20 connections that will be closed
-- after 10 seconds of inactivity.
givePool :: IO Connection -> IO (Pool Connection)
givePool :: IO Connection -> IO (Pool Connection)
givePool IO Connection
creator = IO Connection
-> (Connection -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool Connection)
forall a.
IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool IO Connection
creator Connection -> IO ()
close Int
1 NominalDiffTime
10 Int
20


-- #########
-- # FSM API
-- #########
_fsmRead :: (FromJSON s, FromJSON e, FromJSON a,
             Typeable s, Typeable e, Typeable a,
             MealyInstance k s e a)              =>
             PostgresJSONStore                   ->
             k                                   ->
             Proxy k s e a                       -> IO (Maybe (Instance k s e a))
_fsmRead :: PostgresJSONStore
-> k -> Proxy k s e a -> IO (Maybe (Instance k s e a))
_fsmRead PostgresJSONStore
st k
k Proxy k s e a
_p =
    Pool Connection
-> (Connection -> IO (Maybe (Instance k s e a)))
-> IO (Maybe (Instance k s e a))
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource (PostgresJSONStore -> Pool Connection
storeConnPool PostgresJSONStore
st) (\Connection
conn ->
        Connection
-> IO (Maybe (Instance k s e a)) -> IO (Maybe (Instance k s e a))
forall a. Connection -> IO a -> IO a
withTransactionSerializable Connection
conn (IO (Maybe (Instance k s e a)) -> IO (Maybe (Instance k s e a)))
-> IO (Maybe (Instance k s e a)) -> IO (Maybe (Instance k s e a))
forall a b. (a -> b) -> a -> b
$ do
            [Instance k s e a]
el <- Connection -> Text -> Text -> IO [Instance k s e a]
forall v. FromRow v => Connection -> Text -> Text -> IO [v]
_getValue Connection
conn (PostgresJSONStore -> Text
storeName PostgresJSONStore
st) (k -> Text
forall k. FSMKey k => k -> Text
toText k
k)
            Maybe (Instance k s e a) -> IO (Maybe (Instance k s e a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Instance k s e a) -> IO (Maybe (Instance k s e a)))
-> Maybe (Instance k s e a) -> IO (Maybe (Instance k s e a))
forall a b. (a -> b) -> a -> b
$ [Instance k s e a] -> Maybe (Instance k s e a)
forall a. [a] -> Maybe a
listToMaybe [Instance k s e a]
el)


_fsmCreate :: forall k s e a .
              (ToJSON   s, ToJSON   e, ToJSON   a,
               Typeable s, Typeable e, Typeable a,
               MealyInstance k s e a)              =>
               PostgresJSONStore                   ->
               Instance k s e a                    -> IO (Maybe String)
_fsmCreate :: PostgresJSONStore -> Instance k s e a -> IO (Maybe String)
_fsmCreate PostgresJSONStore
st Instance k s e a
i =
    (SomeException -> IO (Maybe String))
-> IO (Maybe String) -> IO (Maybe String)
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (\(SomeException
e::SomeException) -> Maybe String -> IO (Maybe String)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe String -> IO (Maybe String))
-> Maybe String -> IO (Maybe String)
forall a b. (a -> b) -> a -> b
$ String -> Maybe String
forall a. a -> Maybe a
Just (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
           (Pool Connection
-> (Connection -> IO (Maybe String)) -> IO (Maybe String)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource (PostgresJSONStore -> Pool Connection
storeConnPool PostgresJSONStore
st) (\Connection
conn ->
               Connection -> IO (Maybe String) -> IO (Maybe String)
forall a. Connection -> IO a -> IO a
withTransactionSerializable Connection
conn (IO (Maybe String) -> IO (Maybe String))
-> IO (Maybe String) -> IO (Maybe String)
forall a b. (a -> b) -> a -> b
$ do
                   IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int64 -> IO ()) -> IO Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> Text -> Machine s e a -> IO Int64
forall v. ToField v => Connection -> Text -> Text -> v -> IO Int64
_postValue Connection
conn (PostgresJSONStore -> Text
storeName PostgresJSONStore
st) (k -> Text
forall k. FSMKey k => k -> Text
toText (k -> Text) -> k -> Text
forall a b. (a -> b) -> a -> b
$ Instance k s e a -> k
forall k s e a. Instance k s e a -> k
key Instance k s e a
i) (Instance k s e a -> Machine s e a
forall k s e a. Instance k s e a -> Machine s e a
machine Instance k s e a
i)
                   Maybe String -> IO (Maybe String)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe String
forall a. Maybe a
Nothing))


-- |Postgresql-simple exceptions will be caught by `patch` in FSMApi.hs
-- In principle all transaction isolation levels offered by Postgres are safe
-- here, because we do explicit locking in _getValueForUpdate.
-- However things become more interesting when considering that you can do
-- arbitrary queries in effects, either using the functions in this
-- module or otherwise.

-- We use Serializable here, because it involves no extra cost in our case, and
-- it provides safety when used in arbitrary ways in effects.
-- Hence,
-- * Serializable is recommended and safe.
-- * Repeatable Read, or in PostgreSQL's case Snapshot Isolation, does *not* protect
--   against write skew, which means that if two Actions perform reads and based
--   on the result update data, one of the two updates may be lost.
-- * Read Committed means the usual caveats apply (Nonrepeatable reads, Phantom reads, Write skew…).
--
--   If you are not careful you may end up with wrong data or attempts to insert data
--   with a duplicate ID…
--   Hence, when in doubt, do not lower the isolation level.
_fsmUpdate :: forall k s e a .
              (FromJSON s, FromJSON e, FromJSON a,
               ToJSON   s, ToJSON   e, ToJSON   a,
               Typeable s, Typeable e, Typeable a,
               MealyInstance k s e a)              =>
               PostgresJSONStore                   ->
               k                                   ->
               MachineTransformer s e a            -> IO MealyStatus
_fsmUpdate :: PostgresJSONStore
-> k -> MachineTransformer s e a -> IO MealyStatus
_fsmUpdate PostgresJSONStore
st k
k MachineTransformer s e a
t =
    Pool Connection -> (Connection -> IO MealyStatus) -> IO MealyStatus
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource (PostgresJSONStore -> Pool Connection
storeConnPool PostgresJSONStore
st) (\Connection
conn ->
        Connection -> IO MealyStatus -> IO MealyStatus
forall a. Connection -> IO a -> IO a
withTransactionSerializable Connection
conn (IO MealyStatus -> IO MealyStatus)
-> IO MealyStatus -> IO MealyStatus
forall a b. (a -> b) -> a -> b
$ do
            [Instance k s e a]
el <- Connection -> Text -> Text -> IO [Instance k s e a]
forall v. FromRow v => Connection -> Text -> Text -> IO [v]
_getValueForUpdate Connection
conn (PostgresJSONStore -> Text
storeName PostgresJSONStore
st) (k -> Text
forall k. FSMKey k => k -> Text
toText k
k) :: IO [Instance k s e a]
            let entry :: Maybe (Instance k s e a)
entry = [Instance k s e a] -> Maybe (Instance k s e a)
forall a. [a] -> Maybe a
listToMaybe [Instance k s e a]
el

            IO MealyStatus
-> (Instance k s e a -> IO MealyStatus)
-> Maybe (Instance k s e a)
-> IO MealyStatus
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
                (MealyStatus -> IO MealyStatus
forall (m :: * -> *) a. Monad m => a -> m a
return MealyStatus
MealyError)
                (\Instance k s e a
e -> do
                    Machine s e a
newMachine <- MachineTransformer s e a
t (Instance k s e a -> Machine s e a
forall k s e a. Instance k s e a -> Machine s e a
machine Instance k s e a
e)
                    IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Connection -> Text -> Text -> Machine s e a -> IO Int64
forall v. ToField v => Connection -> Text -> Text -> v -> IO Int64
_postOrUpdateValue Connection
conn (PostgresJSONStore -> Text
storeName PostgresJSONStore
st) (k -> Text
forall k. FSMKey k => k -> Text
toText k
k) Machine s e a
newMachine)
                    MealyStatus -> IO MealyStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (MealyStatus -> IO MealyStatus) -> MealyStatus -> IO MealyStatus
forall a b. (a -> b) -> a -> b
$ if [Msg a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
Prelude.null (Machine s e a -> [Msg a]
forall s e a. Machine s e a -> [Msg a]
outbox Machine s e a
newMachine) then MealyStatus
Done else MealyStatus
Pending)
                Maybe (Instance k s e a)
entry)


-- #####
-- # WAL
-- #####
_createWalTable :: Connection -> Text -> IO Int64
_createWalTable :: Connection -> Text -> IO Int64
_createWalTable Connection
conn Text
name =
    Connection -> Query -> Only Identifier -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PGS.execute Connection
conn Query
"CREATE TABLE IF NOT EXISTS ? ( id TEXT PRIMARY KEY, date timestamptz NOT NULL, count int NOT NULL )" (Identifier -> Only Identifier
forall a. a -> Only a
Only (Text -> Identifier
Identifier Text
name))

-- |Updates a WALEntry if it exists, inserts a new WALEntry if is is missing.
walUpsertIncrement :: (FSMKey k) => PostgresJSONStore -> k -> IO ()
walUpsertIncrement :: PostgresJSONStore -> k -> IO ()
walUpsertIncrement PostgresJSONStore
st k
i =
    PostgresJSONStore -> k -> Query -> IO ()
forall k. FSMKey k => PostgresJSONStore -> k -> Query -> IO ()
_walExecute PostgresJSONStore
st k
i Query
_walIncrement

walDecrement :: (FSMKey k) => PostgresJSONStore -> k -> IO ()
walDecrement :: PostgresJSONStore -> k -> IO ()
walDecrement PostgresJSONStore
st k
i =
    PostgresJSONStore -> k -> Query -> IO ()
forall k. FSMKey k => PostgresJSONStore -> k -> Query -> IO ()
_walExecute PostgresJSONStore
st k
i Query
_walDecrement

_walExecute :: (FSMKey k) => PostgresJSONStore -> k -> Query -> IO ()
_walExecute :: PostgresJSONStore -> k -> Query -> IO ()
_walExecute PostgresJSONStore
st k
k Query
q = let tbl :: Text
tbl = Text -> Text -> Text
append (PostgresJSONStore -> Text
storeName PostgresJSONStore
st) Text
"Wal" in
    Pool Connection -> (Connection -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource (PostgresJSONStore -> Pool Connection
storeConnPool PostgresJSONStore
st) (\Connection
conn ->
        Connection -> IO () -> IO ()
forall a. Connection -> IO a -> IO a
withTransactionSerializable Connection
conn (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            UTCTime
now   <- IO UTCTime
getCurrentTime
            IO Int64 -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int64 -> IO ()) -> IO Int64 -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Query -> (Identifier, Text, UTCTime, Identifier) -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PGS.execute Connection
conn Query
q (Text -> Identifier
Identifier Text
tbl, k -> Text
forall k. FSMKey k => k -> Text
toText k
k, UTCTime
now, Text -> Identifier
Identifier Text
tbl))

_walIncrement :: Query
_walIncrement :: Query
_walIncrement = Query
"INSERT INTO ? VALUES (?,?,1) ON CONFLICT (id) DO UPDATE SET count = ?.count + 1, date = EXCLUDED.date"

_walDecrement :: Query
_walDecrement :: Query
_walDecrement = Query
"INSERT INTO ? VALUES (?,?,0) ON CONFLICT (id) DO UPDATE SET count = ?.count - 1"


-- |Returns a list of all transactions that were not successfully terminated
-- and are older than `cutoff`.
walScan :: (FSMKey k) => PostgresJSONStore -> Int -> IO [WALEntry k]
walScan :: PostgresJSONStore -> Int -> IO [WALEntry k]
walScan PostgresJSONStore
st Int
cutoff = do
    UTCTime
t <- IO UTCTime
getCurrentTime
    let xx :: UTCTime
xx = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a
negate (Integer -> NominalDiffTime
forall a. Num a => Integer -> a
fromInteger (Int -> Integer
forall a. Integral a => a -> Integer
toInteger Int
cutoff) :: NominalDiffTime)) UTCTime
t

    Pool Connection
-> (Connection -> IO [WALEntry k]) -> IO [WALEntry k]
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource (PostgresJSONStore -> Pool Connection
storeConnPool PostgresJSONStore
st) (\Connection
c ->
        Connection -> IO [WALEntry k] -> IO [WALEntry k]
forall a. Connection -> IO a -> IO a
withTransactionSerializable Connection
c (IO [WALEntry k] -> IO [WALEntry k])
-> IO [WALEntry k] -> IO [WALEntry k]
forall a b. (a -> b) -> a -> b
$
            Connection -> Query -> (Identifier, UTCTime) -> IO [WALEntry k]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
PGS.query Connection
c Query
"SELECT * FROM ? WHERE date < ? AND count > 0" (Text -> Identifier
Identifier (Text -> Identifier) -> Text -> Identifier
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
append (PostgresJSONStore -> Text
storeName PostgresJSONStore
st) Text
"Wal", UTCTime
xx))

-- |Creates a postgresql store
mkStore :: String -> Text -> IO PostgresJSONStore
mkStore :: String -> Text -> IO PostgresJSONStore
mkStore String
connStr Text
name =
    let
        connBS :: ByteString
connBS = String -> ByteString
DBSC8.pack String
connStr
    in do
        Pool Connection
pool <- IO Connection -> IO (Pool Connection)
givePool (ByteString -> IO Connection
PGS.connectPostgreSQL ByteString
connBS)
        Int64
_    <- Pool Connection -> (Connection -> IO Int64) -> IO Int64
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO Int64) -> IO Int64)
-> (Connection -> IO Int64) -> IO Int64
forall a b. (a -> b) -> a -> b
$ (Connection -> Text -> IO Int64) -> Text -> Connection -> IO Int64
forall a b c. (a -> b -> c) -> b -> a -> c
flip Connection -> Text -> IO Int64
_createFsmTable Text
name
        Int64
_    <- Pool Connection -> (Connection -> IO Int64) -> IO Int64
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO Int64) -> IO Int64)
-> (Connection -> IO Int64) -> IO Int64
forall a b. (a -> b) -> a -> b
$ (Connection -> Text -> IO Int64) -> Text -> Connection -> IO Int64
forall a b c. (a -> b -> c) -> b -> a -> c
flip Connection -> Text -> IO Int64
_createWalTable (Text -> Text -> Text
append Text
name Text
"Wal")
        PostgresJSONStore -> IO PostgresJSONStore
forall (m :: * -> *) a. Monad m => a -> m a
return (PostgresJSONStore -> IO PostgresJSONStore)
-> PostgresJSONStore -> IO PostgresJSONStore
forall a b. (a -> b) -> a -> b
$ Pool Connection -> Text -> PostgresJSONStore
PostgresJSONStore Pool Connection
pool Text
name

_createFsmTable :: Connection -> Text -> IO Int64
_createFsmTable :: Connection -> Text -> IO Int64
_createFsmTable Connection
conn Text
name =
    Connection -> Query -> Only Identifier -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PGS.execute Connection
conn Query
"CREATE TABLE IF NOT EXISTS ? ( id text PRIMARY KEY, data jsonb NOT NULL)" (Identifier -> Only Identifier
forall a. a -> Only a
Only (Text -> Identifier
Identifier Text
name))

-- SELECT .. FOR UPDATE locks the rows matching the query. Concurrent
-- (repeatable read and serializable) transactions will block and
-- abort once the new value has been inserted. Since we run effects
-- between SELECT and INSERT, this is what we want.
-- Concurrent SELECTS (without FOR UPDATE) will be unaffected.
_getValue :: (FromRow v) => Connection -> Text -> Text -> IO [v]
_getValue :: Connection -> Text -> Text -> IO [v]
_getValue Connection
c Text
tbl Text
k =
    Connection -> Query -> (Identifier, Text) -> IO [v]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
PGS.query Connection
c Query
"SELECT * FROM ? WHERE id = ?" (Text -> Identifier
Identifier Text
tbl, Text
k)

_getValueForUpdate :: (FromRow v) => Connection -> Text -> Text -> IO [v]
_getValueForUpdate :: Connection -> Text -> Text -> IO [v]
_getValueForUpdate Connection
c Text
tbl Text
k =
    Connection -> Query -> (Identifier, Text) -> IO [v]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
PGS.query Connection
c Query
"SELECT * FROM ? WHERE id = ? FOR UPDATE" (Text -> Identifier
Identifier Text
tbl, Text
k)

_postOrUpdateValue :: (ToField v) => Connection -> Text -> Text -> v -> IO Int64
_postOrUpdateValue :: Connection -> Text -> Text -> v -> IO Int64
_postOrUpdateValue Connection
c Text
tbl Text
k v
v =
    Connection -> Query -> (Identifier, Text, v) -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PGS.execute Connection
c Query
"INSERT INTO ? VALUES (?,?) ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data" (Text -> Identifier
Identifier Text
tbl, Text
k, v
v)

_postValue :: (ToField v) => Connection -> Text -> Text -> v -> IO Int64
_postValue :: Connection -> Text -> Text -> v -> IO Int64
_postValue Connection
c Text
tbl Text
k v
v =
    Connection -> Query -> (Identifier, Text, v) -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PGS.execute Connection
c Query
"INSERT INTO ? VALUES (?,?)" (Text -> Identifier
Identifier Text
tbl, Text
k, v
v)

_deleteValue :: (ToField k) => Connection -> Text -> k -> IO Int64
_deleteValue :: Connection -> Text -> k -> IO Int64
_deleteValue Connection
c Text
tbl k
k =
    Connection -> Query -> (Identifier, k) -> IO Int64
forall q. ToRow q => Connection -> Query -> q -> IO Int64
PGS.execute Connection
c Query
"DELETE FROM ? WHERE id = ?" (Text -> Identifier
Identifier Text
tbl, k
k)

_queryValue :: (FromRow v) => Connection -> Text -> Text -> IO [v]
_queryValue :: Connection -> Text -> Text -> IO [v]
_queryValue Connection
c Text
tbl Text
q =
    Connection -> Query -> (Identifier, Text) -> IO [v]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
PGS.query Connection
c Query
"SELECT * FROM ? WHERE data @> ?" (Text -> Identifier
Identifier Text
tbl, Text
q)

_getKeys :: forall k . (FSMKey k) => PostgresJSONStore -> Text -> IO [k]
_getKeys :: PostgresJSONStore -> Text -> IO [k]
_getKeys PostgresJSONStore
st Text
tbl =
    Pool Connection -> (Connection -> IO [k]) -> IO [k]
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource (PostgresJSONStore -> Pool Connection
storeConnPool PostgresJSONStore
st) (\Connection
conn -> do
       [Only Text]
keys <- Connection -> Query -> Only Identifier -> IO [Only Text]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
PGS.query Connection
conn Query
"SELECT id FROM ?" (Identifier -> Only Identifier
forall a. a -> Only a
Only (Text -> Identifier
Identifier Text
tbl)) :: IO [Only Text]

       [k] -> IO [k]
forall (m :: * -> *) a. Monad m => a -> m a
return  ((Only Text -> k) -> [Only Text] -> [k]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(Only Text
t) -> Text -> k
forall k. FSMKey k => Text -> k
fromText Text
t) [Only Text]
keys :: [k]))

-- | You can call this function when you changed the representation of your
-- MealyMachine. It will read all instances through FromJSON and write them
-- back using ToJSON.
_batchConversion :: forall k s e a .
                   (FromJSON s, FromJSON e, FromJSON a,
                    ToJSON   s, ToJSON   e, ToJSON   a,
                    Typeable s, Typeable e, Typeable a, MealyInstance k s e a)
                 => PostgresJSONStore
                 -> Text
                 -> Proxy k s e a
                 -> IO ()
_batchConversion :: PostgresJSONStore -> Text -> Proxy k s e a -> IO ()
_batchConversion PostgresJSONStore
st Text
tbl Proxy k s e a
_p = do
    [k]
keys <- PostgresJSONStore -> Text -> IO [k]
forall k. FSMKey k => PostgresJSONStore -> Text -> IO [k]
_getKeys PostgresJSONStore
st Text
tbl :: IO [k]
    (k -> IO MealyStatus) -> [k] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\k
k -> PostgresJSONStore
-> k -> MachineTransformer s e a -> IO MealyStatus
forall k s e a.
(FromJSON s, FromJSON e, FromJSON a, ToJSON s, ToJSON e, ToJSON a,
 Typeable s, Typeable e, Typeable a, MealyInstance k s e a) =>
PostgresJSONStore
-> k -> MachineTransformer s e a -> IO MealyStatus
_fsmUpdate PostgresJSONStore
st k
k (MachineTransformer s e a
forall (m :: * -> *) a. Monad m => a -> m a
return :: MachineTransformer s e a)) [k]
keys


-- |Instance to convert one DB row to an instance of Instance ;)
-- users of this module must provide instances for ToJSON, FromJSON for `s`, `e` and `a`.
instance (ToJSON s,   ToJSON e,   ToJSON a)   => ToJSON (Machine s e a)
instance (FromJSON s, FromJSON e, FromJSON a) => FromJSON (Machine s e a)

instance (ToJSON e)   => ToJSON (Msg e)
instance (FromJSON e) => FromJSON (Msg e)

instance (Typeable s, Typeable e, Typeable a,
          FromJSON s, FromJSON e, FromJSON a, FSMKey k) => FromRow (Instance k s e a) where
    fromRow :: RowParser (Instance k s e a)
fromRow = k -> Machine s e a -> Instance k s e a
forall k s e a. k -> Machine s e a -> Instance k s e a
Instance (k -> Machine s e a -> Instance k s e a)
-> RowParser k -> RowParser (Machine s e a -> Instance k s e a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser k
forall a. FromField a => RowParser a
field RowParser (Machine s e a -> Instance k s e a)
-> RowParser (Machine s e a) -> RowParser (Instance k s e a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RowParser (Machine s e a)
forall a. FromField a => RowParser a
field

instance (Typeable s, Typeable e, Typeable a,
          FromJSON s, FromJSON e, FromJSON a) => FromField (Machine s e a) where
    fromField :: FieldParser (Machine s e a)
fromField = FieldParser (Machine s e a)
forall a. (FromJSON a, Typeable a) => FieldParser a
fromJSONField

instance (Typeable s, Typeable e, Typeable a,
          ToJSON s, ToJSON e, ToJSON a)       => ToField (Machine s e a) where
    toField :: Machine s e a -> Action
toField = Machine s e a -> Action
forall a. ToJSON a => a -> Action
toJSONField

instance {-# OVERLAPS #-} (FSMKey k) => ToField k where
    toField :: k -> Action
toField k
k = Text -> Action
forall a. ToField a => a -> Action
toField (k -> Text
forall k. FSMKey k => k -> Text
toText k
k)

instance {-# OVERLAPS #-} (FSMKey k) => FromField k where
    fromField :: FieldParser k
fromField Field
f Maybe ByteString
mdata = (Text -> k) -> Conversion Text -> Conversion k
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> k
forall k. FSMKey k => Text -> k
fromText (FieldParser Text
forall a. FromField a => FieldParser a
fromField Field
f Maybe ByteString
mdata :: Conversion Text)

instance (FSMKey k) => FromRow (WALEntry k) where
    fromRow :: RowParser (WALEntry k)
fromRow = k -> UTCTime -> Int -> WALEntry k
forall k. k -> UTCTime -> Int -> WALEntry k
WALEntry (k -> UTCTime -> Int -> WALEntry k)
-> RowParser k -> RowParser (UTCTime -> Int -> WALEntry k)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser k
forall a. FromField a => RowParser a
field RowParser (UTCTime -> Int -> WALEntry k)
-> RowParser UTCTime -> RowParser (Int -> WALEntry k)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RowParser UTCTime
forall a. FromField a => RowParser a
field RowParser (Int -> WALEntry k)
-> RowParser Int -> RowParser (WALEntry k)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RowParser Int
forall a. FromField a => RowParser a
field

deriving instance (FSMKey k) => Generic  (WALEntry k)
deriving instance (FSMKey k) => Typeable (WALEntry k)