{-# LANGUAGE OverloadedStrings #-}
module Core
(
  Core (..), -- TODO: Expose only put for clients.
  EnqueueResult (..),
  Command (..),
  ServerState,
  Updated (..),
  enqueueCommand,
  tryEnqueueCommand,
  getCurrentValue,
  withCoreMetrics,
  lookup,
  newCore,
  postQuit,
  runCommandLoop,
  runSyncTimer
)
where

import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue, isFullTBQueue)
import Control.Concurrent.STM.TVar (TVar, newTVarIO)
import Control.Monad (forever, unless)
import Control.Monad.IO.Class
import Data.Aeson (Value (..))
import Data.Foldable (forM_)
import Data.Traversable (for)
import Data.UUID (UUID)
import Prelude hiding (log, writeFile)

import qualified Network.WebSockets as WS

import Config (Config (..), periodicSyncingEnabled)
import Logger (Logger)
import Store (Path, Modification (..))
import Subscription (SubscriptionTree, empty)
import Persistence (PersistentValue, PersistenceConfig (..))

import qualified Store
import qualified Persistence
import qualified Metrics

-- | Defines the kinds of commands that are handled by the event loop of the Core.
data Command
  = Sync
    -- ^ The @Sync@ command causes the core to write the JSON value to disk.
  | Modify Modification (Maybe (MVar ()))
    -- ^ The @Modify@ command applies a modification (writing or deleting) to the JSON value.
    -- The optional MVar is used to signal that the command has been processed by the core.
  | Stop
    -- ^ The @Stop@ command causes the event loop of the Core to exit.
  deriving (Command -> Command -> Bool
(Command -> Command -> Bool)
-> (Command -> Command -> Bool) -> Eq Command
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Command -> Command -> Bool
$c/= :: Command -> Command -> Bool
== :: Command -> Command -> Bool
$c== :: Command -> Command -> Bool
Eq)

-- The main value has been updated at the given path. The payload contains the
-- entire new value. (So not only the inner value at the updated path.)
data Updated = Updated Path Value deriving (Updated -> Updated -> Bool
(Updated -> Updated -> Bool)
-> (Updated -> Updated -> Bool) -> Eq Updated
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Updated -> Updated -> Bool
$c/= :: Updated -> Updated -> Bool
== :: Updated -> Updated -> Bool
$c== :: Updated -> Updated -> Bool
Eq, Int -> Updated -> ShowS
[Updated] -> ShowS
Updated -> String
(Int -> Updated -> ShowS)
-> (Updated -> String) -> ([Updated] -> ShowS) -> Show Updated
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Updated] -> ShowS
$cshowList :: [Updated] -> ShowS
show :: Updated -> String
$cshow :: Updated -> String
showsPrec :: Int -> Updated -> ShowS
$cshowsPrec :: Int -> Updated -> ShowS
Show)

data EnqueueResult = Enqueued | Dropped
  deriving (Int -> EnqueueResult -> ShowS
[EnqueueResult] -> ShowS
EnqueueResult -> String
(Int -> EnqueueResult -> ShowS)
-> (EnqueueResult -> String)
-> ([EnqueueResult] -> ShowS)
-> Show EnqueueResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EnqueueResult] -> ShowS
$cshowList :: [EnqueueResult] -> ShowS
show :: EnqueueResult -> String
$cshow :: EnqueueResult -> String
showsPrec :: Int -> EnqueueResult -> ShowS
$cshowsPrec :: Int -> EnqueueResult -> ShowS
Show, EnqueueResult -> EnqueueResult -> Bool
(EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool) -> Eq EnqueueResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EnqueueResult -> EnqueueResult -> Bool
$c/= :: EnqueueResult -> EnqueueResult -> Bool
== :: EnqueueResult -> EnqueueResult -> Bool
$c== :: EnqueueResult -> EnqueueResult -> Bool
Eq, Eq EnqueueResult
Eq EnqueueResult
-> (EnqueueResult -> EnqueueResult -> Ordering)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> EnqueueResult)
-> (EnqueueResult -> EnqueueResult -> EnqueueResult)
-> Ord EnqueueResult
EnqueueResult -> EnqueueResult -> Bool
EnqueueResult -> EnqueueResult -> Ordering
EnqueueResult -> EnqueueResult -> EnqueueResult
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: EnqueueResult -> EnqueueResult -> EnqueueResult
$cmin :: EnqueueResult -> EnqueueResult -> EnqueueResult
max :: EnqueueResult -> EnqueueResult -> EnqueueResult
$cmax :: EnqueueResult -> EnqueueResult -> EnqueueResult
>= :: EnqueueResult -> EnqueueResult -> Bool
$c>= :: EnqueueResult -> EnqueueResult -> Bool
> :: EnqueueResult -> EnqueueResult -> Bool
$c> :: EnqueueResult -> EnqueueResult -> Bool
<= :: EnqueueResult -> EnqueueResult -> Bool
$c<= :: EnqueueResult -> EnqueueResult -> Bool
< :: EnqueueResult -> EnqueueResult -> Bool
$c< :: EnqueueResult -> EnqueueResult -> Bool
compare :: EnqueueResult -> EnqueueResult -> Ordering
$ccompare :: EnqueueResult -> EnqueueResult -> Ordering
$cp1Ord :: Eq EnqueueResult
Ord, Int -> EnqueueResult
EnqueueResult -> Int
EnqueueResult -> [EnqueueResult]
EnqueueResult -> EnqueueResult
EnqueueResult -> EnqueueResult -> [EnqueueResult]
EnqueueResult -> EnqueueResult -> EnqueueResult -> [EnqueueResult]
(EnqueueResult -> EnqueueResult)
-> (EnqueueResult -> EnqueueResult)
-> (Int -> EnqueueResult)
-> (EnqueueResult -> Int)
-> (EnqueueResult -> [EnqueueResult])
-> (EnqueueResult -> EnqueueResult -> [EnqueueResult])
-> (EnqueueResult -> EnqueueResult -> [EnqueueResult])
-> (EnqueueResult
    -> EnqueueResult -> EnqueueResult -> [EnqueueResult])
