module Database.TxtSushi.ExternalSort (
externalSort,
externalSortBy,
externalSortByConstrained,
defaultByteQuota,
defaultMaxOpenFiles) where
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Int
import qualified Data.ByteString.Lazy as BS
import Data.List
import System.IO
import System.IO.Unsafe
import System.Directory
externalSort :: (Binary b, Ord b) => [b] -> [b]
externalSort = externalSortBy compare
externalSortBy :: (Binary b) => (b -> b -> Ordering) -> [b] -> [b]
externalSortBy = externalSortByConstrained defaultByteQuota defaultMaxOpenFiles
defaultByteQuota :: Int
defaultByteQuota = 16 * 1024 * 1024
defaultMaxOpenFiles :: Int
defaultMaxOpenFiles = 17
externalSortByConstrained :: (Binary b, Integral i) => i -> i -> (b -> b -> Ordering) -> [b] -> [b]
externalSortByConstrained byteQuota maxOpenFiles cmp xs = unsafePerformIO $ do
partialSortFiles <- bufferPartialSortsBy (fromIntegral byteQuota) cmp xs
externalMergeAllBy (fromIntegral maxOpenFiles) cmp partialSortFiles
mergeAllBy :: (a -> a -> Ordering) -> [[a]] -> [a]
mergeAllBy _ [] = []
mergeAllBy _ [singletonList] = singletonList
mergeAllBy cmp (fstList:sndList:[]) = mergeBy cmp fstList sndList
mergeAllBy cmp listList =
mergeAllBy cmp (partitionAndMerge 2 cmp listList)
partitionAndMerge :: Int -> (a -> a -> Ordering) -> [[a]] -> [[a]]
partitionAndMerge _ _ [] = []
partitionAndMerge partitionSize cmp listList =
map (mergeAllBy cmp) (regularPartitions partitionSize listList)
regularPartitions :: Int -> [a] -> [[a]]
regularPartitions _ [] = []
regularPartitions partitionSize xs =
let (currPartition, otherXs) = splitAt partitionSize xs
in currPartition : regularPartitions partitionSize otherXs
mergeBy :: (a -> a -> Ordering) -> [a] -> [a] -> [a]
mergeBy _ [] list2 = list2
mergeBy _ list1 [] = list1
mergeBy comparisonFunction list1@(head1:tail1) list2@(head2:tail2) =
case head1 `comparisonFunction` head2 of
GT -> head2 : mergeBy comparisonFunction list1 tail2
_ -> head1 : mergeBy comparisonFunction tail1 list2
externalMergePass :: Binary b => Int -> (b -> b -> Ordering) -> [String] -> IO [String]
externalMergePass _ _ [] = return []
externalMergePass maxOpenFiles cmp files = do
let (mergeNowFiles, mergeLaterFiles) = splitAt (maxOpenFiles 1) files
mergeNowBinStrs <- readThenDelBinFiles mergeNowFiles
let mergeNowBinaries = map decodeAll mergeNowBinStrs
mergedNowFile <- bufferToTempFile $ mergeAllBy cmp mergeNowBinaries
mergedLaterFiles <- externalMergePass maxOpenFiles cmp mergeLaterFiles
return $ mergedNowFile : mergedLaterFiles
externalMergeAllBy :: Binary b => Int -> (b -> b -> Ordering) -> [String] -> IO [b]
externalMergeAllBy _ _ [] = return []
externalMergeAllBy _ _ [singletonFile] =
readThenDelBinFile singletonFile >>= return . decodeAll
externalMergeAllBy maxOpenFiles cmp files = do
partiallyMergedFiles <- externalMergePass maxOpenFiles cmp files
externalMergeAllBy maxOpenFiles cmp partiallyMergedFiles
bufferPartialSortsBy :: (Binary b) => Int64 -> (b -> b -> Ordering) -> [b] -> IO [String]
bufferPartialSortsBy _ _ [] = return []
bufferPartialSortsBy byteQuota cmp xs = do
let (sortNowList, sortLaterList) = splitAfterQuota byteQuota xs
sortedRows = sortBy cmp sortNowList
sortBuffer <- bufferToTempFile sortedRows
otherSortBuffers <- bufferPartialSortsBy byteQuota cmp sortLaterList
return (sortBuffer:otherSortBuffers)
splitAfterQuota :: (Binary b) => Int64 -> [b] -> ([b], [b])
splitAfterQuota _ [] = ([], [])
splitAfterQuota quotaInBytes (binaryHead:binaryTail) =
let
quotaRemaining = quotaInBytes BS.length (encode binaryHead)
(fstBinsTail, sndBins) = splitAfterQuota quotaRemaining binaryTail
in
if quotaRemaining <= 0
then ([binaryHead], binaryTail)
else (binaryHead:fstBinsTail, sndBins)
readThenDelBinFiles :: [String] -> IO [BS.ByteString]
readThenDelBinFiles = sequence . map readThenDelBinFile
readThenDelBinFile :: String -> IO BS.ByteString
readThenDelBinFile fileName = do
binStr <- BS.readFile fileName
emptyStr <- unsafeInterleaveIO $ removeFile fileName >> return BS.empty
return $ binStr `BS.append` emptyStr
bufferToTempFile :: (Binary b) => [b] -> IO String
bufferToTempFile [] = return []
bufferToTempFile xs = do
tempDir <- getTemporaryDirectory
(tempFilePath, tempFileHandle) <- openBinaryTempFile tempDir "sort.txt"
BS.hPut tempFileHandle (encodeAll xs)
hClose tempFileHandle
return tempFilePath
encodeAll :: (Binary b) => [b] -> BS.ByteString
encodeAll = BS.concat . map encode
decodeAll :: (Binary b) => BS.ByteString -> [b]
decodeAll bs
| BS.null bs = []
| otherwise =
let (decodedBin, remainingBs, _) = runGetState get bs 0
in decodedBin : decodeAll remainingBs