kafka

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2018 License: Apache-2.0 Imports: 11 Imported by: 0

README

Information for confluent-kafka-go developers

Whenever librdkafka error codes are updated make sure to run generate before building:

  $ (cd go_rdkafka_generr && go install) && go generate
  $ go build

Testing

Some of the tests included in this directory, the benchmark and integration tests in particular, require an existing Kafka cluster and a testconf.json configuration file to provide tests with bootstrap brokers, topic name, etc.

The format of testconf.json is a JSON object:

{
  "Brokers": "<bootstrap-brokers>",
  "Topic": "<test-topic-name>"
}

See testconf-example.json for an example and full set of available options.

To run unit-tests:

$ go test

To run benchmark tests:

$ go test -bench .

For the code coverage:

$ go test -coverprofile=coverage.out -bench=.
$ go tool cover -func=coverage.out

Build tags (static linking)

Different build types are supported through Go build tags (-tags ..), these tags should be specified on the application build command.

  • static - Build with librdkafka linked statically (but librdkafka dependencies linked dynamically).
  • static_all - Build with all libraries linked statically.
  • neither - Build with librdkafka (and its dependencies) linked dynamically.

Generating HTML documentation

To generate one-page HTML documentation run the mk/doc-gen.py script from the top-level directory. This script requires the beautifulsoup4 Python package.

$ source .../your/virtualenv/bin/activate
$ pip install beautifulsoup4
...
$ mk/doc-gen.py > kafka.html

Documentation

Overview

Package kafka provides high-level Apache Kafka producer and consumers using bindings on-top of the librdkafka C library.

High-level Consumer

* Decide if you want to read messages and events from the `.Events()` channel (set `"go.events.channel.enable": true`) or by calling `.Poll()`.

* Create a Consumer with `kafka.NewConsumer()` providing at least the `bootstrap.servers` and `group.id` configuration properties.

* Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics) to join the group with the specified subscription set. Subscriptions are atomic, calling `.Subscribe*()` again will leave the group and rejoin with the new set of topics.

* Start reading events and messages from either the `.Events` channel or by calling `.Poll()`.

* When the group has rebalanced each client member is assigned a (sub-)set of topic+partitions. By default the consumer will start fetching messages for its assigned partitions at this point, but your application may enable rebalance events to get an insight into what the assigned partitions where as well as set the initial offsets. To do this you need to pass `"go.application.rebalance.enable": true` to the `NewConsumer()` call mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event with the assigned partition set. You can optionally modify the initial offsets (they'll default to stored offsets and if there are no previously stored offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}` which defaults to the `latest` message) and then call `.Assign(partitions)` to start consuming. If you don't need to modify the initial offsets you will not need to call `.Assign()`, the client will do so automatically for you if you dont.

* As messages are fetched they will be made available on either the `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.

* Handle messages, events and errors to your liking.

* When you are done consuming call `.Close()` to commit final offsets and leave the consumer group.

Producer

* Create a Producer with `kafka.NewProducer()` providing at least the `bootstrap.servers` configuration properties.

* Messages may now be produced either by sending a `*kafka.Message` on the `.ProduceChannel` or by calling `.Produce()`.

* Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message` and you should check `msg.TopicPartition.Error` for `nil` to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil `chan Event` channel to `.Produce()`. If no delivery reports are wanted they can be completely disabled by setting configuration property `"go.delivery.reports": false`.

* When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function `.Flush()` that will block until all message deliveries are done or the provided timeout elapses.

* Finally call `.Close()` to decommission the producer.

Events

Apart from emitting messages and delivery reports the client also communicates with the application through a number of different event types. An application may choose to handle or ignore these events.

Consumer events

* `*kafka.Message` - a fetched message.

* `AssignedPartitions` - The assigned partition set for this client following a rebalance. Requires `go.application.rebalance.enable`

* `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance. `AssignedPartitions` and `RevokedPartitions` are symetrical. Requires `go.application.rebalance.enable`

* `PartitionEOF` - Consumer has reached the end of a partition. NOTE: The consumer will keep trying to fetch new messages for the partition.

* `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).

