| License | BSD3 |
|---|---|
| Stability | stable |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
AWS.Lambda.Events.Kafka
Contents
Description
It is possible to subscribe Lambda functions to Kafka topics. You can subscribe to topics from Amazon Managed Streaming for Kafka (MSK) as well as self-managed Kafka clusters.
Lambda considers Amazon Managed Streaming for Kafka (MSK) to be a different event source type from a self-managed Apache Kafka cluster, but their payloads are very similar. The types in this module are derived from inspecting invocation payloads, and from reading the following links:
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
- https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
- https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/
Synopsis
- data KafkaEvent = KafkaEvent {
- eventSource :: !EventSource
- eventSourceArn :: !(Maybe Text)
- bootstrapServers :: !(NonEmpty Text)
- records :: !(Map Text [Record])
- data EventSource
- type Record = Record' ByteString
- data Record' a = Record {}
- data Header = Header !Text !ByteString
- data Timestamp
- parseTimestamp :: Object -> Parser Timestamp
- unparseTimestamp :: KeyValue kv => Timestamp -> [kv]
- int64ToUTCTime :: Int64 -> UTCTime
- utcTimeToInt64 :: UTCTime -> Int64
Documentation
data KafkaEvent Source #
Represents an event from either Amazon MSK or a self-managed Apache Kafka cluster, as the payloads are very similar.
The ToJSON and FromJSON instances on Record perform base64
conversion for you.
See the AWS documentation for a sample payload.
Constructors
| KafkaEvent | |
Fields
| |
Instances
data EventSource Source #
Constructors
| AwsKafka | "aws:kafka" |
| SelfManagedKafka | SelfManagedKafka |
Instances
type Record = Record' ByteString Source #
Convenience alias: most of the time you will parse the records straight into some app-specific structure.
Records from a Kafka event. This is Traversable, which means
you can do things like parse a JSON-encoded payload:
traversedecodeStrict::FromJSONa => Record -> Maybe (Record' a)
Constructors
| Record | |
Instances
AWS serialises record headers to JSON as an array of objects. From their docs:
"headers":[{"headerKey":[104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
Note:
>>>map chr [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]"headerValue"
Constructors
| Header !Text !ByteString |
Instances
| FromJSON Header Source # | |
| ToJSON Header Source # | |
Defined in AWS.Lambda.Events.Kafka | |
| Generic Header Source # | |
| Show Header Source # | |
| Eq Header Source # | |
| type Rep Header Source # | |
Defined in AWS.Lambda.Events.Kafka type Rep Header = D1 ('MetaData "Header" "AWS.Lambda.Events.Kafka" "hal-1.1-IwuGgwGQvpsJOoRlidLUAk" 'False) (C1 ('MetaCons "Header" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Text) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 ByteString))) | |
Kafka timestamp types, derived from the Java client's enum at: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
Constructors
| NoTimestampType | |
| CreateTime !UTCTime | |
| LogAppendTime !UTCTime |
Instances
| Generic Timestamp Source # | |
| Show Timestamp Source # | |
| Eq Timestamp Source # | |
| type Rep Timestamp Source # | |
Defined in AWS.Lambda.Events.Kafka type Rep Timestamp = D1 ('MetaData "Timestamp" "AWS.Lambda.Events.Kafka" "hal-1.1-IwuGgwGQvpsJOoRlidLUAk" 'False) (C1 ('MetaCons "NoTimestampType" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "CreateTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 UTCTime)) :+: C1 ('MetaCons "LogAppendTime" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 UTCTime)))) | |
Internal
unparseTimestamp :: KeyValue kv => Timestamp -> [kv] Source #
int64ToUTCTime :: Int64 -> UTCTime Source #
utcTimeToInt64 :: UTCTime -> Int64 Source #