{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE OverloadedStrings #-} module SuperEvent.Store.HasqlSpec (spec) where import SuperEvent.Store.Hasql import SuperEvent.Store.Types import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Trans import Data.Aeson import Data.Conduit import Test.Hspec import qualified Data.Text as T import qualified Data.UUID.V4 as UUID import qualified Data.Vector as V spec :: Spec spec = around withTempStore $ do it "should work for simple reads" simpleWriteRead it "should work for simple stream reads" simpleWriteReadStream it "should work for simple global reads" simpleWriteReadGlobal it "should work for simple subscriptions" simpleWriteSubStream simpleWriteRead :: (EventStoreReader IO es, EventStoreWriter IO es) => es -> IO () simpleWriteRead store = do let payload :: T.Text payload = "Hello" entry guid = EventData guid (EventType "foo") (toJSON payload) (toJSON ()) stream = StreamId "text" guid <- UUID.nextRandom writeRes <- writeToStream store stream EvAny (V.singleton $ entry guid) writeRes `shouldBe` WrSuccess evt <- readEvent store stream (nextEventNumber firstEventNumber) case evt of ErrFailed -> expectationFailure "Read failed" ErrValue re -> re_guid re `shouldBe` guid simpleWriteReadStream :: (EventStoreReader IO es, EventStoreWriter IO es) => es -> IO () simpleWriteReadStream store = do let payload :: T.Text payload = "Hello" entry guid = EventData guid (EventType "foo") (toJSON payload) (toJSON ()) stream = StreamId "text" events <- V.forM (V.fromList [0..99]) $ \(_ :: Int) -> do guid <- UUID.nextRandom pure (entry guid) writeRes <- writeToStream store stream EvAny events writeRes `shouldBe` WrSuccess let writtenGuids = V.map ed_guid events readHelper evtNum lim = V.map re_guid <$> readStreamEvents store stream evtNum lim RdForward evts <- readHelper firstEventNumber 10 evts `shouldBe` V.take 10 writtenGuids evts2 <- readHelper (incrementTimes 11 firstEventNumber) 100 evts2 `shouldBe` V.take 90 (V.drop 10 writtenGuids) simpleWriteReadGlobal :: (EventStoreReader IO es, EventStoreWriter IO es) => es -> IO () simpleWriteReadGlobal store = do let payload :: T.Text payload = "Hello" entry guid = EventData guid (EventType "foo") (toJSON payload) (toJSON ()) stream = StreamId "text" events <- V.forM (V.fromList [0..99]) $ \(_ :: Int) -> do guid <- UUID.nextRandom pure (entry guid) writeRes <- writeToStream store stream EvAny events writeRes `shouldBe` WrSuccess let writtenGuids = V.map ed_guid events readHelper pos lim = V.map (re_guid . snd) <$> readAllEvents store pos lim RdForward evts <- readHelper (GlobalPosition 0) 10 evts `shouldBe` V.take 10 writtenGuids evts2 <- readHelper (GlobalPosition 11) 100 evts2 `shouldBe` V.take 90 (V.drop 10 writtenGuids) simpleWriteSubStream :: (EventStoreSubscriber IO es, EventStoreReader IO es, EventStoreWriter IO es) => es -> IO () simpleWriteSubStream store = do let payload :: T.Text payload = "Hello" entry guid = EventData guid (EventType "foo") (toJSON payload) (toJSON ()) stream = StreamId "text" outVar <- newTVarIO [] poller <- async $ do let consumer = do val <- await case val of Nothing -> do liftIO $ putStrLn "Consumer was terminated" pure () Just v -> do liftIO $ atomically $ modifyTVar' outVar ((:) v) consumer subscribeTo store (SubscriptionConfig SspBeginning stream) $$ consumer events <- V.forM (V.fromList [0..9999]) $ \(_ :: Int) -> do guid <- UUID.nextRandom pure (entry guid) writeRes <- writeToStream store stream EvAny events writeRes `shouldBe` WrSuccess let writtenGuids = V.map ed_guid events finalResult <- atomically $ do vals <- readTVar outVar when (length vals < 9999) retry pure (V.map re_guid $ V.reverse $ V.fromList vals) uninterruptibleCancel poller finalResult `shouldBe` writtenGuids