-> Enum EnqueueResult
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: EnqueueResult -> EnqueueResult -> EnqueueResult -> [EnqueueResult]
$cenumFromThenTo :: EnqueueResult -> EnqueueResult -> EnqueueResult -> [EnqueueResult]
enumFromTo :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
$cenumFromTo :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
enumFromThen :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
$cenumFromThen :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
enumFrom :: EnqueueResult -> [EnqueueResult]
$cenumFrom :: EnqueueResult -> [EnqueueResult]
fromEnum :: EnqueueResult -> Int
$cfromEnum :: EnqueueResult -> Int
toEnum :: Int -> EnqueueResult
$ctoEnum :: Int -> EnqueueResult
pred :: EnqueueResult -> EnqueueResult
$cpred :: EnqueueResult -> EnqueueResult
succ :: EnqueueResult -> EnqueueResult
$csucc :: EnqueueResult -> EnqueueResult
Enum, EnqueueResult
EnqueueResult -> EnqueueResult -> Bounded EnqueueResult
forall a. a -> a -> Bounded a
maxBound :: EnqueueResult
$cmaxBound :: EnqueueResult
minBound :: EnqueueResult
$cminBound :: EnqueueResult
Bounded)

data Core = Core
  { Core -> PersistentValue
coreCurrentValue :: PersistentValue
  -- the "dirty" flag is set to True whenever the core value has been modified
  -- and is reset to False when it is persisted.
  , Core -> TVar Bool
coreValueIsDirty :: TVar Bool
  , Core -> TBQueue Command
coreQueue :: TBQueue Command
  , Core -> TBQueue (Maybe Updated)
coreUpdates :: TBQueue (Maybe Updated)
  , Core -> MVar ServerState
coreClients :: MVar ServerState
  , Core -> Logger
coreLogger  :: Logger
  , Core -> Config
coreConfig  :: Config
  , Core -> Maybe IcepeakMetrics
coreMetrics :: Maybe Metrics.IcepeakMetrics
  }

type ServerState = SubscriptionTree UUID WS.Connection

newServerState :: ServerState
newServerState :: ServerState
newServerState = ServerState
forall id conn. SubscriptionTree id conn
empty

-- | Try to initialize the core. This loads the database and sets up the internal data structures.
newCore :: Config -> Logger -> Maybe Metrics.IcepeakMetrics -> IO (Either String Core)
newCore :: Config -> Logger -> Maybe IcepeakMetrics -> IO (Either String Core)
newCore Config
config Logger
logger Maybe IcepeakMetrics
metrics = do
  let queueCapacity :: Natural
queueCapacity = Word -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word -> Natural) -> (Config -> Word) -> Config -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Config -> Word
configQueueCapacity (Config -> Natural) -> Config -> Natural
forall a b. (a -> b) -> a -> b
$ Config
config
  -- load the persistent data from disk
  let filePath :: String
filePath = StorageBackend -> Maybe String -> String
Persistence.getDataFile (Config -> StorageBackend
configStorageBackend Config
config) (Config -> Maybe String
configDataFile Config
config)
      journalFile :: Maybe String
journalFile
        | Config -> Bool
configEnableJournaling Config
config
          Bool -> Bool -> Bool
&& Config -> Bool
periodicSyncingEnabled Config
config = String -> Maybe String
forall a. a -> Maybe a
Just (String -> Maybe String) -> String -> Maybe String
forall a b. (a -> b) -> a -> b
$ String
filePath String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
".journal"
        | Bool
otherwise = Maybe String
forall a. Maybe a
Nothing
  Either String PersistentValue
eitherValue <- StorageBackend
-> PersistenceConfig -> IO (Either String PersistentValue)
Persistence.loadFromBackend (Config -> StorageBackend
configStorageBackend Config
config) PersistenceConfig :: String
-> Maybe String
-> Logger
-> Maybe IcepeakMetrics
-> PersistenceConfig
PersistenceConfig
    { pcDataFile :: String
pcDataFile = String
filePath
    , pcJournalFile :: Maybe String
pcJournalFile = Maybe String
journalFile
    , pcLogger :: Logger
pcLogger = Logger
logger
    , pcMetrics :: Maybe IcepeakMetrics
pcMetrics = Maybe IcepeakMetrics
metrics
    }
  Either String PersistentValue
-> (PersistentValue -> IO Core) -> IO (Either String Core)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Either String PersistentValue
eitherValue ((PersistentValue -> IO Core) -> IO (Either String Core))
-> (PersistentValue -> IO Core) -> IO (Either String Core)
forall a b. (a -> b) -> a -> b
$ \PersistentValue
value -> do
    -- create synchronization channels
    TVar Bool
tdirty <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
    TBQueue Command
tqueue <- Natural -> IO (TBQueue Command)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
queueCapacity
    TBQueue (Maybe Updated)
tupdates <- Natural -> IO (TBQueue (Maybe Updated))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
queueCapacity
    MVar ServerState
tclients <- ServerState -> IO (MVar ServerState)
forall a. a -> IO (MVar a)
newMVar ServerState
newServerState
    Core -> IO Core
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PersistentValue
-> TVar Bool
-> TBQueue Command
-> TBQueue (Maybe Updated)
-> MVar ServerState
-> Logger
-> Config
-> Maybe IcepeakMetrics
-> Core
Core PersistentValue
value TVar Bool
tdirty TBQueue Command
tqueue TBQueue (Maybe Updated)
tupdates MVar ServerState
tclients Logger
logger Config
config Maybe IcepeakMetrics
metrics)

-- Tell the put handler loop and the update handler to quit.
postQuit :: Core -> IO ()
postQuit :: Core -> IO ()
postQuit Core
core = do
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TBQueue Command -> Command -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue Command
coreQueue Core
core) Command
Stop
    TBQueue (Maybe Updated) -> Maybe Updated -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue (Maybe Updated)
coreUpdates Core
core) Maybe Updated
forall a. Maybe a
Nothing

-- | Try to enqueue a command. It succeeds if the queue is not full, otherwise,
-- nothing is changed. This should be used for non-critical commands that can
-- also be retried later.
tryEnqueueCommand :: Command -> Core -> IO EnqueueResult
tryEnqueueCommand :: Command -> Core -> IO EnqueueResult
tryEnqueueCommand Command
cmd Core
core = STM EnqueueResult -> IO EnqueueResult
forall a. STM a -> IO a
atomically (STM EnqueueResult -> IO EnqueueResult)
-> STM EnqueueResult -> IO EnqueueResult
forall a b. (a -> b) -> a -> b
$ do
  Bool
isFull <- TBQueue Command -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue (Core -> TBQueue Command
coreQueue Core
core)
  Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isFull (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TBQueue Command -> Command -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue Command
coreQueue Core
core) Command
cmd
  EnqueueResult -> STM EnqueueResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EnqueueResult -> STM EnqueueResult)
-> EnqueueResult -> STM EnqueueResult
forall a b. (a -> b) -> a -> b
$ if Bool
isFull then EnqueueResult
Dropped else EnqueueResult
Enqueued

-- | Enqueue a command. Blocks if the queue is full. This is used by the sync
-- timer to make sure the sync commands are actually enqueued. In general,
-- whenever it is critical that a command is executed eventually (when reaching
-- the front of the queue), this function should be used.
enqueueCommand :: Command -> Core -> IO ()
enqueueCommand :: Command -> Core -> IO ()
enqueueCommand Command
cmd Core
core = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue Command -> Command -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue Command
coreQueue Core
core) Command
cmd

getCurrentValue :: Core -> Path -> IO (Maybe Value)
getCurrentValue :: Core -> Path -> IO (Maybe Value)
getCurrentValue Core
core Path
path =
  (Value -> Maybe Value) -> IO Value -> IO (Maybe Value)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Path -> Value -> Maybe Value
