sarama

package module
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2015 License: MIT Imports: 22 Imported by: 0

README

sarama

GoDoc Build Status

Sarama is an MIT-licensed Go client library for Apache Kafka 0.8 (and later).

Getting started
Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support Go 1.3 and 1.4, and Kafka 0.8.1 and 0.8.2.

Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. A changelog is available here.

Other

Documentation

Overview

Package sarama provides client libraries for the Kafka 0.8 protocol. The AsyncProducer object is the high-level API for producing messages asynchronously; the SyncProducer provides a blocking API for the same purpose. The Consumer object is the high-level API for consuming messages. The Client object provides metadata management functionality that is shared between the higher-level objects.

For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire.

The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Index

Examples

Constants

View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be assigned to the next message
	// that will be produced to the partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming new messages.
	OffsetNewest int64 = -1
	// OffsetOldest stands for the oldest offset available on the broker for a partition. You can send this
	// to a client's GetOffset method to get this offset, or when calling ConsumePartition to start consuming
	// from the oldest offset that is still available on the broker.
	OffsetOldest int64 = -2
)
View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

Variables

View Source
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

View Source
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

View Source
var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var MaxRequestSize int32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var MaxResponseSize int32 = 100 * 1024 * 1024

MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer interface {

	// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
	// buffered. The shutdown has completed when both the Errors and Successes channels
	// have been closed. When calling AsyncClose, you *must* continue to read from those
	// channels in order to drain the results of any messages in flight.
	AsyncClose()

	// Close shuts down the producer and flushes any messages it may have buffered.
	// You must call this function before a producer object passes out of scope, as
	// it may otherwise leak memory. You must call this before calling Close on the
	// underlying client.
	Close() error

	// Input is the input channel for the user to write messages to that they wish to send.
	Input() chan<- *ProducerMessage

	// Successes is the success output channel back to the user when AckSuccesses is enabled.
	// If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
	// It is suggested that you send and read messages together in a single select statement.
	Successes() <-chan *ProducerMessage

	// Errors is the error output channel back to the user. You MUST read from this channel
	// or the Producer will deadlock when the channel is full. Alternatively, you can set
	// Producer.Return.Errors in your config to false, which prevents errors to be returned.
	Errors() <-chan *ProducerError
}

AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope.

Example (Goroutines)

This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.

config := NewConfig()
config.Producer.Return.Successes = true
producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	panic(err)
}

// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var (
	wg                          sync.WaitGroup
	enqueued, successes, errors int
)

wg.Add(1)
go func() {
	defer wg.Done()
	for _ = range producer.Successes() {
		successes++
	}
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for err := range producer.Errors() {
		log.Println(err)
		errors++
	}
}()

ProducerLoop:
for {
	message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
	select {
	case producer.Input() <- message:
		enqueued++

	case <-signals:
		producer.AsyncClose() // Trigger a shutdown of the producer.
		break ProducerLoop
	}
}

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
Output:

Example (Select)

This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.

producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
	panic(err)
}

defer func() {
	if err := producer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, errors int
ProducerLoop:
for {
	select {
	case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
		enqueued++
	case err := <-producer.Errors():
		log.Println("Failed to produce message", err)
		errors++
	case <-signals:
		break ProducerLoop
	}
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
Output:

func NewAsyncProducer

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error)

NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.

func NewAsyncProducerFromClient

func NewAsyncProducerFromClient(client Client) (AsyncProducer, error)

NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

Example
broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
	return err
}

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata(&request)
if err != nil {
	_ = broker.Close()
	return err
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

return broker.Close()
Output:

func NewBroker

func NewBroker(addr string) *Broker

NewBroker creates and returns a Broker targetting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

func (*Broker) Addr

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) Close

func (b *Broker) Close() error

func (*Broker) CommitOffset

func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)

func (*Broker) Connected

func (b *Broker) Connected() (bool, error)

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

func (*Broker) Fetch

func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)

func (*Broker) FetchOffset

func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)

func (*Broker) GetAvailableOffsets

func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)

func (*Broker) GetConsumerMetadata

func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

func (*Broker) GetMetadata

func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)

func (*Broker) ID

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

func (*Broker) Open

func (b *Broker) Open(conf *Config) error

Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.

func (*Broker) Produce

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type Client

