-- Copyright (c) 2013-2015 PivotCloud, Inc.
-- Aws.Kinesis.Client.Producer
-- Please feel free to contact us at licensing@pivotmail.com with any
-- contributions, additions, or other feedback; we would love to hear from
-- you.
-- Licensed under the Apache License, Version 2.0 (the "License"); you may
-- not use this file except in compliance with the License. You may obtain a
-- copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-- License for the specific language governing permissions and limitations
-- under the License.

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UnicodeSyntax #-}

-- |
-- Module: Aws.Kinesis.Client.Producer
-- Copyright: Copyright © 2013-2015 PivotCloud, Inc.
-- License: Apache-2.0
-- Maintainer: Jon Sterling <jsterling@alephcloud.com>
-- Stability: experimental
module Aws.Kinesis.Client.Producer
( -- * The Producer
, withKinesisProducer
, managedKinesisProducer

  -- * Commands
, writeProducer
, Message

, module Aws.Kinesis.Client.Producer.Kit

-- * Exceptions
, WriteProducerException(..)
, ProducerCleanupTimedOut(..)
, ProducerWorkerDied(..)
) where

import qualified Aws.Kinesis as Kin
import Aws.Kinesis.Client.Common
import Aws.Kinesis.Client.Producer.Kit
import Aws.Kinesis.Client.Producer.Internal
import Aws.Kinesis.Client.Internal.Queue
import Aws.Kinesis.Client.Internal.Queue.Chunk

import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Exception.Enclosed
import Control.Exception.Lifted
import Control.Lens
import Control.Monad
import Control.Monad.Codensity
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Trans.Except
import Data.Conduit
import Data.Maybe
import Data.Monoid.Unicode
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Traversable
import Data.Typeable
import Numeric.Natural
import Prelude.Unicode
import qualified System.Random as R
import System.IO

-- | The (abstract) Kinesis producer client.
data KinesisProducer
  =  q. BoundedCloseableQueue q MessageQueueItem
  { _kpMessageQueue  !q
  , _kpRetryPolicy  !RetryPolicy

data WriteProducerException
  = ProducerQueueClosed
    -- ^ Thrown when a message could not be enqueued since the queue was closed.
  | ProducerQueueFull
    -- ^ Thrown when a message could not be enqueued since the queue was full.
  | MessageTooLarge
    -- ^ Thrown when the message was larger than the maximum message size
    -- ('MaxMessageSize')
  deriving (Typeable, Show, Eq)

instance Exception WriteProducerException

-- | Thrown when the producer's cleanup routine takes longer than the
-- configured timeout.
data ProducerCleanupTimedOut
  = ProducerCleanupTimedOut
  deriving (Typeable, Show, Eq)

instance Exception ProducerCleanupTimedOut

-- | Thrown when the producer's worker dies unexpectedly (this is fatal, and
-- should never happen).
data ProducerWorkerDied
  = ProducerWorkerDied (Maybe SomeException)
  deriving (Typeable, Show)

instance Exception ProducerWorkerDied

-- | Generates a valid 'Kin.PartitionKey'.
   R.RandomGen g
generatePartitionKey gen =
  let name = take 25 $ R.randomRs ('a','z') gen in
  Kin.partitionKey (T.pack name)
    & either (error  T.unpack) id

splitEvery _ [] = []
splitEvery n list = first : splitEvery n rest
    (first,rest) = splitAt (fromIntegral n) list

-- | A conduit for sending records to Kinesis using the @PutRecords@ endpoint.
-- This is a conduit in order to restore failed messages as leftovers.
   Sink [MessageQueueItem] IO ()
