module Database.Alteryx.StreamingYxdb
(
blocksToDecompressedBytes,
blocksToRecords,
sinkRecords,
getMetadata,
recordsToBlocks,
sourceFileBlocks,
sourceFileRecords
)where
import Conduit
import Control.Applicative
import Control.Lens hiding (from, to)
import Control.Monad as M
import Control.Monad.Primitive as M
import Control.Monad.Trans.Resource
import qualified Control.Monad.Trans.State.Lazy as State
import qualified Control.Newtype as NT
import Data.Array.Unboxed as A
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put
import Data.ByteString as BS
import Data.ByteString.Builder as BSB
import Data.ByteString.Builder.Extra as BSB
import Data.ByteString.Lazy as BSL
import Data.Conduit
import Data.Conduit.Binary
import Data.Conduit.Combinators as CC
import Data.Conduit.Serialization.Binary
import Data.Monoid
import Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Vector as V (toList)
import System.IO
import Database.Alteryx.Serialization
import Database.Alteryx.Types
readRange :: (MonadResource m) => FilePath -> Maybe Int -> Maybe Int -> m BS.ByteString
readRange filepath from to = sourceFileRange filepath (fromIntegral <$> from) (fromIntegral <$> to) $$ fold
getMetadata :: FilePath -> IO YxdbMetadata
getMetadata filepath = runResourceT $ do
headerBS <- readRange filepath Nothing (Just headerPageSize)
let header = decode $ BSL.fromStrict headerBS :: Header
recordInfoBS <- readRange filepath (Just headerPageSize) (Just $ numMetadataBytesHeader header)
let recordInfo = decode $ BSL.fromStrict recordInfoBS :: RecordInfo
blockIndexBS <- readRange filepath (Just $ fromIntegral $ header ^. recordBlockIndexPos) Nothing
let blockIndex = decode $ BSL.fromStrict blockIndexBS :: BlockIndex
return YxdbMetadata {
_metadataHeader = header,
_metadataRecordInfo = recordInfo,
_metadataBlockIndex = blockIndex
}
type BlockRange = (Int, Int)
type BlockRanges = [BlockRange]
sourceFileRecords :: (MonadResource m, MonadIO m) => FilePath -> Source m Record
sourceFileRecords filename = do
metadata <- liftIO $ getMetadata filename
let recordInfo = metadata ^. metadataRecordInfo
sourceFileBlocks filename metadata $= blocksToRecords recordInfo
blockRanges :: YxdbMetadata -> BlockRanges
blockRanges metadata =
let blockIndices =
Prelude.map fromIntegral $
A.elems $
NT.unpack $
metadata ^. metadataBlockIndex
blockEnd = fromIntegral $ metadata ^. metadataHeader ^. recordBlockIndexPos
ranges = Prelude.zip blockIndices (Prelude.tail $ blockIndices ++ return blockEnd)
in ranges
sourceBlock :: (MonadResource m) => FilePath -> BlockRange -> Source m Block
sourceBlock filepath (from, to) = do
let numBytes = fromIntegral $ to from
rawBlock <- readRange filepath (Just from) (Just numBytes)
let block = runGet (label ("yieldBlock: " ++ show (from, numBytes)) get) $
BSL.fromStrict rawBlock :: Block
yield block
sourceBlocks :: (MonadResource m) => FilePath -> BlockRanges -> Source m Block
sourceBlocks filepath ranges = forM_ ranges $ sourceBlock filepath
sourceFileBlocks :: (MonadResource m) => FilePath -> YxdbMetadata -> Source m Block
sourceFileBlocks filepath = sourceBlocks filepath . blockRanges
blocksToRecords :: (MonadThrow m) => RecordInfo -> Conduit Block m Record
blocksToRecords recordInfo =
blocksToDecompressedBytes =$=
conduitGet (getRecord recordInfo)
blocksToDecompressedBytes :: (MonadThrow m) => Conduit Block m BS.ByteString
blocksToDecompressedBytes = CC.concatMap (BSL.toChunks . NT.unpack)
type StatefulConduit a m b = Conduit a (State.StateT StreamingCSVStatistics m) b
recordsToBlocks :: (MonadThrow m) => RecordInfo -> StatefulConduit Record m Block
recordsToBlocks recordInfo = do
allRecords <- CC.sinkList
let records = Prelude.take recordsPerBlock allRecords
if Prelude.null records
then return ()
else do
let numRecords = Prelude.length records
lift $ State.modify (& statisticsNumRecords %~ (+numRecords))
let buildOneRecord :: Record -> BSB.Builder
buildOneRecord record =
let recordBSL = runPut $ putRecord recordInfo record
in lazyByteStringThreshold miniblockThreshold recordBSL
yield $ Block $ toLazyByteString $ mconcat $ Prelude.map buildOneRecord records
recordsToBlocks recordInfo
blocksToYxdbBytes :: (MonadThrow m) => RecordInfo -> StatefulConduit Block m BS.ByteString
blocksToYxdbBytes recordInfo = do
let headerBS = BS.replicate headerPageSize 0
yield headerBS
let metadataBS = BSL.toStrict $ runPut (put recordInfo)
lift $ State.modify (& statisticsMetadataLength .~ BS.length metadataBS)
yield metadataBS
let putBlockWithIndex :: (MonadThrow m) => StatefulConduit Block m BS.ByteString
putBlockWithIndex = do
mBlock <- await
case mBlock of
Nothing -> return ()
Just block -> do
let bs = BSL.toStrict $ runPut $ put block
bsLen = BS.length bs
lift $ State.modify (& statisticsBlockLengths %~ (bsLen:))
yield bs
putBlockWithIndex
putBlockWithIndex
computeHeaderFromStatistics :: StreamingCSVStatistics -> IO Header
computeHeaderFromStatistics statistics = do
currentTime <- getCurrentTime
return $ Header {
_description = "Alteryx database file generated by Michael Burge's 'yxdb-utils'",
_fileId = dbFileId WrigleyDb,
_creationDate = currentTime,
_flags1 = 0,
_flags2 = 0,
_metaInfoLength = fromIntegral $ (statistics ^. statisticsMetadataLength) `div` 2,
_mystery = 0,
_spatialIndexPos = 0,
_recordBlockIndexPos = fromIntegral $ computeBlockIndexPositionFromStatistics statistics,
_numRecords = fromIntegral $ statistics ^. statisticsNumRecords,
_compressionVersion = 1,
_reservedSpace = BS.empty
}
computeStartOfBlocksFromStatistics :: StreamingCSVStatistics -> Int
computeStartOfBlocksFromStatistics statistics =
headerPageSize + (statistics ^. statisticsMetadataLength)
computeBlockIndexPositionFromStatistics :: StreamingCSVStatistics -> Integer
computeBlockIndexPositionFromStatistics statistics =
let totalBytes = Prelude.sum $ statistics ^. statisticsBlockLengths
startOfBlocks = computeStartOfBlocksFromStatistics statistics
in fromIntegral $ startOfBlocks + totalBytes
computeBlockIndexFromStatistics :: StreamingCSVStatistics -> BlockIndex
computeBlockIndexFromStatistics statistics =
let startOfBlocks = computeStartOfBlocksFromStatistics statistics
orderedBlockLengths = Prelude.reverse $ statistics ^. statisticsBlockLengths
numBlocks = Prelude.length orderedBlockLengths
in BlockIndex $
listArray (0, numBlocks1) $
Prelude.map (fromIntegral . (startOfBlocks+)) $
Prelude.init $
Prelude.scanl (+) 0 orderedBlockLengths
toBS :: Binary a => a -> BS.ByteString
toBS = BSL.toStrict . runPut . put
sinkYxdbBytes :: (MonadThrow m, MonadIO m) => Handle -> Sink BS.ByteString (State.StateT StreamingCSVStatistics m) ()
sinkYxdbBytes handle = do
CC.sinkHandle handle
statistics <- lift State.get
header <- liftIO $ computeHeaderFromStatistics statistics
let blockIndex = computeBlockIndexFromStatistics statistics
headerBS = toBS header
blockIndexBS = toBS blockIndex
liftIO $ do
hSeek handle AbsoluteSeek 0
BS.hPut handle headerBS
hSeek handle SeekFromEnd 0
BS.hPut handle blockIndexBS
sinkRecords :: (MonadThrow m, MonadIO m) => Handle -> RecordInfo -> Sink Record m ()
sinkRecords handle recordInfo =
evalStateLC defaultStatistics $
recordsToBlocks recordInfo =$=
blocksToYxdbBytes recordInfo =$
sinkYxdbBytes handle