module Data.Repa.Flow.IO.Bucket
        ( Bucket (..)
        , openBucket
        , hBucket

          -- * Reading
        , fromFiles,            fromFiles'
        , fromDir
        , fromSplitFile
        , fromSplitFileAt

          -- * Writing
        , toFiles,              toFiles'
        , toDir
        , toDirs

          -- * Bucket IO
        , bClose
        , bIsOpen
        , bAtEnd
        , bSeek
        , bGetArray
        , bPutArray)
where
import Data.Repa.Array.Material                 as A
import Data.Repa.Array.Generic                  as A
import Data.Repa.Array.Meta.Dense               as A
import Data.Repa.Array.Meta.RowWise             as A
import Data.Repa.Array.Auto.IO                  as A
import qualified Foreign.Storable               as Foreign
import qualified Foreign.Marshal.Alloc          as Foreign
import Control.Monad
import Data.Word
import System.IO
import System.FilePath
import System.Directory
import Prelude                                  as P


-- | A bucket represents portion of a whole data-set on disk,
--   and contains a file handle that points to the next piece of 
--   data to be read or written.
--  
--   The bucket could be created from a portion of a single flat file,
--   or be one file of a pre-split data set. The main advantage over a
--   plain `Handle` is that a `Bucket` can represent a small portion
--   of a single large file.
--
data Bucket
        = Bucket
        { -- | Physical location of the file, if known.
          bucketFilePath        :: Maybe FilePath 

          -- | Starting position of the bucket in the file, in bytes.
        , bucketStartPos        :: Integer

          -- | Maximum length of the bucket, in bytes.
          --
          --   If `Nothing` then the length is indeterminate, which is used
          --   when writing to files.
        , bucketLength          :: Maybe Integer

          -- | File handle for the bucket.
          -- 
          --   If several buckets have been created from a single file,
          --   then all buckets will have handles bound to that file,
          --   but they will be at different positions.
        , bucketHandle          :: Handle }


