{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module RON.Data (
Reducible (..),
Replicated (..),
ReplicatedAsObject (..),
ReplicatedAsPayload (..),
fromRon,
getObjectStateChunk,
newRon,
objectEncoding,
payloadEncoding,
rconcat,
reduceObject,
reduceStateFrame,
reduceWireFrame,
stateFromWireChunk,
stateToWireChunk,
ObjectStateT,
MonadObjectState,
evalObjectState,
evalObjectState_,
execObjectState,
execObjectState_,
newObjectFrame,
newObjectFrameWith,
runObjectState,
runObjectState_,
) where
import RON.Prelude
import qualified Data.List.NonEmpty as NonEmpty
import Data.Map.Strict ((!?))
import qualified Data.Map.Strict as Map
import RON.Data.Internal
import RON.Data.LWW (LwwRep)
import RON.Data.ORSet (ORSetRep)
import RON.Data.RGA (RgaRep)
import RON.Data.VersionVector (VersionVector)
import RON.Error (MonadE, throwErrorString)
import RON.Types (ClosedOp (..),
ObjectFrame (ObjectFrame, frame, uuid),
ObjectRef (ObjectRef), Op (..), StateChunk (..),
StateFrame, UUID, WireChunk (Closed, Query, Value),
WireFrame, WireReducedChunk (..),
WireStateChunk (WireStateChunk, stateBody, stateType))
import RON.UUID (pattern Zero)
reducers :: Map UUID Reducer
reducers = Map.fromList
[ mkReducer @LwwRep
, mkReducer @RgaRep
, mkReducer @ORSetRep
, mkReducer @VersionVector
]
reduceWireFrame :: WireFrame -> WireFrame
reduceWireFrame chunks = values' ++ queries where
chunkTypeAndObject = opTypeAndObject . \case
Closed op -> op
Value WireReducedChunk{wrcHeader = op} -> op
Query WireReducedChunk{wrcHeader = op} -> op
opTypeAndObject ClosedOp{..} = (reducerId, objectId)
(queries, values) = partition isQuery chunks
values' =
fold $
Map.mapWithKey reduceWireFrameByType $
NonEmpty.fromList <$>
Map.fromListWith (++)
[(chunkTypeAndObject value, [value]) | value <- values]
reduceWireFrameByType :: (UUID, UUID) -> NonEmpty WireChunk -> [WireChunk]
reduceWireFrameByType (typ, obj) = case reducers !? typ of
Nothing ->
toList
Just Reducer{wireReducer} -> wireReducer obj
isQuery :: WireChunk -> Bool
isQuery = \case
Query _ -> True
_ -> False
mkReducer :: forall a . Reducible a => (UUID, Reducer)
mkReducer =
( reducibleOpType @a
, Reducer
{wireReducer = mkWireReducer @a, stateReducer = unsafeReduceState @a}
)
where
unsafeReduceState
:: forall rep
. Reducible rep => WireStateChunk -> WireStateChunk -> WireStateChunk
unsafeReduceState
WireStateChunk{stateBody = c1} WireStateChunk{stateBody = c2} =
wireStateChunk @rep $ reduceState (StateChunk c1) (StateChunk c2)
mkWireReducer :: forall a . Reducible a => WireReducer
mkWireReducer obj chunks = chunks' <> leftovers where
chunks'
= maybeToList stateChunk'
++ map (Value . wrapRChunk) unappliedPatches
++ map (Closed . wrapOp) unappliedOps
mStates = nonEmpty states
(stateChunk', (unappliedPatches, unappliedOps)) = case mStates of
Nothing -> (Nothing, reduceUnappliedPatches @a (patches, closedOps))
Just nStates -> let
nState = sconcat $ fmap snd nStates
(reducedState, unapplied') =
applyPatches nState (patches, closedOps)
reducedStateBody = stateToChunk @a reducedState
rc = ReducedChunk{rcRef = Zero, rcBody = reducedStateBody}
in
(Just $ Value $ wrapRChunk rc, reduceUnappliedPatches @a unapplied')
typ = reducibleOpType @a
wrapOp = ClosedOp typ obj
(states, patches, closedOps, leftovers) = foldMap load chunks
load chunk = fromMaybe ([], [], [], [chunk]) $ load' chunk
load' chunk = case chunk of
Closed closedOp@ClosedOp{op} -> do
guardSameObject closedOp
pure ([], [], [op], [])
Value WireReducedChunk{wrcHeader, wrcBody} -> do
guardSameObject wrcHeader
let ref = refId $ op wrcHeader
case ref of
Zero ->
pure
( [(opId $ op wrcHeader, stateFromChunk wrcBody)]
, []
, []
, []
)
_ ->
pure
( []
, [ReducedChunk{rcRef = ref, rcBody = wrcBody}]
, []
, []
)
_ -> Nothing
guardSameObject ClosedOp{reducerId, objectId} =
guard $ reducerId == typ && objectId == obj
wrapRChunk ReducedChunk{rcRef, rcBody} = WireReducedChunk
{ wrcHeader = wrapOp Op{opId = Zero, refId = rcRef, payload = []}
, wrcBody = rcBody
}
reduceStateFrame :: MonadE m => StateFrame -> StateFrame -> m StateFrame
reduceStateFrame s1 s2 =
(`execStateT` s1) . (`Map.traverseWithKey` s2) $ \oid chunk -> let
WireStateChunk{stateType} = chunk
in
case reducers !? stateType of
Just Reducer{stateReducer} ->
modify' $ Map.insertWith stateReducer oid chunk
Nothing ->
throwErrorString $
"Cannot reduce StateFrame of unknown type " ++ show stateType
unsafeReduceObject
:: MonadE m => ObjectFrame a -> StateFrame -> m (ObjectFrame a)
unsafeReduceObject obj@ObjectFrame{frame = s1} s2 = do
frame' <- reduceStateFrame s1 s2
pure obj{frame = frame'}
reduceObject :: MonadE m => ObjectFrame a -> ObjectFrame a -> m (ObjectFrame a)
reduceObject o1@ObjectFrame{uuid = id1} ObjectFrame{uuid = id2, frame = frame2}
| id1 == id2 = unsafeReduceObject o1 frame2
| otherwise = throwErrorString $ "Object ids differ: " ++ show (id1, id2)
newtype MaxOnFst a b = MaxOnFst (a, b)
instance Ord a => Semigroup (MaxOnFst a b) where
mof1@(MaxOnFst (a1, _)) <> mof2@(MaxOnFst (a2, _))
| a1 < a2 = mof2
| otherwise = mof1
execObjectState
:: Monad m => ObjectFrame b -> ObjectStateT b m a -> m (ObjectFrame b)
execObjectState state@ObjectFrame{uuid, frame} action = do
frame' <- execStateT (runReaderT action $ ObjectRef uuid) frame
pure state{frame = frame'}
execObjectState_ :: Monad m => StateT StateFrame m a -> m StateFrame
execObjectState_ action = execStateT action mempty
runObjectState
:: Functor m
=> ObjectFrame b
-> ObjectStateT b m a
-> m (a, ObjectFrame b)
runObjectState state@ObjectFrame{uuid, frame} action =
runStateT (runReaderT action $ ObjectRef uuid) frame
<&> \(a, frame') -> (a, state{frame = frame'})
runObjectState_ :: StateT StateFrame m a -> m (a, StateFrame)
runObjectState_ action = runStateT action mempty
newObjectFrameWith
:: Functor m => StateT StateFrame m (ObjectRef a) -> m (ObjectFrame a)
newObjectFrameWith action =
runObjectState_ action <&> \(ObjectRef uuid, frame) -> ObjectFrame{uuid, frame}