{-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE StandaloneDeriving #-} module Network.Nakadi.Tests.Common where import ClassyPrelude import Control.Exception.Safe ( MonadThrow , throwM ) import Control.Lens import Control.Monad.Logger import Data.Aeson import Data.List.Split ( chunksOf ) import qualified Data.Text as Text import Data.UUID ( UUID ) import Network.Nakadi import qualified Network.Nakadi.Lenses as L import System.Random import UnliftIO.Concurrent data TerminateConsumption = TerminateConsumption deriving (Eq, Show, Typeable) instance Exception TerminateConsumption type App = LoggingT (ReaderT () IO) runApp :: App a -> IO a runApp = flip runReaderT () . runStdoutLoggingT data Foo = Foo { fortune :: Text } deriving (Show, Eq, Generic) deriving instance FromJSON Foo deriving instance ToJSON Foo data WrongFoo = WrongFoo { fortune :: Int } deriving (Show, Eq, Generic) deriving instance FromJSON WrongFoo deriving instance ToJSON WrongFoo myEventTypeName :: EventTypeName myEventTypeName = "test.FOO" myEventTypeSchema :: EventTypeSchema myEventTypeSchema = EventTypeSchema { _version = Just "0.1" , _createdAt = Nothing , _schemaType = SchemaTypeJson , _schema = "{ \"properties\": {\"fortune\": {\"type\": \"string\"} }, \"required\": [\"fortune\"] }" } myEventType :: EventType myEventType = EventType { _name = myEventTypeName , _owningApplication = Just "test-suite" , _category = Just EventTypeCategoryData , _enrichmentStrategies = Just [EnrichmentStrategyMetadata] , _partitionStrategy = Just "hash" , _compatibilityMode = Just CompatibilityModeForward , _partitionKeyFields = Just ["fortune"] , _cleanupPolicy = Just CleanupPolicyDelete , _schema = myEventTypeSchema , _defaultStatistic = Just EventTypeStatistics { _messagesPerMinute = 1000 , _messageSize = 200 , _readParallelism = 8 , _writeParallelism = 8 } , _options = Nothing } ignoreExnNotFound :: MonadThrow m => a -> NakadiException -> m a ignoreExnNotFound a (EventTypeNotFound _) = return a ignoreExnNotFound _ exn = throwM exn extractCursor :: Partition -> Cursor extractCursor Partition {..} = Cursor {_partition = _partition, _offset = _newestAvailableOffset} myDataChangeEvent :: EventId -> UTCTime -> DataChangeEvent Foo myDataChangeEvent eid now = DataChangeEvent { _payload = Foo "Hello!" , _metadata = EventMetadata { _eid = eid , _occurredAt = Timestamp now , _parentEids = Nothing , _partition = Nothing , _partitionCompactionKey = Nothing } , _dataType = "test.FOO" , _dataOp = DataOpUpdate } genMyDataChangeEvent :: MonadIO m => m (DataChangeEvent Foo) genMyDataChangeEvent = do eid <- genRandomUUID now <- liftIO getCurrentTime pure DataChangeEvent { _payload = Foo "Hello!" , _metadata = EventMetadata { _eid = EventId eid , _occurredAt = Timestamp now , _parentEids = Nothing , _partition = Nothing , _partitionCompactionKey = Nothing } , _dataType = "test.FOO" , _dataOp = DataOpUpdate } genMyDataChangeEventIdx :: MonadIO m => Int -> m (DataChangeEvent Foo) genMyDataChangeEventIdx idx = do eid <- genRandomUUID now <- liftIO getCurrentTime pure DataChangeEvent { _payload = Foo ("Hello " ++ Text.pack (show idx)) , _metadata = EventMetadata { _eid = EventId eid , _occurredAt = Timestamp now , _parentEids = Nothing , _partition = Nothing , _partitionCompactionKey = Nothing } , _dataType = "test.FOO" , _dataOp = DataOpUpdate } genRandomUUID :: MonadIO m => m UUID genRandomUUID = liftIO randomIO recreateEvent :: (MonadUnliftIO m, MonadNakadi b m) => EventType -> m () recreateEvent eventType = do let eventTypeName = eventType ^. L.name subscriptionIds <- subscriptionsList Nothing (Just [eventTypeName]) <&> map (view L.id) mapM_ subscriptionDelete subscriptionIds eventTypeDelete eventTypeName `catch` ignoreExnNotFound () eventTypeCreate eventType delayedPublish :: (MonadNakadi b m, MonadIO m, ToJSON a) => Maybe FlowId -> [a] -> m () delayedPublish maybeFlowId events = do liftIO $ threadDelay (10 ^ 6) let flowId = fromMaybe (FlowId "shalom") maybeFlowId config <- nakadiAsk <&> setFlowId flowId -- Publish events in batches. runNakadiT config $ forM_ (chunksOf 100 events) (eventsPublish myEventTypeName)