{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE UndecidableInstances #-} {-| Module : Mealstrom.PostgresJSONStore Description : Main backend for FSMs and WALs. Copyright : (c) Max Amanshauser, 2016 License : MIT Maintainer : max@lambdalifting.org This module is the main backend for FSMs. Instances are stored in a table with the name passed as storeName when creating the PostgresJSONStore. WALs use the same name with "Wal" appended. -} module Mealstrom.PostgresJSONStore( PostgresJSONStore, mkStore, _fsmRead, _fsmCreate, _fsmUpdate, _batchConversion ) where import Control.Exception (handle,SomeException) import Control.Monad (void) import Database.PostgreSQL.Simple as PGS import Database.PostgreSQL.Simple.FromRow import Database.PostgreSQL.Simple.ToField import Database.PostgreSQL.Simple.Transaction import Database.PostgreSQL.Simple.Types import Data.Aeson import qualified Data.ByteString.Char8 as DBSC8 import Data.Int (Int64) import Data.Maybe (listToMaybe) import Data.Pool import Data.Text import Data.Time import Data.Typeable hiding (Proxy) import GHC.Generics import Database.PostgreSQL.Simple.FromField (FromField (fromField), fromJSONField, Conversion) import Mealstrom.FSM import Mealstrom.FSMStore import Mealstrom.WALStore data PostgresJSONStore = PostgresJSONStore { storeConnPool :: Pool Connection, storeName :: Text } instance (FromJSON s, FromJSON e, FromJSON a, ToJSON s, ToJSON e, ToJSON a, Typeable s, Typeable e, Typeable a, MealyInstance k s e a) => FSMStore PostgresJSONStore k s e a where fsmRead st k p = Mealstrom.PostgresJSONStore._fsmRead st k p >>= \mi -> return $ fmap (currState . machine) mi fsmCreate = Mealstrom.PostgresJSONStore._fsmCreate fsmUpdate = Mealstrom.PostgresJSONStore._fsmUpdate instance (FSMKey k) => WALStore PostgresJSONStore k where walUpsertIncrement = Mealstrom.PostgresJSONStore.walUpsertIncrement walDecrement = Mealstrom.PostgresJSONStore.walDecrement walScan = Mealstrom.PostgresJSONStore.walScan -- |We create a database pool (no subpools) of 20 connections that will be closed -- after 10 seconds of inactivity. givePool :: IO Connection -> IO (Pool Connection) givePool creator = createPool creator close 1 10 20 -- ######### -- # FSM API -- ######### _fsmRead :: (FromJSON s, FromJSON e, FromJSON a, Typeable s, Typeable e, Typeable a, MealyInstance k s e a) => PostgresJSONStore -> k -> Proxy k s e a -> IO (Maybe (Instance k s e a)) _fsmRead st k _p = withResource (storeConnPool st) (\conn -> withTransactionSerializable conn $ do el <- _getValue conn (storeName st) (toText k) return $ listToMaybe el) _fsmCreate :: forall k s e a . (ToJSON s, ToJSON e, ToJSON a, Typeable s, Typeable e, Typeable a, MealyInstance k s e a) => PostgresJSONStore -> Instance k s e a -> IO (Maybe String) _fsmCreate st i = handle (\(e::SomeException) -> return $ Just (show e)) (withResource (storeConnPool st) (\conn -> withTransactionSerializable conn $ do void $ _postValue conn (storeName st) (toText $ key i) (machine i) return Nothing)) -- |Postgresql-simple exceptions will be caught by `patch` in FSMApi.hs -- In principle all transaction isolation levels offered by Postgres are safe -- here, because we do explicit locking in _getValueForUpdate. -- However things become more interesting when considering that you can do -- arbitrary queries in effects, either using the functions in this -- module or otherwise. -- We use Serializable here, because it involves no extra cost in our case, and -- it provides safety when used in arbitrary ways in effects. -- Hence, -- * Serializable is recommended and safe. -- * Repeatable Read, or in PostgreSQL's case Snapshot Isolation, does *not* protect -- against write skew, which means that if two Actions perform reads and based -- on the result update data, one of the two updates may be lost. -- * Read Committed means the usual caveats apply (Nonrepeatable reads, Phantom reads, Write skew…). -- -- If you are not careful you may end up with wrong data or attempts to insert data -- with a duplicate ID… -- Hence, when in doubt, do not lower the isolation level. _fsmUpdate :: forall k s e a . (FromJSON s, FromJSON e, FromJSON a, ToJSON s, ToJSON e, ToJSON a, Typeable s, Typeable e, Typeable a, MealyInstance k s e a) => PostgresJSONStore -> k -> MachineTransformer s e a -> IO MealyStatus _fsmUpdate st k t = withResource (storeConnPool st) (\conn -> withTransactionSerializable conn $ do el <- _getValueForUpdate conn (storeName st) (toText k) :: IO [Instance k s e a] let entry = listToMaybe el maybe (return MealyError) (\e -> do newMachine <- t (machine e) void (_postOrUpdateValue conn (storeName st) (toText k) newMachine) return $ if Prelude.null (outbox newMachine) then Done else Pending) entry) -- ##### -- # WAL -- ##### _createWalTable :: Connection -> Text -> IO Int64 _createWalTable conn name = PGS.execute conn "CREATE TABLE IF NOT EXISTS ? ( id TEXT PRIMARY KEY, date timestamptz NOT NULL, count int NOT NULL )" (Only (Identifier name)) -- |Updates a WALEntry if it exists, inserts a new WALEntry if is is missing. walUpsertIncrement :: (FSMKey k) => PostgresJSONStore -> k -> IO () walUpsertIncrement st i = _walExecute st i _walIncrement walDecrement :: (FSMKey k) => PostgresJSONStore -> k -> IO () walDecrement st i = _walExecute st i _walDecrement _walExecute :: (FSMKey k) => PostgresJSONStore -> k -> Query -> IO () _walExecute st k q = let tbl = append (storeName st) "Wal" in withResource (storeConnPool st) (\conn -> withTransactionSerializable conn $ do now <- getCurrentTime void $ PGS.execute conn q (Identifier tbl, toText k, now, Identifier tbl)) _walIncrement :: Query _walIncrement = "INSERT INTO ? VALUES (?,?,1) ON CONFLICT (id) DO UPDATE SET count = ?.count + 1, date = EXCLUDED.date" _walDecrement :: Query _walDecrement = "INSERT INTO ? VALUES (?,?,0) ON CONFLICT (id) DO UPDATE SET count = ?.count - 1" -- |Returns a list of all transactions that were not successfully terminated -- and are older than `cutoff`. walScan :: (FSMKey k) => PostgresJSONStore -> Int -> IO [WALEntry k] walScan st cutoff = do t <- getCurrentTime let xx = addUTCTime (negate (fromInteger (toInteger cutoff) :: NominalDiffTime)) t withResource (storeConnPool st) (\c -> withTransactionSerializable c $ PGS.query c "SELECT * FROM ? WHERE date < ? AND count > 0" (Identifier $ append (storeName st) "Wal", xx)) -- |Creates a postgresql store mkStore :: String -> Text -> IO PostgresJSONStore mkStore connStr name = let connBS = DBSC8.pack connStr in do pool <- givePool (PGS.connectPostgreSQL connBS) _ <- withResource pool $ flip _createFsmTable name _ <- withResource pool $ flip _createWalTable (append name "Wal") return $ PostgresJSONStore pool name _createFsmTable :: Connection -> Text -> IO Int64 _createFsmTable conn name = PGS.execute conn "CREATE TABLE IF NOT EXISTS ? ( id text PRIMARY KEY, data jsonb NOT NULL)" (Only (Identifier name)) -- SELECT .. FOR UPDATE locks the rows matching the query. Concurrent -- (repeatable read and serializable) transactions will block and -- abort once the new value has been inserted. Since we run effects -- between SELECT and INSERT, this is what we want. -- Concurrent SELECTS (without FOR UPDATE) will be unaffected. _getValue :: (FromRow v) => Connection -> Text -> Text -> IO [v] _getValue c tbl k = PGS.query c "SELECT * FROM ? WHERE id = ?" (Identifier tbl, k) _getValueForUpdate :: (FromRow v) => Connection -> Text -> Text -> IO [v] _getValueForUpdate c tbl k = PGS.query c "SELECT * FROM ? WHERE id = ? FOR UPDATE" (Identifier tbl, k) _postOrUpdateValue :: (ToField v) => Connection -> Text -> Text -> v -> IO Int64 _postOrUpdateValue c tbl k v = PGS.execute c "INSERT INTO ? VALUES (?,?) ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data" (Identifier tbl, k, v) _postValue :: (ToField v) => Connection -> Text -> Text -> v -> IO Int64 _postValue c tbl k v = PGS.execute c "INSERT INTO ? VALUES (?,?)" (Identifier tbl, k, v) _deleteValue :: (ToField k) => Connection -> Text -> k -> IO Int64 _deleteValue c tbl k = PGS.execute c "DELETE FROM ? WHERE id = ?" (Identifier tbl, k) _queryValue :: (FromRow v) => Connection -> Text -> Text -> IO [v] _queryValue c tbl q = PGS.query c "SELECT * FROM ? WHERE data @> ?" (Identifier tbl, q) _getKeys :: forall k . (FSMKey k) => PostgresJSONStore -> Text -> IO [k] _getKeys st tbl = withResource (storeConnPool st) (\conn -> do keys <- PGS.query conn "SELECT id FROM ?" (Only (Identifier tbl)) :: IO [Only Text] return (fmap (\(Only t) -> fromText t) keys :: [k])) -- | You can call this function when you changed the representation of your -- MealyMachine. It will read all instances through FromJSON and write them -- back using ToJSON. _batchConversion :: forall k s e a . (FromJSON s, FromJSON e, FromJSON a, ToJSON s, ToJSON e, ToJSON a, Typeable s, Typeable e, Typeable a, MealyInstance k s e a) => PostgresJSONStore -> Text -> Proxy k s e a -> IO () _batchConversion st tbl _p = do keys <- _getKeys st tbl :: IO [k] mapM_ (\k -> _fsmUpdate st k (return :: MachineTransformer s e a)) keys -- |Instance to convert one DB row to an instance of Instance ;) -- users of this module must provide instances for ToJSON, FromJSON for `s`, `e` and `a`. instance (ToJSON s, ToJSON e, ToJSON a) => ToJSON (Machine s e a) instance (FromJSON s, FromJSON e, FromJSON a) => FromJSON (Machine s e a) instance (ToJSON e) => ToJSON (Msg e) instance (FromJSON e) => FromJSON (Msg e) instance (Typeable s, Typeable e, Typeable a, FromJSON s, FromJSON e, FromJSON a, FSMKey k) => FromRow (Instance k s e a) where fromRow = Instance <$> field <*> field instance (Typeable s, Typeable e, Typeable a, FromJSON s, FromJSON e, FromJSON a) => FromField (Machine s e a) where fromField = fromJSONField instance (Typeable s, Typeable e, Typeable a, ToJSON s, ToJSON e, ToJSON a) => ToField (Machine s e a) where toField = toJSONField instance {-# OVERLAPS #-} (FSMKey k) => ToField k where toField k = toField (toText k) instance {-# OVERLAPS #-} (FSMKey k) => FromField k where fromField f mdata = fmap fromText (fromField f mdata :: Conversion Text) instance (FSMKey k) => FromRow (WALEntry k) where fromRow = WALEntry <$> field <*> field <*> field deriving instance (FSMKey k) => Generic (WALEntry k) deriving instance (FSMKey k) => Typeable (WALEntry k)