{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

-- | This module allows for lazy decoding of hadoop sequence files from a lazy
-- 'L.ByteString'. In the future an incremental API using strict
-- 'Data.ByteString.ByteString' will be provided, but for now if you need that
-- level of control you need to use the attoparsec parsers in
-- "Data.Hadoop.SequenceFile.Parser" directly.
--
-- __Basic Examples__
--
-- > import           Control.Applicative ((<$>))
-- > import qualified Data.ByteString.Lazy as L
-- > import qualified Data.Foldable as F
-- > import           Data.Int (Int32)
-- > import           Data.Text (Text)
-- > import qualified Data.Text.IO as T
-- >
-- > import           Data.Hadoop.SequenceFile
-- >
-- > -- | Print all the keys in a sequence file.
-- > printKeys :: FilePath -> IO ()
-- > printKeys path = do
-- >     bs <- L.readFile path
-- >     let records = decode bs :: Stream (RecordBlock Text Int32)
-- >     F.for_ records $ \rb -> do
-- >         F.mapM_ T.putStrLn (rbKeys rb)
-- >
-- > -- | Count the number of records in a sequence file.
-- > recordCount :: FilePath -> IO ()
-- > recordCount path = do
-- >     bs <- L.readFile path
-- >     let records = decode bs :: Stream (RecordBlock Text Int32)
-- >     print $ F.sum $ rbCount <$> records
--
-- __Integration with Conduit__
--
-- > sourceRecords :: MonadIO m => FilePath -> Source m (RecordBlock Text ByteString)
-- > sourceRecords path = do
-- >     bs <- liftIO (L.readFile path)
-- >     F.traverse_ yield (decode bs)

module Data.Hadoop.SequenceFile
    ( Stream(..)
    , Writable(..)
    , RecordBlock(..)
    , decode
    ) where

import qualified Data.Attoparsec.ByteString.Lazy as A
import qualified Data.ByteString.Lazy as L
import           Data.Foldable (Foldable(..))
import           Data.Monoid ((<>), mempty)

import           Data.Hadoop.SequenceFile.Parser
import           Data.Hadoop.SequenceFile.Types
import           Data.Hadoop.Writable

------------------------------------------------------------------------

-- | A lazy stream of values.
data Stream a = Error !String
              | Value !a (Stream a)
              | Done
    deriving (Eq, Ord, Show)

instance Functor Stream where
    fmap f (Value x s) = Value (f x) (fmap f s)
    fmap _ (Error err) = Error err
    fmap _ Done        = Done

instance Foldable Stream where
    foldMap f (Value x s) = f x <> foldMap f s
    foldMap _ _           = mempty

------------------------------------------------------------------------

-- | Decode a lazy 'L.ByteString' in to a stream of record blocks.
decode :: (Writable ck k, Writable cv v) => L.ByteString -> Stream (RecordBlock k v)
decode bs = case A.parse header bs of
    A.Fail _ ctx err -> mkError ctx err
    A.Done bs' hdr   -> untilEnd (recordBlock hdr) bs'

untilEnd :: A.Parser a -> L.ByteString -> Stream a
untilEnd p bs = case A.parse p bs of
    A.Fail _ ctx err -> mkError ctx err
    A.Done bs' x     -> Value x (untilEnd p bs')

mkError :: [String] -> String -> Stream a
mkError [] err  = Error err
mkError ctx err = Error (err <> "\ncontext:\n" <> concatMap indentLn ctx)
  where
    indentLn str    = "    " <> str <> "\n"