-- | Open a file as a single bucket.
openBucket :: FilePath -> IOMode -> IO Bucket
openBucket path mode
 = do   h       <- openBinaryFile path mode
        hSeek h SeekFromEnd  0
        lenTotal <- hTell h
        hSeek h AbsoluteSeek 0 

        return  $ Bucket
                { bucketFilePath        = Just path
                , bucketStartPos        = 0
                , bucketLength          = Just lenTotal
                , bucketHandle          = h }
{-# NOINLINE openBucket #-}


-- | Wrap an existing file handle as a bucket.
--
--   The starting position is set to 0.
hBucket :: Handle -> IO Bucket
hBucket h
 =      return  $ Bucket
                { bucketFilePath        = Nothing
                , bucketStartPos        = 0
                , bucketLength          = Nothing
                , bucketHandle          = h }
{-# NOINLINE hBucket #-}


-- From Files -----------------------------------------------------------------
-- | Open some files as buckets and use them as `Sources`.
fromFiles
        :: [FilePath]
        -> (Array B Bucket -> IO b)
        -> IO b
fromFiles files use 
 = fromFiles' (A.fromList B files) use
{-# INLINE fromFiles #-}


-- | Like `fromFiles'`, but take a list of file paths.
fromFiles' 
        ::  (Bulk l FilePath, Target l Bucket)
        =>  Array l FilePath                    -- ^ Files to open.
        -> (Array l Bucket -> IO b)  
                                                -- ^ Consumer.
        -> IO b

fromFiles' paths use
 = do   
        -- Open all the files, ending up with a list of buckets.
        bs      <- mapM (flip openBucket ReadMode) $ A.toList paths

        -- Pack buckets back into an array with the same layout as
        -- the original.
        let Just bsArr =  A.fromListInto (A.layout paths) bs

        use bsArr
{-# NOINLINE fromFiles' #-}


-- From Dirs ------------------------------------------------------------------
-- | Open all the files in a directory as separate buckets.
--
--   This operation may fail with the same exceptions as `getDirectoryContents`.
--
fromDir :: FilePath
        -> (Array B Bucket -> IO b)
        -> IO b

fromDir dir use
 = do   fs      <- getDirectoryContents dir
        let fsRel       
                =  P.map (dir </>) 
                $  P.filter (\f -> f /= "." && f /= "..") fs
        fromFiles fsRel use
{-# INLINE fromDir #-}


-- | Open a file containing atomic records and split it into the given number
--   of evenly sized buckets. 
--
--   The records are separated by a special terminating charater, which the
--   given predicate detects. The file is split cleanly on record boundaries, 
--   so we get a whole number of records in each bucket. As the records can be
--   of varying size the buckets are not guaranteed to have exactly the same
--   length, in either records or buckets, though we try to give them the
--   approximatly the same number of bytes.
--
fromSplitFile
        :: Int                          -- ^ Number of buckets.
        -> (Word8 -> Bool)              -- ^ Detect the end of a record.
        -> FilePath                     -- ^ File to open.
        -> (Array B Bucket -> IO b)     -- ^ Consumer.
        -> IO b

fromSplitFile n pEnd path use
        = fromSplitFileAt n pEnd path 0 use
{-# INLINE fromSplitFile #-}


-- | Like `fromSplitFile` but start at the given offset.
fromSplitFileAt
        :: Int                          -- ^ Number of buckets.
        -> (Word8 -> Bool)              -- ^ Detect the end of a record.
        -> FilePath                     -- ^ File to open.
        -> Integer                      -- ^ Starting offset.
        -> (Array B Bucket -> IO b)     -- ^ Consumer.
        -> IO b

fromSplitFileAt n pEnd path offsetStart use
 = do
        -- Open the file first to check its length.
        h0       <- openBinaryFile path ReadMode
        hSeek h0 SeekFromEnd 0
        lenTotal <- hTell h0
        hClose h0

        -- Open a file handle for each of the buckets.
        -- The handles start at the begining of the file and still need
        -- to be advanced.
        -- 
        -- TODO: check at least one elem in list
        hh@(h1_ : _)  <- mapM (flip openBinaryFile ReadMode) (replicate n path)

        hSeek h1_ AbsoluteSeek offsetStart

        -- Advance all the handles to the start of their part of the file.
        let loop_advances _      _    [] 
             = return []

            loop_advances _      pos1 (_h1 : [])
             = return [pos1]

            loop_advances remain pos1 (h1 : h2 : hs)
             = do 
                  -- Push the next handle an even distance into the
                  -- remaining part of the file.
                  let lenWanted 
                       = remain `div` (fromIntegral $ P.length (h1 : h2 : hs))
                  let posWanted = pos1 + lenWanted
                  hSeek h2 AbsoluteSeek posWanted

                  -- Now advance it until we get to the end of a record.
                  pos2          <- advance h2 pEnd 
                  let remain'   = lenTotal - pos2

                  poss          <- loop_advances remain' pos2 (h2 : hs)
                  return $ pos1 : poss

        starts    <- loop_advances lenTotal offsetStart hh

        -- Ending positions and lengths for each bucket.
        let ends  = tail (starts ++ [lenTotal])
        let lens  = P.map (\(start, end) -> end - start) 
                  $ P.zip starts ends

        let bs    = [ Bucket
                        { bucketFilePath = Just path
                        , bucketStartPos = start
                        , bucketLength   = Just len
                        , bucketHandle   = h }       
                              | start <- starts
                              | len   <- lens
                              | h     <- hh ]

        use  $ A.fromList B bs
{-# NOINLINE fromSplitFileAt #-}
--  NOINLINE to avoid polluting the core code of the consumer.
--  This prevents it from being specialised for the pEnd predicate, 
--  but we're expecting pEnd to be applied a small number of times,
--  so it shouldn't matter.


-- | Advance a file handle until we reach a byte that, matches the given 
--   predicate, then return the final file position.
advance :: Handle -> (Word8 -> Bool) -> IO Integer
advance h pEnd
 = do   buf     <- Foreign.mallocBytes 1

        let loop_advance 
             = do c <- hGetBuf h buf 1
                  if c == 0
                   then return ()
                   else do
                        x <- Foreign.peek buf
                        if pEnd x
                         then return ()
                         else loop_advance
        loop_advance
        Foreign.free buf
        hTell h
{-# NOINLINE advance #-}


-- Writing --------------------------------------------------------------------
-- | Open some files for writing as individual buckets and pass
--   them to the given consumer.
toFiles :: [FilePath]                   -- ^ File paths.
        -> (Array B Bucket -> IO b)     -- ^ Worker writes data to buckets.
        -> IO b

toFiles paths use
        = toFiles' (A.fromList B paths) use
{-# INLINE toFiles #-}


-- | Like `toFiles`, but take an array of `FilePath`s.
toFiles' :: (Bulk l FilePath, Target l Bucket)
         =>  Array l FilePath           -- ^ File paths.
         -> (Array l Bucket -> IO b)    -- ^ Worker writes data to buckets.
                                        -- ^ Consumer.
         -> IO b

toFiles' paths use
 = do   -- Open all the files, ending up with a list of buckets.
        bs             <- mapM (flip openBucket WriteMode) $ A.toList paths

        -- Pack buckets back into an array with the same layout as
        -- the original.
        let Just bsArr =  A.fromListInto (A.layout paths) bs

        use bsArr
{-# NOINLINE toFiles' #-}
---
--   TODO: Attached finalizers to the sinks so that file assocated with
--   each stream is closed when that stream is ejected.


-- | Create a new directory of the given name, containing the given number
--   of buckets. 
--
--   If the directory is named @somedir@ then the files are named
--   @somedir/000000@, @somedir/000001@, @somedir/000002@ and so on.
--
toDir   :: Int                          -- ^ Number of buckets to create.
        -> FilePath                     -- ^ Path to directory.
        -> (Array B Bucket -> IO b)     -- ^ Consumer.
        -> IO b

toDir nBuckets path use
 | nBuckets <= 0        
 = use (A.fromList B [])

 | otherwise
 = do   
        createDirectory path

        let makeName i = path </> ((replicate (6 - (P.length $ show i)) '0') ++ show i)
        let names      = [makeName i | i <- [0 .. nBuckets - 1]]

        let newBucket file
             = do h      <- openBinaryFile file WriteMode
                  return $  Bucket
                         { bucketFilePath       = Just file
                         , bucketStartPos       = 0
                         , bucketLength         = Nothing
                         , bucketHandle         = h }

        bs <- mapM newBucket names
        use (A.fromList B bs)
{-# NOINLINE toDir #-}


-- | Given a list of directories, create those directories and open 
--   the given number of output files per directory.
--
--   In the resulting array of buckets, the outer dimension indexes
--   each directory, and the inner one indexes each file in its
--   directory.
--
--   For each directory @somedir@ the files are named
--   @somedir/000000@, @somedir/000001@, @somedir/000002@ and so on.
--
toDirs  :: Int                  -- ^ Number of buckets to create per directory.
        -> [FilePath]           -- ^ Paths to directories.
        -> (Array (E B DIM2) Bucket -> IO b)     
                                -- ^ Consumer.
        -> IO b

toDirs nBucketsPerDir paths use
 | nBucketsPerDir <= 0
 = do   let Just bsArr 
                = A.fromListInto (A.matrix B 0 0) []
        use bsArr

 | otherwise
 = do   
        let makeName path i 
                = path </> ((replicate (6 - (P.length $ show i)) '0') ++ show i)

        let newBucket file
             = do h      <- openBinaryFile file WriteMode
                  return $  Bucket
                         { bucketFilePath       = Just file
                         , bucketStartPos       = 0
                         , bucketLength         = Nothing
                         , bucketHandle         = h }

        let newDir path
             = do createDirectory path
                  bs    <- mapM newBucket 
                        $ [makeName path i | i <- [0 .. nBucketsPerDir - 1]]

                  return bs

        -- Make all the buckets, then pack them into a matrix.
        bs        <- liftM P.concat $ P.mapM newDir paths

        -- Pack the buckets into an array 
        let Just bsArr 
                = A.fromListInto 
                        (A.matrix B (P.length paths) nBucketsPerDir) 
                        bs

        use bsArr
{-# NOINLINE toDirs #-}



-- Bucket IO ------------------------------------------------------------------
-- | Close a bucket, releasing the contained file handle.
bClose :: Bucket -> IO ()
bClose bucket
        = hClose $ bucketHandle bucket
{-# NOINLINE bClose #-}


-- | Check if the bucket is currently open.
bIsOpen :: Bucket -> IO Bool
bIsOpen bucket
        = hIsOpen $ bucketHandle bucket
{-# NOINLINE bIsOpen #-}


-- | Check if the contained file handle is at the end of the bucket.
bAtEnd :: Bucket -> IO Bool
bAtEnd bucket
 = do   eof     <- hIsEOF $ bucketHandle bucket

        -- Position in the file.
        posFile  <- hTell $ bucketHandle bucket

        -- Check for bogus position before we subtract the startPos.
        -- If this happenes then something has messed with our handle.
        when (posFile < bucketStartPos bucket)
         $ error $ P.unlines
         [ "repa-flow.bAtEnd: handle position is outside bucket."
         , "  bucket file path = " ++ show (bucketFilePath bucket)
         , "  bucket start pos = " ++ show (bucketStartPos bucket)
         , "  pos in file      = " ++ show posFile ]

        -- Position in the bucket.
        let posBucket = posFile - bucketStartPos bucket

        return $ eof || (case bucketLength bucket of
                                Nothing  -> False
                                Just len -> posBucket >= len)
{-# NOINLINE bAtEnd #-}


-- | Seek to a position with a bucket.
bSeek :: Bucket -> SeekMode -> Integer -> IO ()
bSeek bucket mode offset
 = do   
        -- The current position in the underlying file.
        posFile <- hTell $ bucketHandle bucket

        -- Apply the seek mode to get the wanted position in the file,
        --  which might be outside the bucket.
        --  If Nothing it means the end of the file.
        let posWanted 
             = case mode of
                AbsoluteSeek     
                 -> Just $ bucketStartPos bucket + max 0 offset

                RelativeSeek     
                 -> Just $ posFile + offset

                SeekFromEnd
                 -> case bucketLength bucket of
                        Nothing  -> Nothing
                        Just len -> Just $ bucketStartPos bucket 
                                         + len - max 0 offset

        -- Clip the wanted position so that it's inside the bucket.
        let posActual
             = case posWanted of
                Nothing
                 -> Nothing

                Just wanted      
                  |  Just len    <- bucketLength bucket
                  -> if      wanted < bucketStartPos bucket
                        then Just $ bucketStartPos bucket
                     else if wanted > bucketStartPos bucket + len
                        then Just $ bucketStartPos bucket + len
                     else Just wanted

                  | otherwise
                  -> if     wanted < bucketStartPos bucket
                       then Just $ bucketStartPos bucket
                       else Just wanted

        case posActual of
         Nothing   -> hSeek (bucketHandle bucket) SeekFromEnd  0
         Just pos  -> hSeek (bucketHandle bucket) AbsoluteSeek pos
{-# NOINLINE bSeek #-}


-- | Get some data from a bucket.
bGetArray :: Bucket -> Integer -> IO (Array F Word8)
bGetArray bucket lenWanted
 = do   
        -- Curent position in the file.
        posFile         <- hTell $ bucketHandle bucket
        let posBucket   =  posFile - bucketStartPos bucket

        let len         = case bucketLength bucket of
                           Nothing         -> lenWanted
                           Just lenMax
                            -> let lenRemain = lenMax - posBucket
                               in  min lenWanted lenRemain

        liftM (convert F)
         $ hGetArray (bucketHandle bucket) 
         $ fromIntegral len
{-# NOINLINE bGetArray #-}


-- | Put some data in a bucket.
bPutArray :: Bucket -> Array F Word8 -> IO ()
bPutArray bucket arr
        = hPutArray (bucketHandle bucket) (convert A arr)
{-# NOINLINE bPutArray #-}