module Data.Repa.Flow.Generic.IO.Lines
        ( sourceLinesFormat
        , sourceLinesFormatFromLazyByteString)
where
import Data.Repa.Flow.Generic.IO.Base           as F
import Data.Repa.Flow.Generic.Map               as F
import Data.Repa.Flow.Generic.Base              as F

import Data.Repa.Array.Generic                  as A
import Data.Repa.Array.Material                 as A
import qualified Data.Repa.Array.Auto.Format    as A

import Data.Repa.Convert.Format                 as C

import qualified Data.ByteString.Lazy           as BSL
import Data.Char
import Data.Word
#include "repa-flow.h"


-- | Read lines from a named text file,
--   in a chunk-wise manner,
--   converting each line to values with the given format.
sourceLinesFormat
        :: forall format
        .  (Unpackable format, Target A (Value format))
        => Integer                      -- ^ Chunk length.
        -> IO ()                        -- ^ Action if we find a line longer than the chunk length.
        -> IO (Array A Word8 -> IO ())  -- ^ Action if we can't convert a row.
        -> format                       -- ^ Format of each line.
        -> Array B Bucket
        -> IO (Sources Int IO (Array A (Value format)))

sourceLinesFormat nChunk aFailLong _aFailConvert format bs
 = do
        -- Rows are separated by new lines.
        let !nl  = fromIntegral $ ord '\n'
        let !nr  = fromIntegral $ ord '\r'

        -- Stream chunks of data from the input file, 
        -- where the chunks end cleanly and line boundaries.
        -- Filter out any stray CR characters allong the way.
        sChunk  <- sourceChunks nChunk (== nl) aFailLong bs

        sRows8  :: Sources Int IO (Array N (Array F Word8))
                <- map_i ( A.trimEnds  (== nl)
                         . A.segmentOn (== nl) 
                         . A.filter F  (/= nr)) 
                         sChunk

        -- Convert each value using the given format.
        let unpackRow :: Array A Word8 -> Value format
            unpackRow arr
             = case A.unpackFormat format arr of
                 Nothing -> error ("no convert " ++ show arr)
                        -- TODO: impl proper pull function
                        -- so we can call aFailConvert if needed.
                        -- We shouldn't be throwing errors this deep in the library.
                 Just v  -> v
            {-# INLINE unpackRow #-}

        F.map_i (A.mapS A (unpackRow . A.convert A)) sRows8
{-# INLINE sourceLinesFormat #-}


-- | Read lines from a lazy byte string,
--   in a chunk-wise manner,
--   converting each line to values with the given format.
sourceLinesFormatFromLazyByteString
        :: (Unpackable format, Target A (Value format))
        => Int                          -- ^ Number of streams in the result bundle.
        -> IO (Array A Word -> IO ())   -- ^ Action if we can't convert a row.
        -> format                       -- ^ Format of each line.
        -> BSL.ByteString               -- ^ Lazy byte string.
        -> Int                          -- ^ Skip this many header lines at the start.
        -> IO (Sources Int IO (Array A (Value format)))

sourceLinesFormatFromLazyByteString n _aFailConvert format bs0 nSkip
 = do
        -- Rows are separated by new lines.
        let !nl  = fromIntegral $ ord '\n'
        let !nr  = fromIntegral $ ord '\r'

        -- Give a copy of the bytestring to each stream.
        refsBS          <- newRefs n bs0
        refsSkip        <- newRefs n nSkip

        let unpackRow arr
             = case A.unpackFormat format arr of
                 Nothing -> error ("no convert " 
                                        ++ (show $ map (chr . fromIntegral) 
                                                 $ A.toList arr))
                        -- TODO: imlp proper pull function
                        -- so that we can call aFailConvert if needed.
                        -- We shouldn't be throwing errors this deep in the library.
                 Just v  -> v
            {-# INLINE unpackRow #-}

        let pull_fromString i eat eject
             = do bs   <- readRefs refsBS   i
                  skip <- readRefs refsSkip i
                  if BSL.null bs
                   then eject
                   else do let (bsLine, bsRest) 
                                = BSL.break (== nl) bs

                           writeRefs refsBS i 
                            $ BSL.dropWhile (== nl) bsRest

                           if (skip >= 0)
                             then do writeRefs refsSkip i (skip - 1)
                                     pull_fromString i eat eject

                             else eat $ A.singleton A.A
                                      $ unpackRow
                                      $ A.convert A.A
                                      $ A.fromByteString 
                                      $ BSL.toStrict 
                                      $ BSL.filter (/= nr) bsLine
            {-# INLINE pull_fromString #-}

        return $ Sources n pull_fromString
{-# INLINE_FLOW sourceLinesFormatFromLazyByteString #-}