{-# LANGUAGE NamedFieldPuns #-}
-- |
-- Module      :  Kafka
-- Copyright   :  Abhinav Gupta 2015
-- License     :  MIT
--
-- Maintainer  :  mail@abhinavg.net
-- Stability   :  experimental
-- Portability :  GHC
--
-- A library to interact with Apache Kafka 0.7.
--
module Kafka
    (
    -- * Main interface
    --
    -- | Requests to Kafka can be made using 'produce', 'fetch', and
    -- 'offsets'.  For 'produce' and 'fetch', the functions automatically
    -- decide whether the request needs to be a single @Produce@/@Fetch@
    -- request or a @Multi*@ request.
    --
    -- The request operations send requests and receive responses using any
    -- type that is an instance of 'Transport'. 'withConnection' produces one
    -- such object.
    --
      withConnection
    , produce
    , fetch
    , offsets

    -- * Types

    , Produce(..)

    , Fetch(..)
    , FetchResponse(..)

    , Offsets(..)
    , OffsetsTime(..)

    , Topic(..)
    , Offset(..)
    , Partition(..)
    , Size(..)
    , Count(..)

    -- * Other

    , Error(..)
    , Response

    -- * Transport

    , Socket
    , Transport(..)
    ) where

import Control.Applicative
import Data.Monoid

import qualified Data.Serialize as C

import Kafka.Internal

-- | Receives the next response from the connection using the given
-- deserializer.
--
-- Returns either an 'Error' for Kafka errors or the deseriazlied result.
recvResponse
    :: Transport t
    => t        -- ^ Transport from which the response will be read.
    -> C.Get a  -- ^ Deserializer
    -> IO (Response a)
recvResponse transport getBody = do
    -- TODO this probably belongs in an internal module somewhere
    -- TODO use protocol error or something for parse errors instead of this
    -- TODO maybe also catch IO exceptions
    -- TODO alternatively, throw protocol and IO exceptions instead of failing
    -- like this.
    respLen <- C.runGet getLength <$> recv transport 4 >>= either fail return
    response <- recvExactly transport respLen
    either fail return $ C.runGet getResponse response
  where
    getLength = fromIntegral <$> C.getWord32be
    getResponse = do
        err <- C.get
        body <- getBody
        return $ maybe (Right body) Left err

-- | Sends the given 'Produce' requests to Kafka.
--
-- If multiple requests are supplied, a @MultiProduce@ request is made.
--
-- @
-- 'withConnection' \"localhost\" 9092 $ \\conn ->
--   produce conn [
--      'Produce' ('Topic' \"my-topic\") ('Partition' 0) [\"foo\"]
--    , Produce \"another-topic\" 0
--                [\"multiple\", \"messages\"]
--    ]
-- @
--
-- Note that string literals may be used in place of 'Topic' (with the
-- @OverloadedStrings@ GHC extension), and integer literals may be used in
-- place of 'Partition'.
produce :: Transport t => t -> [Produce] -> IO ()
produce transport reqs = case reqs of
  []  -> return ()
  [x] -> send transport (C.runPut $ putProduceRequest x)
  xs  -> send transport (C.runPut $ putMultiProduceRequest xs)

-- | 'Fetch'es messages from Kafka.
--
-- If multiple Fetch requests are supplied, a @MultiFetch@ request is made.
--
-- @
-- 'withConnection' \"localhost\" 9092 $ \\conn -> do
--   Right ['FetchResponse' messages newOffset] <- fetch conn [
--       'Fetch' ('Topic' \"test-topic\") ('Partition' 0) (Offset 42) 1024
--     ]
--   {- Consume the messages here -}
--   response <- fetch conn ['Fetch' \"test-topic\" 0 newOffset 1024]
--   {- ... -}
-- @
--
-- Returns a list of 'FetchResponse's in the same order as the 'Fetch'
-- requests. Each response contains the messages returned for the
-- corresponding request and the new offset at which the next request should
-- be made for that request to get the messages that follow.
--
-- If a response for a request contains no messages, the specified
-- topic-partition pair has been exhausted.
--
-- Note that string literals may be used in place of 'Topic' (with the
-- @OverloadedStrings@ GHC extension), and integer literals may be used in
-- place of 'Offset'.
--
fetch :: Transport t => t -> [Fetch] -> IO (Response [FetchResponse])
fetch transport reqs = case reqs of
  [] -> return (Right mempty)
  [x] -> do
    send transport (C.runPut $ putFetchRequest x)
    fmap (:[]) <$> recvResponse transport (getFetchResponse x)
  xs -> do
    send transport (C.runPut $ putMultiFetchRequest xs)
    recvResponse transport (getMultiFetchResponse xs)

-- | Retrieve message offsets from Kafka.
--
-- @
-- 'withConnection' \"localhost\" 9092 $ \\conn -> do
--   Right [os] <- offsets conn (Offsets \"topic\" 0 OffsetsEarliest 1)
--   fetch conn ['Fetch' \"topic\" (Partition 0) os 10] >>= doSomething
-- @
--
-- Note that string literals may be used in place of 'Topic' (with the
-- @OverloadedStrings@ GHC extension), and integer literals may be used in
-- place of 'Count'.
--
offsets :: Transport t => t -> Offsets -> IO (Response [Offset])
offsets transport req = do
    send transport (C.runPut $ putOffsetsRequest req)
    recvResponse transport getOffsetsResponse