-- Copyright (c) 2013-2015 PivotCloud, Inc. -- -- Aws.Kinesis.Client.Producer -- -- Please feel free to contact us at licensing@pivotmail.com with any -- contributions, additions, or other feedback; we would love to hear from -- you. -- -- Licensed under the Apache License, Version 2.0 (the "License"); you may -- not use this file except in compliance with the License. You may obtain a -- copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -- License for the specific language governing permissions and limitations -- under the License. {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE UnicodeSyntax #-} -- | -- Module: Aws.Kinesis.Client.Producer -- Copyright: Copyright © 2013-2015 PivotCloud, Inc. -- License: Apache-2.0 -- Maintainer: Jon Sterling -- Stability: experimental -- module Aws.Kinesis.Client.Producer ( -- * The Producer KinesisProducer , withKinesisProducer , managedKinesisProducer -- * Commands , writeProducer , Message , module Aws.Kinesis.Client.Producer.Kit -- * Exceptions , WriteProducerException(..) , ProducerCleanupTimedOut(..) , ProducerWorkerDied(..) ) where import qualified Aws.Kinesis as Kin import Aws.Kinesis.Client.Common import Aws.Kinesis.Client.Producer.Kit import Aws.Kinesis.Client.Producer.Internal import Aws.Kinesis.Client.Internal.Queue import Aws.Kinesis.Client.Internal.Queue.Chunk import Control.Applicative import Control.Concurrent.Async.Lifted import Control.Concurrent.Lifted hiding (yield) import Control.Exception.Enclosed import Control.Exception.Lifted import Control.Lens import Control.Monad import Control.Monad.Codensity import Control.Monad.Reader import Control.Monad.Trans.Control import Control.Monad.Trans.Except import Data.Conduit import Data.Maybe import Data.Monoid.Unicode import qualified Data.Text as T import qualified Data.Text.Encoding as T import Data.Traversable import Data.Typeable import Numeric.Natural import Prelude.Unicode import qualified System.Random as R import System.IO -- | The (abstract) Kinesis producer client. -- data KinesisProducer = ∀ q. BoundedCloseableQueue q MessageQueueItem ⇒ KinesisProducer { _kpMessageQueue ∷ !q , _kpRetryPolicy ∷ !RetryPolicy } data WriteProducerException = ProducerQueueClosed -- ^ Thrown when a message could not be enqueued since the queue was closed. | ProducerQueueFull -- ^ Thrown when a message could not be enqueued since the queue was full. | MessageTooLarge -- ^ Thrown when the message was larger than the maximum message size -- ('MaxMessageSize') deriving (Typeable, Show, Eq) instance Exception WriteProducerException -- | Thrown when the producer's cleanup routine takes longer than the -- configured timeout. data ProducerCleanupTimedOut = ProducerCleanupTimedOut deriving (Typeable, Show, Eq) instance Exception ProducerCleanupTimedOut -- | Thrown when the producer's worker dies unexpectedly (this is fatal, and -- should never happen). data ProducerWorkerDied = ProducerWorkerDied (Maybe SomeException) deriving (Typeable, Show) instance Exception ProducerWorkerDied -- | Generates a valid 'Kin.PartitionKey'. -- generatePartitionKey ∷ R.RandomGen g ⇒ g → Kin.PartitionKey generatePartitionKey gen = let name = take 25 $ R.randomRs ('a','z') gen in Kin.partitionKey (T.pack name) & either (error ∘ T.unpack) id splitEvery ∷ Natural → [α] → [[α]] splitEvery _ [] = [] splitEvery n list = first : splitEvery n rest where (first,rest) = splitAt (fromIntegral n) list -- | A conduit for sending records to Kinesis using the @PutRecords@ endpoint. -- This is a conduit in order to restore failed messages as leftovers. -- putRecordsSink ∷ ProducerKit → Sink [MessageQueueItem] IO () putRecordsSink ProducerKit{..} = do let batchSize = _pkBatchPolicy ^. bpBatchSize awaitForever $ \messages → do let batches = splitEvery batchSize messages leftovers ← lift ∘ flip (mapConcurrentlyN _pkMaxConcurrency 100) batches $ \items → do case filter messageQueueItemIsEligible items of [] → return [] eligibleItems → do handleAny (\(SomeException e) → eligibleItems <$ debugPrint stderr (show e)) $ do requestEntries ← for eligibleItems $ \m → do let partitionKey = m ^. mqiPartitionKey return Kin.PutRecordsRequestEntry { Kin.putRecordsRequestEntryData = m ^. mqiMessage ∘ to T.encodeUtf8 , Kin.putRecordsRequestEntryExplicitHashKey = Nothing , Kin.putRecordsRequestEntryPartitionKey = partitionKey } #ifdef DEBUG debugPrint stdout $ "will put " ⊕ show (length requestEntries) ⊕ " records" #else return () #endif Kin.PutRecordsResponse{..} ← runKinesis _pkKinesisKit Kin.PutRecords { Kin.putRecordsRecords = requestEntries , Kin.putRecordsStreamName = _pkStreamName } let processResult m m' | isJust (Kin.putRecordsResponseRecordErrorCode m') = Just m | otherwise = Nothing return ∘ catMaybes $ zipWith processResult eligibleItems putRecordsResponseRecords forM_ leftovers $ \items → unless (null items) $ leftover $ items <&> mqiRemainingAttempts -~ 1 & filter messageQueueItemIsEligible -- | Enqueues a message to Kinesis on the next shard. If a message cannot be -- enqueued, an error of type 'WriteProducerException' will be returned. -- writeProducer ∷ MonadIO m ⇒ KinesisProducer → Message → m (Either WriteProducerException ()) writeProducer KinesisProducer{..} !msg = runExceptT $ do when (T.length msg > MaxMessageSize) $ throwE MessageTooLarge gen ← liftIO R.newStdGen result ← liftIO $ tryWriteQueue _kpMessageQueue MessageQueueItem { _mqiMessage = msg , _mqiPartitionKey = generatePartitionKey gen , _mqiRemainingAttempts = _kpRetryPolicy ^. rpRetryCount ∘ to succ } case result of Just written → unless written $ throwE ProducerQueueFull Nothing → throwE $ ProducerQueueClosed -- | This constructs a 'KinesisProducer' and closes it when you have done with -- it. This is equivalent to 'withKinesisProducer', but replaces the -- continuation with a return in 'Codensity'. -- managedKinesisProducer ∷ ( MonadIO m , MonadBaseControl IO m ) ⇒ ProducerKit → Codensity m KinesisProducer managedKinesisProducer kit@ProducerKit{_pkQueueImplementation = QueueImplementation (_ ∷ proxy q)} = do messageQueue ← liftIO ∘ newQueue ∘ fromIntegral $ kit ^. pkMessageQueueBounds let producer = KinesisProducer { _kpMessageQueue = (messageQueue ∷ q) , _kpRetryPolicy = kit ^. pkRetryPolicy } chunkingPolicy = ChunkingPolicy { _cpMaxChunkSize = (kit ^. pkBatchPolicy ∘ bpBatchSize) * (kit ^. pkMaxConcurrency) , _cpMinChunkingInterval = 5000000 } processQueue = chunkedSourceFromQueue chunkingPolicy messageQueue $$ putRecordsSink kit -- TODO: figure out better error handling here (such as a limit to respawns) workerLoop ∷ IO () = do result ← tryAny processQueue case result of Left exn → do debugPrint stderr $ "Respawning worker loop after exception: " ⊕ show exn workerLoop Right () → return () cleanupWorker _ = do closeQueue messageQueue #ifdef DEBUG debugPrint stdout "Closing queues, will clean up" #else return () #endif withAsync processQueue $ \cleanupHandle → do case _pkCleanupTimeout kit of Just timeout → withAsync (threadDelay $ fromIntegral timeout) $ \timeoutHandle → do result ← waitEitherCatchCancel timeoutHandle cleanupHandle case result of Left _timeoutResult → throwIO ProducerCleanupTimedOut Right workerResult → throwIO ∘ ProducerWorkerDied $ workerResult ^? _Left Nothing → wait cleanupHandle workerHandle ← Codensity $ bracket (async (liftIO workerLoop)) (liftIO ∘ cleanupWorker) Codensity $ \inner → do withAsync (inner producer) $ \innerHandle → do result ← waitEitherCatchCancel innerHandle workerHandle case result of Left innerResult → either throwIO return innerResult Right (workerResult ∷ Either SomeException ()) → throwIO ∘ ProducerWorkerDied $ workerResult ^? _Left -- | This constructs a 'KinesisProducer' and closes it when you have done with -- it. -- withKinesisProducer ∷ ( MonadIO m , MonadBaseControl IO m ) ⇒ ProducerKit → (KinesisProducer → m α) → m α withKinesisProducer = runCodensity ∘ managedKinesisProducer -- | map at most n actions concurrently -- mapConcurrentlyN ∷ Traversable t ⇒ Natural -- ^ number of concurrent actions → Natural -- ^ startup delay between actions in milliseconds → (a → IO b) → t a → IO (t b) mapConcurrentlyN n delay f t = do sem ← liftIO ∘ newQSem $ fromIntegral n mapConcurrently (run sem) t_ where (_, t_) = mapAccumL (\i v → (succ i, (i,v))) 0 t run sem (i,a) = liftBaseOp_ (bracket_ (waitQSem sem) (signalQSem sem)) $ do liftIO ∘ threadDelay ∘ fromIntegral $ 1000 * delay * i f a debugPrint ∷ MonadIO m ⇒ Handle → String → m () debugPrint h = liftIO ∘ hPutStrLn h ∘ ("[Kinesis Producer] " ⊕)