module Haskakafka.Example where
import Haskakafka
import qualified Data.ByteString.Char8 as C8
example :: IO ()
example = do
let
kafkaConfig = [("socket.timeout.ms", "50000")]
topicConfig = [("request.timeout.ms", "50000")]
samplePayload = C8.pack "Hello world"
withKafkaProducer kafkaConfig topicConfig
"localhost:9092" "test_topic"
$ \kafka topic -> do
let message = KafkaProduceMessage samplePayload
_ <- produceMessage topic (KafkaSpecifiedPartition 0) message
let keyMessage = KafkaProduceKeyedMessage (C8.pack "Key") samplePayload
_ <- produceKeyedMessage topic message
_ <- produceMessageBatch topic KafkaUnassignedPartition [message, keyMessage]
putStrLn "Done producing messages, here was our config: "
dumpConfFromKafka kafka >>= \d -> putStrLn $ "Kafka config: " ++ (show d)
dumpConfFromKafkaTopic topic >>= \d -> putStrLn $ "Topic config: " ++ (show d)
let partition = 0
withKafkaConsumer kafkaConfig topicConfig
"localhost:9092" "test_topic"
partition
KafkaOffsetBeginning
$ \kafka topic -> do
let timeoutMs = 1000
me <- consumeMessage topic partition timeoutMs
case me of
(Left err) -> putStrLn $ "Uh oh, an error! " ++ (show err)
(Right m) -> putStrLn $ "Woo, payload was " ++ (C8.unpack $ messagePayload m)
let maxMessages = 10
mes <- consumeMessageBatch topic partition timeoutMs maxMessages
case mes of
(Left err) -> putStrLn $ "Something went wrong in batch consume! " ++ (show err)
(Right ms) -> putStrLn $ "Woohoo, we got " ++ (show $ length ms) ++ " messages"
setLogLevel kafka KafkaLogCrit
let timeoutMs = 1000
emd <- fetchBrokerMetadata [] "localhost:9092" timeoutMs
case emd of
(Left err) -> putStrLn $ "Uh oh, error time: " ++ (show err)
(Right md) -> putStrLn $ "Kafka metadata: " ++ (show md)