{-# LANGUAGE DataKinds                 #-}
{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RecordWildCards           #-}
{-# LANGUAGE ScopedTypeVariables       #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Execution.Production
-- Copyright : (C) 2015 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
-- Production execution model. It's striving for robustness. The model consists
-- on 4 threads. The Reader thread that reads 'Package' from the connection, the
-- Runner thread which executes finalizers submitted by the user (typically what
-- to do on operation completion or when a event has arrived for a subscription)
-- , the writer thread that sends 'Package' to the server, and the Manager
-- thread that handles requests coming both from the user and the Reader
-- thread. If the Reader or Runner threads die, it will be restarted by the
-- Manager thread if the connection hasn't been closed by user in the meantime.
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Execution.Production
    ( Production
    , ServerConnectionError(..)
    , newExecutionModel
    , pushOperation
    , shutdownExecutionModel
    , pushConnectStream
    , pushConnectPersist
    , pushCreatePersist
    , pushUpdatePersist
    , pushDeletePersist
    , pushAckPersist
    , pushNakPersist
    , pushUnsubscribe
    , prodWaitTillClosed
    ) where

--------------------------------------------------------------------------------
import Prelude hiding (take)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import Data.IORef
import Data.Int
import Data.Foldable
import Data.Typeable
import Text.Printf

--------------------------------------------------------------------------------
import Data.Serialize.Get hiding (Done)
import Data.Serialize.Put
import Data.Text
import Data.UUID

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription hiding
    ( submitPackage
    , unsubscribe
    , ackPersist
    , nakPersist
    , abort
    )
import Database.EventStore.Internal.Operation hiding (retry)
import Database.EventStore.Internal.Packages
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.Types
import Database.EventStore.Logging

--------------------------------------------------------------------------------
data Worker
    = Reader ThreadId
    | Runner ThreadId
    | Writer ThreadId
    deriving Show

--------------------------------------------------------------------------------
-- | Raised when the server responded in an unexpected way.
data ServerConnectionError
    = WrongPackageFraming
      -- ^ TCP package sent by the server had a wrong framing.
    | PackageParsingError String
      -- ^ Server sent a malformed TCP package.
    deriving (Show, Typeable)

--------------------------------------------------------------------------------
instance Exception ServerConnectionError

--------------------------------------------------------------------------------
-- | Used to determine if we hit the end of the queue.
data Slot a = Slot !a | End

--------------------------------------------------------------------------------
-- | A 'TQueue' that can be cycled.
newtype CycleQueue a = CycleQueue (TQueue (Slot a))

--------------------------------------------------------------------------------
-- | Creates an empty 'CycleQueue'.
newCycleQueue :: IO (CycleQueue a)
newCycleQueue = fmap CycleQueue newTQueueIO

--------------------------------------------------------------------------------
-- | Gets an element from the 'CycleQueue'.
readCycleQueue :: CycleQueue a -> STM a
readCycleQueue (CycleQueue q) = do
    Slot a <- readTQueue q
    return a

--------------------------------------------------------------------------------
-- | Writes an element to the 'CycleQueue'.
writeCycleQueue :: CycleQueue a -> a -> STM ()
writeCycleQueue (CycleQueue q) a = writeTQueue q (Slot a)

--------------------------------------------------------------------------------
-- | Empties a 'CycleQueue'.
emptyCycleQueue :: CycleQueue a -> STM ()
emptyCycleQueue (CycleQueue q) = writeTQueue q End >> go
  where
    go = do
        s <- readTQueue q
        case s of
            End -> return ()
            _   -> go

--------------------------------------------------------------------------------
-- | Updates a 'CycleQueue'.
updateCycleQueue :: CycleQueue a -> (a -> STM (Maybe a)) -> STM ()
updateCycleQueue (CycleQueue q) k = writeTQueue q End >> go
  where
    go = do
        s <- readTQueue q
        case s of
            End    -> return ()
            Slot a -> do
                r <- k a
                case r of
                    Nothing -> go
                    Just a' -> writeTQueue q (Slot a') >> go

--------------------------------------------------------------------------------
-- | Indicates if a 'CycleQueue' is empty.
isEmptyCycleQueue :: CycleQueue a -> STM Bool
isEmptyCycleQueue (CycleQueue q) = isEmptyTQueue q

--------------------------------------------------------------------------------
wkUpdState :: Worker -> State -> State
wkUpdState (Reader tid) s = s { _reader = Just tid }
wkUpdState (Runner tid) s = s { _runner = Just tid }
wkUpdState (Writer tid) s = s { _writer = Just tid }

--------------------------------------------------------------------------------
-- | Holds the execution model state.
data Production =
    Prod
    { _submit :: TVar (Msg -> IO ())
      -- ^ The action to call when pushing new command.
    , _waitClosed :: STM ()
      -- ^ Action that attests the execution model has been closed successfully.
      --   It doesn't mean the execution model hasn't been shutdown because of
      --   some random exception.
    }

--------------------------------------------------------------------------------
-- | Main execution environment used among different transitions.
data Env =
    Env
    { _setts :: Settings
      -- ^ Global settings reference.
    , _queue :: CycleQueue Msg
      -- ^ That queue ties the user, the reader thread and the manager thread.
      --   The user and the reader push new messages onto the queue while the
      --   manager dequeue and handles one message at the time.
    , _pkgQueue :: CycleQueue Package
      -- ^ That queue ties the writer thread with the manager thread. The writer
      --   dequeue packages from that queue and sends those to the server. While
      --   the manager pushes new packages on every new submitted operation.
    , _jobQueue :: CycleQueue Job
      -- ^ That queue ties the runner thread with the manager thread. The runner
      --   dequeues IO action from it while the manager pushes new command
      --   finalizers as those arrived.
    , _state :: TVar State
      -- ^ Holds manager thread state.
    , _nextSubmit :: TVar (Msg -> IO ())
      -- ^ Indicates the action to call in order to push new commands.
    , _connRef :: IORef Connection
      -- ^ Connection to the server.
    , _disposed :: TMVar ()
      -- ^ Indicates when the production execution model has been shutdown and
      --   disposed any ongoing operations.
    }

--------------------------------------------------------------------------------
data Msg
    = Stopped Worker SomeException
    | Arrived Package
    | Shutdown
    | forall a.
      NewOperation (Either OperationError a -> IO ()) (Operation a)
    | ConnectStream (SubConnectEvent -> IO ()) Text Bool
    | ConnectPersist (SubConnectEvent -> IO ()) Text Text Int32
    | Unsubscribe Running
    | CreatePersist (Either PersistActionException ConfirmedAction -> IO ())
          Text Text PersistentSubscriptionSettings
    | UpdatePersist (Either PersistActionException ConfirmedAction -> IO ())
          Text Text PersistentSubscriptionSettings
    | DeletePersist (Either PersistActionException ConfirmedAction -> IO ())
          Text Text
    | AckPersist Running [UUID]
    | NakPersist Running NakAction (Maybe Text) [UUID]

--------------------------------------------------------------------------------
pushCmd :: Production -> Msg -> IO ()
pushCmd (Prod _sender _) msg = do
    push <- readTVarIO _sender
    push msg

--------------------------------------------------------------------------------
-- | Asks to shutdown the connection to the server asynchronously.
shutdownExecutionModel :: Production -> IO ()
shutdownExecutionModel prod = pushCmd prod Shutdown

--------------------------------------------------------------------------------
-- | Pushes a new 'Operation' asynchronously.
pushOperation :: Production
             -> (Either OperationError a -> IO ())
             -> Operation a
             -> IO ()
pushOperation prod k op = pushCmd prod (NewOperation k op)

--------------------------------------------------------------------------------
-- | Subscribes to a regular stream.
pushConnectStream :: Production
                  -> (SubConnectEvent -> IO ())
                  -> Text
                  -> Bool
                  -> IO ()
pushConnectStream prod k n tos = pushCmd prod (ConnectStream k n tos)

--------------------------------------------------------------------------------
-- | Subscribes to a persistent subscription.
pushConnectPersist :: Production
                   -> (SubConnectEvent -> IO ())
                   -> Text
                   -> Text
                   -> Int32
                   -> IO ()
pushConnectPersist prod k g n buf = pushCmd prod (ConnectPersist k g n buf)

--------------------------------------------------------------------------------
-- | Creates a persistent subscription.
pushCreatePersist :: Production
                  -> (Either PersistActionException ConfirmedAction -> IO ())
                  -> Text
                  -> Text
                  -> PersistentSubscriptionSettings
                  -> IO ()
pushCreatePersist prod k g n setts = pushCmd prod (CreatePersist k g n setts)

--------------------------------------------------------------------------------
-- | Updates a persistent subscription.
pushUpdatePersist :: Production
                  -> (Either PersistActionException ConfirmedAction -> IO ())
                  -> Text
                  -> Text
                  -> PersistentSubscriptionSettings
                  -> IO ()
pushUpdatePersist prod k g n setts = pushCmd prod (UpdatePersist k g n setts)

--------------------------------------------------------------------------------
-- | Deletes a persistent subscription.
pushDeletePersist :: Production
                  -> (Either PersistActionException ConfirmedAction -> IO ())
                  -> Text
                  -> Text
                  -> IO ()
pushDeletePersist prod k g n = pushCmd prod (DeletePersist k g n)

--------------------------------------------------------------------------------
-- | Acknowledges a set of events has been successfully handled.
pushAckPersist :: Production -> Running -> [UUID] -> IO ()
pushAckPersist prod run evts = pushCmd prod (AckPersist run evts)

--------------------------------------------------------------------------------
-- | Acknowledges a set of events hasn't been handled successfully.
pushNakPersist :: Production
               -> Running
               -> NakAction
               -> Maybe Text
               -> [UUID]
               -> IO ()
pushNakPersist prod run act res evts =
    pushCmd prod (NakPersist run act res evts)

--------------------------------------------------------------------------------
-- | Unsubscribe from a subscription.
pushUnsubscribe :: Production -> Running -> IO ()
pushUnsubscribe prod r = pushCmd prod (Unsubscribe r)

--------------------------------------------------------------------------------
-- | Waits the execution model to close properly.
prodWaitTillClosed :: Production -> IO ()
prodWaitTillClosed (Prod _ disposed) = atomically disposed

--------------------------------------------------------------------------------
newtype Job = Job (IO ())

--------------------------------------------------------------------------------
-- Internal Production state.
--------------------------------------------------------------------------------
data State =
    State
    { _proc   :: !(Processor (IO ()))
    , _reader :: !(Maybe ThreadId)
    , _runner :: !(Maybe ThreadId)
    , _writer :: !(Maybe ThreadId)
    }

--------------------------------------------------------------------------------
emptyState :: Settings -> Generator -> State
emptyState setts gen = State (newProcessor setts gen) Nothing Nothing Nothing

--------------------------------------------------------------------------------
updateProc :: Processor (IO ()) -> State -> State
updateProc p s = s { _proc = p }

--------------------------------------------------------------------------------
-- | Reader thread. Keeps reading 'Package' from the connection.
--------------------------------------------------------------------------------
reader :: Settings -> CycleQueue Msg -> Connection -> IO ()
reader sett queue c = forever $ do
    header_bs <- connRecv c 4
    case runGet getLengthPrefix header_bs of
        Left _              -> throwIO WrongPackageFraming
        Right length_prefix -> connRecv c length_prefix >>= parsePackage
  where
    parsePackage bs =
        case runGet getPackage bs of
            Left e    -> throwIO $ PackageParsingError e
            Right pkg -> do
                atomically $ writeCycleQueue queue (Arrived pkg)
                let cmd  = packageCmd pkg
                    uuid = packageCorrelation pkg
                _settingsLog sett $ Info $ PackageReceived cmd uuid

--------------------------------------------------------------------------------
-- | Writer thread, writes incoming 'Package's
--------------------------------------------------------------------------------
writer :: Settings -> CycleQueue Package -> Connection -> IO ()
writer setts pkg_queue conn = forever $ do
    pkg <- atomically $ readCycleQueue pkg_queue
    connSend conn $ runPut $ putPackage pkg
    let cmd  = packageCmd pkg
        uuid = packageCorrelation pkg
    _settingsLog setts $ Info $ PackageSent cmd uuid

--------------------------------------------------------------------------------
getLengthPrefix :: Get Int
getLengthPrefix = fmap fromIntegral getWord32le

--------------------------------------------------------------------------------
getPackage :: Get Package
getPackage = do
    cmd  <- getWord8
    flg  <- getFlag
    col  <- getUUID
    cred <- getCredentials flg
    rest <- remaining
    dta  <- getBytes rest

    let pkg = Package
              { packageCmd         = cmd
              , packageCorrelation = col
              , packageData        = dta
              , packageCred        = cred
              }

    return pkg

--------------------------------------------------------------------------------
getFlag :: Get Flag
getFlag = do
    wd <- getWord8
    case wd of
        0x00 -> return None
        0x01 -> return Authenticated
        _    -> fail $ printf "TCP: Unhandled flag value 0x%x" wd

--------------------------------------------------------------------------------
getCredEntryLength :: Get Int
getCredEntryLength = fmap fromIntegral getWord8

--------------------------------------------------------------------------------
getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials None = return Nothing
getCredentials _ = do
    loginLen <- getCredEntryLength
    login    <- getBytes loginLen
    passwLen <- getCredEntryLength
    passw    <- getBytes passwLen
    return $ Just $ credentials login passw

--------------------------------------------------------------------------------
getUUID :: Get UUID
getUUID = do
    bs <- getLazyByteString 16
    case fromByteString bs of
        Just uuid -> return uuid
        _         -> fail "TCP: Wrong UUID format"

--------------------------------------------------------------------------------
-- Runner thread. Keeps running job comming from the Manager thread.
--------------------------------------------------------------------------------
runner :: CycleQueue Job -> IO ()
runner job_queue = forever $ do
    Job j <- atomically $ readCycleQueue job_queue
    j

--------------------------------------------------------------------------------
-- | Spawns a new thread worker.
spawn :: Env -> (ThreadId -> Worker) -> IO Worker
spawn Env{..} mk = do
    conn <- readIORef _connRef
    tid  <- mfix $ \tid ->
        let worker = mk tid
            action =
                case worker of
                    Reader _ -> reader _setts _queue conn
                    Runner _ -> runner _jobQueue
                    Writer _ -> writer _setts _pkgQueue conn in
        forkFinally action $ \r ->
            case r of
                Left e ->
                    case asyncExceptionFromException e of
                        Just ThreadKilled -> return ()
                        _ ->  atomically $ writeCycleQueue _queue
                                         $ Stopped worker e
                _      -> return ()
    return $ mk tid

--------------------------------------------------------------------------------
-- | Loops over a 'Processor''s 'Transition' state machine, returning an updated
--   'Processor' model at the end.
runTransition :: Env -> Transition (IO ()) -> STM (Processor (IO ()))
runTransition Env{..} = go
  where
    go (Produce j nxt) = do
        let job = Job j
        writeCycleQueue _jobQueue job
        go nxt
    go (Transmit pkg nxt) = do
        writeCycleQueue _pkgQueue pkg
        go nxt
    go (Await new_proc) = return new_proc

--------------------------------------------------------------------------------
-- | First execution mode. It spawns initial reader, runner and writer threads.
--   Then it switches to 'cruising' mode.
bootstrap :: Env -> IO ()
bootstrap env@Env{..} = do
    rew <- spawn env Reader
    ruw <- spawn env Runner
    wrw <- spawn env Writer
    let _F = wkUpdState rew .
             wkUpdState ruw .
             wkUpdState wrw
    atomically $ modifyTVar' _state _F
    cruising env

--------------------------------------------------------------------------------
-- | Crusing execution mode. Reads and handle message coming from the channel as
--   those are arrived. That mode is used when the connection to the server is
--   still live. We might have deconnection once in a while but at the end, if
--   we managed to reconnect to it, we consider everything is fine.
cruising :: Env -> IO ()
cruising env@Env{..} = do
    msg <- atomically $ readCycleQueue _queue
    s   <- readTVarIO _state
    case msg of
        Stopped _ e -> throwIO e
        Arrived pkg -> do
            let sm = submitPackage pkg $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        Shutdown -> throwIO ClosedConnection
        NewOperation k op -> do
            let sm = newOperation k op $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        ConnectStream k n tos -> do
            let sm = connectRegularStream k n tos $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        ConnectPersist k g n b -> do
            let sm = connectPersistent k g n b $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        Unsubscribe r -> do
            let sm = unsubscribe r $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        CreatePersist k g n psetts -> do
            let sm = createPersistent k g n psetts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        UpdatePersist k g n psetts -> do
            let sm = updatePersistent k g n psetts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        DeletePersist k g n -> do
            let sm = deletePersistent k g n $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        AckPersist run evts -> do
            let sm = ackPersist (return ()) run evts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        NakPersist run act res evts -> do
            let sm = nakPersist (return ()) run act res evts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env

--------------------------------------------------------------------------------
-- | That mode is triggered either because the user asks to shutdown the
--   connection or because the connection to server has been dropped and we
--   can't reconnect.
closing :: Env -> IO ()
closing env@Env{..} = do
    State _ retid rutid  wutid <- readTVarIO _state
    -- We kill reader and writer threads to avoid in fly package in the cleaning
    -- phase.
    traverse_ killThread retid
    traverse_ killThread wutid

    -- Discards every 'Package' that was about to be sent.
    atomically $ emptyCycleQueue _pkgQueue

    -- Takes care of 'Package's that have already arrived. Just in case those
    -- are completing or moving forward ongoing operations. Every ongoing
    -- request is kept for later reconnection. Some transient operations like
    -- Ack, Nak or Unsubscribe are just discard.
    atomically $ updateCycleQueue _queue $ \nxt ->
        case nxt of
            Arrived pkg -> do
                s <- readTVar _state
                let sm = submitPackage pkg $ _proc s
                nxt_proc <- runTransition env sm
                modifyTVar' _state $ updateProc nxt_proc
                return Nothing
            Shutdown -> return Nothing
            AckPersist _ _ -> return Nothing
            NakPersist _ _ _ _ -> return Nothing
            Unsubscribe _ -> return Nothing
            Stopped _ _ -> return Nothing
            _ -> return $ Just nxt

    -- If the connection is already closed, it will throw an exception. We just
    -- make sure it doesn't interfere with the cleaning process.
    conn <- readIORef _connRef
    _    <- try $ connClose conn :: (IO (Either ConnectionException ()))
    atomically $ do
        s <- readTVar _state
        _ <- runTransition env $ abort $ _proc s
        return ()

    -- Waits the runner thread to deal with its jobs list.
    atomically $ do
        end <- isEmptyCycleQueue _jobQueue
        unless end retry

    traverse_ killThread rutid

--------------------------------------------------------------------------------
raiseException :: Exception e => e -> Msg -> IO ()
raiseException e _ = throwIO e

--------------------------------------------------------------------------------
-- | Main Production execution model entry point.
newExecutionModel :: Settings -> HostName -> Int -> IO Production
newExecutionModel setts host port = do
    gen       <- newGenerator
    queue     <- newCycleQueue
    pkg_queue <- newCycleQueue
    job_queue <- newCycleQueue
    conn      <- newConnection setts host port
    conn_ref  <- newIORef conn
    var       <- newTVarIO $ emptyState setts gen
    nxt_sub   <- newTVarIO (atomically . writeCycleQueue queue)
    disposed  <- newEmptyTMVarIO
    let env = Env setts queue pkg_queue job_queue var nxt_sub conn_ref disposed
        handler res = do
            closing env
            case res of
                Left e -> do
                    _settingsLog setts (Error $ UnexpectedException e)
                    case fromException e of
                        Just (_ :: ConnectionException) -> atomically $ do
                            writeTVar nxt_sub (raiseException e)
                            putTMVar disposed ()
                        _ -> do new_conn <- newConnection setts host port
                                writeIORef conn_ref new_conn
                                _ <- forkFinally (bootstrap env) handler
                                return ()

                _ -> atomically $ putTMVar disposed ()
    _ <- forkFinally (bootstrap env) handler
    return $ Prod nxt_sub $ do
        closed <- connIsClosed conn
        unless closed retry
        readTMVar disposed