{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell   #-}
{-# LANGUAGE TypeFamilies      #-}
{-# LANGUAGE QuasiQuotes       #-}

-- | SQLLite co-ordinator for Funflow.
--
--   This co-ordinator effectively uses the shared filesystem as a tool for task
--   distribution and sequencing. This means that it can control a distributed
--   funflow task without needing any additional processes running.
module Control.Funflow.External.Coordinator.SQLite
  ( SQLite (..)
  ) where

import           Control.Concurrent                   (threadDelay)
import           Control.Exception.Safe
import           Control.Funflow.ContentHashable
import           Control.Funflow.External
import           Control.Funflow.External.Coordinator
import           Control.Funflow.Lock
import           Control.Lens
import           Control.Monad.IO.Class
import qualified Data.Aeson                           as Json
import qualified Data.ByteString.Char8                as C8
import qualified Data.Text                            as T
import           Data.Typeable                        (Typeable)
import qualified Database.SQLite.Simple               as SQL
import qualified Database.SQLite.Simple.FromField     as SQL
import qualified Database.SQLite.Simple.Ok            as SQL
import qualified Database.SQLite.Simple.ToField       as SQL
import           Path
import           Path.IO
import           System.Clock

-- | SQLite coordinator tag.
data SQLite = SQLite

-- | SQLite coordinator hook.
data SQLiteHook = SQLiteHook
  { _sqlConn :: SQL.Connection
  , _sqlLock :: Lock
  }
makeLenses ''SQLiteHook

-- | Take the lock and run the given action on the SQLite connection.
withSQLite :: SQLiteHook -> (SQL.Connection -> IO a) -> IO a
withSQLite hook action = withLock (hook^.sqlLock) $ action (hook^.sqlConn)

-- | Enumeration of possible 'TaskStatus' cases for SQLite status column.
data SqlTaskStatus
  = SqlPending
  | SqlRunning
  | SqlCompleted
  | SqlFailed
  deriving Enum
instance SQL.FromField SqlTaskStatus where
  fromField field = do
    n <- SQL.fromField field
    pure $! toEnum n
instance SQL.ToField SqlTaskStatus where
  toField = SQL.toField . fromEnum

-- | Wrapper around 'Executor' for SQLite serialization.
newtype SqlExecutor = SqlExecutor { getSqlExecutor :: Executor }
instance SQL.FromField SqlExecutor where
  fromField field = SqlExecutor . Executor <$> SQL.fromField field
instance SQL.ToField SqlExecutor where
  toField (SqlExecutor (Executor host)) = SQL.toField host

-- | SQLite task info query result.
data SqlTaskInfo = SqlTaskInfo
  { _stiStatus   :: SqlTaskStatus
  , _stiExecutor :: Maybe SqlExecutor
  , _stiElapsed  :: Maybe Integer
  , _stiExitCode :: Maybe Int
  }
makeLenses '' SqlTaskInfo
instance SQL.FromRow SqlTaskInfo where
  fromRow = SqlTaskInfo
    <$> SQL.field
    <*> SQL.field
    <*> SQL.field
    <*> SQL.field

-- | Wrapper around 'ExternalTask' for SQLite serialization.
newtype SqlExternal = SqlExternal { getSqlExternal :: ExternalTask }
instance SQL.FromField SqlExternal where
  fromField field = do
    bs <- SQL.fromField field
    case Json.eitherDecode bs of
      Left err -> SQL.Errors [toException $ DecodingError "task" err]
      Right x  -> pure $! SqlExternal x
instance SQL.ToField SqlExternal where
  toField = SQL.toField . Json.encode . getSqlExternal

-- | Wrapper around 'TaskDescription' for SQLite serialization.
newtype SqlTask = SqlTask TaskDescription
instance SQL.FromRow SqlTask where
  fromRow = do
    output <- SQL.field
    SqlExternal task <- SQL.field
    pure $! SqlTask $! TaskDescription
      { _tdOutput = output
      , _tdTask = task
      }

-- | Errors that can occur when interacting with the SQLite coordinator.
data SQLiteCoordinatorError
    -- | @MissingDBTaskEntry output field@
    --   The task database entry is missing a field.
  = MissingDBTaskEntry ContentHash T.Text
    -- | @DecodingError field error@
    --   Failed to decode the field.
  | DecodingError T.Text String
    -- | @NonRunningTask output@
    --   The task is not running.
  | NonRunningTask ContentHash
    -- | @IllegalStatusUpdate output status@
    --   Cannot update the status of the task.
  | IllegalStatusUpdate ContentHash TaskStatus
  deriving (Show, Typeable)
instance Exception SQLiteCoordinatorError where
  displayException (MissingDBTaskEntry output field) =
    "Missing field in SQLite task entry '"
    ++ T.unpack field
    ++ "' for output "
    ++ C8.unpack (encodeHash output)
  displayException (DecodingError field err) =
    "Failed to decode field '"
    ++ T.unpack field
    ++ "': "
    ++ err
  displayException (NonRunningTask output) =
    "Task was not running when expected: "
    ++ C8.unpack (encodeHash output)
  displayException (IllegalStatusUpdate output status) =
    "Illegal status update for "
    ++ C8.unpack (encodeHash output)
    ++ ": "
    ++ show status

-- | Helper for @NULL@ valued data-base fields.
--
-- Throws 'MissingDBTaskEntry' on 'Nothing', otherwise returns the value.
fromMaybeField :: MonadIO m => ContentHash -> T.Text -> Maybe a -> m a
fromMaybeField output f = \case
  Nothing -> liftIO $ throwIO $ MissingDBTaskEntry output f
  Just x -> pure x

-- | Unlifted version of 'taskInfo'.
taskInfo' :: SQLiteHook -> ContentHash -> IO TaskInfo
taskInfo' hook output = do
  r <- withSQLite hook $ \conn -> SQL.queryNamed conn
    "SELECT status, executor, elapsed, exit_code FROM tasks\
    \ WHERE\
    \  output = :output"
    [ ":output" SQL.:= output ]
  case r of
    [] -> pure UnknownTask
    (ti:_) -> case ti^.stiStatus of
      SqlPending -> pure $! KnownTask Pending
      SqlRunning -> do
        executor <- fromMaybeField output "executor" (ti^.stiExecutor)
        pure $! KnownTask $! Running ExecutionInfo
          { _eiExecutor = getSqlExecutor executor
          , _eiElapsed = fromNanoSecs 0
          }
      SqlCompleted -> do
        executor <- fromMaybeField output "executor" (ti^.stiExecutor)
        elapsed <- fromMaybeField output "elapsed" (ti^.stiElapsed)
        pure $! KnownTask $! Completed ExecutionInfo
          { _eiExecutor = getSqlExecutor executor
          , _eiElapsed = fromNanoSecs elapsed
          }
      SqlFailed -> do
        executor <- fromMaybeField output "executor" (ti^.stiExecutor)
        elapsed <- fromMaybeField output "elapsed" (ti^.stiElapsed)
        exitCode <- fromMaybeField output "exit_code" (ti^.stiExitCode)
        pure $! KnownTask $! Failed
          ExecutionInfo
            { _eiExecutor = getSqlExecutor executor
            , _eiElapsed = fromNanoSecs elapsed
            }
          exitCode

instance Coordinator SQLite where
  type Config SQLite = Path Abs Dir
  type Hook SQLite = SQLiteHook

  initialise dir = liftIO $ do
    createDirIfMissing True dir
    lock <- openLock (dir </> [reldir|lock|])
    withLock lock $ do
      conn <- SQL.open $ fromAbsFile (dir </> [relfile|db.sqlite|])
      SQL.execute_ conn
        "CREATE TABLE IF NOT EXISTS\
        \  tasks\
        \  ( output TEXT PRIMARY KEY\
        \  , status INT NOT NULL\
        \  , executor TEXT\
        \  , elapsed INT\
        \  , exit_code INT\
        \  , task TEXT NOT NULL\
        \  )"
      return SQLiteHook
        { _sqlConn = conn
        , _sqlLock = lock
        }

  submitTask hook td = liftIO $
    withSQLite hook $ \conn -> SQL.executeNamed conn
      "INSERT OR REPLACE INTO\
      \  tasks (output, status, task)\
      \ VALUES\
      \  (:output, :status, :task)"
      [ ":output" SQL.:= (td^.tdOutput)
      , ":status" SQL.:= SqlPending
      , ":task" SQL.:= SqlExternal (td^.tdTask)
      ]

  queueSize hook = liftIO $ do
    [[n]] <- withSQLite hook $ \conn -> SQL.queryNamed conn
      "SELECT COUNT(*) FROM tasks WHERE status = :pending"
      [ ":pending" SQL.:= SqlPending ]
    return n

  taskInfo hook output = liftIO $
    taskInfo' hook output

  popTask hook executor = liftIO $
    withSQLite hook $ \conn -> SQL.withTransaction conn $ do
      r <- SQL.queryNamed conn
        "SELECT output, task FROM tasks\
        \ WHERE\
        \  status = :pending\
        \ LIMIT 1"
        [ ":pending" SQL.:= SqlPending ]
      case r of
        [] -> pure Nothing
        (SqlTask td:_) -> do
          SQL.executeNamed conn
            "UPDATE tasks\
            \ SET\
            \  status = :status,\
            \  executor = :executor\
            \ WHERE\
            \  output = :output"
            [ ":status" SQL.:= SqlRunning
            , ":executor" SQL.:= SqlExecutor executor
            , ":output" SQL.:= td^.tdOutput
            ]
          pure $! Just td

  awaitTask hook output = liftIO $ loop
    where
      -- XXX: SQLite has callback mechanisms built-in (e.g. @sqlite3_commit_hook@).
      --   Unfortunately, @direct-sqlite@, which @sqlite-simple@ builds on top of,
      --   doesn't expose this functionality at the moment.
      loop = taskInfo' hook output >>= \case
        KnownTask Pending -> sleep >> loop
        KnownTask (Running _) -> sleep >> loop
        ti -> pure ti
      sleep = liftIO $ threadDelay oneSeconds
      oneSeconds = 1000000

  updateTaskStatus hook output ts = liftIO $
    withSQLite hook $ \conn -> SQL.withTransaction conn $ do
      r <- SQL.queryNamed conn
        "SELECT status FROM tasks\
        \ WHERE\
        \  output = :output"
        [ ":output" SQL.:= output ]
      case r of
        [SqlRunning]:_ -> case ts of
          Completed ei -> SQL.executeNamed conn
            "UPDATE tasks\
            \ SET\
            \  status = :completed,\
            \  elapsed = :elapsed\
            \ WHERE\
            \  output = :output"
            [ ":completed" SQL.:= SqlCompleted
            , ":elapsed" SQL.:= toNanoSecs (ei^.eiElapsed)
            , ":output" SQL.:= output
            ]
          Failed ei exitCode -> SQL.executeNamed conn
            "UPDATE tasks\
            \ SET\
            \  status = :failed,\
            \  elapsed = :elapsed,\
            \  exit_code = :exit_code\
            \ WHERE\
            \  output = :output"
            [ ":failed" SQL.:= SqlFailed
            , ":elapsed" SQL.:= toNanoSecs (ei^.eiElapsed)
            , ":exit_code" SQL.:= exitCode
            , ":output" SQL.:= output
            ]
          Pending -> SQL.executeNamed conn
            "UPDATE tasks\
            \ SET\
            \  status = :pending\
            \ WHERE\
            \  output = :output"
            [ ":pending" SQL.:= SqlPending
            , ":output" SQL.:= output
            ]
          Running _ -> throwIO $ IllegalStatusUpdate output ts
        _ -> throwIO $ NonRunningTask output

  dropTasks hook = liftIO $
    withSQLite hook $ \conn ->
      SQL.executeNamed conn
        "DELETE FROM tasks\
        \ WHERE\
        \  status = :pending"
        [ ":pending" SQL.:= SqlPending ]