Producer events

* `*kafka.Message` - delivery report for produced message. Check `.TopicPartition.Error` for delivery result.

Generic events for both Consumer and Producer

* `KafkaError` - client (error codes are prefixed with _) or broker error. These errors are normally just informational since the client will try its best to automatically recover (eventually).

Hint: If your application registers a signal notification (signal.Notify) makes sure the signals channel is buffered to avoid possible complications with blocking Poll() calls.

Note: The Confluent Kafka Go client is safe for concurrent use.

Index

Constants

View Source
const (
	// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
	TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
	// TimestampCreateTime indicates timestamp set by producer (source time)
	TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
	// TimestampLogAppendTime indicates timestamp set set by broker (store time)
	TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
)
View Source
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)

OffsetBeginning represents the earliest offset (logical)

OffsetEnd represents the latest offset (logical)

View Source
const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)

OffsetInvalid represents an invalid/unspecified offset

View Source
const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)

OffsetStored represents a stored offset

View Source
const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)

PartitionAny represents any partition (for partitioning), or unspecified value (for all other cases)

Variables

This section is empty.

Functions

func LibraryVersion

func LibraryVersion() (int, string)

LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.

Types

type AssignedPartitions

type AssignedPartitions struct {
	Partitions []TopicPartition
}

AssignedPartitions consumer group rebalance event: assigned partition set

func (AssignedPartitions) String

func (e AssignedPartitions) String() string

type BrokerMetadata

type BrokerMetadata struct {
	ID   int32
	Host string
	Port int
}

BrokerMetadata contains per-broker metadata

type ConfigMap

type ConfigMap map[string]ConfigValue

ConfigMap is a map contaning standard librdkafka configuration properties as documented in: https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md

The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.

func (ConfigMap) Get added in v0.11.4

func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)

Get finds the given key in the ConfigMap and returns its value. If the key is not found `defval` is returned. If the key is found but the type does not match that of `defval` (unless nil) an ErrInvalidArg error is returned.

func (ConfigMap) Set

func (m ConfigMap) Set(kv string) error

Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.

func (ConfigMap) SetKey

func (m ConfigMap) SetKey(key string, value ConfigValue) error

SetKey sets configuration property key to value. For user convenience a key prefixed with {topic}. will be set on the "default.topic.config" sub-map.

type ConfigValue

type ConfigValue interface{}

ConfigValue supports the following types:

bool, int, string, any type with the standard String() interface

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements a High-level Apache Kafka Consumer instance

func NewConsumer

func NewConsumer(conf *ConfigMap) (*Consumer, error)

NewConsumer creates a new high-level Consumer instance.

Supported special configuration properties:

go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
                                     If set to true the app must handle the AssignedPartitions and
                                     RevokedPartitions events and call Assign() and Unassign()
                                     respectively.
go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
go.events.channel.size (int, 1000) - Events() channel size

WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.

func (*Consumer) Assign

func (c *Consumer) Assign(partitions []TopicPartition) (err error)

Assign an atomic set of partitions to consume. This replaces the current assignment.

func (*Consumer) Assignment added in v0.9.4

func (c *Consumer) Assignment() (partitions []TopicPartition, err error)

Assignment returns the current partition assignments

func (*Consumer) Close

func (c *Consumer) Close() (err error)

Close Consumer instance. The object is no longer usable after this call.

func (*Consumer) Commit

func (c *Consumer) Commit() ([]TopicPartition, error)

Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.

func (*Consumer) CommitMessage

func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)

CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.

func (*Consumer) CommitOffsets

func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)

CommitOffsets commits the provided list of offsets This is a blocking call. Returns the committed offsets on success.

func (*Consumer) Committed added in v0.11.4

func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

Committed retrieves committed offsets for the given set of partitions

func (*Consumer) Events

func (c *Consumer) Events() chan Event

Events returns the Events channel (if enabled)

func (*Consumer) GetMetadata

