module Aws.Kinesis.Client.Producer
(
KinesisProducer
, withKinesisProducer
, managedKinesisProducer
, writeProducer
, Message
, module Aws.Kinesis.Client.Producer.Kit
, WriteProducerException(..)
, ProducerCleanupTimedOut(..)
, ProducerWorkerDied(..)
) where
import qualified Aws.Kinesis as Kin
import Aws.Kinesis.Client.Common
import Aws.Kinesis.Client.Producer.Kit
import Aws.Kinesis.Client.Producer.Internal
import Aws.Kinesis.Client.Internal.Queue
import Aws.Kinesis.Client.Internal.Queue.Chunk
import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Exception.Enclosed
import Control.Exception.Lifted
import Control.Lens
import Control.Monad
import Control.Monad.Codensity
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Trans.Except
import Data.Conduit
import Data.Maybe
import Data.Monoid.Unicode
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Traversable
import Data.Typeable
import Numeric.Natural
import Prelude.Unicode
import qualified System.Random as R
import System.IO
data KinesisProducer
= ∀ q. BoundedCloseableQueue q MessageQueueItem
⇒ KinesisProducer
{ _kpMessageQueue ∷ !q
, _kpRetryPolicy ∷ !RetryPolicy
}
data WriteProducerException
= ProducerQueueClosed
| ProducerQueueFull
| MessageTooLarge
deriving (Typeable, Show, Eq)
instance Exception WriteProducerException
data ProducerCleanupTimedOut
= ProducerCleanupTimedOut
deriving (Typeable, Show, Eq)
instance Exception ProducerCleanupTimedOut
data ProducerWorkerDied
= ProducerWorkerDied (Maybe SomeException)
deriving (Typeable, Show)
instance Exception ProducerWorkerDied
generatePartitionKey
∷ R.RandomGen g
⇒ g
→ Kin.PartitionKey
generatePartitionKey gen =
let name = take 25 $ R.randomRs ('a','z') gen in
Kin.partitionKey (T.pack name)
& either (error ∘ T.unpack) id
splitEvery
∷ Natural
→ [α]
→ [[α]]
splitEvery _ [] = []
splitEvery n list = first : splitEvery n rest
where
(first,rest) = splitAt (fromIntegral n) list
putRecordsSink
∷ ProducerKit
→ Sink [MessageQueueItem] IO ()
putRecordsSink ProducerKit{..} = do
let batchSize = _pkBatchPolicy ^. bpBatchSize
awaitForever $ \messages → do
let batches = splitEvery batchSize messages
leftovers ← lift ∘ flip (mapConcurrentlyN _pkMaxConcurrency 100) batches $ \items → do
case filter messageQueueItemIsEligible items of
[] → return []
eligibleItems → do
handleAny (\(SomeException e) → eligibleItems <$ debugPrint stderr (show e)) $ do
requestEntries ← for eligibleItems $ \m → do
let partitionKey = m ^. mqiPartitionKey
return Kin.PutRecordsRequestEntry
{ Kin.putRecordsRequestEntryData = m ^. mqiMessage ∘ to T.encodeUtf8
, Kin.putRecordsRequestEntryExplicitHashKey = Nothing
, Kin.putRecordsRequestEntryPartitionKey = partitionKey
}
#ifdef DEBUG
debugPrint stdout $ "will put " ⊕ show (length requestEntries) ⊕ " records"
#else
return ()
#endif
Kin.PutRecordsResponse{..} ← runKinesis _pkKinesisKit Kin.PutRecords
{ Kin.putRecordsRecords = requestEntries
, Kin.putRecordsStreamName = _pkStreamName
}
let
processResult m m'
| isJust (Kin.putRecordsResponseRecordErrorCode m') = Just m
| otherwise = Nothing
return ∘ catMaybes $ zipWith processResult eligibleItems putRecordsResponseRecords
forM_ leftovers $ \items →
unless (null items) $
leftover $ items
<&> mqiRemainingAttempts -~ 1
& filter messageQueueItemIsEligible
writeProducer
∷ MonadIO m
⇒ KinesisProducer
→ Message
→ m (Either WriteProducerException ())
writeProducer KinesisProducer{..} !msg =
runExceptT $ do
when (T.length msg > MaxMessageSize) $
throwE MessageTooLarge
gen ← liftIO R.newStdGen
result ← liftIO $
tryWriteQueue _kpMessageQueue MessageQueueItem
{ _mqiMessage = msg
, _mqiPartitionKey = generatePartitionKey gen
, _mqiRemainingAttempts = _kpRetryPolicy ^. rpRetryCount ∘ to succ
}
case result of
Just written → unless written $ throwE ProducerQueueFull
Nothing → throwE $ ProducerQueueClosed
managedKinesisProducer
∷ ( MonadIO m
, MonadBaseControl IO m
)
⇒ ProducerKit
→ Codensity m KinesisProducer
managedKinesisProducer kit@ProducerKit{_pkQueueImplementation = QueueImplementation (_ ∷ proxy q)} = do
messageQueue ← liftIO ∘ newQueue ∘ fromIntegral $ kit ^. pkMessageQueueBounds
let
producer = KinesisProducer
{ _kpMessageQueue = (messageQueue ∷ q)
, _kpRetryPolicy = kit ^. pkRetryPolicy
}
chunkingPolicy = ChunkingPolicy
{ _cpMaxChunkSize = (kit ^. pkBatchPolicy ∘ bpBatchSize) * (kit ^. pkMaxConcurrency)
, _cpMinChunkingInterval = 5000000
}
processQueue =
chunkedSourceFromQueue chunkingPolicy messageQueue
$$ putRecordsSink kit
workerLoop ∷ IO () = do
result ← tryAny processQueue
case result of
Left exn → do
debugPrint stderr $ "Respawning worker loop after exception: " ⊕ show exn
workerLoop
Right () → return ()
cleanupWorker _ = do
closeQueue messageQueue
#ifdef DEBUG
debugPrint stdout "Closing queues, will clean up"
#else
return ()
#endif
withAsync processQueue $ \cleanupHandle → do
case _pkCleanupTimeout kit of
Just timeout →
withAsync (threadDelay $ fromIntegral timeout) $ \timeoutHandle → do
result ← waitEitherCatchCancel timeoutHandle cleanupHandle
case result of
Left _timeoutResult →
throwIO ProducerCleanupTimedOut
Right workerResult →
throwIO ∘ ProducerWorkerDied $ workerResult ^? _Left
Nothing →
wait cleanupHandle
workerHandle ← Codensity $ bracket (async (liftIO workerLoop)) (liftIO ∘ cleanupWorker)
Codensity $ \inner → do
withAsync (inner producer) $ \innerHandle → do
result ← waitEitherCatchCancel innerHandle workerHandle
case result of
Left innerResult →
either throwIO return innerResult
Right (workerResult ∷ Either SomeException ()) →
throwIO ∘ ProducerWorkerDied $ workerResult ^? _Left
withKinesisProducer
∷ ( MonadIO m
, MonadBaseControl IO m
)
⇒ ProducerKit
→ (KinesisProducer → m α)
→ m α
withKinesisProducer =
runCodensity ∘ managedKinesisProducer
mapConcurrentlyN
∷ Traversable t
⇒ Natural
→ Natural
→ (a → IO b)
→ t a
→ IO (t b)
mapConcurrentlyN n delay f t = do
sem ← liftIO ∘ newQSem $ fromIntegral n
mapConcurrently (run sem) t_
where
(_, t_) = mapAccumL (\i v → (succ i, (i,v))) 0 t
run sem (i,a) =
liftBaseOp_ (bracket_ (waitQSem sem) (signalQSem sem)) $ do
liftIO ∘ threadDelay ∘ fromIntegral $ 1000 * delay * i
f a
debugPrint
∷ MonadIO m
⇒ Handle
→ String
→ m ()
debugPrint h =
liftIO
∘ hPutStrLn h
∘ ("[Kinesis Producer] " ⊕)