{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE UnicodeSyntax #-} {-# LANGUAGE ExplicitForAll #-} module Main where import qualified Data.ByteString.Char8 as C import Control.Lens import Control.Monad (forM) import Control.Monad.Except (catchError) import Network.Kafka import Network.Kafka.Protocol import System.Environment (getArgs) -- import Text.Show.Pretty (ppShow) main ∷ IO () main = do h:portString:_ ← getArgs let host = Host (KString (C.pack h)) port = Port (read portString) topic = "open_channel" state = mkKafkaState "command-line-test-client" (host, port) result ← runKafka state $ do -- md ← metadata (MetadataReq [topic]) -- putStrLnM (ppShow md) topicPartitionList ← brokerPartitionInfo topic forM topicPartitionList $ \(PartitionAndLeader { _palLeader, _palTopic, _palPartition }) → do let s = stateBrokers . at _palLeader broker ← findMetadataOrElse [topic] s (KafkaInvalidBroker _palLeader) flip catchError (return . Left) $ do result ← withBrokerHandle broker $ \handle → do offset ← getLastOffset' handle EarliestTime _palPartition topic fetchRequest offset _palPartition topic >>= fetch' handle return $ Right result print result -- tmd :: TopicMetadata -- tmd = TopicMetadata (NoError, "omfg", [PartitionMetadata (NoError, 0, Leader (Just 1), Replicas [], Isr []), PartitionMetadata (NoError, 1, Leader (Just 1), Replicas [], Isr [])]) -- newtype TopicMetadata = TopicMetadata { _topicMetadataFields :: (KafkaError, TopicName, [PartitionMetadata]) } deriving (Show, Eq, Deserializable) -- newtype PartitionMetadata = PartitionMetadata { _partitionMetadataFields :: (KafkaError, Partition, Leader, Replicas, Isr) } deriving (Show, Eq, Deserializable)