func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned.

func (*Consumer) OffsetsForTimes added in v0.11.4

func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

OffsetsForTimes looks up offsets by timestamp for the given partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.

The function will block for at most timeoutMs milliseconds.

Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.

func (*Consumer) Pause added in v0.11.4

func (c *Consumer) Pause(partitions []TopicPartition) (err error)

Pause consumption for the provided list of partitions

Note that messages already enqueued on the consumer's Event channel (if `go.events.channel.enable` has been set) will NOT be purged by this call, set `go.events.channel.size` accordingly.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) (event Event)

Poll the consumer for messages or events.

Will block for at most timeoutMs milliseconds

The following callbacks may be triggered:

Subscribe()'s rebalanceCb

Returns nil on timeout, else an Event

func (*Consumer) QueryWatermarkOffsets

func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.

func (*Consumer) ReadMessage added in v0.11.4

func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)

ReadMessage polls the consumer for a message.

This is a conveniance API that wraps Poll() and only returns messages or errors. All other event types are discarded.

The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.

Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.

Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).

All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.

func (*Consumer) Resume added in v0.11.4

func (c *Consumer) Resume(partitions []TopicPartition) (err error)

Resume consumption for the provided list of partitions

func (*Consumer) Seek added in v0.9.4

func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error

Seek seeks the given topic partitions using the offset from the TopicPartition.

If timeoutMs is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ErrTimedOut. If timeoutMs is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).

Seek() may only be used for partitions already being consumed (through Assign() or implicitly through a self-rebalanced Subscribe()). To set the starting offset it is preferred to use Assign() and provide a starting offset for each partition.

Returns an error on failure or nil otherwise.

func (*Consumer) StoreOffsets added in v0.11.4

func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)

StoreOffsets stores the provided list of offsets that will be committed to the offset store according to `auto.commit.interval.ms` or manual offset-less Commit().

Returns the stored offsets on success. If at least one offset couldn't be stored, an error and a list of offsets is returned. Each offset can be checked for specific errors via its `.Error` member.

func (*Consumer) String

func (c *Consumer) String() string

Strings returns a human readable name for a Consumer instance

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

Subscribe to a single topic This replaces the current subscription

func (*Consumer) SubscribeTopics

func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)

SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.

func (*Consumer) Subscription added in v0.9.4

func (c *Consumer) Subscription() (topics []string, err error)

Subscription returns the current subscription as set by Subscribe()

func (*Consumer) Unassign

func (c *Consumer) Unassign() (err error)

Unassign the current set of partitions to consume.

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe() (err error)

Unsubscribe from the current subscription, if any.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error provides a Kafka-specific error container

func (Error) Code

func (e Error) Code() ErrorCode

Code returns the ErrorCode of an Error

func (Error) Error

func (e Error) Error() string

Error returns a human readable representation of an Error Same as Error.String()

func (Error) String

func (e Error) String() string

String returns a human readable representation of an Error

type ErrorCode

type ErrorCode int

ErrorCode is the integer representation of local and broker error codes