putRecordsSink ProducerKit{..} = do
  let batchSize = _pkBatchPolicy ^. bpBatchSize

  awaitForever $ \messages  do
    let batches = splitEvery batchSize messages
    leftovers  lift  flip (mapConcurrentlyN _pkMaxConcurrency 100) batches $ \items  do
      case filter messageQueueItemIsEligible items of
        []  return []
        eligibleItems  do
          handleAny (\(SomeException e)  eligibleItems <$ debugPrint stderr (show e)) $ do
            requestEntries  for eligibleItems $ \m  do
              let partitionKey = m ^. mqiPartitionKey
              return Kin.PutRecordsRequestEntry
                { Kin.putRecordsRequestEntryData = m ^. mqiMessage  to T.encodeUtf8
                , Kin.putRecordsRequestEntryExplicitHashKey = Nothing
                , Kin.putRecordsRequestEntryPartitionKey = partitionKey

#ifdef DEBUG
            debugPrint stdout $ "will put "  show (length requestEntries)  " records"
            return ()

            Kin.PutRecordsResponse{..}  runKinesis _pkKinesisKit Kin.PutRecords
              { Kin.putRecordsRecords = requestEntries
              , Kin.putRecordsStreamName = _pkStreamName
              processResult m m'
                | isJust (Kin.putRecordsResponseRecordErrorCode m') = Just m
                | otherwise = Nothing
            return  catMaybes $ zipWith processResult eligibleItems putRecordsResponseRecords

    forM_ leftovers $ \items 
      unless (null items) $
        leftover $ items
          <&> mqiRemainingAttempts -~ 1
           & filter messageQueueItemIsEligible

-- | Enqueues a message to Kinesis on the next shard. If a message cannot be
-- enqueued, an error of type 'WriteProducerException' will be returned.
   MonadIO m
   m (Either WriteProducerException ())
writeProducer KinesisProducer{..} !msg =
  runExceptT $ do
    when (T.length msg > MaxMessageSize) $
      throwE MessageTooLarge

    gen  liftIO R.newStdGen
    result  liftIO $
      tryWriteQueue _kpMessageQueue MessageQueueItem
        { _mqiMessage = msg
        , _mqiPartitionKey = generatePartitionKey gen
        , _mqiRemainingAttempts = _kpRetryPolicy ^. rpRetryCount  to succ
    case result of
      Just written  unless written $ throwE ProducerQueueFull
      Nothing  throwE $ ProducerQueueClosed

-- | This constructs a 'KinesisProducer' and closes it when you have done with
-- it. This is equivalent to 'withKinesisProducer', but replaces the
-- continuation with a return in 'Codensity'.
   ( MonadIO m
    , MonadBaseControl IO m
   Codensity m KinesisProducer
managedKinesisProducer kit@ProducerKit{_pkQueueImplementation = QueueImplementation (_  proxy q)} = do
  messageQueue  liftIO  newQueue  fromIntegral $ kit ^. pkMessageQueueBounds

    producer = KinesisProducer
      { _kpMessageQueue = (messageQueue  q)
      , _kpRetryPolicy = kit ^. pkRetryPolicy

    chunkingPolicy = ChunkingPolicy
      { _cpMaxChunkSize = (kit ^. pkBatchPolicy  bpBatchSize) * (kit ^. pkMaxConcurrency)
      , _cpMinChunkingInterval = 5000000

    processQueue =
      chunkedSourceFromQueue chunkingPolicy messageQueue
        $$ putRecordsSink kit

    -- TODO: figure out better error handling here (such as a limit to respawns)
    workerLoop  IO () = do
      result  tryAny processQueue
      case result of
        Left exn  do
          debugPrint stderr $ "Respawning worker loop after exception: "  show exn
        Right ()  return ()

    cleanupWorker _ = do
      closeQueue messageQueue
#ifdef DEBUG
      debugPrint stdout "Closing queues, will clean up"
      return ()

      withAsync processQueue $ \cleanupHandle  do
        case _pkCleanupTimeout kit of
          Just timeout 
            withAsync (threadDelay $ fromIntegral timeout) $ \timeoutHandle  do
              result  waitEitherCatchCancel timeoutHandle cleanupHandle
              case result of
                Left _timeoutResult 
                  throwIO ProducerCleanupTimedOut
                Right workerResult 
                  throwIO  ProducerWorkerDied $ workerResult ^? _Left
            wait cleanupHandle

  workerHandle  Codensity $ bracket (async (liftIO workerLoop)) (liftIO  cleanupWorker)

  Codensity $ \inner  do
    withAsync (inner producer) $ \innerHandle  do
      result  waitEitherCatchCancel innerHandle workerHandle
      case result of
        Left innerResult 
          either throwIO return innerResult
        Right (workerResult  Either SomeException ()) 
          throwIO  ProducerWorkerDied $ workerResult ^? _Left

-- | This constructs a 'KinesisProducer' and closes it when you have done with
-- it.
   ( MonadIO m
    , MonadBaseControl IO m
   (KinesisProducer  m α)
   m α
withKinesisProducer =
  runCodensity  managedKinesisProducer

-- | map at most n actions concurrently
   Traversable t
   Natural -- ^ number of concurrent actions
   Natural -- ^ startup delay between actions in milliseconds
   (a  IO b)
   t a
   IO (t b)
mapConcurrentlyN n delay f t = do
  sem  liftIO  newQSem $ fromIntegral n
  mapConcurrently (run sem) t_
      (_, t_) = mapAccumL (\i v  (succ i, (i,v))) 0 t
      run sem (i,a) =
        liftBaseOp_ (bracket_ (waitQSem sem) (signalQSem sem)) $ do
          liftIO  threadDelay  fromIntegral $ 1000 * delay * i
          f a

   MonadIO m
   m ()
debugPrint h =
     hPutStrLn h
     ("[Kinesis Producer] " )