type Client interface {
	// Config returns the Config struct of the client. This struct should not be altered after it
	// has been created.
	Config() *Config

	// Topics returns the set of available topics as retrieved from the cluster metadata.
	Topics() ([]string, error)

	// Partitions returns the sorted list of all partition IDs for the given topic.
	Partitions(topic string) ([]int32, error)

	// WritablePartitions returns the sorted list of all writable partition IDs for the given topic,
	// where "writable" means "having a valid leader accepting writes".
	WritablePartitions(topic string) ([]int32, error)

	// Leader returns the broker object that is the leader of the current topic/partition, as
	// determined by querying the cluster metadata.
	Leader(topic string, partitionID int32) (*Broker, error)

	// Replicas returns the set of all replica IDs for the given partition.
	Replicas(topic string, partitionID int32) ([]int32, error)

	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
	// available metadata for those topics. If no topics are provided, it will refresh metadata
	// for all topics.
	RefreshMetadata(topics ...string) error

	// GetOffset queries the cluster to get the most recent available offset at the given
	// time on the topic/partition combination. Time should be OffsetOldest for the earliest available
	// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
	GetOffset(topic string, partitionID int32, time int64) (int64, error)

	// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
	// value if it's available. You can call RefreshCoordinator to update the cached value.
	// This function only works on Kafka 0.8.2 and higher.
	Coordinator(consumerGroup string) (*Broker, error)

	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache.
	// This function only works on Kafka 0.8.2 and higher.
	RefreshCoordinator(consumerGroup string) error

	// Close shuts down all broker connections managed by this client. It is required to call this function before
	// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
	// using a client before you close the client.
	Close() error

	// Closed returns true if the client has already had Close called on it
	Closed() bool
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. A single client can be safely shared by multiple concurrent Producers and Consumers.

func NewClient

func NewClient(addrs []string, conf *Config) (Client, error)

NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	CompressionNone   CompressionCodec = 0
	CompressionGZIP   CompressionCodec = 1
	CompressionSnappy CompressionCodec = 2
)

type Config

type Config struct {
	// Net is the namespace for network-level properties used by the Broker, and shared by the Client/Producer/Consumer.
	Net struct {
		MaxOpenRequests int // How many outstanding requests a connection is allowed to have before sending on it blocks (default 5).

		// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
		DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).

		// KeepAlive specifies the keep-alive period for an active network connection.
		// If zero, keep-alives are disabled. (default is 0: disabled).
		KeepAlive time.Duration
	}

	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
	Metadata struct {
		Retry struct {
			Max     int           // The total number of times to retry a metadata request when the cluster is in the middle of a leader election (default 3).
			Backoff time.Duration // How long to wait for leader election to occur before retrying (default 250ms). Similar to the JVM's `retry.backoff.ms`.
		}
		// How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes.
		// Set to 0 to disable. Similar to `topic.metadata.refresh.interval.ms` in the JVM version.
		RefreshFrequency time.Duration
	}

	// Producer is the namespace for configuration related to producing messages, used by the Producer.
	Producer struct {
		// The maximum permitted size of a message (defaults to 1000000). Should be set equal to or smaller than the broker's `message.max.bytes`.
		MaxMessageBytes int
		// The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
		// Equivalent to the `request.required.acks` setting of the JVM producer.
		RequiredAcks RequiredAcks
		// The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds).
		// This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution,
		// nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
		Timeout time.Duration
		// The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
		Compression CompressionCodec
		// Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key).
		// Similar to the `partitioner.class` setting for the JVM producer.
		Partitioner PartitionerConstructor

		// Return specifies what channels will be populated. If they are set to true, you must read from
		// the respective channels to prevent deadlock.
		Return struct {
			// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
			Successes bool

			// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
			Errors bool
		}

		// The following config options control how often messages are batched up and sent to the broker. By default,
		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
		// into the subsequent batch.
		Flush struct {
			Bytes     int           // The best-effort number of bytes needed to trigger a flush. Use the global sarama.MaxRequestSize to set a hard upper limit.
			Messages  int           // The best-effort number of messages needed to trigger a flush. Use `MaxMessages` to set a hard upper limit.
			Frequency time.Duration // The best-effort frequency of flushes. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
			// The maximum number of messages the producer will send in a single broker request.
			// Defaults to 0 for unlimited. Similar to `queue.buffering.max.messages` in the JVM producer.
			MaxMessages int
		}

		Retry struct {
			// The total number of times to retry sending a message (default 3).
			// Similar to the `message.send.max.retries` setting of the JVM producer.
			Max int
			// How long to wait for the cluster to settle between retries (default 100ms).
			// Similar to the `retry.backoff.ms` setting of the JVM producer.
			Backoff time.Duration
		}
	}

	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
	Consumer struct {
		Retry struct {
			// How long to wait after a failing to read from a partition before trying again (default 2s).
			Backoff time.Duration
		}

		// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
		Fetch struct {
			// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
			// The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM's `fetch.min.bytes`.
			Min int32
			// The default number of message bytes to fetch from the broker in each request (default 32768). This should be larger than the
			// majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar
			// to the JVM's `fetch.message.max.bytes`.
			Default int32
			// The maximum number of message bytes to fetch from the broker in a single request. Messages larger than this will return
			// ErrMessageTooLarge and will not be consumable, so you must be sure this is at least as large as your largest message.
			// Defaults to 0 (no limit). Similar to the JVM's `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still applies.
			Max int32
		}
		// The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
		// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
		// Equivalent to the JVM's `fetch.wait.max.ms`.
		MaxWaitTime time.Duration

		// Return specifies what channels will be populated. If they are set to true, you must read from
		// them to prevent deadlock.
		Return struct {
			// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
			Errors bool
		}
	}

	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
	// Defaults to "sarama", but you should probably set it to something specific to your application.
	ClientID string
	// The number of events to buffer in internal and external channels. This permits the producer and consumer to
	// continue processing some messages in the background while user code is working, greatly improving throughput.
	// Defaults to 256.
	ChannelBufferSize int
}