const (
	// ErrBadMsg Local: Bad message format
	ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG)
	// ErrBadCompression Local: Invalid compressed data
	ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION)
	// ErrDestroy Local: Broker handle destroyed
	ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY)
	// ErrFail Local: Communication failure with broker
	ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL)
	// ErrTransport Local: Broker transport failure
	ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT)
	// ErrCritSysResource Local: Critical system resource failure
	ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE)
	// ErrResolve Local: Host resolution failure
	ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE)
	// ErrMsgTimedOut Local: Message timed out
	ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT)
	// ErrPartitionEOF Broker: No more messages
	ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF)
	// ErrUnknownPartition Local: Unknown partition
	ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
	// ErrFs Local: File or filesystem error
	ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS)
	// ErrUnknownTopic Local: Unknown topic
	ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
	// ErrAllBrokersDown Local: All broker connections are down
	ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)
	// ErrInvalidArg Local: Invalid argument or configuration
	ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG)
	// ErrTimedOut Local: Timed out
	ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
	// ErrQueueFull Local: Queue full
	ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL)
	// ErrIsrInsuff Local: ISR count insufficient
	ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF)
	// ErrNodeUpdate Local: Broker node update
	ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE)
	// ErrSsl Local: SSL error
	ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL)
	// ErrWaitCoord Local: Waiting for coordinator
	ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD)
	// ErrUnknownGroup Local: Unknown group
	ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP)
	// ErrInProgress Local: Operation in progress
	ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS)
	// ErrPrevInProgress Local: Previous operation in progress
	ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS)
	// ErrExistingSubscription Local: Existing subscription
	ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION)
	// ErrAssignPartitions Local: Assign partitions
	ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
	// ErrRevokePartitions Local: Revoke partitions
	ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
	// ErrConflict Local: Conflicting use
	ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT)
	// ErrState Local: Erroneous state
	ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE)
	// ErrUnknownProtocol Local: Unknown protocol
	ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL)
	// ErrNotImplemented Local: Not implemented
	ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED)
	// ErrAuthentication Local: Authentication failure
	ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION)
	// ErrNoOffset Local: No offset stored
	ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET)
	// ErrOutdated Local: Outdated
	ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED)
	// ErrTimedOutQueue Local: Timed out in queue
	ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
	// ErrUnsupportedFeature Local: Required feature not supported by broker
	ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE)
	// ErrWaitCache Local: Awaiting cache update
	ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE)
	// ErrIntr Local: Operation interrupted
	ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR)
	// ErrKeySerialization Local: Key serialization error
	ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION)
	// ErrValueSerialization Local: Value serialization error
	ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION)
	// ErrKeyDeserialization Local: Key deserialization error
	ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION)
	// ErrValueDeserialization Local: Value deserialization error
	ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION)
	// ErrPartial Local: Partial response
	ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL)
	// ErrReadOnly Local: Read-only object
	ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY)
	// ErrNoent Local: No such entry
	ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT)
	// ErrUnderflow Local: Read underflow
	ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW)
	// ErrUnknown Unknown broker error
	ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN)
	// ErrNoError Success
	ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR)
	// ErrOffsetOutOfRange Broker: Offset out of range
	ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE)
	// ErrInvalidMsg Broker: Invalid message
	ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG)
	// ErrUnknownTopicOrPart Broker: Unknown topic or partition
	ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
	// ErrInvalidMsgSize Broker: Invalid message size
	ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE)
	// ErrLeaderNotAvailable Broker: Leader not available
	ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
	// ErrNotLeaderForPartition Broker: Not leader for partition
	ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION)
	// ErrRequestTimedOut Broker: Request timed out
	ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT)
	// ErrBrokerNotAvailable Broker: Broker not available
	ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE)
	// ErrReplicaNotAvailable Broker: Replica not available
	ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE)
	// ErrMsgSizeTooLarge Broker: Message size too large
	ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
	// ErrStaleCtrlEpoch Broker: StaleControllerEpochCode
	ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH)
	// ErrOffsetMetadataTooLarge Broker: Offset metadata string too large
	ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE)
	// ErrNetworkException Broker: Broker disconnected before response received
	ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION)
	// ErrGroupLoadInProgress Broker: Group coordinator load in progress
	ErrGroupLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS)
	// ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available
	ErrGroupCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
	// ErrNotCoordinatorForGroup Broker: Not coordinator for group
	ErrNotCoordinatorForGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP)
	// ErrTopicException Broker: Invalid topic
	ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION)
	// ErrRecordListTooLarge Broker: Message batch larger than configured server segment size
	ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE)
	// ErrNotEnoughReplicas Broker: Not enough in-sync replicas
	ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS)
	// ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas
	ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND)
	// ErrInvalidRequiredAcks Broker: Invalid required acks value
	ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS)
	// ErrIllegalGeneration Broker: Specified group generation id is not valid
	ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
	// ErrInconsistentGroupProtocol Broker: Inconsistent group protocol
	ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL)
	// ErrInvalidGroupID Broker: Invalid group.id
	ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID)
	// ErrUnknownMemberID Broker: Unknown member
	ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
	// ErrInvalidSessionTimeout Broker: Invalid session timeout
	ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT)
	// ErrRebalanceInProgress Broker: Group rebalance in progress
	ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS)
	// ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid
	ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE)
	// ErrTopicAuthorizationFailed Broker: Topic authorization failed
	ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED)
	// ErrGroupAuthorizationFailed Broker: Group authorization failed
	ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED)
	// ErrClusterAuthorizationFailed Broker: Cluster authorization failed
	ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED)
	// ErrInvalidTimestamp Broker: Invalid timestamp
	ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP)
	// ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism
	ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM)
	// ErrIllegalSaslState Broker: Request not valid in current SASL state
	ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE)
	// ErrUnsupportedVersion Broker: API version not supported
	ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)
	// ErrTopicAlreadyExists Broker: Topic already exists
	ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS)
	// ErrInvalidPartitions Broker: Invalid number of partitions
	ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS)
	// ErrInvalidReplicationFactor Broker: Invalid replication factor
	ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR)
	// ErrInvalidReplicaAssignment Broker: Invalid replica assignment
	ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT)
	// ErrInvalidConfig Broker: Configuration is invalid
	ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG)
	// ErrNotController Broker: Not controller for cluster
	ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER)
	// ErrInvalidRequest Broker: Invalid request
	ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST)
	// ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request
	ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT)
	// ErrPolicyViolation Broker: Isolation policy volation
	ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION)
	// ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number
	ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER)
	// ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number
	ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER)
	// ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch
	ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH)
	// ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state
	ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE)
	// ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id
	ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING)
	// ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms
	ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT)
	// ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
	ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS)
	// ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
	ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED)
	// ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed
	ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
	// ErrSecurityDisabled Broker: Security features are disabled
	ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED)
	// ErrOperationNotAttempted Broker: Operation not attempted
	ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED)
)

