-- Copyright (c) 2013-2014 PivotCloud, Inc.
--
-- Aws.Kinesis.Client.Consumer
--
-- Please feel free to contact us at licensing@pivotmail.com with any
-- contributions, additions, or other feedback; we would love to hear from
-- you.
--
-- Licensed under the Apache License, Version 2.0 (the "License"); you may
-- not use this file except in compliance with the License. You may obtain a
-- copy of the License at http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-- License for the specific language governing permissions and limitations
-- under the License.

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UnicodeSyntax #-}

-- |
-- Module: Aws.Kinesis.Client.Consumer
-- Copyright: Copyright © 2013-2014 PivotCloud, Inc.
-- License: Apache-2.0
-- Maintainer: Jon Sterling <jsterling@alephcloud.com>
-- Stability: experimental
--
module Aws.Kinesis.Client.Consumer
( -- * The Consumer
  KinesisConsumer
, managedKinesisConsumer
, withKinesisConsumer

  -- * Commands
, consumerSource
, readConsumer
, tryReadConsumer
, consumerStreamState

  -- * Consumer Environment
, ConsumerKit(..)
, ckKinesisKit
, ckStreamName
, ckBatchSize
, ConsumerError(..)
, MonadConsumer
, SavedStreamState
) where

import qualified Aws.Kinesis as Kin
import Aws.Kinesis.Client.Common

import Control.Applicative
import Control.Applicative.Unicode
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Concurrent.STM.Queue
import Control.Exception
import Control.Lens
import Control.Lens.Action
import Control.Monad.Codensity
import Control.Monad.Error.Class
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Unicode
import qualified Data.Aeson as Æ
import qualified Data.Carousel as CR
import qualified Data.Map as M
import qualified Data.HashMap.Strict as HM
import Data.Traversable (for)
import Data.Conduit
import qualified Data.Conduit.List as CL
import Prelude.Unicode

-- | The internal representation for shards used by the consumer.
--
data ShardState
  = ShardState
  { _ssIterator  !(TVar (Maybe Kin.ShardIterator))
  , _ssShardId  !Kin.ShardId
  , _ssLastSequenceNumber  !(TVar (Maybe Kin.SequenceNumber))
  }

-- | A getter for '_ssIterator'.
--
ssIterator  Getter ShardState (TVar (Maybe Kin.ShardIterator))
ssIterator = to _ssIterator

-- | A lens for '_ssShardId'.
--
ssShardId  Lens' ShardState Kin.ShardId
ssShardId = lens _ssShardId $ \ss sid  ss { _ssShardId = sid }

-- | A getter for '_ssLastSequenceNumber'.
--
ssLastSequenceNumber  Getter ShardState (TVar (Maybe Kin.SequenceNumber))
ssLastSequenceNumber = to _ssLastSequenceNumber

-- | 'ShardState' is quotiented by shard ID.
--
instance Eq ShardState where
  ss == ss' = ss ^. ssShardId  ss' ^. ssShardId

data ConsumerError
  = NoShards
  -- ^ A stream must always have at least one open shard.

  | KinesisError !SomeException
  -- ^ An error which derives from a request made to Kinesis.

  deriving Show

-- | The 'ConsumerKit' contains what is needed to initialize a 'KinesisConsumer'.
data ConsumerKit
  = ConsumerKit
  { _ckKinesisKit  !KinesisKit
  -- ^ The credentials and configuration for making requests to AWS Kinesis.

  , _ckStreamName  !Kin.StreamName
  -- ^ The name of the stream to consume from.

  , _ckBatchSize  {-# UNPACK #-} !Int
  -- ^ The number of records to fetch at once from the stream.

  , _ckIteratorType  !Kin.ShardIteratorType
  -- ^ The type of iterator to consume.

  , _ckSavedStreamState  !(Maybe SavedStreamState)
  -- ^ Optionally, an initial stream state. The iterator type in
  -- '_ckIteratorType' will be used for any shards not present in the saved
  -- stream state; otherwise, 'Kin.AfterSequenceNumber' will be used.
  }

-- | A lens for '_ckKinesisKit'.
--
ckKinesisKit  Lens' ConsumerKit KinesisKit
ckKinesisKit = lens _ckKinesisKit $ \ck kk  ck { _ckKinesisKit = kk }

-- | A lens for '_ckStreamName'.
--
ckStreamName  Lens' ConsumerKit Kin.StreamName
ckStreamName = lens _ckStreamName $ \ck sn  ck { _ckStreamName = sn }

-- | A lens for '_ckBatchSize'.
--
ckBatchSize  Lens' ConsumerKit Int
ckBatchSize = lens _ckBatchSize $ \ck bs  ck { _ckBatchSize = bs }

-- | A lens for '_ckIteratorType'.
--
ckIteratorType  Lens' ConsumerKit Kin.ShardIteratorType
ckIteratorType = lens _ckIteratorType $ \ck it  ck { _ckIteratorType = it }

-- | A lens for '_ckSavedStreamState'.
--
ckSavedStreamState  Lens' ConsumerKit (Maybe SavedStreamState)
ckSavedStreamState = lens _ckSavedStreamState $ \ck ss  ck { _ckSavedStreamState = ss }


type MessageQueueItem = (ShardState, Kin.Record)
type StreamState = CR.Carousel ShardState

newtype SavedStreamState
  = SavedStreamState
  { _savedStreamState  M.Map Kin.ShardId Kin.SequenceNumber
  }

-- | An iso for 'SavedStreamState'.
--
_SavedStreamState  Iso' SavedStreamState (M.Map Kin.ShardId Kin.SequenceNumber)
_SavedStreamState = iso _savedStreamState SavedStreamState

instance Æ.ToJSON SavedStreamState where
  toJSON (SavedStreamState m) =
    Æ.Object  HM.fromList  flip fmap (M.toList m) $ \(sid, sn) 
      let Æ.String sid' = Æ.toJSON sid
      in sid' Æ..= sn

instance Æ.FromJSON SavedStreamState where
  parseJSON =
    Æ.withObject "SavedStreamState" $ \xs  do
      fmap (SavedStreamState  M.fromList)  for (HM.toList xs) $ \(sid, sn)  do
        pure (,)
           Æ.parseJSON (Æ.String sid)
           Æ.parseJSON sn


-- | The 'KinesisConsumer' maintains state about which shards to pull from.
--
data KinesisConsumer
  = KinesisConsumer
  { _kcMessageQueue  !(TBQueue MessageQueueItem)
  , _kcStreamState  !(TVar StreamState)
  }

-- | A getter for '_kcMessageQueue'.
--
kcMessageQueue  Getter KinesisConsumer (TBQueue MessageQueueItem)
kcMessageQueue = to _kcMessageQueue

-- | A getter for '_kcStreamState'.
--
kcStreamState  Getter KinesisConsumer (TVar StreamState)
kcStreamState = to _kcStreamState

-- | The basic effect modality required for operating the consumer.
--
type MonadConsumer m
  = ( MonadIO m
    , MonadBaseControl IO m
    , MonadError ConsumerError m
    )

type MonadConsumerInternal m
  = ( MonadConsumer m
    , MonadReader ConsumerKit m
    )

-- | This constructs a 'KinesisConsumer' and closes it when you have done with
-- it; this is equivalent to 'withKinesisConsumer', except that the
-- continuation is replaced with returning the consumer in 'Codensity'.
--
managedKinesisConsumer
   MonadConsumer m
   ConsumerKit
   Codensity m KinesisConsumer
managedKinesisConsumer kit =
  Codensity $ withKinesisConsumer kit

-- | This constructs a 'KinesisConsumer' and closes it when you have done with
-- it.
--
withKinesisConsumer
   MonadConsumer m
   ConsumerKit
   (KinesisConsumer  m α)
   m α
withKinesisConsumer kit inner =
  flip runReaderT kit $ do
    batchSize  view ckBatchSize
    messageQueue  liftIO  newTBQueueIO $ batchSize * 10

    state  updateStreamState CR.empty = liftIO  newTVarIO
    let reshardingLoop = forever $
          handleError (\_  liftIO $ threadDelay 3000000) $ do
            liftIO (readTVarIO state)
              = updateStreamState
              = liftIO  atomically  writeTVar state
            liftIO $ threadDelay 10000000

        producerLoop = forever $
          handleError (\_  liftIO $ threadDelay 2000000) $ do
            recordsCount  replenishMessages messageQueue state
            liftIO  threadDelay $
              case recordsCount of
                0  5000000
                _  70000


    withAsync reshardingLoop $ \reshardingHandle  do
      link reshardingHandle
      withAsync producerLoop $ \producerHandle  do
        link producerHandle
        res  lift  inner $ KinesisConsumer messageQueue state
        return res

-- | This requests new information from Kinesis and reconciles that with an
-- existing carousel of shard states.
--
updateStreamState
   MonadConsumerInternal m
   StreamState
   m StreamState
updateStreamState state = do
  streamName  view ckStreamName
  defaultIteratorType  view ckIteratorType
  savedState  view ckSavedStreamState

  mapError KinesisError  mapEnvironment ckKinesisKit $ do
    let existingShardIds = state ^. CR.clList <&> view ssShardId
        shardSource = flip mapOutputMaybe (streamOpenShardSource streamName) $ \sh@Kin.Shard{..} 
          if shardShardId `elem` existingShardIds
            then Nothing
            else Just sh

    newShards  shardSource $$ CL.consume
    shardStates  forM newShards $ \Kin.Shard{..}  do
      let startingSequenceNumber =
            savedState ^? _Just  _SavedStreamState  ix shardShardId
          iteratorType =
            maybe
              defaultIteratorType
              (const Kin.AfterSequenceNumber)
              startingSequenceNumber

      Kin.GetShardIteratorResponse it  runKinesis Kin.GetShardIterator
        { Kin.getShardIteratorShardId = shardShardId
        , Kin.getShardIteratorShardIteratorType = iteratorType
        , Kin.getShardIteratorStartingSequenceNumber = startingSequenceNumber
        , Kin.getShardIteratorStreamName = streamName
        }
      liftIO $ do
        iteratorVar  newTVarIO $ Just it
        sequenceNumberVar  newTVarIO Nothing
        return ShardState
          { _ssIterator = iteratorVar
          , _ssShardId = shardShardId
          , _ssLastSequenceNumber = sequenceNumberVar
          }
    return  CR.nub $ CR.append shardStates state

-- | Waits for a message queue to be emptied and fills it up again.
--
replenishMessages
   MonadConsumerInternal m
   TBQueue MessageQueueItem
   TVar StreamState
   m Int
replenishMessages messageQueue shardsVar = do
  bufferSize  view ckBatchSize
  liftIO  atomically  awaitQueueEmpty $ messageQueue
  (shard, iterator)  liftIO  atomically $ do
    mshard  shardsVar ^!? act readTVar  CR.cursor
    shard  maybe retry return mshard
    miterator  shard ^! ssIterator  act readTVar
    iterator  maybe retry return miterator
    return (shard, iterator)

  Kin.GetRecordsResponse{..}  mapError KinesisError  mapEnvironment ckKinesisKit $ runKinesis Kin.GetRecords
    { getRecordsLimit = Just bufferSize
    , getRecordsShardIterator = iterator
    }

  liftIO  atomically $ do
    writeTVar (shard ^. ssIterator) getRecordsResNextShardIterator
    forM_ getRecordsResRecords $ writeTBQueue messageQueue  (shard ,)
    modifyTVar shardsVar CR.moveRight

  return $ length getRecordsResRecords

-- | Await and read a single record from the consumer.
--
readConsumer
   MonadConsumer m
   KinesisConsumer
   m Kin.Record
readConsumer consumer =
  liftIO  atomically $ do
    (ss, rec)  consumer ^! kcMessageQueue  act readTBQueue
    writeTVar (ss ^. ssLastSequenceNumber)  Just $ Kin.recordSequenceNumber rec
    return rec

-- | Try to read a single record from the consumer; if there is non queued up,
-- then 'Nothing' will be returned.
--
tryReadConsumer
   MonadConsumer m
   KinesisConsumer
   m (Maybe Kin.Record)
tryReadConsumer consumer =
  liftIO  atomically $ do
    mitem  consumer ^! kcMessageQueue  act tryReadTBQueue
    for mitem $ \(ss, rec)  do
      writeTVar (ss ^. ssLastSequenceNumber)  Just $ Kin.recordSequenceNumber rec
      return rec

-- | A conduit for getting records.
--
consumerSource
   MonadConsumer m
   KinesisConsumer
   Source m Kin.Record
consumerSource consumer =
  forever $
    lift (readConsumer consumer)
      = yield

-- | Get the last read sequence number at each shard.
--
consumerStreamState
   MonadConsumer m
   KinesisConsumer
   m SavedStreamState
consumerStreamState consumer =
  liftIO  atomically $ do
    shards  consumer ^! kcStreamState  act readTVar  CR.clList
    pairs  for shards $ \ss 
      (ss ^. ssShardId,) <$>
        ss ^! ssLastSequenceNumber  act readTVar
    return  SavedStreamState  M.fromList $ pairs = \(sid, msn) 
      msn ^.. _Just  to (sid,)