module Control.Quiver.Sort (
spsort
, spsortBy
, spsortOn
, spfilesort
, spfilesortBy
, SPFileConfig
, defaultConfig
, setChunkSize
, setTempDir
, setMaxFiles
) where
import Control.Quiver.Binary
import Control.Quiver.ByteString
import Control.Quiver.Group
import Control.Quiver.Instances ()
import Control.Quiver.Interleave
import Control.Quiver.SP
import Control.Applicative (liftA2)
import Control.Exception (IOException)
import Control.Monad (join)
import Control.Monad.Catch (MonadCatch(..), MonadMask,
finally)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Resource (MonadResource, allocate)
import Data.Bool (bool)
import Data.Coerce (coerce)
import Data.Foldable (toList)
import Data.Function (on)
import Data.List (sortBy)
import Data.Monoid (First(..), (<>))
import Data.Sequence (Seq, (|>))
import qualified Data.Sequence as S
import System.Directory (doesDirectoryExist,
getPermissions,
getTemporaryDirectory,
removeDirectoryRecursive,
removeFile, writable)
import System.IO (hClose, openTempFile)
import System.IO.Temp (createTempDirectory)
spsort :: (Ord a, Monad m) => SP a a m ()
spsort = spsortBy compare
spsortBy :: (Monad m) => (a -> a -> Ordering) -> SP a a m ()
spsortBy f = (sortBy f <$> spToList) >>= spevery
spsortOn :: (Ord b, Monad m) => (a -> b) -> SP a a m ()
spsortOn f = sppure ((,) <*> f)
>->> spsortBy (compare `on` snd)
>->> sppure fst >&> snd
spToList :: SQ a x f [a]
spToList = spfoldr (:) []
data SPFileConfig = FC { _chunkSize :: !Int
, _withTmpDir :: !(Maybe FilePath)
, _maxFiles :: !Int
}
defaultConfig :: SPFileConfig
defaultConfig = FC { _chunkSize = 1000
, _withTmpDir = Nothing
, _maxFiles = 100
}
setChunkSize :: Int -> SPFileConfig -> SPFileConfig
setChunkSize cs cfg = cfg { _chunkSize = cs }
setTempDir :: FilePath -> SPFileConfig -> SPFileConfig
setTempDir dir cfg = cfg { _withTmpDir = Just dir }
setMaxFiles :: Int -> SPFileConfig -> SPFileConfig
setMaxFiles c cfg = cfg { _maxFiles = c }
spfilesort :: (Binary a, Ord a, MonadResource m, MonadMask m) => SPFileConfig
-> P () a a () m (SPResult IOException)
spfilesort = spfilesortBy compare
spfilesortBy :: (Binary a, MonadResource m, MonadMask m) => (a -> a -> Ordering) -> SPFileConfig
-> P () a a () m (SPResult IOException)
spfilesortBy cmp cfg = do mdir' <- join <$> liftIO (traverse checkDir (_withTmpDir cfg))
getTmpDir mdir' "quiver-sort" pipeline
where
checkDir dir = do ex <- liftA2 (liftA2 (&&)) doesDirectoryExist (fmap writable . getPermissions) dir
return (bool Nothing (Just dir) ex)
getTmpDir = maybe withSystemTempDirectory withTempDirectory
pipeline tmpDir = toFiles tmpDir >>= either spfailed (sortFromFiles maxFiles cmp tmpDir)
toFiles tmpDir = sortToFiles chunkSize cmp tmpDir >->> spToSeq >&> uncurry (flip checkFailed)
chunkSize = _chunkSize cfg
maxFiles = _maxFiles cfg
sortToFiles :: (Binary a, MonadIO m, MonadMask m) => Int -> (a -> a -> Ordering) -> FilePath
-> SP a FilePath m IOException
sortToFiles chunkSize cmp tmpDir = spchunks chunkSize
>->> spTraverseUntil sortChunk
>&> snd
where
sortChunk as = writeOut tmpDir (spevery (sortBy cmp as))
writeOut :: (Binary a, MonadIO m, MonadMask m) => FilePath -> P () x a () m (SPResult IOException)
-> m (Either IOException FilePath)
writeOut tmpDir p = do (fl,h) <- liftIO (openTempFile tmpDir "quiver-sort-chunk")
finally (checkFailed fl <$> sprun (pipeline h) <* liftIO (hClose h))
(liftIO (hClose h))
where
pipeline h = p >->> spencode >&> fst >->> qhoist liftIO (qPut h) >&> getFirstError
sortFromFiles :: (Binary a, MonadIO m, MonadMask m) => Int -> (a -> a -> Ordering) -> FilePath
-> Seq FilePath -> Producer a () m (SPResult IOException)
sortFromFiles mf cmp tmpDir = nextBatch
where
nextBatch Empty = spcomplete
nextBatch fls = case S.splitAt mf fls of
(b,Empty) -> batch b
(b,fls') -> do br <- qlift (writeBatch b)
liftIO $ mapM_ removeFile b
either spfailed (nextBatch . (fls' |>)) br
writeBatch = writeOut tmpDir . batch
batch = spinterleave cmp . map readFl . toList
readFl fl = qhoist liftIO (qReadFile fl readSize) >->> spdecode >&> fst
readSize = 4096
pattern Empty :: Seq a
pattern Empty <- (S.viewl -> S.EmptyL)
spTraverseUntil :: (Monad m) => (a -> m (Either e b)) -> SP a b m e
spTraverseUntil k = loop
where
loop = spconsume loop' spcomplete
loop' a = qlift (k a) >>= either spfailed (>:> loop)
checkFailed :: r -> SPResult e -> Either e r
checkFailed _ (Just (Just e)) = Left e
checkFailed r _ = Right r
spToSeq :: SQ a x f (Seq a)
spToSeq = spfoldl' (|>) mempty
withSystemTempDirectory :: (MonadResource m) =>
String
-> (FilePath -> m a)
-> m a
withSystemTempDirectory template action = liftIO getTemporaryDirectory >>= \tmpDir -> withTempDirectory tmpDir template action
withTempDirectory :: (MonadResource m) =>
FilePath
-> String
-> (FilePath -> m a)
-> m a
withTempDirectory targetDir template withTmp = do
(_release, tmpDir) <- allocate (createTempDirectory targetDir template)
(ignoringIOErrors . removeDirectoryRecursive)
withTmp tmpDir
ignoringIOErrors :: (MonadCatch m) => m () -> m ()
ignoringIOErrors ioe = ioe `catch` (\(_ :: IOError) -> return ())
getFirstError :: (SPResult a, SPResult a) -> SPResult a
getFirstError (r1,r2) = coerce (toFirst r1 <> toFirst r2)
where
toFirst :: SPResult a -> Maybe (First a)
toFirst = coerce