func (ErrorCode) String

func (c ErrorCode) String() string

String returns a human readable representation of an error code

type Event

type Event interface {
	// String returns a human-readable representation of the event
	String() string
}

Event generic interface

type Handle

type Handle interface {
	// contains filtered or unexported methods
}

Handle represents a generic client handle containing common parts for both Producer and Consumer.

type Header struct {
	Key   string // Header name (utf-8 string)
	Value []byte // Header value (nil, empty, or binary)
}

Header represents a single Kafka message header.

Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys.

Key is a human readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of of the Value but it should be made relatively compact. The value may be a byte array, empty, or nil.

NOTE: Message headers are not available on producer delivery report messages.

func (Header) String added in v0.11.4

func (h Header) String() string

String returns the Header Key and data in a human representable possibly truncated form suitable for displaying to the user.

type Message

type Message struct {
	TopicPartition TopicPartition
	Value          []byte
	Key            []byte
	Timestamp      time.Time
	TimestampType  TimestampType
	Opaque         interface{}
	Headers        []Header
}

Message represents a Kafka message

func (*Message) String

func (m *Message) String() string

String returns a human readable representation of a Message. Key and payload are not represented.

type Metadata

type Metadata struct {
	Brokers []BrokerMetadata
	Topics  map[string]TopicMetadata

	OriginatingBroker BrokerMetadata
}

Metadata contains broker and topic metadata for all (matching) topics

type Offset

type Offset int64

Offset type (int64) with support for canonical names

func NewOffset

func NewOffset(offset interface{}) (Offset, error)

NewOffset creates a new Offset using the provided logical string, or an absolute int64 offset value. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"

func OffsetTail

func OffsetTail(relativeOffset Offset) Offset

OffsetTail returns the logical offset relativeOffset from current end of partition

func (*Offset) Set

func (o *Offset) Set(offset interface{}) error

Set offset value, see NewOffset()

func (Offset) String

func (o Offset) String() string

type OffsetsCommitted

type OffsetsCommitted struct {
	Error   error
	Offsets []TopicPartition
}

OffsetsCommitted reports committed offsets

func (OffsetsCommitted) String

func (o OffsetsCommitted) String() string

