module Database.Perdure.LocalStoreFile (
RawStoreFile(..),
storeFileWriteWords,
storeFileReadWords,
LocalStoreFile,
withFileStoreFile,
withRawDeviceStoreFile,
withRawDeviceStoreFiles,
module Database.Perdure.StoreFile,
narrowBufsLen,
storeFileWrite1,
storeFileRead1
) where
import Prelude ()
import Cgm.Prelude
import Data.Typeable
import qualified Data.ByteString as B
import Control.Concurrent
import Data.Word
import qualified Data.Map as Map
import Cgm.Data.Super
import Cgm.Data.Len
import Cgm.Data.Monoid
import Cgm.Data.NEList
import Cgm.Data.Either
import Cgm.System.Endian
import Cgm.Control.Concurrent.TThread
import Cgm.Control.Concurrent.Await
import Cgm.System.Mem.Alloc
import Database.Perdure.Validator
import System.IO
import System.Posix.Files
import System.Posix.IO
import System.Posix.Types
import Data.Bits
import Control.Monad.Error hiding (sequence_)
import Database.Perdure.StoreFile(SyncableStoreFile(..))
class SyncableStoreFile f => RawStoreFile f where
storeFileWriteRaw :: f -> Len Word8 Word64 -> [ArrayRange (PrimArray Pinned Word8)] -> Async Bool ()
storeFileReadRaw :: f -> Len Word8 Word64 -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> Async Bool ()
storeFileWriteWords :: (Endian w, RawStoreFile f) => f -> Len Word8 Word64 -> Endianness -> [PrimArray Pinned w] -> Async Bool ()
storeFileWriteWords f addr e bufs = storeFileWriteRaw f addr $ fmap fullArrayRange $ fromWords bufs where
fromWords = bool (fmap $ unsafePrimArrayCast . mapImmArray swapBytes) (fmap unsafePrimArrayCast) (e == platformWordEndianness)
storeFileReadWords :: (LgMultiple w Word8, Endian w, RawStoreFile f) =>
f -> Len Word8 Word64 -> Endianness -> Len w Word32 -> Async (Maybe (PrimArray Pinned w)) ()
storeFileReadWords f addr e l k = do
buf <- stToIO $ mkArray $ refineLen $ apply super <$> l
($ k) $ mapAsync (bool (return Nothing) $ Just <$> processBuf buf) $ storeFileReadRaw f addr (fullArrayRange buf) where
processBuf buf = stToIO (toWords <$> unsafeFreezeSTPrimArray buf)
toWords = bool (mapImmArray unswapBytes . unsafePrimArrayCast) unsafePrimArrayCast (e == platformWordEndianness)
newtype LocalStoreFile = LocalStoreFile (MVar DevOp)
type ByteAddr = Len Word8 Word64
data PosOp = PosOp ByteAddr RWOp deriving Show
data NonPosOp = Sync (IO ()) | FullBarrier
instance Show NonPosOp where
show (Sync _) = "Sync"
show FullBarrier = "FullBarrier"
type DevOp = Either PosOp NonPosOp
data RWOp =
WriteOp [ArrayRange (PrimArray Pinned Word8)] (Bool -> IO ()) |
ReadOp (ArrayRange (STPrimArray RealWorld Pinned Word8)) (Bool -> IO ())
instance Show RWOp where
show (WriteOp as _) = "WriteOp " ◊ show (sum $ fmap arrayLen as)
show (ReadOp a _) = "ReadOp " ◊ show (arrayLen a)
forceDevOp :: DevOp -> ()
forceDevOp = either
(\(PosOp b r) -> (b `seq`) $
case r of
WriteOp as _ -> sum (fmap arrayLen as) `seq` ()
ReadOp a _ -> arrayLen a `seq` ())
(\n -> case n of
Sync _ -> ()
FullBarrier -> ())
instance RawStoreFile LocalStoreFile where
storeFileWriteRaw f seek bufs k = queue f $ Left $ PosOp seek $ WriteOp bufs k
storeFileReadRaw f seek buf k = queue f $ Left $ PosOp seek $ ReadOp buf k
instance SyncableStoreFile LocalStoreFile where
storeFileSync f k = queue f $ Right $ Sync k
storeFileFullBarrier f = queue f $ Right FullBarrier
queue :: LocalStoreFile -> DevOp -> IO ()
queue (LocalStoreFile c) op = evaluate (forceDevOp op) >> putMVar c op
narrowBufsLen :: Endian w => [PrimArray Pinned w] -> Len w Word32
narrowBufsLen = (fromMaybe (error "Array length cannot be expressed as a Word32") . unapply super <$>) . sum . fmap arrayLen
storeFileWrite1 :: (RawStoreFile f, Endian w) => f -> Len Word64 Word64 -> Endianness -> [PrimArray Pinned w] -> IO ()
storeFileWrite1 f addr e bufs =
storeFileWriteWords f (refineLen addr) e bufs $ bool (error "storeFileWrite failed") $ return ()
storeFileRead1 :: (RawStoreFile f, Validator v, ValidatedElem v ~ w, Endian w, LgMultiple w Word8) =>
f -> Len Word64 Word64 -> Len w Word32 -> Endianness -> v -> Async (Maybe (ArrayRange (PrimArray Pinned w))) ()
storeFileRead1 f addr size e v k =
($ k) $ mapAsync (return . (>>= validate v . fullArrayRange)) $ storeFileReadWords f (refineLen addr) e size
class Show a => Schedule a where
emptySchedule :: a
insertOp :: PosOp -> a -> a
nextOp :: a -> Maybe (PosOp, a)
minViewWithKeyNE :: Ord k => Map.Map k (NEList v) -> Maybe ((k, v), Map.Map k (NEList v))
minViewWithKeyNE = (getSingle <$>) . Map.minViewWithKey where
getSingle ((k, ne), m) = flip onNEList ne $ \single ne' -> ((k, single), maybe id (Map.insert k) ne' m)
data CLook = CLook ByteAddr (Map.Map ByteAddr (NEList PosOp)) (Map.Map ByteAddr (NEList PosOp)) deriving Show
updateLow :: (Map.Map ByteAddr (NEList PosOp) -> Map.Map ByteAddr (NEList PosOp)) -> CLook -> CLook
updateLow u (CLook c low high) = CLook c (u low) high
updateHigh :: (Map.Map ByteAddr (NEList PosOp) -> Map.Map ByteAddr (NEList PosOp)) -> CLook -> CLook
updateHigh u (CLook c low high) = CLook c low (u high)
instance Schedule CLook where
emptySchedule = CLook 0 Map.empty Map.empty
insertOp op@(PosOp seek _) s@(CLook c _ _) = bool updateLow updateHigh (seek >= c) (Map.insertWith neAppend seek (neSingleton op)) s
nextOp (CLook _ low high) = useHighMin `firstJust` useLowMin where
useLowMin = (\((pos, op), low') -> (op, CLook pos low' high)) <$> minViewWithKeyNE low
useHighMin = (\((pos, op), high') -> (op, CLook pos low high')) <$> minViewWithKeyNE high
class Show f => RawFile f where
fileWriteRaw :: f -> Len Word8 Word64 -> [ArrayRange (PrimArray Pinned Word8)] -> ErrorT String IO ()
fileReadRaw :: f -> Len Word8 Word64 -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> ErrorT String IO ()
fileFlush :: f -> IO ()
instance RawFile Handle where
fileWriteRaw h seek bufs = lift $ hSeekX h seek >> sequence_ (withArrayPtr (hPutBufLen h) <$> bufs)
fileReadRaw h seek buf = ErrorT $ fmap (boolEither "" () . (== arrayLen buf)) $ hSeekX h seek >> withArrayPtr (hGetBufLen h) buf
fileFlush = hFlush
performAll :: (Schedule s, RawFile f) => f -> s -> IO ()
performAll f s = maybe (return ()) (\(op, s') -> perform f op >> performAll f s') $ nextOp s
perform :: RawFile f => f -> PosOp -> IO ()
perform f (PosOp seek rw) = case rw of
WriteOp bufs k -> runErrorT (fileWriteRaw f seek bufs) >>= (log' >>) . either (const $ k False) (const $ k True) where
log' = return ()
ReadOp buf k -> runErrorT (fileReadRaw f seek buf) >>= (log' >>) . either (const $ k False) (const $ k True) where
log' = return ()
process :: (Schedule s, RawFile f) => MVar DevOp -> f -> s -> IO ()
process c f s = maybe
(takeMVar c >>= incoming)
(\(op, s') -> tryTakeMVar c >>= maybe (perform f op >> process c f s') incoming) $
nextOp s where
incoming :: DevOp -> IO ()
incoming = (>>= process c f) . either
(return . flip insertOp s)
(\op -> emptySchedule <$
(performAll f s >> case op of
Sync k -> fileFlush f >> k
FullBarrier -> fileFlush f
)
)
hSeekX :: Handle -> ByteAddr -> IO ()
hSeekX h seek = do
size <- fromIntegral <$> hFileSize h
when (size <= seek) $ do
hSeek h AbsoluteSeek (fromIntegral size)
hWriteZeros h $ seek size
hSeek h AbsoluteSeek (fromIntegral seek)
hWriteZeros :: Handle -> ByteAddr -> IO ()
hWriteZeros h numZeros = when (numZeros > 0) $ do
let batch = min numZeros (16*1024*1024)
B.hPut h (B.replicate (fromIntegral batch) 0)
hWriteZeros h (numZeros batch)
newtype ChildException = ChildException SomeException deriving (Typeable, Show)
instance Exception ChildException
withFileStoreFile :: FilePath -> (LocalStoreFile -> IO a) -> ErrorT String IO a
withFileStoreFile path user = lift $ withBinaryFile path ReadWriteMode $ \h -> hSetBuffering h NoBuffering >> withRawFile h user
withRawDeviceStoreFile :: FilePath -> (LocalStoreFile -> IO a) -> ErrorT String IO a
withRawDeviceStoreFile path user =
ErrorT $ bracket (openFd path ReadWrite Nothing $ defaultFileFlags {exclusive = True, append = True}) closeFd $
\fd -> runErrorT $
do fs <- lift $ getFdStatus fd
bool (throwError "Not a raw device") (lift $ withRawFile (RawDevice fd fs 9) user) $ isCharacterDevice fs
withRawDeviceStoreFiles :: [FilePath] -> ([LocalStoreFile] -> IO a) -> ErrorT String IO a
withRawDeviceStoreFiles ps user = foldr (\p u fs -> (>>= ErrorT . pure) $ withRawDeviceStoreFile p $ \f -> runErrorT $ u $ fs ◊ [f]) (lift . user) ps []
toFileOffset :: Integral n => Len Word8 n -> FileOffset
toFileOffset = fromIntegral . getLen
toByteCount :: Integral n => Len Word8 n -> ByteCount
toByteCount = fromIntegral . getLen
fdSeekLen :: Fd -> ByteAddr -> IO ()
fdSeekLen fd seek = () <$ fdSeek fd AbsoluteSeek (toFileOffset seek)
fdReadArray :: Fd -> ByteAddr -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> ErrorT String IO ()
fdReadArray fd start a = ErrorT $ fmap (boolEither "" () . (==) (toByteCount $ arrayLen a)) $
fdSeekLen fd start >> withArrayPtr (\ptr len -> fdReadBuf fd ptr $ toByteCount len) a
fdWriteArray :: Fd -> ByteAddr -> ArrayRange (STPrimArray RealWorld Pinned Word8) -> ErrorT String IO ()
fdWriteArray fd start a = ErrorT $ fmap (boolEither "" () . (==) (toByteCount $ arrayLen a)) $
fdSeekLen fd start >> withArrayPtr (\ptr len -> fdWriteBuf fd ptr $ toByteCount len) a
data RawDevice = RawDevice Fd FileStatus Int
rawDeviceBlockBytes :: RawDevice -> Len Word8 Word
rawDeviceBlockBytes (RawDevice _ _ lg) = unsafeLen $ 1 `shiftL` lg
instance Show RawDevice where show (RawDevice _ fs _) = show $ specialDeviceID fs
instance RawFile RawDevice where
fileWriteRaw r@(RawDevice fd _ _) start bufs =
let len = up $ sum $ arrayLen <$> bufs in
withBlockArray r start len $ ((. fullArrayRange) .) $ \tStart t ->
do
let bb = rawDeviceBlockBytes r
let tLen = arrayLen t
let tEnd = tStart + up tLen
when (tStart < start) $ fdReadArray fd tStart $ headArrayRange bb t
when (start + len < tEnd) $ fdReadArray fd (tEnd up bb) $ skipArrayRange (tLen bb) t
let dest = skipArrayRange (fromJust $ unapply super $ start tStart) t
_ <- lift $ stToIO $ foldlM (\d b -> skipArrayRange (arrayLen b) d <$ mapMArrayCopyImm return b d) dest bufs
fdWriteArray fd tStart t
fileReadRaw r@(RawDevice fd _ _) start buf =
withBlockArray r start (up $ arrayLen buf) $ ((. fullArrayRange) .) $ \tStart t ->
do
fdReadArray fd tStart t
let rangeToCopy = skipArrayRange (fromJust $ unapply super $ start tStart) t
lift $ stToIO (mapMArrayCopy return rangeToCopy buf)
fileFlush _ = return ()
withBlockArray :: MonadIO m => RawDevice -> ByteAddr -> ByteAddr -> (ByteAddr -> STPrimArray RealWorld Pinned Word8 -> m a) -> m a
withBlockArray r@(RawDevice _ _ lgBlockBytes) seek len f =
let blockBytes = rawDeviceBlockBytes r
seek' = getLen seek
len' = getLen len
start = (seek' `shiftR` lgBlockBytes) `shiftL` lgBlockBytes
end = ((seek' + len' + up (getLen blockBytes) 1) `shiftR` lgBlockBytes) `shiftL` lgBlockBytes
in liftIO (stToIO $ newAlignedPinnedWord8Array blockBytes $ unsafeLen $ fromJust $ unapply super $ end start) >>=
f (unsafeLen start)
withRawFile :: (RawFile f, Show f) => f -> (LocalStoreFile -> IO a) -> IO a
withRawFile f user = do
chan <- newEmptyMVar
runWithDeamon
("User of " ++ show f, user $ LocalStoreFile chan)
("Server for " ++ show f, bracket_ (return ()) (return ()) $ process chan f (emptySchedule :: CLook))