module Data.Repa.Flow.Generic.IO.Base
        ( -- * Buckets
          module Data.Repa.Flow.IO.Bucket

          -- * Sourcing
        , sourceBytes
        , sourceChars
        , sourceChunks
        , sourceRecords

          -- * Sinking
        , sinkBytes
        , sinkChars
        , sinkLines

          -- * Sieving
        , sieve_o)
where
import Data.Repa.Flow.IO.Bucket
import Data.Repa.Flow.Generic.IO.Sieve          as F
import Data.Repa.Flow.Generic.Map               as F
import Data.Repa.Flow.Generic.Base              as F
import Data.Repa.Array.Material                 as A
import Data.Repa.Array.Meta.Delayed             as A
import Data.Repa.Array.Meta.Window              as A
import Data.Repa.Array.Generic                  as A
import Data.Repa.Array.Generic.Index            as A
import Data.Char
import System.IO
import Data.Word
import Prelude                                  as P
#include "repa-flow.h"


-- Sourcing ---------------------------------------------------------------------------------------
-- | Read complete records of data form a bucket, into chunks of the given length.
--   We read as many complete records as will fit into each chunk.
--
--   The records are separated by a special terminating character, which the 
--   given predicate detects. After reading a chunk of data we seek the bucket to 
--   just after the last complete record that was read, so we can continue to
--   read more complete records next time. 
--
--   If we cannot fit at least one complete record in the chunk then perform
--   the given failure action. Limiting the chunk length guards against the
--   case where a large input file is malformed, as we won't try to read the
--   whole file into memory.
-- 
--   * Data is read into foreign memory without copying it through the GHC heap.
--   * The provided file handle must support seeking, else you'll get an exception.
--
sourceRecords 
        :: BulkI l Bucket
        => Integer              -- ^ Chunk length in bytes.
        -> (Word8 -> Bool)      -- ^ Detect the end of a record.        
        -> IO ()                -- ^ Action to perform if we can't get a whole record.
        -> Array l Bucket       -- ^ Source buckets.
        -> IO (Sources Int IO (Array N (Array F Word8)))

sourceRecords len pSep aFail hs
 =   smap_i (\_ !c -> A.segmentOn pSep c)
 =<< sourceChunks len pSep aFail hs
{-# INLINE sourceRecords #-}


-- | Like `sourceRecords`, but produce all records in a single vector.
sourceChunks
        :: BulkI l Bucket
        => Integer              -- ^ Chunk length in bytes.
        -> (Word8 -> Bool)      -- ^ Detect the end of a record.        
        -> IO ()                -- ^ Action to perform if we can't get a whole record.
        -> Array l Bucket       -- ^ Source buckets.
        -> IO (Sources (Index l) IO (Array F Word8))

sourceChunks len pSep aFail bs
 = return $ Sources (A.extent $ A.layout bs) pull_sourceChunks
 where  
        pull_sourceChunks i eat eject
         = let b = bs `index` i
           in  bAtEnd b >>= \eof ->
            if eof
                -- We're at the end of the file.
                then eject

            else do
                -- Read a new chunk from the file.
                arr      <- bGetArray b len

                -- Find the end of the last record in the file.
                let !mLenSlack  = findIndex pSep (A.reverse arr)

                case mLenSlack of
                 -- If we couldn't find the end of record then apply the failure action.
                 Nothing        -> aFail

                 -- Work out how long the record is.
                 Just lenSlack
                  -> do let !lenArr     = A.length arr
                        let !ixSplit    = lenArr - lenSlack

                        -- Seek the file to just after the last complete record.
                        bSeek b RelativeSeek (fromIntegral $ negate lenSlack)

                        -- Eat complete records at the start of the chunk.
                        eat $ window 0 ixSplit arr
        {-# INLINE pull_sourceChunks #-}
{-# INLINE_FLOW sourceChunks #-}


-- | Read 8-byte ASCII characters from some files, using the given chunk length.
--
--   * Data is read into foreign memory without copying it through the GHC heap.
--   * All chunks have the same size, except possibly the last one.
----
sourceChars 
        :: Bulk l Bucket
        => Integer              -- ^ Chunk length in bytes.
        -> Array l Bucket       -- ^ Buckets.
        -> IO (Sources (Index l) IO (Array F Char))
sourceChars len bs
 =   smap_i (\_ !c -> A.computeS F $ A.map (chr . fromIntegral) c)
 =<< sourceBytes len bs
{-# INLINE sourceChars #-}


-- | Read data from some files, using the given chunk length.
--
--   * Data is read into foreign memory without copying it through the GHC heap.
--   * All chunks have the same size, except possibly the last one.
--
sourceBytes 
        :: Bulk  l Bucket
        => Integer              -- ^ Chunk length in bytes.
        -> Array l Bucket       -- ^ Buckets.
        -> IO (Sources (Index l) IO (Array F Word8))
sourceBytes len bs
 = return $ Sources (A.extent $ A.layout bs) pull_sourceBytes
 where
        pull_sourceBytes i eat eject
         = do let b  = A.index bs i
              op  <- bIsOpen b
              if not op 
                then eject
                else do
                  eof <- bAtEnd b
                  if eof
                   then eject
                   else do
                        !chunk  <- bGetArray b len
                        eat chunk
        {-# INLINE pull_sourceBytes #-}
{-# INLINE_FLOW sourceBytes #-}


-- Sinking ----------------------------------------------------------------------------------------
-- | Write vectors of text lines to the given files handles.
-- 
--   * Data is copied into a new buffer to insert newlines before being
--     written out.
--
sinkLines 
        :: ( Bulk  l Bucket
           , BulkI l1 (Array l2 Char)
           , BulkI l2 Char)
        => Name  l1             -- ^ Layout of chunks of lines.
        -> Name  l2             -- ^ Layout of lines.
        -> Array l Bucket       -- ^ Buckets.
        -> IO (Sinks (Index l) IO (Array l1 (Array l2 Char)))
sinkLines _ _ !bs
 =   smap_o (\_ !c -> computeS F $ A.map (fromIntegral . ord) $ concatWith F fl c)
 =<< sinkBytes bs
 where  !fl     = A.fromList F ['\n']
{-# INLINE sinkLines #-}


-- | Write chunks of 8-byte ASCII characters to the given file handles.
-- 
--   * Data is copied into a foreign buffer to truncate the characters
--     to 8-bits each before being written out.
--
sinkChars 
        :: (Bulk  l Bucket, BulkI r Char)
        =>  Array l Bucket      -- ^ Buckets.
        -> IO (Sinks (Index l) IO (Array r Char))
sinkChars !bs
 =   smap_o (\_ !c -> computeS F $ A.map (fromIntegral . ord) c)
 =<< sinkBytes bs
{-# INLINE sinkChars #-}


-- | Write chunks of bytes to the given file handles.
--
--   * Data is written out directly from the provided buffer.
--
sinkBytes 
        :: Bulk  l Bucket
        => Array l Bucket       -- ^ Buckets.
        -> IO (Sinks (Index l) IO (Array F Word8))
sinkBytes !bs
 = do   let push_sinkBytes i !chunk
             = bPutArray (bs `index` i) chunk
            {-# NOINLINE push_sinkBytes #-}

        let eject_sinkBytes i
             = bClose    (bs `index` i)
            {-# INLINE eject_sinkBytes #-}

        return  $ Sinks (A.extent $ A.layout bs) push_sinkBytes eject_sinkBytes
{-# INLINE_FLOW sinkBytes #-}