type PartitionEOF

type PartitionEOF TopicPartition

PartitionEOF consumer reached end of partition

func (PartitionEOF) String

func (p PartitionEOF) String() string

type PartitionMetadata

type PartitionMetadata struct {
	ID       int32
	Error    Error
	Leader   int32
	Replicas []int32
	Isrs     []int32
}

PartitionMetadata contains per-partition metadata

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer implements a High-level Apache Kafka Producer instance

func NewProducer

func NewProducer(conf *ConfigMap) (*Producer, error)

NewProducer creates a new high-level Producer instance.

conf is a *ConfigMap with standard librdkafka configuration properties, see here:

Supported special configuration properties:

go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
                                  These batches do not relate to Kafka message batches in any way.
                                  Note: timestamps and headers are not supported with this interface.
go.delivery.reports (bool, true) - Forward per-message delivery reports to the
                                   Events() channel.
go.events.channel.size (int, 1000000) - Events() channel size
go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)

func (*Producer) Close

func (p *Producer) Close()

Close a Producer instance. The Producer object or its channels are no longer usable after this call.

func (*Producer) Events

func (p *Producer) Events() chan Event

Events returns the Events channel (read)

func (*Producer) Flush

func (p *Producer) Flush(timeoutMs int) int

Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.

func (*Producer) GetMetadata

func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned.

func (*Producer) Len

func (p *Producer) Len() int

Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. Includes messages on ProduceChannel.

func (*Producer) OffsetsForTimes added in v0.11.4

func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)

OffsetsForTimes looks up offsets by timestamp for the given partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.

The function will block for at most timeoutMs milliseconds.

Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.

func (*Producer) Produce

func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error

Produce single message. This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately. The delivery report will be sent on the provided deliveryChan if specified, or on the Producer object's Events() channel if not. msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented), api.version.request=true, and broker >= 0.10.0.0. msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented), api.version.request=true, and broker >= 0.11.0.0. Returns an error if message could not be enqueued.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *Message

ProduceChannel returns the produce *Message channel (write)

func (*Producer) QueryWatermarkOffsets

func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)

QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.

func (*Producer) String

func (p *Producer) String() string

String returns a human readable name for a Producer instance

type RebalanceCb

type RebalanceCb func(*Consumer, Event) error

RebalanceCb provides a per-Subscribe*() rebalance event callback. The passed Event will be either AssignedPartitions or RevokedPartitions

type RevokedPartitions

type RevokedPartitions struct {
	Partitions []TopicPartition
}

RevokedPartitions consumer group rebalance event: revoked partition set

func (RevokedPartitions) String

func (e RevokedPartitions) String() string

type Stats added in v0.11.0

type Stats struct {
	// contains filtered or unexported fields
}

Stats statistics event

func (Stats) String added in v0.11.0

func (e Stats) String() string

type TimestampType

type TimestampType int

TimestampType is a the Message timestamp type or source

func (TimestampType) String

func (t TimestampType) String() string

type TopicMetadata

type TopicMetadata struct {
	Topic      string
	Partitions []PartitionMetadata
	Error      Error
}

TopicMetadata contains per-topic metadata

type TopicPartition

type TopicPartition struct {
	Topic     *string
	Partition int32
	Offset    Offset
	Error     error
}

TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.

func (TopicPartition) String

func (p TopicPartition) String() string

type TopicPartitions added in v0.9.4

type TopicPartitions []TopicPartition

TopicPartitions is a slice of TopicPartitions that also implements the sort interface

func (TopicPartitions) Len added in v0.9.4

func (tps TopicPartitions) Len() int

func (TopicPartitions) Less added in v0.9.4

func (tps TopicPartitions) Less(i, j int) bool

func (TopicPartitions) Swap added in v0.9.4

func (tps TopicPartitions) Swap(i, j int)

Directories

Path Synopsis
confluent-kafka-go internal tool to generate error constants from librdkafka
confluent-kafka-go internal tool to generate error constants from librdkafka

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL