{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
module Network.UI.Kafka (
TopicConnection(..)
, Sensor
, LoopAction
, ExitAction
, ConsumerCallback
, consumerLoop
, rawConsumerLoop
, ProducerCallback
, producerLoop
, rawProducerLoop
) where
import Control.Arrow ((***))
import Control.Concurrent (MVar, newEmptyMVar, isEmptyMVar, threadDelay, tryPutMVar)
import Control.Monad (void, when)
import Control.Monad.Except (liftIO)
import Data.Aeson.Types (FromJSON, ToJSON)
import Data.Binary (decode, encode)
import Data.ByteString.Char8 (pack, unpack)
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.String (IsString(fromString))
import GHC.Generics (Generic)
import Network.UI.Kafka.Types (Event)
import Network.Kafka (KafkaClientError, KafkaTime(..), TopicAndMessage(..), getLastOffset, mkKafkaState, runKafka, withAddressHandle)
import Network.Kafka.Consumer (fetch', fetchMessages, fetchRequest)
import Network.Kafka.Producer (makeKeyedMessage, produceMessages)
import Network.Kafka.Protocol (FetchResponse(..), KafkaBytes(..), Key(..), Message(..), Value(..))
data TopicConnection =
TopicConnection
{
client :: String
, address :: (String, Int)
, topic :: String
}
deriving (Eq, Generic, Read, Show)
instance FromJSON TopicConnection
instance ToJSON TopicConnection
type Sensor = String
type LoopAction = IO (Either KafkaClientError ())
type ExitAction = IO ()
type ConsumerCallback = Sensor
-> Event
-> IO ()
consumerLoop :: TopicConnection
-> ConsumerCallback
-> IO (ExitAction, LoopAction)
consumerLoop topicConnection =
let
fromMessage :: Message -> (Sensor, Event)
fromMessage message =
let
(_, _, _, Key (Just (KBytes k)), Value (Just (KBytes v))) = _messageFields message
in
(unpack k, decode $ fromStrict v)
in
rawConsumerLoop topicConnection fromMessage
. uncurry
rawConsumerLoop :: TopicConnection
-> (Message -> a)
-> (a -> IO ())
-> IO (ExitAction, LoopAction)
rawConsumerLoop TopicConnection{..} fromMessage consumer =
do
exitFlag <- newEmptyMVar :: IO (MVar ())
let
topic' = fromString topic
address' = fromString *** fromIntegral $ address
loop offset =
do
result <- withAddressHandle address' $ \handle -> fetch' handle =<< fetchRequest offset 0 topic'
let
(_, [(_, _, offset', _)]) : _ = _fetchResponseFields result
messages = fromMessage . _tamMessage <$> fetchMessages result
liftIO
$ do
mapM_ consumer messages
threadDelay 100
running <- liftIO $ isEmptyMVar exitFlag
when running
$ loop offset'
return
(
void $ tryPutMVar exitFlag ()
, fmap void $ runKafka (mkKafkaState (fromString client) address')
$ do
offset <- getLastOffset LatestTime 0 topic'
loop offset
)
type ProducerCallback = IO [Event]
producerLoop :: TopicConnection
-> Sensor
-> ProducerCallback
-> IO (ExitAction, LoopAction)
producerLoop topicConnection sensor =
let
toMessage = makeKeyedMessage (pack sensor) . toStrict . encode
in
rawProducerLoop topicConnection toMessage
rawProducerLoop :: TopicConnection
-> (a -> Message)
-> IO [a]
-> IO (ExitAction, LoopAction)
rawProducerLoop TopicConnection{..} toMessage producer =
do
exitFlag <- newEmptyMVar :: IO (MVar ())
let
loop =
do
events <- liftIO producer
void
. produceMessages
$ map
(TopicAndMessage (fromString topic) . toMessage)
events
running <- liftIO $ isEmptyMVar exitFlag
when (running && not (null events))
loop
return
(
void $ tryPutMVar exitFlag ()
, void <$> runKafka (mkKafkaState (fromString client) (fromString *** fromIntegral $ address)) loop
)