module Database.Perdure.State(
PState(stateLocation, stateSpace, stateRoot),
--initStateN,
Root(..),
RootValues(..),
rootAllocSize,
initState,
readState,
writeState,
--asyncWriteState,
updateState,
updateStateRead,
--asyncCollectState,
collectState,
collectStateM,
module Database.Perdure.Space,
module Database.Perdure.Persistent,
CachedFile(..),
RootLocation(..),
RootAddress(..),
stateValue
) where
import Prelude()
import Cgm.Prelude
import Control.Concurrent
import qualified Cgm.Control.Monad.State as M
import Control.Monad.State hiding (sequence)
import Control.Monad.Reader hiding (sequence)
import Cgm.Data.Word
import Cgm.Data.Len
import Cgm.Data.List
import Database.Perdure.Persistent
import Database.Perdure.CSerializer
import Database.Perdure.Decrementer
import Database.Perdure.Incrementer
import Database.Perdure.RootValidator
import Database.Perdure.Space
import Database.Perdure.SpaceTree
import Database.Perdure.Data.MapMultiset
import Cgm.System.Endian
import Cgm.Data.Multiset as MS
import Database.Perdure.Ref
import Database.Perdure.Deref
import Database.Perdure.SpaceBook
import Cgm.Data.Typeable
import Cgm.Data.Maybe
import Database.Perdure.WriteBits
import Database.Perdure.AllocCopy
import Cgm.Data.Super
import Database.Perdure.Allocator
import Database.Perdure.Rev
data CachedFile = CachedFile ReplicatedFile (MVar Cache)
data RootLocation = RootLocation CachedFile [RootAddress]
newtype StateId = StateId Word64 deriving (Ord, Eq, Show, Enum, Bounded, Persistent, Num, Real, Integral)
newtype RootAddress = RootAddress {getRootAddress :: Address} deriving (Eq, Show)
rootRef :: LgMultiple Word64 w => RootAddress -> BasicRef w
rootRef (RootAddress a) = BasicRef a $ refineLen rootAllocSize
data PState a = PState {
stateLocation :: RootLocation,
stateSpace :: SpaceTree,
stateRoot :: RootVersions a
}
stateValue :: PState a -> a
stateValue = deref . rootValue . rootScan . toOnlyRev . stateRoot
data Root a = Root {
rootId :: StateId,
rootDecr :: Maybe (RootValues a),
rootScan :: RootValues a
}
type RootVersions a = Root a :> NoRev
instance (Persistent a, Typeable a) => Persistent (Root a) where persister = structureMap persister
data RootValues a = RootValues {
rootCS :: CDRef SpaceBook,
rootValue :: CDRef a
}
instance (Persistent a, Typeable a) => Persistent (RootValues a) where
persister = structureMap persister
initState :: (Persistent a, Typeable a) => RootLocation -> SpaceTree -> a -> IO (PState a)
initState = initStateN minBound
initStateN :: (Persistent a, Typeable a) => StateId -> RootLocation -> SpaceTree -> a -> IO (PState a)
initStateN i l s a =
let sb = SpaceBook MS.emptySet s
s' = bookSpace $ incr persister a sb
in await0 $ writeRootSimple l s' $ Current $ Root i Nothing $ RootValues (ref sb) (ref a)
readState :: (Persistent a, Typeable a) => RootLocation -> IO (Maybe (PState a))
readState l@(RootLocation f rootAddrs) =
fmap (\r -> let rs = rootScan $ toOnlyRev r in PState l (bookSpace $ incr persister rs $ deref $ rootCS rs) r) .
maybeMaximumBy (comparing $ rootId . toOnlyRev) . catMaybes <$> sequence (readRoot f <$> rootAddrs)
rootAllocSize :: Len Word64 Word32
rootAllocSize = coarsenLen (unsafeLen (1024 * 1024) :: Len Word8 Word32)
asyncWriteState :: (Persistent a, Typeable a) => a -> PState a -> IO () -> IO (PState a)
asyncWriteState a rs =
(if rootId (toOnlyRev $ stateRoot rs) `mod` collectFrequency == 0 then \w k -> collectState rs >>= ($ k) . w else ($ rs)) $
\(PState l s rvers) -> case toOnlyRev rvers of (Root i d (RootValues rcs _)) ->
writeRootSimple l s $ Current $ Root (succ i) d $ RootValues rcs $ ref a
collectFrequency :: StateId
collectFrequency = 1000
writeRootSimple :: forall a. (Persistent a, Typeable a) => RootLocation -> SpaceTree -> RootVersions a -> IO () -> IO (PState a)
writeRootSimple l@(RootLocation f _) s r done =
fmap (\(r', _ :: MapMultiset Address, s') -> PState l s' r') $
writeRoot (rootId $ toOnlyRev r) l done $ write f s r
writeState :: (Persistent a, Typeable a) => a -> PState a -> IO (PState a)
writeState a r = await0 $ asyncWriteState a r
updateState :: (Persistent a, Typeable a, MonadIO m) => M.StateT a m b -> M.StateT (PState a) m b
updateState = updateStateRead . M.mapStateT lift
updateStateRead :: (Persistent a, Typeable a, MonadIO m) => M.StateT a (ReaderT (PState a) m) b -> M.StateT (PState a) m b
updateStateRead (M.StateT u) =
M.StateT $ \t -> runReaderT (u (deref $ rootValue $ rootScan $ toOnlyRev $ stateRoot t)) t
>>= \(b, ma') -> liftM (b,) $ maybe (return Nothing) (liftM Just . liftIO . flip writeState t) ma'
asyncCollectState :: (Persistent a, Typeable a) => PState a -> IO () -> IO (PState a)
asyncCollectState (PState l _ rvers) done =
let rcs'@(SpaceBook _ s') = incr persister d' $ deref rcs
(Root i md d'@(RootValues rcs ra)) = toOnlyRev rvers
in writeRootSimple l s' (Current $ Root (succ i) (Just d') $ RootValues (ref $ decr persister md rcs') ra) done
collectState :: (Persistent a, Typeable a) => PState a -> IO (PState a)
collectState r = await0 $ asyncCollectState r
collectStateM :: (Persistent a, Typeable a) => M.StateT (PState a) IO ()
collectStateM = get >>= lift . collectState >>= put
writeRoot :: StateId -> RootLocation -> IO () -> StateT (ABitSeq RealWorld) IO a -> IO a
writeRoot i (RootLocation (CachedFile f _) roots) done writer = do
start <- stToIO mkABitSeq
(result, end) <- runStateT writer start
storeFileFullBarrier f
ar <- allocCopyBits start end
onWordConv (writeRoot' (apply wordConv1 ar :: PrimArray Pinned Word32)) (writeRoot' (apply wordConv1 ar :: PrimArray Pinned Word64))
storeFileSync f done
return result where
writeRoot' :: forall w. (LgMultiple Word64 w, ValidationDigestWord w) => PrimArray Pinned w -> IO ()
writeRoot' a = do
let (RootValidator, bufs) = mkValidationInput a
when (sum (arrayLen <$> bufs) > apply super (refineLen rootAllocSize :: Len w Word32)) $ error "Root too large."
() <$ storeFileWrite f (getRootAddress $ genericIndex roots $ mod i $ fromIntegral $ length roots) platformWordEndianness bufs
write :: (Multiset c, Persistent a', Space ls) =>
CachedFile -> ls -> a' -> StateT (ABitSeq RealWorld) IO (a', c Address, ls)
write (CachedFile f c) ls a' = StateT $ \s -> do
cd <- newMVar emptySet
lv <- newMVar ls
cSer persister (c, StateAllocator f lv, Just cd) (\a'' s' -> readMVar lv >>= \ls' -> readMVar cd >>= \u -> return ((a'', u, ls'), s')) a' s
readRoot :: forall a. (Persistent a, Typeable a) => CachedFile -> RootAddress -> IO (Maybe (RootVersions a))
readRoot (CachedFile f c) rootAddr =
fmap (deserializeFromFullArray $ apply cDeser persister $ DeserializerContext f c) <$>
foldM (\result next -> maybe next (return . Just) result) Nothing readRootDataWE where
readRootData :: forall w. ValidationDigestWord w => Endianness -> Tagged w (IO (Maybe (ArrayRange (PrimArray Free Word))))
readRootData e = tag $ fmap (fmap deserInput) $ await1 $
storeFileRead f (rootRef rootAddr) e (RootValidator :: RootValidator w)
readRootDataW e = onWordConv id reverse [(at :: At Word32) $ readRootData e, (at :: At Word64) $ readRootData e]
readRootDataWE = concat $ transpose $ readRootDataW <$> [platformWordEndianness, reverseEndianness platformWordEndianness]
data StateAllocator a = StateAllocator ReplicatedFile (MVar a)
instance Space a => Allocator (StateAllocator a) where
allocatorStoreFile (StateAllocator f _) = f
alloc (StateAllocator _ var) len = modifyMVar var $ \a -> do
let spn = requireSpan a
let a' = removeSpan spn a
return (a', onSortedPair (\start _ -> unsafeLen start) spn) where
requireSpan = onSortedPair (\start _ -> unsafeSortedPair start (start + getLen len)) .
fromMaybe (error "Out of storage space") . listHead .
findSpan (getLen len)
deriveStructured ''Root
deriveStructured ''RootValues