Documentation ¶
Overview ¶
Package kafkapubsub provides an implementation of pubsub for Kafka. It requires a minimum Kafka version of 0.11.x for Header support. Some functionality may work with earlier versions of Kafka.
See https://kafka.apache.org/documentation.html#semantics for a discussion of message semantics in Kafka. sarama.Config exposes many knobs that can affect performance and semantics, so review and set them carefully.
kafkapubsub does not support Message.Nack; Message.Nackable will return false, and Message.Nack will panic if called.
URLs ¶
For pubsub.OpenTopic and pubsub.OpenSubscription, kafkapubsub registers for the scheme "kafka". The default URL opener will connect to a default set of Kafka brokers based on the environment variable "KAFKA_BROKERS", expected to be a comma-delimited set of server addresses. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.
Escaping ¶
Go CDK supports all UTF-8 strings. No escaping is required for Kafka. Message metadata is supported through Kafka Headers, which allow arbitrary []byte for both key and value. These are converted to string for use in Message.Metadata.
As ¶
kafkapubsub exposes the following types for As:
- Topic: sarama.SyncProducer
- Subscription: sarama.ConsumerGroup, sarama.ConsumerGroupSession (may be nil during session renegotiation, and session may go stale at any time)
- Message: *sarama.ConsumerMessage
- Message.BeforeSend: *sarama.ProducerMessage
- Error: sarama.ConsumerError, sarama.ConsumerErrors, sarama.ProducerError, sarama.ProducerErrors, sarama.ConfigurationError, sarama.PacketDecodingError, sarama.PacketEncodingError, sarama.KError
Example (OpenSubscriptionFromURL) ¶
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // This example is used in https://gocloud.dev/howto/pubsub/subscribe/#kafka // import _ "gocloud.dev/pubsub/kafkapubsub" // Variables set up elsewhere: ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // The host + path are used as the consumer group name. // The "topic" query parameter sets one or more topics to subscribe to. // The set of brokers must be in an environment variable KAFKA_BROKERS. subscription, err := pubsub.OpenSubscription(ctx, "kafka://my-group?topic=my-topic") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenTopicFromURL) ¶
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // This example is used in https://gocloud.dev/howto/pubsub/publish/#kafka // import _ "gocloud.dev/pubsub/kafkapubsub" // Variables set up elsewhere: ctx := context.Background() // pubsub.OpenTopic creates a *pubsub.Topic from a URL. // The host + path are the topic name to send to. // The set of brokers must be in an environment variable KAFKA_BROKERS. topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Index ¶
- Constants
- func MinimalConfig() *sarama.Config
- func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, ...) (*pubsub.Subscription, error)
- func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const Scheme = "kafka"
Scheme is the URL scheme that kafkapubsub registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
This section is empty.
Functions ¶
func MinimalConfig ¶
MinimalConfig returns a minimal sarama.Config.
func OpenSubscription ¶
func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription creates a pubsub.Subscription that joins group, receiving messages from topics.
It uses a sarama.ConsumerGroup to receive messages. Consumer options can be configured in the Consumer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.
Example ¶
package main import ( "context" "log" "gocloud.dev/pubsub/kafkapubsub" ) func main() { // This example is used in https://gocloud.dev/howto/pubsub/subscribe/#kafka-ctor // Variables set up elsewhere: ctx := context.Background() // The set of brokers in the Kafka cluster. addrs := []string{"1.2.3.4:9092"} // The Kafka client configuration to use. config := kafkapubsub.MinimalConfig() // Construct a *pubsub.Subscription, joining the consumer group "my-group" // and receiving messages from "my-topic". subscription, err := kafkapubsub.OpenSubscription( addrs, config, "my-group", []string{"my-topic"}, nil) if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
func OpenTopic ¶
func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)
OpenTopic creates a pubsub.Topic that sends to a Kafka topic.
It uses a sarama.SyncProducer to send messages. Producer options can be configured in the Producer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.
Config.Producer.Return.Success must be set to true.
Example ¶
package main import ( "context" "log" "gocloud.dev/pubsub/kafkapubsub" ) func main() { // This example is used in https://gocloud.dev/howto/pubsub/publish/#kafka-ctor // Variables set up elsewhere: ctx := context.Background() // The set of brokers in the Kafka cluster. addrs := []string{"1.2.3.4:9092"} // The Kafka client configuration to use. config := kafkapubsub.MinimalConfig() // Construct a *pubsub.Topic. topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil) if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct { // KeyName optionally sets the Message.Metadata key in which to store the // Kafka message key. If set, and if the Kafka message key is non-empty, // the key value will be stored in Message.Metadata under KeyName. KeyName string // WaitForJoin causes OpenSubscription to wait for up to WaitForJoin // to allow the client to join the consumer group. // Messages sent to the topic before the client joins the group // may not be received by this subscription. // OpenSubscription will succeed even if WaitForJoin elapses and // the subscription still hasn't been joined successfully. WaitForJoin time.Duration }
SubscriptionOptions contains configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct { // KeyName optionally sets the Message.Metadata key to use as the optional // Kafka message key. If set, and if a matching Message.Metadata key is found, // the value for that key will be used as the message key when sending to // Kafka, instead of being added to the message headers. KeyName string }
TopicOptions contains configuration options for topics.
type URLOpener ¶
type URLOpener struct { // Brokers is the slice of brokers in the Kafka cluster. Brokers []string // Config is the Sarama Config. // Config.Producer.Return.Success must be set to true. Config *sarama.Config // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens Kafka URLs like "kafka://mytopic" for topics and "kafka://group?topic=mytopic" for subscriptions.
For topics, the URL's host+path is used as the topic name.
For subscriptions, the URL's host+path is used as the group name, and the "topic" query parameter(s) are used as the set of topics to subscribe to.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.