-- Copyright (c) 2013-2014 PivotCloud, Inc.
--
-- Aws.Kinesis.Client.Producer
--
-- Please feel free to contact us at licensing@pivotmail.com with any
-- contributions, additions, or other feedback; we would love to hear from
-- you.
--
-- Licensed under the Apache License, Version 2.0 (the "License"); you may
-- not use this file except in compliance with the License. You may obtain a
-- copy of the License at http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-- License for the specific language governing permissions and limitations
-- under the License.

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE UnicodeSyntax #-}

-- |
-- Module: Aws.Kinesis.Client.Producer
-- Copyright: Copyright © 2013-2014 PivotCloud, Inc.
-- License: Apache-2.0
-- Maintainer: Jon Sterling <jsterling@alephcloud.com>
-- Stability: experimental
--
module Aws.Kinesis.Client.Producer
( -- * The Producer
  KinesisProducer
, withKinesisProducer
, managedKinesisProducer

  -- * Commands
, writeProducer
, Message

  -- * Producer Environment
, MonadProducer
, ProducerKit(..)
, pkKinesisKit
, pkStreamName
, pkBatchPolicy
, pkRetryPolicy
, pkMessageQueueBounds
, pkMaxConcurrency

, ProducerError(..)
, _KinesisError
, _MessageNotEnqueued
, _InvalidConcurrentConsumerCount
, _MessageTooLarge

, pattern MaxMessageSize

, BatchPolicy
, defaultBatchPolicy
, bpBatchSize
, bpEndpoint

, RetryPolicy
, defaultRetryPolicy
, rpRetryCount

, RecordEndpoint(..)
) where

import qualified Aws.Kinesis as Kin
import qualified Aws.Kinesis.Commands.PutRecords as Kin
import Aws.Kinesis.Client.Common

import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception.Lifted
import Control.Lens
import Control.Monad
import Control.Monad.Codensity
import Control.Monad.Error.Class
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Trans.Either
import Data.Conduit
import Data.Conduit.TQueue
import qualified Data.Conduit.List as CL
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Traversable
import Data.Typeable
import Prelude.Unicode
import qualified System.Random as R
import System.IO

-- | There are two endpoints which may be used to send records to Kinesis.
--
data RecordEndpoint
  = PutRecordEndpoint
  -- ^ Use the @PutRecord@ endpoint, which sends records one at a time.

  | PutRecordsEndpoint
  -- ^ Use the @PutRecords@ endpoint, which sends records in batches.

  deriving (Eq, Show)

-- | The maximum size in bytes of a message.
--
pattern MaxMessageSize = 51000

-- | The producer batches records according to a user-specified policy.
--
data BatchPolicy
  = BatchPolicy
  { _bpBatchSize  {-# UNPACK #-} !Int
  , _bpEndpoint  !RecordEndpoint
  } deriving (Eq, Show)

-- | The number of records to send in a single request. This is only used
-- when the endpoint is set to 'PutRecordsEndpoint'.
--
bpBatchSize  Lens' BatchPolicy Int
bpBatchSize = lens _bpBatchSize $ \bp bs  bp { _bpBatchSize = bs }

-- | The endpoint to use when sending records to Kinesis.
--
bpEndpoint  Lens' BatchPolicy RecordEndpoint
bpEndpoint = lens _bpEndpoint $ \bp ep  bp { _bpEndpoint = ep }

-- | The default batching policy sends @200@ records per 'PutRecordsEndpoint'
-- request.
--
defaultBatchPolicy  BatchPolicy
defaultBatchPolicy = BatchPolicy
  { _bpBatchSize = 200
  , _bpEndpoint = PutRecordsEndpoint
  }

-- | The producer will attempt to re-send records which failed according to a
-- user-specified policy. This policy applies to failures which occur in the
-- process of sending a message to Kinesis, not those which occur in the course
-- of enqueuing a message.
data RetryPolicy
  = RetryPolicy
  { _rpRetryCount  {-# UNPACK #-} !Int
  } deriving (Eq, Show)

-- | The number of times to retry sending a message after it has first failed.
--
rpRetryCount  Lens' RetryPolicy Int
rpRetryCount = lens _rpRetryCount $ \rp n  rp { _rpRetryCount = n }

-- | The default retry policy will attempt @5@ retries for a message.
--
defaultRetryPolicy  RetryPolicy
defaultRetryPolicy = RetryPolicy
  { _rpRetryCount = 5
  }

type Message = T.Text

data MessageQueueItem
  = MessageQueueItem
  { _mqiMessage  !Message
  -- ^ The contents of the message

  , _mqiPartitionKey  !Kin.PartitionKey
  -- ^ The partition key the message is destined for

  , _mqiRemainingAttempts  !Int
  -- ^ The number of times remaining to try and publish this message
  } deriving (Eq, Show)

mqiMessage  Lens' MessageQueueItem Message
mqiMessage = lens _mqiMessage $ \i m  i { _mqiMessage = m }

mqiPartitionKey  Lens' MessageQueueItem Kin.PartitionKey
mqiPartitionKey = lens _mqiPartitionKey $ \i s  i { _mqiPartitionKey = s }

mqiRemainingAttempts  Lens' MessageQueueItem Int
mqiRemainingAttempts = lens _mqiRemainingAttempts $ \i n  i { _mqiRemainingAttempts = n }

messageQueueItemIsEligible
   MessageQueueItem
   Bool
messageQueueItemIsEligible =
  ( 1)  _mqiRemainingAttempts

-- | The basic input required to construct a Kinesis producer.
--
data ProducerKit
  = ProducerKit
  { _pkKinesisKit  !KinesisKit
  -- ^ The basic information required to send requests to AWS Kinesis.

  , _pkStreamName  !Kin.StreamName
  -- ^ The name of the stream to send records to.

  , _pkBatchPolicy  !BatchPolicy
  -- ^ The record batching policy for the producer.

  , _pkRetryPolicy  !RetryPolicy
  -- ^ The retry policy for the producer.

  , _pkMessageQueueBounds  {-# UNPACK #-} !Int
  -- ^ The maximum number of records that may be enqueued at one time.

  , _pkMaxConcurrency  {-# UNPACK #-} !Int
  -- ^ The number of requests to run concurrently (minimum: 1).
  }

-- | A lens for '_pkKinesisKit'.
--
pkKinesisKit  Lens' ProducerKit KinesisKit
pkKinesisKit = lens _pkKinesisKit $ \pk kk  pk { _pkKinesisKit = kk }

-- | A lens for '_pkStreamName'.
--
pkStreamName  Lens' ProducerKit Kin.StreamName
pkStreamName = lens _pkStreamName $ \pk sn  pk { _pkStreamName = sn }

-- | A lens for '_pkBatchPolicy'.
--
pkBatchPolicy  Lens' ProducerKit BatchPolicy
pkBatchPolicy = lens _pkBatchPolicy $ \pk bp  pk { _pkBatchPolicy = bp }

-- | A lens for '_pkRetryPolicy'.
--
pkRetryPolicy  Lens' ProducerKit RetryPolicy
pkRetryPolicy = lens _pkRetryPolicy $ \pk rp  pk { _pkRetryPolicy = rp }

-- | A lens for '_pkMessageQueueBounds'.
--
pkMessageQueueBounds  Lens' ProducerKit Int
pkMessageQueueBounds = lens _pkMessageQueueBounds $ \pk qb  pk { _pkMessageQueueBounds = qb }

-- | A lens for '_pkMaxConcurrency'.
--
pkMaxConcurrency  Lens' ProducerKit Int
pkMaxConcurrency = lens _pkMaxConcurrency $ \pk n  pk { _pkMaxConcurrency = n }

-- | The (abstract) Kinesis producer client.
--
data KinesisProducer
  = KinesisProducer
  { _kpMessageQueue  !(TBMQueue MessageQueueItem)
  , _kpRetryPolicy  !RetryPolicy
  }

kpMessageQueue  Getter KinesisProducer (TBMQueue MessageQueueItem)
kpMessageQueue = to _kpMessageQueue

kpRetryPolicy  Getter KinesisProducer RetryPolicy
kpRetryPolicy = to _kpRetryPolicy

data ProducerError
  = KinesisError !SomeException
  -- ^ Represents an error which occured as a result of a request to Kinesis.

  | MessageNotEnqueued Message
  -- ^ Thrown when a message could not be enqueued since the queue was full.
  -- This error must be handled at the call-site.

  | MessageTooLarge
  -- ^ Thrown when the message was larger than the maximum message size ('MaxMessageSize').

  | InvalidConcurrentConsumerCount
  -- ^ Thrown when 'pkMaxConcurrency' is set with an invalid value.

  deriving (Typeable, Show)

instance Exception ProducerError

-- | A prism for 'KinesisError'.
--
_KinesisError  Prism' ProducerError SomeException
_KinesisError =
  prism KinesisError $ \case
    KinesisError e  Right e
    e  Left e

-- | A prism for 'MessageNotEnqueued'.
--
_MessageNotEnqueued  Prism' ProducerError Message
_MessageNotEnqueued =
  prism MessageNotEnqueued $ \case
    MessageNotEnqueued m  Right m
    e  Left e

-- | A prism for 'MessageTooLarge'.
--
_MessageTooLarge  Prism' ProducerError ()
_MessageTooLarge =
  prism (const MessageTooLarge) $ \case
    MessageTooLarge  Right ()
    e  Left e

-- | A prism for 'InvalidConcurrentConsumerCount'.
--
_InvalidConcurrentConsumerCount  Prism' ProducerError ()
_InvalidConcurrentConsumerCount =
  prism (const InvalidConcurrentConsumerCount) $ \case
    InvalidConcurrentConsumerCount  Right ()
    e  Left e

-- | The basic effect modality required to use the Kinesis producer.
--
type MonadProducer m
  = ( MonadIO m
    , MonadBaseControl IO m
    , MonadError ProducerError m
    )

type MonadProducerInternal m
  = ( MonadProducer m
    , MonadReader ProducerKit m
    )

-- | Lifts something in 'MonadKinesis' to 'MonadProducer'.
--
liftKinesis
   MonadProducerInternal m
   EitherT SomeException (ReaderT KinesisKit m) α
   m α
liftKinesis =
  mapEnvironment pkKinesisKit
     mapError KinesisError

-- | Generates a valid 'Kin.PartitionKey'.
--
generatePartitionKey
   R.RandomGen g
   g
   Kin.PartitionKey
generatePartitionKey gen =
  let name = take 25 $ R.randomRs ('a','z') gen in
  Kin.partitionKey (T.pack name)
    & either (error  T.unpack) id

-- | This will take up to @n@ items from a 'TBMQueue'.
--
takeTBMQueue
   Int
   TBMQueue α
   STM [α]
takeTBMQueue n q
  | n <= 0 = return []
  | otherwise = do
      res  tryReadTBMQueue q
      case res of
        Just (Just x)  (x:) <$> takeTBMQueue (n - 1) q
        _  return []

-- | A policy for chunking the contents of the message queue.
--
data ChunkingPolicy
  = ChunkingPolicy
  { _cpMaxChunkSize  Int
  -- ^ The largest chunk size that is permitted.

  , _cpThrottlingDelay  Int
  -- ^ The time after which a chunk should be committed, even if the maximum
  -- chunk size has not yet been reached.
  }

-- | A 'Source' that reads chunks off a bounded STM queue according some
-- 'ChunkingPolicy'.
--
chunkedSourceTBMQueue
   ( MonadIO m
    , MonadBaseControl IO m
    )
   ChunkingPolicy
   TBMQueue α
   Source m [α]
chunkedSourceTBMQueue bp@ChunkingPolicy{..} q = do
  terminateNow  liftIO  atomically $ isClosedTBMQueue q
  unless terminateNow $ do
    items  liftIO  atomically $ takeTBMQueue _cpMaxChunkSize q
    unless (null items) $ do
      yield items

    when (length items < _cpMaxChunkSize) $
      threadDelay _cpThrottlingDelay

    chunkedSourceTBMQueue bp q

-- | Transform a 'Source' into a chunked 'Source' according to some
-- 'ChunkingPolicy'.
--
chunkSource
   ( MonadIO m
    , MonadBaseControl IO m
    )
   ChunkingPolicy
   Source m α
   Source m [α]
chunkSource cp src = do
  queue  liftIO $ newTBMQueueIO $ _cpMaxChunkSize cp
  worker  lift  async $ src $$+ sinkTBMQueue queue True
  addCleanup (\_  cancel worker) $ do
    lift $ link worker
    chunkedSourceTBMQueue cp queue
  return ()

-- | A conduit for concurently sending multiple records to Kinesis using the
-- @PutRecord@ endpoint.
--
concurrentPutRecordSink
   MonadProducerInternal m
   Sink [MessageQueueItem] m ()
concurrentPutRecordSink = do
  maxWorkerCount  view pkMaxConcurrency
  awaitForever $ \messages  do
    lift  flip (mapConcurrentlyN maxWorkerCount 100) messages $ \m  do
      CL.sourceList [m] $$ putRecordSink


-- | A conduit for sending a record to Kinesis using the @PutRecord@ endpoint;
-- this is a conduit in order to restore failed messages as leftovers.
--
putRecordSink
   MonadProducerInternal m
   Sink MessageQueueItem m ()
putRecordSink = do
  streamName  view pkStreamName

  awaitForever $ \item  do
    let handler e = do
          liftIO $ do
            hPutStrLn stderr $ "Kinesis producer client error (will wait 5s): " ++ show e
            threadDelay 5000000
          leftover $ item & mqiRemainingAttempts -~ 1

    when (messageQueueItemIsEligible item) $
      handleError handler $ do
        let partitionKey = item ^. mqiPartitionKey
        void  lift  liftKinesis $ runKinesis Kin.PutRecord
          { Kin.putRecordData = item ^. mqiMessage  to T.encodeUtf8
          , Kin.putRecordExplicitHashKey = Nothing
          , Kin.putRecordPartitionKey = partitionKey
          , Kin.putRecordSequenceNumberForOrdering = Nothing
          , Kin.putRecordStreamName = streamName
          }

splitEvery
   Int
   [α]
   [[α]]
splitEvery _ [] = []
splitEvery n list = first : splitEvery n rest
  where
    (first,rest) = splitAt n list

-- | A conduit for sending records to Kinesis using the @PutRecords@ endpoint.
-- This is a conduit in order to restore failed messages as leftovers.
--
putRecordsSink
   MonadProducerInternal m
   Sink [MessageQueueItem] m ()
putRecordsSink = do
  streamName  view pkStreamName
  batchSize  view $ pkBatchPolicy  bpBatchSize
  maxWorkerCount  view pkMaxConcurrency
  awaitForever $ \messages  do
    let batches = splitEvery batchSize messages
    leftovers  lift  flip (mapConcurrentlyN maxWorkerCount 100) batches $ \items  do
      case filter messageQueueItemIsEligible items of
        []  return []
        eligibleItems  do
          handleError (\e  eligibleItems <$ liftIO (hPutStrLn stderr $ show e)) $ do
            requestEntries  for eligibleItems $ \m  do
              let partitionKey = m ^. mqiPartitionKey
              return Kin.PutRecordsRequestEntry
                { Kin.putRecordsRequestEntryData = m ^. mqiMessage  to T.encodeUtf8
                , Kin.putRecordsRequestEntryExplicitHashKey = Nothing
                , Kin.putRecordsRequestEntryPartitionKey = partitionKey
                }

            Kin.PutRecordsResponse{..}   liftKinesis $ runKinesis Kin.PutRecords
              { Kin.putRecordsRecords = requestEntries
              , Kin.putRecordsStreamName = streamName
              }
            let
              processResult m m'
                | isJust (Kin.putRecordsResponseRecordErrorCode m') = Just m
                | otherwise = Nothing
            return  catMaybes $ zipWith processResult eligibleItems putRecordsResponseRecords

    forM_ leftovers $ \items 
      unless (null items) $
        leftover $ items
          <&> mqiRemainingAttempts -~ 1
           & filter messageQueueItemIsEligible

sendMessagesSink
   MonadProducerInternal m
   Sink [MessageQueueItem] m ()
sendMessagesSink = do
  batchPolicy  view pkBatchPolicy
  case batchPolicy ^. bpEndpoint of
    PutRecordsEndpoint  putRecordsSink
    PutRecordEndpoint  concurrentPutRecordSink

-- | Enqueues a message to Kinesis on the next shard. If a message cannot be
-- enqueued (because the client has exceeded its queue size), the
-- 'MessageNotEnqueued' exception will be thrown.
--
writeProducer
   MonadProducer m
   KinesisProducer
   Message
   m ()
writeProducer producer !msg = do
  when (T.length msg > MaxMessageSize) $
    throwError MessageTooLarge

  gen  liftIO R.newStdGen
  result  liftIO  atomically $ do
    tryWriteTBMQueue (producer ^. kpMessageQueue) MessageQueueItem
      { _mqiMessage = msg
      , _mqiPartitionKey = generatePartitionKey gen
      , _mqiRemainingAttempts = producer ^. kpRetryPolicy . rpRetryCount . to succ
      }
  case result of
    Just True  return ()
    _  throwError $ MessageNotEnqueued msg

-- | This is a 'Source' that returns all the items presently in a queue: it
-- terminates when the queue is empty.
--
exhaustTBMQueue
   TBMQueue α
   Source STM α
exhaustTBMQueue q = do
  mx  lift $ tryReadTBMQueue q
  case mx of
    Just (Just x)  do
      yield x
      exhaustTBMQueue q
    _  return ()

-- | This constructs a 'KinesisProducer' and closes it when you have done with
-- it. This is equivalent to 'withKinesisProducer', but replaces the
-- continuation with a return in 'Codensity'.
--
managedKinesisProducer
    m
  . ( MonadIO m
    , MonadBaseControl IO m
    , MonadError ProducerError m
    )
   ProducerKit
   Codensity m KinesisProducer
managedKinesisProducer kit = do
  when (kit ^. pkMaxConcurrency < 1)  lift $
    throwError InvalidConcurrentConsumerCount

  messageQueue  liftIO  newTBMQueueIO $ kit ^. pkMessageQueueBounds

  let chunkingPolicy = ChunkingPolicy ((kit ^. pkBatchPolicy  bpBatchSize) * (kit ^. pkMaxConcurrency)) 5000000
      -- TODO: this 'forever' is only here to restart if we get killed.
      -- Replace with proper error handling.
      consumerLoop  m () = flip runReaderT kit  forever $
        chunkSource chunkingPolicy (sourceTBMQueue messageQueue)
          $$ sendMessagesSink

  let cleanupConsumer consumerHandle = do
        liftIO  atomically $ closeTBMQueue messageQueue
        flip runReaderT kit $ do
          leftovers  liftIO  atomically $
            exhaustTBMQueue messageQueue
              $$ CL.consume
          chunkSource chunkingPolicy (CL.sourceList leftovers)
            $$ sendMessagesSink
        cancel consumerHandle

  consumerHandle  managedBracket (async consumerLoop) cleanupConsumer

  Codensity $ \inner  do
    link consumerHandle
    res  inner KinesisProducer
      { _kpMessageQueue = messageQueue
      , _kpRetryPolicy = kit ^. pkRetryPolicy
      }
    ()  wait consumerHandle
    return res


-- | This constructs a 'KinesisProducer' and closes it when you have done with
-- it.
--
withKinesisProducer
   ( MonadIO m
    , MonadBaseControl IO m
    , MonadError ProducerError m
    )
   ProducerKit
   (KinesisProducer  m α)
   m α
withKinesisProducer =
  runCodensity  managedKinesisProducer

managedBracket
   MonadBaseControl IO m
   m α
   (α  m β)
   Codensity m α
managedBracket action cleanup =
  Codensity $ bracket action cleanup

-- | map at most n actions concurrently
--
mapConcurrentlyN
   ( MonadIO m
    , MonadBaseControl IO m
    , Traversable t
    )
   Int -- ^ number of concurrent actions
   Int -- ^ startup delay between actions in milliseconds
   (a  m b)
   t a
   m (t b)
mapConcurrentlyN n delay f t = do
  sem  liftIO $ newQSem n
  mapConcurrently (run sem) t_
    where
      (_, t_) = mapAccumL (\i v  (succ i, (i,v))) 0 t
      run sem (i,a) =
        liftBaseOp_ (bracket_ (waitQSem sem) (signalQSem sem)) $ do
          liftIO  threadDelay $ 1000 * delay * i
          f a