Config is used to pass multiple configuration options to Sarama's constructors.

func NewConfig

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type Consumer

type Consumer interface {

	// Topics returns the set of available topics as retrieved from the cluster metadata.
	// This method is the same as Client.Topics(), and is provided for convenience.
	Topics() ([]string, error)

	// Partitions returns the sorted list of all partition IDs for the given topic.
	// This method is the same as Client.Pertitions(), and is provided for convenience.
	Partitions(topic string) ([]int32, error)

	// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
	// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
	// literal offset, or OffsetNewest or OffsetOldest
	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)

	// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
	Close() error
}

Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

Sarama's Consumer type does not currently support automatic consumer group rebalancing and offset tracking, however the https://github.com/wvanbergen/kafka library builds on Sarama to add this support. We plan to properly integrate this functionality at a later date.

Example (For_loop)

This example has the simplest use case of the consumer. It simply iterates over the messages channel using a for/range loop. Because a producer never stopsunless requested, a signal handler is registered so we can trigger a clean shutdown of the consumer.

master, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := master.Close(); err != nil {
		log.Fatalln(err)
	}
}()

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
	log.Fatalln(err)
}

go func() {
	// By default, the consumer will always keep going, unless we tell it to stop.
	// In this case, we capture the SIGINT signal so we can tell the consumer to stop
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	<-signals
	consumer.AsyncClose()
}()

msgCount := 0
for message := range consumer.Messages() {
	log.Println(string(message.Value))
	msgCount++
}
log.Println("Processed", msgCount, "messages.")
Output:

Example (Goroutines)

This example shows how to use a consumer with different goroutines to read from the Messages and Errors channels.

config := NewConfig()
config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.

master, err := NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := master.Close(); err != nil {
		panic(err)
	}
}()

consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
if err != nil {
	log.Fatalln(err)
}

var (
	wg       sync.WaitGroup
	msgCount int
)

wg.Add(1)
go func() {
	defer wg.Done()
	for message := range consumer.Messages() {
		log.Printf("Consumed message with offset %d", message.Offset)
		msgCount++
	}
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for err := range consumer.Errors() {
		log.Println(err)
	}
}()

// Wait for an interrupt signal to trigger the shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
consumer.AsyncClose()

// Wait for the Messages and Errors channel to be fully drained.
wg.Wait()
log.Println("Processed", msgCount, "messages.")
Output:

Example (Select)

This example shows how to use a consumer with a select statement dealing with the different channels.

config := NewConfig()
config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.

master, err := NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := master.Close(); err != nil {
		log.Fatalln(err)
	}
}()

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := consumer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

msgCount := 0

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumerLoop:
for {
	select {
	case err := <-consumer.Errors():
		log.Println(err)
	case <-consumer.Messages():
		msgCount++
	case <-signals:
		log.Println("Received interrupt")
		break consumerLoop
	}
}
log.Println("Processed", msgCount, "messages.")
Output:

func NewConsumer

func NewConsumer(addrs []string, config *Config) (Consumer, error)

NewConsumer creates a new consumer using the given broker addresses and configuration.

func NewConsumerFromClient

func NewConsumerFromClient(client Client) (Consumer, error)

NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.

type ConsumerError

type ConsumerError struct {
	Topic     string
	Partition int32
	Err       error
}

ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.

func (ConsumerError) Error

func (ce ConsumerError) Error() string

type ConsumerErrors

type ConsumerErrors []*ConsumerError

ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.

func (ConsumerErrors) Error

func (ce ConsumerErrors) Error() string

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	ConsumerGroup string
}

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Err             KError
	Coordinator     *Broker
	CoordinatorID   int32  // deprecated: use Coordinator.ID()
	CoordinatorHost string // deprecated: use Coordinator.Addr()
	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
}

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type FetchRequest

type FetchRequest struct {
	MaxWaitTime int32
	MinBytes    int32
	// contains filtered or unexported fields
}

func (*FetchRequest) AddBlock

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

type FetchResponse

type FetchResponse struct {
	Blocks map[string]map[int32]*FetchResponseBlock
}

func (*FetchResponse) AddError

func (fr *FetchResponse) AddError(topic string, partition int32, err KError)

func (*FetchResponse) AddMessage

func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) GetBlock

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

type FetchResponseBlock

type FetchResponseBlock struct {
	Err                 KError
	HighWaterMarkOffset int64
	MsgSet              MessageSet
}

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrNoError                         KError = 0
	ErrUnknown                         KError = -1
	ErrOffsetOutOfRange                KError = 1
	ErrInvalidMessage                  KError = 2
	ErrUnknownTopicOrPartition         KError = 3
	ErrInvalidMessageSize              KError = 4
	ErrLeaderNotAvailable              KError = 5
	ErrNotLeaderForPartition           KError = 6
	ErrRequestTimedOut                 KError = 7
	ErrBrokerNotAvailable              KError = 8
	ErrReplicaNotAvailable             KError = 9
	ErrMessageSizeTooLarge             KError = 10
	ErrStaleControllerEpochCode        KError = 11
	ErrOffsetMetadataTooLarge          KError = 12
	ErrOffsetsLoadInProgress           KError = 14
	ErrConsumerCoordinatorNotAvailable KError = 15
	ErrNotCoordinatorForConsumer       KError = 16
	ErrInvalidTopic                    KError = 17
	ErrMessageSetSizeTooLarge          KError = 18
	ErrNotEnoughReplicas               KError = 19
	ErrNotEnoughReplicasAfterAppend    KError = 20
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type Message

type Message struct {
	Codec CompressionCodec // codec used to compress the message contents
	Key   []byte           // the message key, may be nil
	Value []byte           // the message contents
	Set   *MessageSet      // the message set a message might wrap
	// contains filtered or unexported fields
}

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	Messages               []*MessageBlock
}

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

type MetadataResponse

type MetadataResponse struct {
	Brokers []*Broker
	Topics  []*TopicMetadata
}

func (*MetadataResponse) AddBroker

func (m *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopic added in v1.1.0

func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata

func (*MetadataResponse) AddTopicPartition

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup           string
	ConsumerGroupGeneration int32  // v1 or later
	ConsumerID              string // v1 or later
	RetentionTime           int64  // v2 or later

	// Version can be:
	// - 0 (kafka 0.8.1 and later)
	// - 1 (kafka 0.8.2 and later)
	// - 2 (kafka 0.8.3 and later)
	Version int16
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Errors map[string]map[int32]KError
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	Version       int16
	// contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Blocks map[string]map[int32]*OffsetFetchResponseBlock
}

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset   int64
	Metadata string
	Err      KError
}

type OffsetRequest

type OffsetRequest struct {
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32)

type OffsetResponse

type OffsetResponse struct {
	Blocks map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err     KError
	Offsets []int64
}

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type PartitionConsumer

type PartitionConsumer interface {

	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
	// after which you should wait until the 'messages' and 'errors' channel are drained.
	// It is required to call this function, or Close before a consumer object passes out of scope,
	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
	// client.
	AsyncClose()

	// Close stops the PartitionConsumer from fetching messages. It is required to call this function
	// (or AsyncClose) before a consumer object passes out of scope, as it will otherwise leak memory. You must
	// call this before calling Close on the underlying client.
	Close() error

	// Messages returns the read channel for the messages that are returned by the broker.
	Messages() <-chan *ConsumerMessage

	// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
	// errors are logged and not returned over this channel. If you want to implement any custom errpr
	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
	Errors() <-chan *ConsumerError

	// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
	// be used for the next message that will be produced. You can use this to determine how far behind
	// the processing is.
	HighWaterMarkOffset() int64
}

PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close() or AsyncClose() on a PartitionConsumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.

type PartitionMetadata

type PartitionMetadata struct {
	Err      KError
	ID       int32
	Leader   int32
	Replicas []int32
	Isr      []int32
}

type Partitioner

type Partitioner interface {
	Partition(message *ProducerMessage, numPartitions int32) (int32, error) // Partition takes a message and partition count and chooses a partition

	// RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not.
	// Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to
	// be unavailable), and its choice must be respected by the caller. The obvious example is the HashPartitioner.
	RequiresConsistency() bool
}

Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

Example (Manual)

This example shows how to assign partitions to your messages manually.

config := NewConfig()

// First, we tell the producer that we are going to partition ourselves.
config.Producer.Partitioner = NewManualPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	log.Fatal(err)
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Println("Failed to close producer:", err)
	}
}()

// Now, we set the Partition field of the ProducerMessage struct.
msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Fatalln("Failed to produce message to kafka cluster.")
}

if partition != 6 {
	log.Fatal("Message should have been produced to partition 6!")
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)
Output:

Example (Per_topic)

This example shows how to set a different partitioner depending on the topic.

config := NewConfig()
config.Producer.Partitioner = func(topic string) Partitioner {
	switch topic {
	case "access_log", "error_log":
		return NewRandomPartitioner(topic)

	default:
		return NewHashPartitioner(topic)
	}
}

// ...
Output:

Example (Random)

By default, Sarama uses the message's key to consistently assign a partition to a message using hashing. If no key is set, a random partition will be chosen. This example shows how you can partition messages randomly, even when a key is set, by overriding Config.Producer.Partitioner.

config := NewConfig()
config.Producer.Partitioner = NewRandomPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	log.Fatal(err)
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Println("Failed to close producer:", err)
	}
}()

msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Fatalln("Failed to produce message to kafka cluster.")
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)
Output:

func NewHashPartitioner

func NewHashPartitioner(topic string) Partitioner

NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func NewManualPartitioner

func NewManualPartitioner(topic string) Partitioner

NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.

func NewRandomPartitioner

func NewRandomPartitioner(topic string) Partitioner

NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

func NewRoundRobinPartitioner

func NewRoundRobinPartitioner(topic string) Partitioner

NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.

type PartitionerConstructor

type PartitionerConstructor func(topic string) Partitioner

PartitionerConstructor is the type for a function capable of constructing new Partitioners.

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks RequiredAcks
	Timeout      int32
	// contains filtered or unexported fields
}

func (*ProduceRequest) AddMessage

func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet

func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse

type ProduceResponse struct {
	Blocks map[string]map[int32]*ProduceResponseBlock
}

func (*ProduceResponse) AddTopicPartition

func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err    KError
	Offset int64
}

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

func (ProducerError) Error

func (pe ProducerError) Error() string

type ProducerErrors

type ProducerErrors []*ProducerError

ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

func (ProducerErrors) Error

func (pe ProducerErrors) Error() string

type ProducerMessage

type ProducerMessage struct {
	Topic string  // The Kafka topic for this message.
	Key   Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
	Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.

	// These are filled in by the producer as the message is processed
	Offset    int64 // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
	Partition int32 // Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.

	Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels.  Sarama completely ignores this field and is only to be used for pass-through data.
	// contains filtered or unexported fields
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all replicas to commit before responding.
	WaitForAll RequiredAcks = -1
)

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)

Logger is the instance of a StdLogger interface that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type SyncProducer

type SyncProducer interface {

	// SendMessage produces a given message, and returns only when it either has succeeded or failed to produce.
	// It will return the partition and the offset of the produced message, or an error if the message
	// failed to produce.
	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

	// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
	// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
	// on the underlying client.
	Close() error
}

SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

Example

This example shows the basic usage pattern of the SyncProducer.

producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Printf("FAILED to send message: %s\n", err)
} else {
	log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Output:

func NewSyncProducer

func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error)

NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.

func NewSyncProducerFromClient

func NewSyncProducerFromClient(client Client) (SyncProducer, error)

NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type TopicMetadata

type TopicMetadata struct {
	Err        KError
	Name       string
	Partitions []*PartitionMetadata
}

Directories

Path Synopsis
examples
interceptors Module
Package mocks provides mocks that can be used for testing applications that use Sarama.
Package mocks provides mocks that can be used for testing applications that use Sarama.
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL