module Streaming.Sort
  ( 
    
    sort
  , sortBy
  , sortOn
    
    
  , withFileSort
  , withFileSortBy
    
  , SortException (..)
    
  , Config
  , defaultConfig
    
  , setConfig
  , chunkSize
  , maxFiles
  , useDirectory
  )  where
import           Streaming         (Of(..), Stream)
import qualified Streaming         as S
import           Streaming.Binary  (decoded)
import qualified Streaming.Prelude as S
import           Streaming.With
import           Data.Binary               (Binary, encode)
import qualified Data.ByteString.Lazy      as BL
import qualified Data.ByteString.Streaming as BS
import           Control.Exception         (Exception(..), IOException,
                                            mapException)
import           Control.Monad             (join, void)
import           Control.Monad.Catch       (MonadMask, MonadThrow, finally,
                                            throwM)
import           Control.Monad.IO.Class    (MonadIO, liftIO)
import           Control.Monad.Trans.Class (lift)
import           Data.Bool                 (bool)
import           Data.Coerce               (Coercible, coerce)
import           Data.Function             (on)
import           Data.Functor.Identity     (Identity(Identity), runIdentity)
import           Data.Int                  (Int64)
import qualified Data.List                 as L
import           Data.Maybe                (catMaybes)
import           System.Directory          (doesDirectoryExist, getPermissions,
                                            removeFile, writable)
import           System.IO                 (hClose, openBinaryTempFile)
sort :: (Monad m, Ord a) => Stream (Of a) m r -> Stream (Of a) m r
sort = sortBy compare
sortBy :: (Monad m) => (a -> a -> Ordering) -> Stream (Of a) m r -> Stream (Of a) m r
sortBy cmp s = lift (S.toList s) >>= srt
  where
    srt (as :> r) = S.each (L.sortBy cmp as) >> return r
sortOn :: (Ord b, Monad m) => (a -> b) -> Stream (Of a) m r -> Stream (Of a) m r
sortOn f = S.map fst
           . sortBy (compare `on` snd)
           . S.map ((,) <*> f)
data Config = Config
  { _chunkSize    :: !Int
    
  , _maxFiles     :: !Int
    
    
  , _useDirectory :: !(Maybe FilePath)
    
    
    
  } deriving (Show)
defaultConfig :: Config
defaultConfig = Config
  { _chunkSize    = 1000
  , _maxFiles     = 100
  , _useDirectory = Nothing
  }
setConfig :: (forall f. (Functor f) => (a -> f a) -> Config -> f Config)
             -> a -> Config -> Config
setConfig lens a = runIdentity #. lens (const (Identity a))
(#.) :: (Coercible c b) => (b -> c) -> (a -> b) -> (a -> c)
(#.) _ = coerce (\x -> x :: b) :: forall a b. Coercible b a => a -> b
chunkSize :: (Functor f) => (Int -> f Int) -> Config -> f Config
chunkSize inj cfg = (\v -> cfg { _chunkSize = v}) <$> inj (_chunkSize cfg)
maxFiles :: (Functor f) => (Int -> f Int) -> Config -> f Config
maxFiles inj cfg = (\v -> cfg { _maxFiles = v}) <$> inj (_maxFiles cfg)
useDirectory :: (Functor f) => (Maybe FilePath -> f (Maybe FilePath)) -> Config -> f Config
useDirectory inj cfg = (\v -> cfg { _useDirectory = v}) <$> inj (_useDirectory cfg)
withFileSort :: (Ord a, Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
                => Config -> Stream (Of a) m v
                -> (Stream (Of a) n () -> m r) -> m r
withFileSort cfg = withFileSortBy cfg compare
withFileSortBy :: (Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
                  => Config -> (a -> a -> Ordering) -> Stream (Of a) m v
                  -> (Stream (Of a) n () -> m r) -> m r
withFileSortBy cfg cmp str k = mapException SortIO . createDir $ \dir ->
  mergeAllFiles (_maxFiles cfg) dir cmp (initStream dir) k
  where
    createDir k' = liftIO (traverse checkDir (_useDirectory cfg)) 
                   
                   >>= (`getTmpDir` k') . join 
    
    checkDir dir = do exists <- doesDirectoryExist dir
                      canWrite <- writable <$> getPermissions dir
                      return (bool Nothing (Just dir) (exists && canWrite))
    getTmpDir mdir = maybe withSystemTempDirectory withTempDirectory mdir "streaming-sort"
    initStream dir = initialSort (_chunkSize cfg) dir cmp str
initialSort :: (Binary a, MonadMask m, MonadIO m)
               => Int -> FilePath -> (a -> a -> Ordering)
               -> Stream (Of a) m r
               -> Stream (Of FilePath) m r
initialSort chnkSize dir cmp =
  S.mapped (writeSortedData dir)
  . S.maps (sortBy cmp)
  . S.chunksOf chnkSize
mergeAllFiles :: (Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
                 => Int -> FilePath -> (a -> a -> Ordering)
                 -> Stream (Of FilePath) m v
                 -> (Stream (Of a) n () -> m r) -> m r
mergeAllFiles numFiles tmpDir cmp files k = go files
  where
    go = checkEmpty . S.mapped S.toList . S.chunksOf numFiles
    
    checkEmpty chunks = S.uncons chunks >>= maybe (k (return ()))
                                                  (uncurry checkSingleChunk)
    
    
    checkSingleChunk ch chunks =
      S.uncons chunks >>= maybe (withFilesSort cmp ch k)
                                (uncurry (withMultipleChunks ch))
    withMultipleChunks ch1 ch2 chunks = go (S.mapM sortAndWrite allChunks)
      where
        allChunks = S.yield ch1 >> S.yield ch2 >> chunks
        sortAndWrite fls = withFilesSort cmp fls (fmap S.fst' . writeSortedData tmpDir)
writeSortedData :: (Binary a, MonadMask m, MonadIO m)
                   => FilePath -> Stream (Of a) m r -> m (Of FilePath r)
writeSortedData tmpDir str = do fl <- liftIO newTmpFile
                                (fl :>) <$> writeBinaryFile fl (encodeStream str)
  where
    newTmpFile = do (fl, h) <- openBinaryTempFile tmpDir "streaming-sort-chunk"
                    hClose h 
                    return fl
withFilesSort :: (Binary a, MonadMask m, MonadIO m, MonadThrow n, MonadIO n)
                 => (a -> a -> Ordering) -> [FilePath]
                 -> (Stream (Of a) n () -> m r) -> m r
withFilesSort cmp files k = mergeContinuations readThenDelete
                                               files
                                               withMerged
  where
    withMerged = k . interleave cmp . map decodeStream
readThenDelete :: (MonadMask m, MonadIO m, MonadIO n) => FilePath
                  -> (BS.ByteString n () -> m r) -> m r
readThenDelete fl k = withBinaryFileContents fl k `finally` liftIO (removeFile fl)
mergeContinuations :: (Monad m) => (forall res. a -> (b -> m res) -> m res) -> [a] -> ([b] -> m r) -> m r
mergeContinuations toCont xs cont = go [] xs
  where
    go bs []     = cont bs
    go bs (a:as) = toCont a $ \b -> go (b:bs) as
interleave :: (Monad m) => (a -> a -> Ordering) -> [Stream (Of a) m r] -> Stream (Of a) m ()
interleave cmp streams =
  go =<< lift (L.sortBy cmper . catMaybes <$> mapM S.uncons streams)
  where
    
    
    go []               = return ()
    go [(a,str)]        = S.yield a >> void str 
    go ((a,str):astrs') = do S.yield a
                             
                             mastr' <- lift (S.uncons str)
                             go (addBackIfNonEmpty mastr' astrs')
    
    addBackIfNonEmpty = maybe id (L.insertBy cmper)
    cmper = cmp `on` fst
encodeStream :: (Binary a, Monad m) => Stream (Of a) m r -> BS.ByteString m r
encodeStream = fromChunksLazy . S.map encode
decodeStream :: (Binary a, MonadThrow m) => BS.ByteString m r -> Stream (Of a) m r
decodeStream bs = decoded bs >>= handleResult
  where
    handleResult (_, bytes, res) = either (lift . throwM . SortDecode bytes) return res
fromChunksLazy :: (Monad m) => Stream (Of BL.ByteString) m r -> BS.ByteString m r
fromChunksLazy = BS.fromChunks . S.concat . S.map BL.toChunks
data SortException = SortIO IOException
                   | SortDecode Int64 String
  deriving (Show)
instance Exception SortException