{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} module Network.Nakadi.Examples.Echo.Test (testEcho) where import ClassyPrelude import Conduit import Control.Concurrent.Async.Lifted (wait) import Control.Lens import qualified Data.Vector as Vector import qualified Network.Nakadi as Nakadi import Network.Nakadi.Examples.Echo.Echo import qualified Network.Nakadi.Lenses as L import Network.Nakadi.Tests.Common import Test.Tasty.HUnit -- Example program which consumes the events for the event type -- "test-event" and republishes them unchanged under the event type -- "test-event-copy". genEvent :: MonadIO m => m (Nakadi.DataChangeEvent Foo) genEvent = do now <- liftIO getCurrentTime eid <- Nakadi.EventId <$> genRandomUUID let event = Nakadi.DataChangeEvent { Nakadi._payload = Foo "Hello!" , Nakadi._metadata = Nakadi.EventMetadata { Nakadi._eid = eid , Nakadi._occurredAt = Nakadi.Timestamp now , Nakadi._parentEids = Nothing , Nakadi._partition = Nothing } , Nakadi._dataType = "test.FOO" , Nakadi._dataOp = Nakadi.DataOpUpdate } pure event genEvents :: MonadIO m => m (Vector (Nakadi.DataChangeEvent Foo)) genEvents = Vector.fromList <$> sequence (replicate 10 genEvent) publishEvents :: Nakadi.MonadNakadi IO m => Vector (Nakadi.DataChangeEvent Foo) -> Nakadi.EventTypeName -> m () publishEvents events eventName = do Nakadi.eventsPublish eventName (Vector.toList events) consumerMain :: ( Nakadi.MonadNakadi b m , MonadIO m , MonadBaseControl IO m , MonadMask m) => Nakadi.EventTypeName -> Int -> m (Vector Foo) consumerMain eventName maxSize = runResourceT $ do let consumeParameters = Nakadi.defaultConsumeParameters & L.batchFlushTimeout .~ Just 1 & L.streamLimit .~ Just (fromIntegral maxSize) Nakadi.eventsProcessConduit (Just consumeParameters) eventName Nothing $ concatMapC (view L.events) .| concatC .| mapC (id :: Nakadi.DataChangeEventEnriched Foo -> Nakadi.DataChangeEventEnriched Foo) .| mapC (view L.payload) .| sinkVector testEcho :: Nakadi.Config IO -> Assertion testEcho config = Nakadi.runNakadiT config $ do recreateEvent myEventTypeName myEventType recreateEvent myEventTypeNameCopy myEventTypeCopy events <- genEvents let eventsPayloads = map (view L.payload) events withAsync (runEcho myEventTypeName myEventTypeNameCopy) $ \ _echoHandle -> withAsync (consumerMain myEventTypeNameCopy (length events)) $ \ consumerHandle -> do threadDelay (10^6) -- Give it some time to connect publishEvents events myEventTypeName eventsConsumed <- wait consumerHandle liftIO $ eventsPayloads @=? eventsConsumed where myEventTypeNameCopy = Nakadi.EventTypeName "test.FOO-copy" myEventTypeCopy = myEventType & L.name .~ myEventTypeNameCopy