{-# LANGUAGE GADTs               #-}
{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Subscription.Catchup where
import Control.Monad.Fix
import Safe (fromJustNote)
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Operation.Volatile
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data Phase
  = CatchingUp
  | Running SubDetails
  | Closed (Either SomeException SubDropReason)
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready StreamName{} old e =
    EventNumber (resolvedEventOriginalEventNumber e) < old
receivedAlready All old e =
    let pos =
            fromJustNote
                "Position is always defined when reading events from $all stream"
                $ resolvedEventPosition e in
    pos < old
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget StreamName{} e =
    EventNumber (resolvedEventOriginalEventNumber e)
nextTarget All e =
    fromJustNote
        "Position is always defined when reading events from $all stream"
        $ resolvedEventPosition e
data CatchupSubscription t =
  CatchupSubscription { _catchupExec   :: Exec
                      , _catchupStream :: StreamId t
                      , _catchupPhase  :: TVar Phase
                      , _catchupTrack  :: TVar t
                      , _catchupNext   :: STM (Maybe ResolvedEvent)
                      }
instance Subscription (CatchupSubscription t) where
  nextEventMaybeSTM = _catchupNext
  getSubscriptionDetailsSTM s = do
    p <- readTVar (_catchupPhase s)
    case p of
      Running details -> return details
      Closed r        -> throwClosed r
      _               -> retrySTM
  unsubscribe s = subUnsubscribe (_catchupExec s) s
instance SubscriptionStream (CatchupSubscription t) t where
    subscriptionStream = _catchupStream
newCatchupSubscription :: Exec
                       -> Bool
                       -> Maybe Int32
                       -> Maybe Credentials
                       -> StreamId t
                       -> t
                       -> IO (CatchupSubscription t)
newCatchupSubscription exec tos batch cred streamId seed = do
  phaseVar <- newTVarIO CatchingUp
  queue    <- newTQueueIO
  track    <- newTVarIO seed
  let stream = streamIdRaw streamId
      sub = CatchupSubscription exec streamId phaseVar track $ do
        p       <- readTVar phaseVar
        isEmpty <- isEmptyTQueue queue
        if isEmpty
          then
            case p of
              Closed r -> throwClosed r
              _        -> return Nothing
          else Just <$> readTQueue queue
      callback cb (Left e) =
        case fromException e of
          Just opE ->
            case opE of
              StreamNotFound{} -> do
                let op = volatile streamId tos cred
                publishWith exec (SubmitOperation cb op)
              _ -> atomically $ writeTVar phaseVar (Closed $ Left e)
          _ -> atomically $ writeTVar phaseVar (Closed $ Left e)
      callback _ (Right action) =
        case action of
          Confirmed details -> atomically $ writeTVar phaseVar (Running details)
          Dropped r ->
            atomically $ writeTVar phaseVar (Closed $ Right r)
          Submit e -> atomically $ do
            tracker <- readTVar track
            unless (receivedAlready streamId tracker e) $ do
              writeTVar track (nextTarget streamId e)
              writeTQueue queue e
          ConnectionReset -> do
            chk <- readTVarIO track
            let newOp = catchup (execSettings exec) streamId chk tos batch cred
            newCb <- mfix $ \self -> newCallback (callback self)
            publishWith exec (SubmitOperation newCb newOp)
  cb <- mfix $ \self -> newCallback (callback self)
  let op = catchup (execSettings exec) streamId seed tos batch cred
  publishWith exec (SubmitOperation cb op)
  return sub
throwClosed :: Either SomeException SubDropReason -> STM a
throwClosed (Left e)  = throwSTM e
throwClosed (Right r) = throwSTM (SubscriptionClosed $ Just r)
hasCaughtUp :: CatchupSubscription t -> IO Bool
hasCaughtUp sub = atomically $ hasCaughtUpSTM sub
waitTillCatchup :: CatchupSubscription t -> IO ()
waitTillCatchup sub = atomically $ unlessM (hasCaughtUpSTM sub) retrySTM
hasCaughtUpSTM :: CatchupSubscription t -> STM Bool
hasCaughtUpSTM CatchupSubscription{..} = do
  p <- readTVar _catchupPhase
  case p of
    CatchingUp -> return False
    Running{}  -> return True
    Closed tpe ->
      case tpe of
        Left e  -> throwSTM e
        Right r -> throwSTM (SubscriptionClosed $ Just r)