{-# LANGUAGE DataKinds                 #-}
{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RecordWildCards           #-}
{-# LANGUAGE ScopedTypeVariables       #-}
-- |
-- Module : Database.EventStore.Internal.Execution.Production
-- Copyright : (C) 2015 Yorick Laupa
-- License : (see the file LICENSE)
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
-- Production execution model. It's striving for robustness. The model consists
-- on 4 threads. The Reader thread that reads 'Package' from the connection, the
-- Runner thread which executes finalizers submitted by the user (typically what
-- to do on operation completion or when a event has arrived for a subscription)
-- , the writer thread that sends 'Package' to the server, and the Manager
-- thread that handles requests coming both from the user and the Reader
-- thread. If the Reader or Runner threads die, it will be restarted by the
-- Manager thread if the connection hasn't been closed by user in the meantime.
module Database.EventStore.Internal.Execution.Production
    ( Production
    , ServerConnectionError(..)
    , newExecutionModel
    , pushOperation
    , shutdownExecutionModel
    , pushConnectStream
    , pushConnectPersist
    , pushCreatePersist
    , pushUpdatePersist
    , pushDeletePersist
    , pushAckPersist
    , pushNakPersist
    , pushUnsubscribe
    , prodWaitTillClosed
    ) where

import Prelude hiding (take)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import Data.IORef
import Data.Int
import Data.Foldable
import Data.Typeable
import Text.Printf

import Data.Serialize.Get hiding (Done)
import Data.Serialize.Put
import Data.Text
import Data.UUID

import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription hiding
    ( submitPackage
    , unsubscribe
    , ackPersist
    , nakPersist
    , abort
import Database.EventStore.Internal.Operation hiding (retry)
import Database.EventStore.Internal.Packages
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.Types
import Database.EventStore.Logging

data Worker
    = Reader ThreadId
    | Runner ThreadId
    | Writer ThreadId
    deriving Show

-- | Raised when the server responded in an unexpected way.
data ServerConnectionError
    = WrongPackageFraming
      -- ^ TCP package sent by the server had a wrong framing.
    | PackageParsingError String
      -- ^ Server sent a malformed TCP package.
    deriving (Show, Typeable)

instance Exception ServerConnectionError

-- | Used to determine if we hit the end of the queue.
data Slot a = Slot !a | End

-- | A 'TQueue' that can be cycled.
newtype CycleQueue a = CycleQueue (TQueue (Slot a))

-- | Creates an empty 'CycleQueue'.
newCycleQueue :: IO (CycleQueue a)
newCycleQueue = fmap CycleQueue newTQueueIO

-- | Gets an element from the 'CycleQueue'.
readCycleQueue :: CycleQueue a -> STM a
readCycleQueue (CycleQueue q) = do
    Slot a <- readTQueue q
    return a

-- | Writes an element to the 'CycleQueue'.
writeCycleQueue :: CycleQueue a -> a -> STM ()
writeCycleQueue (CycleQueue q) a = writeTQueue q (Slot a)

-- | Empties a 'CycleQueue'.
emptyCycleQueue :: CycleQueue a -> STM ()
emptyCycleQueue (CycleQueue q) = writeTQueue q End >> go
    go = do
        s <- readTQueue q
        case s of
            End -> return ()
            _   -> go

-- | Updates a 'CycleQueue'.
updateCycleQueue :: CycleQueue a -> (a -> STM (Maybe a)) -> STM ()
updateCycleQueue (CycleQueue q) k = writeTQueue q End >> go
    go = do
        s <- readTQueue q
        case s of
            End    -> return ()
            Slot a -> do
                r <- k a
                case r of
                    Nothing -> go
                    Just a' -> writeTQueue q (Slot a') >> go

-- | Indicates if a 'CycleQueue' is empty.
isEmptyCycleQueue :: CycleQueue a -> STM Bool
isEmptyCycleQueue (CycleQueue q) = isEmptyTQueue q

wkUpdState :: Worker -> State -> State
wkUpdState (Reader tid) s = s { _reader = Just tid }
wkUpdState (Runner tid) s = s { _runner = Just tid }
wkUpdState (Writer tid) s = s { _writer = Just tid }

-- | Holds the execution model state.
data Production =
    { _submit :: TVar (Msg -> IO ())
      -- ^ The action to call when pushing new command.
    , _waitClosed :: STM ()
      -- ^ Action that attests the execution model has been closed successfully.
      --   It doesn't mean the execution model hasn't been shutdown because of
      --   some random exception.

-- | Main execution environment used among different transitions.
data Env =
    { _setts :: Settings
      -- ^ Global settings reference.
    , _queue :: CycleQueue Msg
      -- ^ That queue ties the user, the reader thread and the manager thread.
      --   The user and the reader push new messages onto the queue while the
      --   manager dequeue and handles one message at the time.
    , _pkgQueue :: CycleQueue Package
      -- ^ That queue ties the writer thread with the manager thread. The writer
      --   dequeue packages from that queue and sends those to the server. While
      --   the manager pushes new packages on every new submitted operation.
    , _jobQueue :: CycleQueue Job
      -- ^ That queue ties the runner thread with the manager thread. The runner
      --   dequeues IO action from it while the manager pushes new command
      --   finalizers as those arrived.
    , _state :: TVar State
      -- ^ Holds manager thread state.
    , _nextSubmit :: TVar (Msg -> IO ())
      -- ^ Indicates the action to call in order to push new commands.
    , _connRef :: IORef Connection
      -- ^ Connection to the server.
    , _disposed :: TMVar ()
      -- ^ Indicates when the production execution model has been shutdown and
      --   disposed any ongoing operations.

data Msg
    = Stopped Worker SomeException
    | Arrived Package
    | Shutdown
    | forall a.
      NewOperation (Either OperationError a -> IO ()) (Operation a)
    | ConnectStream (SubConnectEvent -> IO ()) Text Bool
    | ConnectPersist (SubConnectEvent -> IO ()) Text Text Int32
    | Unsubscribe Running
    | CreatePersist (Either PersistActionException ConfirmedAction -> IO ())
          Text Text PersistentSubscriptionSettings
    | UpdatePersist (Either PersistActionException ConfirmedAction -> IO ())
          Text Text PersistentSubscriptionSettings
    | DeletePersist (Either PersistActionException ConfirmedAction -> IO ())
          Text Text
    | AckPersist Running [UUID]
    | NakPersist Running NakAction (Maybe Text) [UUID]

pushCmd :: Production -> Msg -> IO ()
pushCmd (Prod _sender _) msg = do
    push <- readTVarIO _sender
    push msg

-- | Asks to shutdown the connection to the server asynchronously.
shutdownExecutionModel :: Production -> IO ()
shutdownExecutionModel prod = pushCmd prod Shutdown

-- | Pushes a new 'Operation' asynchronously.
pushOperation :: Production
             -> (Either OperationError a -> IO ())
             -> Operation a
             -> IO ()
pushOperation prod k op = pushCmd prod (NewOperation k op)

-- | Subscribes to a regular stream.
pushConnectStream :: Production
                  -> (SubConnectEvent -> IO ())
                  -> Text
                  -> Bool
                  -> IO ()
pushConnectStream prod k n tos = pushCmd prod (ConnectStream k n tos)

-- | Subscribes to a persistent subscription.
pushConnectPersist :: Production
                   -> (SubConnectEvent -> IO ())
                   -> Text
                   -> Text
                   -> Int32
                   -> IO ()
pushConnectPersist prod k g n buf = pushCmd prod (ConnectPersist k g n buf)

-- | Creates a persistent subscription.
pushCreatePersist :: Production
                  -> (Either PersistActionException ConfirmedAction -> IO ())
                  -> Text
                  -> Text
                  -> PersistentSubscriptionSettings
                  -> IO ()
pushCreatePersist prod k g n setts = pushCmd prod (CreatePersist k g n setts)

-- | Updates a persistent subscription.
pushUpdatePersist :: Production
                  -> (Either PersistActionException ConfirmedAction -> IO ())
                  -> Text
                  -> Text
                  -> PersistentSubscriptionSettings
                  -> IO ()
pushUpdatePersist prod k g n setts = pushCmd prod (UpdatePersist k g n setts)

-- | Deletes a persistent subscription.
pushDeletePersist :: Production
                  -> (Either PersistActionException ConfirmedAction -> IO ())
                  -> Text
                  -> Text
                  -> IO ()
pushDeletePersist prod k g n = pushCmd prod (DeletePersist k g n)

-- | Acknowledges a set of events has been successfully handled.
pushAckPersist :: Production -> Running -> [UUID] -> IO ()
pushAckPersist prod run evts = pushCmd prod (AckPersist run evts)

-- | Acknowledges a set of events hasn't been handled successfully.
pushNakPersist :: Production
               -> Running
               -> NakAction
               -> Maybe Text
               -> [UUID]
               -> IO ()
pushNakPersist prod run act res evts =
    pushCmd prod (NakPersist run act res evts)

-- | Unsubscribe from a subscription.
pushUnsubscribe :: Production -> Running -> IO ()
pushUnsubscribe prod r = pushCmd prod (Unsubscribe r)

-- | Waits the execution model to close properly.
prodWaitTillClosed :: Production -> IO ()
prodWaitTillClosed (Prod _ disposed) = atomically disposed

newtype Job = Job (IO ())

-- Internal Production state.
data State =
    { _proc   :: !(Processor (IO ()))
    , _reader :: !(Maybe ThreadId)
    , _runner :: !(Maybe ThreadId)
    , _writer :: !(Maybe ThreadId)

emptyState :: Settings -> Generator -> State
emptyState setts gen = State (newProcessor setts gen) Nothing Nothing Nothing

updateProc :: Processor (IO ()) -> State -> State
updateProc p s = s { _proc = p }

-- | Reader thread. Keeps reading 'Package' from the connection.
reader :: Settings -> CycleQueue Msg -> Connection -> IO ()
reader sett queue c = forever $ do
    header_bs <- connRecv c 4
    case runGet getLengthPrefix header_bs of
        Left _              -> throwIO WrongPackageFraming
        Right length_prefix -> connRecv c length_prefix >>= parsePackage
    parsePackage bs =
        case runGet getPackage bs of
            Left e    -> throwIO $ PackageParsingError e
            Right pkg -> do
                atomically $ writeCycleQueue queue (Arrived pkg)
                let cmd  = packageCmd pkg
                    uuid = packageCorrelation pkg
                _settingsLog sett $ Info $ PackageReceived cmd uuid

-- | Writer thread, writes incoming 'Package's
writer :: Settings -> CycleQueue Package -> Connection -> IO ()
writer setts pkg_queue conn = forever $ do
    pkg <- atomically $ readCycleQueue pkg_queue
    connSend conn $ runPut $ putPackage pkg
    let cmd  = packageCmd pkg
        uuid = packageCorrelation pkg
    _settingsLog setts $ Info $ PackageSent cmd uuid

getLengthPrefix :: Get Int
getLengthPrefix = fmap fromIntegral getWord32le

getPackage :: Get Package
getPackage = do
    cmd  <- getWord8
    flg  <- getFlag
    col  <- getUUID
    cred <- getCredentials flg
    rest <- remaining
    dta  <- getBytes rest

    let pkg = Package
              { packageCmd         = cmd
              , packageCorrelation = col
              , packageData        = dta
              , packageCred        = cred

    return pkg

getFlag :: Get Flag
getFlag = do
    wd <- getWord8
    case wd of
        0x00 -> return None
        0x01 -> return Authenticated
        _    -> fail $ printf "TCP: Unhandled flag value 0x%x" wd

getCredEntryLength :: Get Int
getCredEntryLength = fmap fromIntegral getWord8

getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials None = return Nothing
getCredentials _ = do
    loginLen <- getCredEntryLength
    login    <- getBytes loginLen
    passwLen <- getCredEntryLength
    passw    <- getBytes passwLen
    return $ Just $ credentials login passw

getUUID :: Get UUID
getUUID = do
    bs <- getLazyByteString 16
    case fromByteString bs of
        Just uuid -> return uuid
        _         -> fail "TCP: Wrong UUID format"

-- Runner thread. Keeps running job comming from the Manager thread.
runner :: CycleQueue Job -> IO ()
runner job_queue = forever $ do
    Job j <- atomically $ readCycleQueue job_queue

-- | Spawns a new thread worker.
spawn :: Env -> (ThreadId -> Worker) -> IO Worker
spawn Env{..} mk = do
    conn <- readIORef _connRef
    tid  <- mfix $ \tid ->
        let worker = mk tid
            action =
                case worker of
                    Reader _ -> reader _setts _queue conn
                    Runner _ -> runner _jobQueue
                    Writer _ -> writer _setts _pkgQueue conn in
        forkFinally action $ \r ->
            case r of
                Left e ->
                    case asyncExceptionFromException e of
                        Just ThreadKilled -> return ()
                        _ ->  atomically $ writeCycleQueue _queue
                                         $ Stopped worker e
                _      -> return ()
    return $ mk tid

-- | Loops over a 'Processor''s 'Transition' state machine, returning an updated
--   'Processor' model at the end.
runTransition :: Env -> Transition (IO ()) -> STM (Processor (IO ()))
runTransition Env{..} = go
    go (Produce j nxt) = do
        let job = Job j
        writeCycleQueue _jobQueue job
        go nxt
    go (Transmit pkg nxt) = do
        writeCycleQueue _pkgQueue pkg
        go nxt
    go (Await new_proc) = return new_proc

-- | First execution mode. It spawns initial reader, runner and writer threads.
--   Then it switches to 'cruising' mode.
bootstrap :: Env -> IO ()
bootstrap env@Env{..} = do
    rew <- spawn env Reader
    ruw <- spawn env Runner
    wrw <- spawn env Writer
    let _F = wkUpdState rew .
             wkUpdState ruw .
             wkUpdState wrw
    atomically $ modifyTVar' _state _F
    cruising env

-- | Crusing execution mode. Reads and handle message coming from the channel as
--   those are arrived. That mode is used when the connection to the server is
--   still live. We might have deconnection once in a while but at the end, if
--   we managed to reconnect to it, we consider everything is fine.
cruising :: Env -> IO ()
cruising env@Env{..} = do
    msg <- atomically $ readCycleQueue _queue
    s   <- readTVarIO _state
    case msg of
        Stopped _ e -> throwIO e
        Arrived pkg -> do
            let sm = submitPackage pkg $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        Shutdown -> throwIO ClosedConnection
        NewOperation k op -> do
            let sm = newOperation k op $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        ConnectStream k n tos -> do
            let sm = connectRegularStream k n tos $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        ConnectPersist k g n b -> do
            let sm = connectPersistent k g n b $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        Unsubscribe r -> do
            let sm = unsubscribe r $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        CreatePersist k g n psetts -> do
            let sm = createPersistent k g n psetts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        UpdatePersist k g n psetts -> do
            let sm = updatePersistent k g n psetts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        DeletePersist k g n -> do
            let sm = deletePersistent k g n $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        AckPersist run evts -> do
            let sm = ackPersist (return ()) run evts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env
        NakPersist run act res evts -> do
            let sm = nakPersist (return ()) run act res evts $ _proc s
            atomically $ do
                new_proc <- runTransition env sm
                modifyTVar' _state $ updateProc new_proc
            cruising env

-- | That mode is triggered either because the user asks to shutdown the
--   connection or because the connection to server has been dropped and we
--   can't reconnect.
closing :: Env -> IO ()
closing env@Env{..} = do
    State _ retid rutid  wutid <- readTVarIO _state
    -- We kill reader and writer threads to avoid in fly package in the cleaning
    -- phase.
    traverse_ killThread retid
    traverse_ killThread wutid

    -- Discards every 'Package' that was about to be sent.
    atomically $ emptyCycleQueue _pkgQueue

    -- Takes care of 'Package's that have already arrived. Just in case those
    -- are completing or moving forward ongoing operations. Every ongoing
    -- request is kept for later reconnection. Some transient operations like
    -- Ack, Nak or Unsubscribe are just discard.
    atomically $ updateCycleQueue _queue $ \nxt ->
        case nxt of
            Arrived pkg -> do
                s <- readTVar _state
                let sm = submitPackage pkg $ _proc s
                nxt_proc <- runTransition env sm
                modifyTVar' _state $ updateProc nxt_proc
                return Nothing
            Shutdown -> return Nothing
            AckPersist _ _ -> return Nothing
            NakPersist _ _ _ _ -> return Nothing
            Unsubscribe _ -> return Nothing
            Stopped _ _ -> return Nothing
            _ -> return $ Just nxt

    -- If the connection is already closed, it will throw an exception. We just
    -- make sure it doesn't interfere with the cleaning process.
    conn <- readIORef _connRef
    _    <- try $ connClose conn :: (IO (Either ConnectionException ()))
    atomically $ do
        s <- readTVar _state
        _ <- runTransition env $ abort $ _proc s
        return ()

    -- Waits the runner thread to deal with its jobs list.
    atomically $ do
        end <- isEmptyCycleQueue _jobQueue
        unless end retry

    traverse_ killThread rutid

raiseException :: Exception e => e -> Msg -> IO ()
raiseException e _ = throwIO e

-- | Main Production execution model entry point.
newExecutionModel :: Settings -> HostName -> Int -> IO Production
newExecutionModel setts host port = do
    gen       <- newGenerator
    queue     <- newCycleQueue
    pkg_queue <- newCycleQueue
    job_queue <- newCycleQueue
    conn      <- newConnection setts host port
    conn_ref  <- newIORef conn
    var       <- newTVarIO $ emptyState setts gen
    nxt_sub   <- newTVarIO (atomically . writeCycleQueue queue)
    disposed  <- newEmptyTMVarIO
    let env = Env setts queue pkg_queue job_queue var nxt_sub conn_ref disposed
        handler res = do
            closing env
            case res of
                Left e -> do
                    _settingsLog setts (Error $ UnexpectedException e)
                    case fromException e of
                        Just (_ :: ConnectionException) -> atomically $ do
                            writeTVar nxt_sub (raiseException e)
                            putTMVar disposed ()
                        _ -> do new_conn <- newConnection setts host port
                                writeIORef conn_ref new_conn
                                _ <- forkFinally (bootstrap env) handler
                                return ()

                _ -> atomically $ putTMVar disposed ()
    _ <- forkFinally (bootstrap env) handler
    return $ Prod nxt_sub $ do
        closed <- connIsClosed conn
        unless closed retry
        readTMVar disposed