License | BSD3 |
---|---|
Stability | stable |
Safe Haskell | None |
Language | Haskell2010 |
It is possible to subscribe Lambda functions to Kafka topics. You can subscribe to topics from Amazon Managed Streaming for Kafka (MSK) as well as self-managed Kafka clusters.
Lambda considers Amazon Managed Streaming for Kafka (MSK) to be a different event source type from a self-managed Apache Kafka cluster, but their payloads are very similar. The types in this module are derived from inspecting invocation payloads, and from reading the following links:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
- https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
- https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/
Synopsis
- data KafkaEvent = KafkaEvent {
- eventSource :: !EventSource
- eventSourceArn :: !(Maybe Text)
- bootstrapServers :: !(NonEmpty Text)
- records :: !(Map Text [Record])
- data EventSource
- type Record = Record' ByteString
- data Record' a = Record {}
- data Header = Header !Text !ByteString
- data Timestamp
- parseTimestamp :: Object -> Parser Timestamp
- unparseTimestamp :: KeyValue kv => Timestamp -> [kv]
- int64ToUTCTime :: Int64 -> UTCTime
- utcTimeToInt64 :: UTCTime -> Int64
Documentation
data KafkaEvent Source #
Represents an event from either Amazon MSK or a self-managed Apache Kafka cluster, as the payloads are very similar.
The ToJSON
and FromJSON
instances on Record
perform base64
conversion for you.
See the AWS documentation for a sample payload.
KafkaEvent | |
|
Instances
data EventSource Source #
AwsKafka | "aws:kafka" |
SelfManagedKafka | SelfManagedKafka |
Instances
type Record = Record' ByteString Source #
Convenience alias: most of the time you will parse the records straight into some app-specific structure.
Records from a Kafka event. This is Traversable
, which means
you can do things like parse a JSON-encoded payload:
traverse
decodeStrict
::FromJSON
a => Record -> Maybe (Record' a)
Instances
AWS serialises record headers to JSON as an array of objects. From their docs:
"headers":[{"headerKey":[104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
Note:
>>>
map chr [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
"headerValue"
Instances
Eq Header Source # | |
Show Header Source # | |
Generic Header Source # | |
ToJSON Header Source # | |
Defined in AWS.Lambda.Events.Kafka | |
FromJSON Header Source # | |
type Rep Header Source # | |
Defined in AWS.Lambda.Events.Kafka type Rep Header = D1 ('MetaData "Header" "AWS.Lambda.Events.Kafka" "hal-0.4.9-8b71OWlxF37ByGgcLKsX7v" 'False) (C1 ('MetaCons "Header" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Text) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 ByteString))) |
Kafka timestamp types, derived from the Java client's enum at: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
Instances
Eq Timestamp Source # | |
Show Timestamp Source # | |
Generic Timestamp Source # | |
type Rep Timestamp Source # | |
Defined in AWS.Lambda.Events.Kafka type Rep Timestamp = D1 ('MetaData "Timestamp" "AWS.Lambda.Events.Kafka" "hal-0.4.9-8b71OWlxF37ByGgcLKsX7v" 'False) (C1 ('MetaCons "NoTimestampType" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "CreateTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 UTCTime)) :+: C1 ('MetaCons "LogAppendTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 UTCTime)))) |
Internal
unparseTimestamp :: KeyValue kv => Timestamp -> [kv] Source #
int64ToUTCTime :: Int64 -> UTCTime Source #
utcTimeToInt64 :: UTCTime -> Int64 Source #