Store.lookup Path
path) (IO Value -> IO (Maybe Value)) -> IO Value -> IO (Maybe Value)
forall a b. (a -> b) -> a -> b
$ STM Value -> IO Value
forall a. STM a -> IO a
atomically (STM Value -> IO Value) -> STM Value -> IO Value
forall a b. (a -> b) -> a -> b
$ PersistentValue -> STM Value
Persistence.getValue (PersistentValue -> STM Value) -> PersistentValue -> STM Value
forall a b. (a -> b) -> a -> b
$ Core -> PersistentValue
coreCurrentValue Core
core

withCoreMetrics :: MonadIO m => Core -> (Metrics.IcepeakMetrics -> IO ()) -> m ()
withCoreMetrics :: Core -> (IcepeakMetrics -> IO ()) -> m ()
withCoreMetrics Core
core IcepeakMetrics -> IO ()
act = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe IcepeakMetrics -> (IcepeakMetrics -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Core -> Maybe IcepeakMetrics
coreMetrics Core
core) IcepeakMetrics -> IO ()
act

-- | Drain the command queue and execute them. Changes are published to all
-- subscribers. This function returns when executing the 'Stop' command from the
-- queue.
runCommandLoop :: Core -> IO ()
runCommandLoop :: Core -> IO ()
runCommandLoop Core
core = IO ()
go
  where
    config :: Config
config = Core -> Config
coreConfig Core
core
    currentValue :: PersistentValue
currentValue = Core -> PersistentValue
coreCurrentValue Core
core
    storageBackend :: StorageBackend
storageBackend = Config -> StorageBackend
configStorageBackend Config
config
    go :: IO ()
go = do
      Command
command <- STM Command -> IO Command
forall a. STM a -> IO a
atomically (STM Command -> IO Command) -> STM Command -> IO Command
forall a b. (a -> b) -> a -> b
$ TBQueue Command -> STM Command
forall a. TBQueue a -> STM a
readTBQueue (Core -> TBQueue Command
coreQueue Core
core)
      case Command
command of
        Modify Modification
op Maybe (MVar ())
maybeNotifyVar -> do
          Modification -> PersistentValue -> IO ()
Persistence.apply Modification
op PersistentValue
currentValue
          Path -> Core -> IO ()
postUpdate (Modification -> Path
Store.modificationPath Modification
op) Core
core
          -- when periodic syncing is disabled, data is persisted after every modification
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Config -> Bool
periodicSyncingEnabled (Config -> Bool) -> Config -> Bool
forall a b. (a -> b) -> a -> b
$ Core -> Config
coreConfig Core
core) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            StorageBackend -> PersistentValue -> IO ()
Persistence.syncToBackend StorageBackend
storageBackend PersistentValue
currentValue
          (MVar () -> IO ()) -> Maybe (MVar ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
`putMVar` ()) Maybe (MVar ())
maybeNotifyVar
          IO ()
go
        Command
Sync -> do
          StorageBackend -> PersistentValue -> IO ()
Persistence.syncToBackend StorageBackend
storageBackend PersistentValue
currentValue
          IO ()
go
        Command
Stop -> StorageBackend -> PersistentValue -> IO ()
Persistence.syncToBackend StorageBackend
storageBackend PersistentValue
currentValue

-- | Post an update to the core's update queue (read by the websocket subscribers)
postUpdate :: Path -> Core -> IO ()
postUpdate :: Path -> Core -> IO ()
postUpdate Path
path Core
core = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Value
value <- PersistentValue -> STM Value
Persistence.getValue (Core -> PersistentValue
coreCurrentValue Core
core)
  TBQueue (Maybe Updated) -> Maybe Updated -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue (Maybe Updated)
coreUpdates Core
core) (Updated -> Maybe Updated
forall a. a -> Maybe a
Just (Updated -> Maybe Updated) -> Updated -> Maybe Updated
forall a b. (a -> b) -> a -> b
$ Path -> Value -> Updated
Updated Path
path Value
value)

-- | Periodically send a 'Sync' command to the 'Core' if enabled in the core
-- configuration.
runSyncTimer :: Core -> IO ()
runSyncTimer :: Core -> IO ()
runSyncTimer Core
core = (Int -> IO Any) -> Maybe Int -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Int -> IO Any
forall b. Int -> IO b
go (Config -> Maybe Int
configSyncIntervalMicroSeconds (Config -> Maybe Int) -> Config -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Core -> Config
coreConfig Core
core)
  where
    go :: Int -> IO b
go Int
interval = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
      Command -> Core -> IO ()
enqueueCommand Command
Sync Core
core
      Int -> IO ()
threadDelay Int
interval