Documentation ¶
Overview ¶
Package kafka provides high-level Apache Kafka producer and consumers using bindings on-top of the librdkafka C library.
High-level Consumer ¶
* Decide if you want to read messages and events by calling `.Poll()` or the deprecated option of using the `.Events()` channel. (If you want to use `.Events()` channel then set `"go.events.channel.enable": true`).
* Create a Consumer with `kafka.NewConsumer()` providing at least the `bootstrap.servers` and `group.id` configuration properties.
* Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics) to join the group with the specified subscription set. Subscriptions are atomic, calling `.Subscribe*()` again will leave the group and rejoin with the new set of topics.
* Start reading events and messages from either the `.Events` channel or by calling `.Poll()`.
* When the group has rebalanced each client member is assigned a (sub-)set of topic+partitions. By default the consumer will start fetching messages for its assigned partitions at this point, but your application may enable rebalance events to get an insight into what the assigned partitions where as well as set the initial offsets. To do this you need to pass `"go.application.rebalance.enable": true` to the `NewConsumer()` call mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event with the assigned partition set. You can optionally modify the initial offsets (they'll default to stored offsets and if there are no previously stored offsets it will fall back to `"auto.offset.reset"` which defaults to the `latest` message) and then call `.Assign(partitions)` to start consuming. If you don't need to modify the initial offsets you will not need to call `.Assign()`, the client will do so automatically for you if you dont, unless you are using the channel-based consumer in which case you MUST call `.Assign()` when receiving the `AssignedPartitions` and `RevokedPartitions` events.
* As messages are fetched they will be made available on either the `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
* Handle messages, events and errors to your liking.
* When you are done consuming call `.Close()` to commit final offsets and leave the consumer group.
Producer ¶
* Create a Producer with `kafka.NewProducer()` providing at least the `bootstrap.servers` configuration properties.
* Messages may now be produced either by sending a `*kafka.Message` on the `.ProduceChannel` or by calling `.Produce()`.
* Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message` and you should check `msg.TopicPartition.Error` for `nil` to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil `chan Event` channel to `.Produce()`. If no delivery reports are wanted they can be completely disabled by setting configuration property `"go.delivery.reports": false`.
* When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function `.Flush()` that will block until all message deliveries are done or the provided timeout elapses.
* Finally call `.Close()` to decommission the producer.
Transactional producer API ¶
The transactional producer operates on top of the idempotent producer, and provides full exactly-once semantics (EOS) for Apache Kafka when used with the transaction aware consumer (`isolation.level=read_committed`).
A producer instance is configured for transactions by setting the `transactional.id` to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.
After creating the transactional producer instance using `NewProducer()` the transactional state must be initialized by calling `InitTransactions()`. This is a blocking call that will acquire a runtime producer id from the transaction coordinator broker as well as abort any stale transactions and fence any still running producer instances with the same `transactional.id`.
Once transactions are initialized the application may begin a new transaction by calling `BeginTransaction()`. A producer instance may only have one single on-going transaction.
Any messages produced after the transaction has been started will belong to the ongoing transaction and will be committed or aborted atomically. It is not permitted to produce messages outside a transaction boundary, e.g., before `BeginTransaction()` or after `CommitTransaction()`, `AbortTransaction()` or if the current transaction has failed.
If consumed messages are used as input to the transaction, the consumer instance must be configured with `enable.auto.commit` set to `false`. To commit the consumed offsets along with the transaction pass the list of consumed partitions and the last offset processed + 1 to `SendOffsetsToTransaction()` prior to committing the transaction. This allows an aborted transaction to be restarted using the previously committed offsets.
To commit the produced messages, and any consumed offsets, to the current transaction, call `CommitTransaction()`. This call will block until the transaction has been fully committed or failed (typically due to fencing by a newer producer instance).
Alternatively, if processing fails, or an abortable transaction error is raised, the transaction needs to be aborted by calling `AbortTransaction()` which marks any produced messages and offset commits as aborted.
After the current transaction has been committed or aborted a new transaction may be started by calling `BeginTransaction()` again.
Retriable errors: Some error cases allow the attempted operation to be retried, this is indicated by the error object having the retriable flag set which can be detected by calling `err.(kafka.Error).IsRetriable()`. When this flag is set the application may retry the operation immediately or preferably after a shorter grace period (to avoid busy-looping). Retriable errors include timeouts, broker transport failures, etc.
Abortable errors: An ongoing transaction may fail permanently due to various errors, such as transaction coordinator becoming unavailable, write failures to the Apache Kafka log, under-replicated partitions, etc. At this point the producer application must abort the current transaction using `AbortTransaction()` and optionally start a new transaction by calling `BeginTransaction()`. Whether an error is abortable or not is detected by calling `err.(kafka.Error).TxnRequiresAbort()` on the returned error object.
Fatal errors: While the underlying idempotent producer will typically only raise fatal errors for unrecoverable cluster errors where the idempotency guarantees can't be maintained, most of these are treated as abortable by the transactional producer since transactions may be aborted and retried in their entirety; The transactional producer on the other hand introduces a set of additional fatal errors which the application needs to handle by shutting down the producer and terminate. There is no way for a producer instance to recover from fatal errors. Whether an error is fatal or not is detected by calling `err.(kafka.Error).IsFatal()` on the returned error object or by checking the global `GetFatalError()`.
Handling of other errors: For errors that have neither retriable, abortable or the fatal flag set it is not always obvious how to handle them. While some of these errors may be indicative of bugs in the application code, such as when an invalid parameter is passed to a method, other errors might originate from the broker and be passed thru as-is to the application. The general recommendation is to treat these errors, that have neither the retriable or abortable flags set, as fatal.
Error handling example:
retry: err := producer.CommitTransaction(...) if err == nil { return nil } else if err.(kafka.Error).TxnRequiresAbort() { do_abort_transaction_and_reset_inputs() } else if err.(kafka.Error).IsRetriable() { goto retry } else { // treat all other errors as fatal errors panic(err) }
Events ¶
Apart from emitting messages and delivery reports the client also communicates with the application through a number of different event types. An application may choose to handle or ignore these events.
Consumer events ¶
* `*kafka.Message` - a fetched message.
* `AssignedPartitions` - The assigned partition set for this client following a rebalance. Requires `go.application.rebalance.enable`
* `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance. `AssignedPartitions` and `RevokedPartitions` are symmetrical. Requires `go.application.rebalance.enable`
* `PartitionEOF` - Consumer has reached the end of a partition. NOTE: The consumer will keep trying to fetch new messages for the partition.
* `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
Producer events ¶
* `*kafka.Message` - delivery report for produced message. Check `.TopicPartition.Error` for delivery result.
Generic events for both Consumer and Producer ¶
* `KafkaError` - client (error codes are prefixed with _) or broker error. These errors are normally just informational since the client will try its best to automatically recover (eventually).
* `OAuthBearerTokenRefresh` - retrieval of a new SASL/OAUTHBEARER token is required. This event only occurs with sasl.mechanism=OAUTHBEARER. Be sure to invoke SetOAuthBearerToken() on the Producer/Consumer/AdminClient instance when a successful token retrieval is completed, otherwise be sure to invoke SetOAuthBearerTokenFailure() to indicate that retrieval failed (or if setting the token failed, which could happen if an extension doesn't meet the required regular expression); invoking SetOAuthBearerTokenFailure() will schedule a new event for 10 seconds later so another retrieval can be attempted.
Hint: If your application registers a signal notification (signal.Notify) makes sure the signals channel is buffered to avoid possible complications with blocking Poll() calls.
Note: The Confluent Kafka Go client is safe for concurrent use.
Index ¶
- Constants
- func LibraryVersion() (int, string)
- func WriteErrorCodes(f *os.File)
- type ACLBinding
- type ACLBindingFilter
- type ACLBindingFilters
- type ACLBindings
- type ACLOperation
- type ACLPermissionType
- type AdminClient
- func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error)
- func (a *AdminClient) Close()
- func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)
- func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
- func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error)
- func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, ...) (result []TopicResult, err error)
- func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, ...) (result []TopicResult, err error)
- func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, ...) (result []DeleteACLsResult, err error)
- func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)
- func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, ...) (result *DescribeACLsResult, err error)
- func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error)
- func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
- func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
- func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error
- func (a *AdminClient) String() string
- type AdminOption
- type AdminOptionOperationTimeout
- type AdminOptionRequestTimeout
- type AdminOptionValidateOnly
- type AlterConfigsAdminOption
- type AlterOperation
- type AssignedPartitions
- type BrokerMetadata
- type ConfigEntry
- type ConfigEntryResult
- type ConfigMap
- type ConfigResource
- type ConfigResourceResult
- type ConfigSource
- type ConfigValue
- type Consumer
- func (c *Consumer) Assign(partitions []TopicPartition) (err error)
- func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
- func (c *Consumer) AssignmentLost() bool
- func (c *Consumer) Close() (err error)
- func (c *Consumer) Commit() ([]TopicPartition, error)
- func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
- func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
- func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
- func (c *Consumer) Events() chan Event
- func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)
- func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
- func (c *Consumer) GetRebalanceProtocol() string
- func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
- func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)
- func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)
- func (c *Consumer) Logs() chan LogEvent
- func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
- func (c *Consumer) Pause(partitions []TopicPartition) (err error)
- func (c *Consumer) Poll(timeoutMs int) (event Event)
- func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
- func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)
- func (c *Consumer) Resume(partitions []TopicPartition) (err error)
- func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error
- func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
- func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
- func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error)
- func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
- func (c *Consumer) String() string
- func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
- func (c *Consumer) Subscription() (topics []string, err error)
- func (c *Consumer) Unassign() (err error)
- func (c *Consumer) Unsubscribe() (err error)
- type ConsumerGroupMetadata
- type CreateACLResult
- type CreateACLsAdminOption
- type CreatePartitionsAdminOption
- type CreateTopicsAdminOption
- type DeleteACLsAdminOption
- type DeleteACLsResult
- type DeleteTopicsAdminOption
- type DescribeACLsAdminOption
- type DescribeACLsResult
- type DescribeConfigsAdminOption
- type Error
- type ErrorCode
- type Event
- type Handle
- type Header
- type LogEvent
- type Message
- type Metadata
- type MockCluster
- type OAuthBearerToken
- type OAuthBearerTokenRefresh
- type Offset
- type OffsetsCommitted
- type PartitionEOF
- type PartitionMetadata
- type PartitionsSpecification
- type Producer
- func (p *Producer) AbortTransaction(ctx context.Context) error
- func (p *Producer) BeginTransaction() error
- func (p *Producer) Close()
- func (p *Producer) CommitTransaction(ctx context.Context) error
- func (p *Producer) Events() chan Event
- func (p *Producer) Flush(timeoutMs int) int
- func (p *Producer) GetFatalError() error
- func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
- func (p *Producer) InitTransactions(ctx context.Context) error
- func (p *Producer) Len() int
- func (p *Producer) Logs() chan LogEvent
- func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
- func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
- func (p *Producer) ProduceChannel() chan *Message
- func (p *Producer) Purge(flags int) error
- func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, ...) error
- func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
- func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error
- func (p *Producer) String() string
- func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode
- type RebalanceCb
- type ResourcePatternType
- type ResourceType
- type RevokedPartitions
- type Stats
- type TimestampType
- type TopicMetadata
- type TopicPartition
- type TopicPartitions
- type TopicResult
- type TopicSpecification
Constants ¶
const ( // ResourceUnknown - Unknown ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN) // ResourceAny - match any resource type (DescribeConfigs) ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY) // ResourceTopic - Topic ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC) // ResourceGroup - Group ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP) // ResourceBroker - Broker ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER) )
const ( // ConfigSourceUnknown is the default value ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG) // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG) // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG) // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG) // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file) ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG) // ConfigSourceDefault is built-in default configuration for configs that have a default value ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG) )
const ( // ResourcePatternTypeUnknown is a resource pattern type not known or not set. ResourcePatternTypeUnknown = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) // ResourcePatternTypeAny matches any resource, used for lookups. ResourcePatternTypeAny = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_ANY) // ResourcePatternTypeMatch will perform pattern matching ResourcePatternTypeMatch = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_MATCH) // ResourcePatternTypeLiteral matches a literal resource name ResourcePatternTypeLiteral = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_LITERAL) // ResourcePatternTypePrefixed matches a prefixed resource name ResourcePatternTypePrefixed = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED) )
const ( // ACLOperationUnknown represents an unknown or unset operation ACLOperationUnknown = ACLOperation(C.RD_KAFKA_ACL_OPERATION_UNKNOWN) // ACLOperationAny in a filter, matches any ACLOperation ACLOperationAny = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ANY) // ACLOperationAll represents all the operations ACLOperationAll = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALL) // ACLOperationRead a read operation ACLOperationRead = ACLOperation(C.RD_KAFKA_ACL_OPERATION_READ) // ACLOperationWrite represents a write operation ACLOperationWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_WRITE) // ACLOperationCreate represents a create operation ACLOperationCreate = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CREATE) // ACLOperationDelete represents a delete operation ACLOperationDelete = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DELETE) // ACLOperationAlter represents an alter operation ACLOperationAlter = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER) // ACLOperationDescribe represents a describe operation ACLOperationDescribe = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE) // ACLOperationClusterAction represents a cluster action operation ACLOperationClusterAction = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION) // ACLOperationDescribeConfigs represents a describe configs operation ACLOperationDescribeConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS) // ACLOperationAlterConfigs represents an alter configs operation ACLOperationAlterConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS) // ACLOperationIdempotentWrite represents an idempotent write operation ACLOperationIdempotentWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE) )
const ( // ACLPermissionTypeUnknown represents an unknown ACLPermissionType ACLPermissionTypeUnknown = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN) // ACLPermissionTypeAny in a filter, matches any ACLPermissionType ACLPermissionTypeAny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY) // ACLPermissionTypeDeny disallows access ACLPermissionTypeDeny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY) // ACLPermissionTypeAllow grants access ACLPermissionTypeAllow = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW) )
const ( // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) // TimestampCreateTime indicates timestamp set by producer (source time) TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME) // TimestampLogAppendTime indicates timestamp set set by broker (store time) TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) )
const ( // PurgeInFlight purges messages in-flight to or from the broker. // Purging these messages will void any future acknowledgements from the // broker, making it impossible for the application to know if these // messages were successfully delivered or not. // Retrying these messages may lead to duplicates. PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT) // PurgeQueue Purge messages in internal queues. PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE) // PurgeNonBlocking Don't wait for background thread queue purging to finish. PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING) )
const ( // AlterOperationSet sets/overwrites the configuration setting. AlterOperationSet = iota )
const LibrdkafkaLinkInfo = "static glibc_linux from librdkafka-static-bundle-v1.9.2.tgz"
LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
OffsetBeginning represents the earliest offset (logical)
const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
OffsetEnd represents the latest offset (logical)
const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
OffsetInvalid represents an invalid/unspecified offset
const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
OffsetStored represents a stored offset
const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
PartitionAny represents any partition (for partitioning), or unspecified value (for all other cases)
Variables ¶
This section is empty.
Functions ¶
func LibraryVersion ¶
LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.
func WriteErrorCodes ¶ added in v1.0.7
WriteErrorCodes writes Go error code constants to file from the librdkafka error codes. This function is not intended for public use.
Types ¶
type ACLBinding ¶ added in v1.9.0
type ACLBinding struct { Type ResourceType // The resource type. // The resource name, which depends on the resource type. // For ResourceBroker the resource name is the broker id. Name string ResourcePatternType ResourcePatternType // The resource pattern, relative to the name. Principal string // The principal this ACLBinding refers to. Host string // The host that the call is allowed to come from. Operation ACLOperation // The operation/s specified by this binding. PermissionType ACLPermissionType // The permission type for the specified operation. }
ACLBinding specifies the operation and permission type for a specific principal over one or more resources of the same type. Used by `AdminClient.CreateACLs`, returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`.
type ACLBindingFilter ¶ added in v1.9.0
type ACLBindingFilter = ACLBinding
ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes. Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`.
type ACLBindingFilters ¶ added in v1.9.0
type ACLBindingFilters []ACLBindingFilter
ACLBindingFilters is a slice of ACLBindingFilter that also implements the sort interface
type ACLBindings ¶ added in v1.9.0
type ACLBindings []ACLBinding
ACLBindings is a slice of ACLBinding that also implements the sort interface
func (ACLBindings) Len ¶ added in v1.9.0
func (a ACLBindings) Len() int
func (ACLBindings) Less ¶ added in v1.9.0
func (a ACLBindings) Less(i, j int) bool
func (ACLBindings) Swap ¶ added in v1.9.0
func (a ACLBindings) Swap(i, j int)
type ACLOperation ¶ added in v1.9.0
type ACLOperation int
ACLOperation enumerates the different types of ACL operation.
func ACLOperationFromString ¶ added in v1.9.0
func ACLOperationFromString(aclOperationString string) (ACLOperation, error)
ACLOperationFromString translates a ACL operation name to a ACLOperation value.
func (ACLOperation) String ¶ added in v1.9.0
func (o ACLOperation) String() string
String returns the human-readable representation of an ACLOperation
type ACLPermissionType ¶ added in v1.9.0
type ACLPermissionType int
ACLPermissionType enumerates the different types of ACL permission types.
func ACLPermissionTypeFromString ¶ added in v1.9.0
func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error)
ACLPermissionTypeFromString translates a ACL permission type name to a ACLPermissionType value.
func (ACLPermissionType) String ¶ added in v1.9.0
func (o ACLPermissionType) String() string
String returns the human-readable representation of an ACLPermissionType
type AdminClient ¶ added in v0.11.6
type AdminClient struct {
// contains filtered or unexported fields
}
AdminClient is derived from an existing Producer or Consumer
func NewAdminClient ¶ added in v0.11.6
func NewAdminClient(conf *ConfigMap) (*AdminClient, error)
NewAdminClient creats a new AdminClient instance with a new underlying client instance
func NewAdminClientFromConsumer ¶ added in v0.11.6
func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error)
NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.
func NewAdminClientFromProducer ¶ added in v0.11.6
func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error)
NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.
func (*AdminClient) AlterConfigs ¶ added in v0.11.6
func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)
AlterConfigs alters/updates cluster resource configuration.
Updates are not transactional so they may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is updated atomically, replacing values using the provided ConfigEntrys and reverting unspecified ConfigEntrys to their default values.
Requires broker version >=0.11.0.0
AlterConfigs will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration to their default values.
Multiple resources and resource types may be set, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.
func (*AdminClient) Close ¶ added in v0.11.6
func (a *AdminClient) Close()
Close an AdminClient instance.
func (*AdminClient) ClusterID ¶ added in v1.0.7
func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)
ClusterID returns the cluster ID as reported in broker metadata.
Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.
Requires broker version >= 0.10.0.
func (*AdminClient) ControllerID ¶ added in v1.0.7
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
ControllerID returns the broker ID of the current controller as reported in broker metadata.
Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.
Requires broker version >= 0.10.0.
func (*AdminClient) CreateACLs ¶ added in v1.9.0
func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error)
CreateACLs creates one or more ACL bindings.
Parameters:
- `ctx` - context with the maximum amount of time to block, or nil for indefinite.
- `aclBindings` - A slice of ACL binding specifications to create.
- `options` - Create ACLs options
Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful plus an error that is not nil for client level errors
func (*AdminClient) CreatePartitions ¶ added in v0.11.6
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error)
CreatePartitions creates additional partitions for topics.
func (*AdminClient) CreateTopics ¶ added in v0.11.6
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)
CreateTopics creates topics in cluster.
The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
Topic creation is non-atomic and may succeed for some topics but fail for others, make sure to check the result for topic-specific errors.
Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
func (*AdminClient) DeleteACLs ¶ added in v1.9.0
func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error)
DeleteACLs deletes ACL bindings matching one or more ACL binding filters.
Parameters:
- `ctx` - context with the maximum amount of time to block, or nil for indefinite.
- `aclBindingFilters` - a slice of ACL binding filters to match ACLs to delete. string attributes match exact values or any string if set to empty string. Enum attributes match exact values or any value if ending with `Any`. If `ResourcePatternType` is set to `ResourcePatternTypeMatch` deletes ACL bindings with:
- `ResourcePatternTypeLiteral` pattern type with resource name equal to the given resource name
- `ResourcePatternTypeLiteral` pattern type with wildcard resource name that matches the given resource name
- `ResourcePatternTypePrefixed` pattern type with resource name that is a prefix of the given resource name
- `options` - Delete ACLs options
Returns a slice of ACLBinding for each filter when the operation was successful plus an error that is not `nil` for client level errors
func (*AdminClient) DeleteTopics ¶ added in v0.11.6
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)
DeleteTopics deletes a batch of topics.
This operation is not transactional and may succeed for a subset of topics while failing others. It may take several seconds after the DeleteTopics result returns success for all the brokers to become aware that the topics are gone. During this time, topic metadata and configuration may continue to return information about deleted topics.
Requires broker version >= 0.10.1.0
func (*AdminClient) DescribeACLs ¶ added in v1.9.0
func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error)
DescribeACLs matches ACL bindings by filter.
Parameters:
- `ctx` - context with the maximum amount of time to block, or nil for indefinite.
- `aclBindingFilter` - A filter with attributes that must match. string attributes match exact values or any string if set to empty string. Enum attributes match exact values or any value if ending with `Any`. If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns ACL bindings with:
- `ResourcePatternTypeLiteral` pattern type with resource name equal to the given resource name
- `ResourcePatternTypeLiteral` pattern type with wildcard resource name that matches the given resource name
- `ResourcePatternTypePrefixed` pattern type with resource name that is a prefix of the given resource name
- `options` - Describe ACLs options
Returns a slice of ACLBindings when the operation was successful plus an error that is not `nil` for client level errors
func (*AdminClient) DescribeConfigs ¶ added in v0.11.6
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error)
DescribeConfigs retrieves configuration for cluster resources.
The returned configuration includes default values, use ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish default values from manually configured settings.
The value of config entries where .IsSensitive is true will always be nil to avoid disclosing sensitive information, such as security settings.
Configuration entries where .IsReadOnly is true can't be modified (with AlterConfigs).
Synonym configuration entries are returned if the broker supports it (broker version >= 1.1.0). See .Synonyms.
Requires broker version >=0.11.0.0
Multiple resources and resource types may be requested, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.
func (*AdminClient) GetMetadata ¶ added in v0.11.6
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (*AdminClient) SetOAuthBearerToken ¶ added in v1.0.7
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (*AdminClient) SetOAuthBearerTokenFailure ¶ added in v1.0.7
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (*AdminClient) String ¶ added in v0.11.6
func (a *AdminClient) String() string
String returns a human readable name for an AdminClient instance
type AdminOption ¶ added in v0.11.6
type AdminOption interface {
// contains filtered or unexported methods
}
AdminOption is a generic type not to be used directly.
See CreateTopicsAdminOption et.al.
type AdminOptionOperationTimeout ¶ added in v0.11.6
type AdminOptionOperationTimeout struct {
// contains filtered or unexported fields
}
AdminOptionOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.
CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.
Default: 0 (return immediately).
Valid for CreateTopics, DeleteTopics, CreatePartitions.
func SetAdminOperationTimeout ¶ added in v0.11.6
func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)
SetAdminOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.
CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.
Default: 0 (return immediately).
Valid for CreateTopics, DeleteTopics, CreatePartitions.
type AdminOptionRequestTimeout ¶ added in v0.11.6
type AdminOptionRequestTimeout struct {
// contains filtered or unexported fields
}
AdminOptionRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.
Default: `socket.timeout.ms`.
Valid for all Admin API methods.
func SetAdminRequestTimeout ¶ added in v0.11.6
func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)
SetAdminRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.
Default: `socket.timeout.ms`.
Valid for all Admin API methods.
type AdminOptionValidateOnly ¶ added in v0.11.6
type AdminOptionValidateOnly struct {
// contains filtered or unexported fields
}
AdminOptionValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).
Default: false.
Valid for CreateTopics, CreatePartitions, AlterConfigs
func SetAdminValidateOnly ¶ added in v0.11.6
func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)
SetAdminValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).
Default: false.
Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
type AlterConfigsAdminOption ¶ added in v0.11.6
type AlterConfigsAdminOption interface {
// contains filtered or unexported methods
}
AlterConfigsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
type AlterOperation ¶ added in v0.11.6
type AlterOperation int
AlterOperation specifies the operation to perform on the ConfigEntry. Currently only AlterOperationSet.
func (AlterOperation) String ¶ added in v0.11.6
func (o AlterOperation) String() string
String returns the human-readable representation of an AlterOperation
type AssignedPartitions ¶
type AssignedPartitions struct {
Partitions []TopicPartition
}
AssignedPartitions consumer group rebalance event: assigned partition set
func (AssignedPartitions) String ¶
func (e AssignedPartitions) String() string
type BrokerMetadata ¶
BrokerMetadata contains per-broker metadata
type ConfigEntry ¶ added in v0.11.6
type ConfigEntry struct { // Name of configuration entry, e.g., topic configuration property name. Name string // Value of configuration entry. Value string // Operation to perform on the entry. Operation AlterOperation }
ConfigEntry holds parameters for altering a resource's configuration.
func StringMapToConfigEntries ¶ added in v0.11.6
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry
StringMapToConfigEntries creates a new map of ConfigEntry objects from the provided string map. The AlterOperation is set on each created entry.
func (ConfigEntry) String ¶ added in v0.11.6
func (c ConfigEntry) String() string
String returns a human-readable representation of a ConfigEntry.
type ConfigEntryResult ¶ added in v0.11.6
type ConfigEntryResult struct { // Name of configuration entry, e.g., topic configuration property name. Name string // Value of configuration entry. Value string // Source indicates the configuration source. Source ConfigSource // IsReadOnly indicates whether the configuration entry can be altered. IsReadOnly bool // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset. IsSensitive bool // IsSynonym indicates whether the configuration entry is a synonym for another configuration property. IsSynonym bool // Synonyms contains a map of configuration entries that are synonyms to this configuration entry. Synonyms map[string]ConfigEntryResult }
ConfigEntryResult contains the result of a single configuration entry from a DescribeConfigs request.
func (ConfigEntryResult) String ¶ added in v0.11.6
func (c ConfigEntryResult) String() string
String returns a human-readable representation of a ConfigEntryResult.
type ConfigMap ¶
type ConfigMap map[string]ConfigValue
ConfigMap is a map containing standard librdkafka configuration properties as documented in: https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.
The use of "default.topic.config" is deprecated, topic configuration properties shall be specified in the standard ConfigMap. For backwards compatibility, "default.topic.config" (if supplied) takes precedence.
func (ConfigMap) Get ¶ added in v0.11.4
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)
Get finds the given key in the ConfigMap and returns its value. If the key is not found `defval` is returned. If the key is found but the type does not match that of `defval` (unless nil) an ErrInvalidArg error is returned.
type ConfigResource ¶ added in v0.11.6
type ConfigResource struct { // Type of resource to set. Type ResourceType // Name of resource to set. Name string // Config entries to set. // Configuration updates are atomic, any configuration property not provided // here will be reverted (by the broker) to its default value. // Use DescribeConfigs to retrieve the list of current configuration entry values. Config []ConfigEntry }
ConfigResource holds parameters for altering an Apache Kafka configuration resource
func (ConfigResource) String ¶ added in v0.11.6
func (c ConfigResource) String() string
String returns a human-readable representation of a ConfigResource
type ConfigResourceResult ¶ added in v0.11.6
type ConfigResourceResult struct { // Type of returned result resource. Type ResourceType // Name of returned result resource. Name string // Error, if any, of returned result resource. Error Error // Config entries, if any, of returned result resource. Config map[string]ConfigEntryResult }
ConfigResourceResult provides the result for a resource from a AlterConfigs or DescribeConfigs request.
func (ConfigResourceResult) String ¶ added in v0.11.6
func (c ConfigResourceResult) String() string
String returns a human-readable representation of a ConfigResourceResult.
type ConfigSource ¶ added in v0.11.6
type ConfigSource int
ConfigSource represents an Apache Kafka config source
func (ConfigSource) String ¶ added in v0.11.6
func (t ConfigSource) String() string
String returns the human-readable representation of a ConfigSource type
type ConfigValue ¶
type ConfigValue interface{}
ConfigValue supports the following types:
bool, int, string, any type with the standard String() interface
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer implements a High-level Apache Kafka Consumer instance
func NewConsumer ¶
NewConsumer creates a new high-level Consumer instance.
conf is a *ConfigMap with standard librdkafka configuration properties.
Supported special configuration properties:
go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. If set to true the app must handle the AssignedPartitions and RevokedPartitions events and call Assign() and Unassign() respectively. go.events.channel.enable (bool, false) - [deprecated] Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. go.events.channel.size (int, 1000) - Events() channel size go.logs.channel.enable (bool, false) - Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.
WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.
func (*Consumer) Assign ¶
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
Assign an atomic set of partitions to consume.
The .Offset field of each TopicPartition must either be set to an absolute starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), but should typically be set to `kafka.OffsetStored` to have the consumer use the committed offset as a start position, with a fallback to `auto.offset.reset` if there is no committed offset.
This replaces the current assignment.
func (*Consumer) Assignment ¶ added in v0.9.4
func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
Assignment returns the current partition assignments
func (*Consumer) AssignmentLost ¶ added in v1.6.1
AssignmentLost returns true if current partition assignment has been lost. This method is only applicable for use with a subscribing consumer when handling a rebalance event or callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.
func (*Consumer) Commit ¶
func (c *Consumer) Commit() ([]TopicPartition, error)
Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.
func (*Consumer) CommitMessage ¶
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.
func (*Consumer) CommitOffsets ¶
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
CommitOffsets commits the provided list of offsets This is a blocking call. Returns the committed offsets on success.
func (*Consumer) Committed ¶ added in v0.11.4
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
Committed retrieves committed offsets for the given set of partitions
func (*Consumer) GetConsumerGroupMetadata ¶ added in v1.0.7
func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)
GetConsumerGroupMetadata returns the consumer's current group metadata. This object should be passed to the transactional producer's SendOffsetsToTransaction() API.
func (*Consumer) GetMetadata ¶
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (*Consumer) GetRebalanceProtocol ¶ added in v1.6.1
GetRebalanceProtocol returns the current consumer group rebalance protocol, which is either "EAGER" or "COOPERATIVE". If the rebalance protocol is not known in the current state an empty string is returned. Should typically only be called during rebalancing.
func (*Consumer) GetWatermarkOffsets ¶ added in v1.0.7
GetWatermarkOffsets returns the cached low and high offsets for the given topic and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. The low offset is populated every statistics.interval.ms if that value is set. OffsetInvalid will be returned if there is no cached offset for either value.
func (*Consumer) IncrementalAssign ¶ added in v1.6.1
func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)
IncrementalAssign adds the specified partitions to the current set of partitions to consume.
The .Offset field of each TopicPartition must either be set to an absolute starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), but should typically be set to `kafka.OffsetStored` to have the consumer use the committed offset as a start position, with a fallback to `auto.offset.reset` if there is no committed offset.
The new partitions must not be part of the current assignment.
func (*Consumer) IncrementalUnassign ¶ added in v1.6.1
func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)
IncrementalUnassign removes the specified partitions from the current set of partitions to consume.
The .Offset field of the TopicPartition is ignored.
The removed partitions must be part of the current assignment.
func (*Consumer) OffsetsForTimes ¶ added in v0.11.4
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
OffsetsForTimes looks up offsets by timestamp for the given partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.
The function will block for at most timeoutMs milliseconds.
Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.
func (*Consumer) Pause ¶ added in v0.11.4
func (c *Consumer) Pause(partitions []TopicPartition) (err error)
Pause consumption for the provided list of partitions
Note that messages already enqueued on the consumer's Event channel (if `go.events.channel.enable` has been set) will NOT be purged by this call, set `go.events.channel.size` accordingly.
func (*Consumer) Poll ¶
Poll the consumer for messages or events.
Will block for at most timeoutMs milliseconds ¶
The following callbacks may be triggered:
Subscribe()'s rebalanceCb
Returns nil on timeout, else an Event
func (*Consumer) Position ¶ added in v1.0.7
func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
Position returns the current consume position for the given partitions. Typical use is to call Assignment() to get the partition list and then pass it to Position() to get the current consume position for each of the assigned partitions. The consume position is the next message to read from the partition. i.e., the offset of the last message seen by the application + 1.
func (*Consumer) QueryWatermarkOffsets ¶
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition.
func (*Consumer) ReadMessage ¶ added in v0.11.4
ReadMessage polls the consumer for a message.
This is a convenience API that wraps Poll() and only returns messages or errors. All other event types are discarded.
The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.
Timeout is returned as (nil, err) where err is `err.(kafka.Error).Code() == kafka.ErrTimedOut`.
Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
func (*Consumer) Resume ¶ added in v0.11.4
func (c *Consumer) Resume(partitions []TopicPartition) (err error)
Resume consumption for the provided list of partitions
func (*Consumer) Seek ¶ added in v0.9.4
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error
Seek seeks the given topic partitions using the offset from the TopicPartition.
If timeoutMs is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ErrTimedOut. If timeoutMs is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).
Seek() may only be used for partitions already being consumed (through Assign() or implicitly through a self-rebalanced Subscribe()). To set the starting offset it is preferred to use Assign() and provide a starting offset for each partition.
Returns an error on failure or nil otherwise.
func (*Consumer) SetOAuthBearerToken ¶ added in v1.0.7
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (*Consumer) SetOAuthBearerTokenFailure ¶ added in v1.0.7
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (*Consumer) StoreMessage ¶ added in v1.8.2
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error)
StoreMessage stores offset based on the provided message. This is a convenience method that uses StoreOffsets to do the actual work.
func (*Consumer) StoreOffsets ¶ added in v0.11.4
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
StoreOffsets stores the provided list of offsets that will be committed to the offset store according to `auto.commit.interval.ms` or manual offset-less Commit().
Returns the stored offsets on success. If at least one offset couldn't be stored, an error and a list of offsets is returned. Each offset can be checked for specific errors via its `.Error` member.
func (*Consumer) Subscribe ¶
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
Subscribe to a single topic This replaces the current subscription
func (*Consumer) SubscribeTopics ¶
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.
func (*Consumer) Subscription ¶ added in v0.9.4
Subscription returns the current subscription as set by Subscribe()
func (*Consumer) Unsubscribe ¶
Unsubscribe from the current subscription, if any.
type ConsumerGroupMetadata ¶ added in v1.0.7
type ConsumerGroupMetadata struct {
// contains filtered or unexported fields
}
ConsumerGroupMetadata reflects the current consumer group member metadata.
func NewTestConsumerGroupMetadata ¶ added in v1.0.7
func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)
NewTestConsumerGroupMetadata creates a new consumer group metadata instance mainly for testing use. Use GetConsumerGroupMetadata() to retrieve the real metadata.
type CreateACLResult ¶ added in v1.9.0
type CreateACLResult struct { // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
CreateACLResult provides create ACL error information.
type CreateACLsAdminOption ¶ added in v1.9.0
type CreateACLsAdminOption interface {
// contains filtered or unexported methods
}
CreateACLsAdminOption - see setter.
See SetAdminRequestTimeout
type CreatePartitionsAdminOption ¶ added in v0.11.6
type CreatePartitionsAdminOption interface {
// contains filtered or unexported methods
}
CreatePartitionsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
type CreateTopicsAdminOption ¶ added in v0.11.6
type CreateTopicsAdminOption interface {
// contains filtered or unexported methods
}
CreateTopicsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
type DeleteACLsAdminOption ¶ added in v1.9.0
type DeleteACLsAdminOption interface {
// contains filtered or unexported methods
}
DeleteACLsAdminOption - see setter.
See SetAdminRequestTimeout
type DeleteACLsResult ¶ added in v1.9.0
type DeleteACLsResult = DescribeACLsResult
DeleteACLsResult provides delete ACLs result or error information.
type DeleteTopicsAdminOption ¶ added in v0.11.6
type DeleteTopicsAdminOption interface {
// contains filtered or unexported methods
}
DeleteTopicsAdminOption - see setters.
See SetAdminRequestTimeout, SetAdminOperationTimeout.
type DescribeACLsAdminOption ¶ added in v1.9.0
type DescribeACLsAdminOption interface {
// contains filtered or unexported methods
}
DescribeACLsAdminOption - see setter.
See SetAdminRequestTimeout
type DescribeACLsResult ¶ added in v1.9.0
type DescribeACLsResult struct { // Slice of ACL bindings matching the provided filter ACLBindings ACLBindings // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
DescribeACLsResult provides describe ACLs result or error information.
type DescribeConfigsAdminOption ¶ added in v0.11.6
type DescribeConfigsAdminOption interface {
// contains filtered or unexported methods
}
DescribeConfigsAdminOption - see setters.
See SetAdminRequestTimeout.
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error provides a Kafka-specific error container
func (Error) Error ¶
Error returns a human readable representation of an Error Same as Error.String()
func (Error) IsFatal ¶ added in v1.0.0
IsFatal returns true if the error is a fatal error. A fatal error indicates the client instance is no longer operable and should be terminated. Typical causes include non-recoverable idempotent producer errors.
func (Error) IsRetriable ¶ added in v1.0.7
IsRetriable returns true if the operation that caused this error may be retried. This flag is currently only set by the Transactional producer API.
func (Error) TxnRequiresAbort ¶ added in v1.0.7
TxnRequiresAbort returns true if the error is an abortable transaction error that requires the application to abort the current transaction with AbortTransaction() and start a new transaction with BeginTransaction() if it wishes to proceed with transactional operations. This flag is only set by the Transactional producer API.
type ErrorCode ¶
type ErrorCode int
ErrorCode is the integer representation of local and broker error codes
const ( // ErrBadMsg Local: Bad message format ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG) // ErrBadCompression Local: Invalid compressed data ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION) // ErrDestroy Local: Broker handle destroyed ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY) // ErrFail Local: Communication failure with broker ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL) // ErrTransport Local: Broker transport failure ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT) // ErrCritSysResource Local: Critical system resource failure ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE) // ErrResolve Local: Host resolution failure ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE) // ErrMsgTimedOut Local: Message timed out ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) // ErrPartitionEOF Broker: No more messages ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF) // ErrUnknownPartition Local: Unknown partition ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) // ErrFs Local: File or filesystem error ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS) // ErrUnknownTopic Local: Unknown topic ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) // ErrAllBrokersDown Local: All broker connections are down ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) // ErrInvalidArg Local: Invalid argument or configuration ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG) // ErrTimedOut Local: Timed out ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT) // ErrQueueFull Local: Queue full ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL) // ErrIsrInsuff Local: ISR count insufficient ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF) // ErrNodeUpdate Local: Broker node update ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE) // ErrSsl Local: SSL error ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL) // ErrWaitCoord Local: Waiting for coordinator ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD) // ErrUnknownGroup Local: Unknown group ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP) // ErrInProgress Local: Operation in progress ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS) // ErrPrevInProgress Local: Previous operation in progress ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS) // ErrExistingSubscription Local: Existing subscription ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION) // ErrAssignPartitions Local: Assign partitions ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) // ErrRevokePartitions Local: Revoke partitions ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) // ErrConflict Local: Conflicting use ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT) // ErrState Local: Erroneous state ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE) // ErrUnknownProtocol Local: Unknown protocol ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL) // ErrNotImplemented Local: Not implemented ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) // ErrAuthentication Local: Authentication failure ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION) // ErrNoOffset Local: No offset stored ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET) // ErrOutdated Local: Outdated ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED) // ErrTimedOutQueue Local: Timed out in queue ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) // ErrUnsupportedFeature Local: Required feature not supported by broker ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE) // ErrWaitCache Local: Awaiting cache update ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE) // ErrIntr Local: Operation interrupted ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR) // ErrKeySerialization Local: Key serialization error ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION) // ErrValueSerialization Local: Value serialization error ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION) // ErrKeyDeserialization Local: Key deserialization error ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION) // ErrValueDeserialization Local: Value deserialization error ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION) // ErrPartial Local: Partial response ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL) // ErrReadOnly Local: Read-only object ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY) // ErrNoent Local: No such entry ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT) // ErrUnderflow Local: Read underflow ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW) // ErrInvalidType Local: Invalid type ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE) // ErrRetry Local: Retry operation ErrRetry ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RETRY) // ErrPurgeQueue Local: Purged in queue ErrPurgeQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_QUEUE) // ErrPurgeInflight Local: Purged in flight ErrPurgeInflight ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_INFLIGHT) // ErrFatal Local: Fatal error ErrFatal ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FATAL) // ErrInconsistent Local: Inconsistent state ErrInconsistent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INCONSISTENT) // ErrGaplessGuarantee Local: Gap-less ordering would not be guaranteed if proceeding ErrGaplessGuarantee ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE) // ErrMaxPollExceeded Local: Maximum application poll interval (max.poll.interval.ms) exceeded ErrMaxPollExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED) // ErrUnknownBroker Local: Unknown broker ErrUnknownBroker ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_BROKER) // ErrNotConfigured Local: Functionality not configured ErrNotConfigured ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_CONFIGURED) // ErrFenced Local: This instance has been fenced by a newer instance ErrFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FENCED) // ErrApplication Local: Application generated error ErrApplication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__APPLICATION) // ErrAssignmentLost Local: Group partition assignment lost ErrAssignmentLost ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST) // ErrNoop Local: No operation performed ErrNoop ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOOP) // ErrAutoOffsetReset Local: No offset to automatically reset to ErrAutoOffsetReset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET) // ErrUnknown Unknown broker error ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN) // ErrNoError Success ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR) // ErrOffsetOutOfRange Broker: Offset out of range ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE) // ErrInvalidMsg Broker: Invalid message ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG) // ErrUnknownTopicOrPart Broker: Unknown topic or partition ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) // ErrInvalidMsgSize Broker: Invalid message size ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE) // ErrLeaderNotAvailable Broker: Leader not available ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) // ErrNotLeaderForPartition Broker: Not leader for partition ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) // ErrRequestTimedOut Broker: Request timed out ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT) // ErrBrokerNotAvailable Broker: Broker not available ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE) // ErrReplicaNotAvailable Broker: Replica not available ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE) // ErrMsgSizeTooLarge Broker: Message size too large ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH) // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE) // ErrNetworkException Broker: Broker disconnected before response received ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION) // ErrCoordinatorLoadInProgress Broker: Coordinator load in progress ErrCoordinatorLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS) // ErrCoordinatorNotAvailable Broker: Coordinator not available ErrCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE) // ErrNotCoordinator Broker: Not coordinator ErrNotCoordinator ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR) // ErrTopicException Broker: Invalid topic ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION) // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE) // ErrNotEnoughReplicas Broker: Not enough in-sync replicas ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS) // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND) // ErrInvalidRequiredAcks Broker: Invalid required acks value ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS) // ErrIllegalGeneration Broker: Specified group generation id is not valid ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL) // ErrInvalidGroupID Broker: Invalid group.id ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID) // ErrUnknownMemberID Broker: Unknown member ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) // ErrInvalidSessionTimeout Broker: Invalid session timeout ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT) // ErrRebalanceInProgress Broker: Group rebalance in progress ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS) // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE) // ErrTopicAuthorizationFailed Broker: Topic authorization failed ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) // ErrGroupAuthorizationFailed Broker: Group authorization failed ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED) // ErrClusterAuthorizationFailed Broker: Cluster authorization failed ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED) // ErrInvalidTimestamp Broker: Invalid timestamp ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP) // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM) // ErrIllegalSaslState Broker: Request not valid in current SASL state ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE) // ErrUnsupportedVersion Broker: API version not supported ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION) // ErrTopicAlreadyExists Broker: Topic already exists ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) // ErrInvalidPartitions Broker: Invalid number of partitions ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS) // ErrInvalidReplicationFactor Broker: Invalid replication factor ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR) // ErrInvalidReplicaAssignment Broker: Invalid replica assignment ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT) // ErrInvalidConfig Broker: Configuration is invalid ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG) // ErrNotController Broker: Not controller for cluster ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER) // ErrInvalidRequest Broker: Invalid request ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST) // ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT) // ErrPolicyViolation Broker: Policy violation ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION) // ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER) // ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER) // ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) // ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE) // ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING) // ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT) // ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS) // ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED) // ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED) // ErrSecurityDisabled Broker: Security features are disabled ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED) // ErrOperationNotAttempted Broker: Operation not attempted ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED) // ErrKafkaStorageError Broker: Disk error when trying to access log file on disk ErrKafkaStorageError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR) // ErrLogDirNotFound Broker: The user-specified log directory is not found in the broker config ErrLogDirNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND) // ErrSaslAuthenticationFailed Broker: SASL Authentication failed ErrSaslAuthenticationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) // ErrUnknownProducerID Broker: Unknown Producer Id ErrUnknownProducerID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID) // ErrReassignmentInProgress Broker: Partition reassignment is in progress ErrReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS) // ErrDelegationTokenAuthDisabled Broker: Delegation Token feature is not enabled ErrDelegationTokenAuthDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED) // ErrDelegationTokenNotFound Broker: Delegation Token is not found on server ErrDelegationTokenNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND) // ErrDelegationTokenOwnerMismatch Broker: Specified Principal is not valid Owner/Renewer ErrDelegationTokenOwnerMismatch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH) // ErrDelegationTokenRequestNotAllowed Broker: Delegation Token requests are not allowed on this connection ErrDelegationTokenRequestNotAllowed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED) // ErrDelegationTokenAuthorizationFailed Broker: Delegation Token authorization failed ErrDelegationTokenAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED) // ErrDelegationTokenExpired Broker: Delegation Token is expired ErrDelegationTokenExpired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED) // ErrInvalidPrincipalType Broker: Supplied principalType is not supported ErrInvalidPrincipalType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE) // ErrNonEmptyGroup Broker: The group is not empty ErrNonEmptyGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP) // ErrGroupIDNotFound Broker: The group id does not exist ErrGroupIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND) // ErrFetchSessionIDNotFound Broker: The fetch session ID was not found ErrFetchSessionIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND) // ErrInvalidFetchSessionEpoch Broker: The fetch session epoch is invalid ErrInvalidFetchSessionEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH) // ErrListenerNotFound Broker: No matching listener ErrListenerNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND) // ErrTopicDeletionDisabled Broker: Topic deletion is disabled ErrTopicDeletionDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED) // ErrFencedLeaderEpoch Broker: Leader epoch is older than broker epoch ErrFencedLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH) // ErrUnknownLeaderEpoch Broker: Leader epoch is newer than broker epoch ErrUnknownLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH) // ErrUnsupportedCompressionType Broker: Unsupported compression type ErrUnsupportedCompressionType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE) // ErrStaleBrokerEpoch Broker: Broker epoch has changed ErrStaleBrokerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH) // ErrOffsetNotAvailable Broker: Leader high watermark is not caught up ErrOffsetNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE) // ErrMemberIDRequired Broker: Group member needs a valid member ID ErrMemberIDRequired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) // ErrPreferredLeaderNotAvailable Broker: Preferred leader was not available ErrPreferredLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE) // ErrGroupMaxSizeReached Broker: Consumer group has reached maximum size ErrGroupMaxSizeReached ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED) // ErrFencedInstanceID Broker: Static consumer fenced by other consumer with same group.instance.id ErrFencedInstanceID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) // ErrEligibleLeadersNotAvailable Broker: Eligible partition leaders are not available ErrEligibleLeadersNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE) // ErrElectionNotNeeded Broker: Leader election not needed for topic partition ErrElectionNotNeeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED) // ErrNoReassignmentInProgress Broker: No partition reassignment is in progress ErrNoReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS) // ErrGroupSubscribedToTopic Broker: Deleting offsets of a topic while the consumer group is subscribed to it ErrGroupSubscribedToTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC) // ErrInvalidRecord Broker: Broker failed to validate record ErrInvalidRecord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_RECORD) // ErrUnstableOffsetCommit Broker: There are unstable offsets that need to be cleared ErrUnstableOffsetCommit ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) // ErrThrottlingQuotaExceeded Broker: Throttling quota has been exceeded ErrThrottlingQuotaExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED) // ErrProducerFenced Broker: There is a newer producer with the same transactionalId which fences the current one ErrProducerFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PRODUCER_FENCED) // ErrResourceNotFound Broker: Request illegally referred to resource that does not exist ErrResourceNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND) // ErrDuplicateResource Broker: Request illegally referred to the same resource twice ErrDuplicateResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE) // ErrUnacceptableCredential Broker: Requested credential would not meet criteria for acceptability ErrUnacceptableCredential ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL) // ErrInconsistentVoterSet Broker: Indicates that the either the sender or recipient of a voter-only request is not one of the expected voters ErrInconsistentVoterSet ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET) // ErrInvalidUpdateVersion Broker: Invalid update version ErrInvalidUpdateVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION) // ErrFeatureUpdateFailed Broker: Unable to update finalized features due to server error ErrFeatureUpdateFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED) // ErrPrincipalDeserializationFailure Broker: Request principal deserialization failed during forwarding ErrPrincipalDeserializationFailure ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE) )
type Event ¶
type Event interface { // String returns a human-readable representation of the event String() string }
Event generic interface
type Handle ¶
type Handle interface { // SetOAuthBearerToken sets the the data to be transmitted // to a broker during SASL/OAUTHBEARER authentication. It will return nil // on success, otherwise an error if: // 1) the token data is invalid (meaning an expiration time in the past // or either a token value or an extension key or value that does not meet // the regular expression requirements as per // https://tools.ietf.org/html/rfc7628#section-3.1); // 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; // 3) SASL/OAUTHBEARER is supported but is not configured as the client's // authentication mechanism. SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error // SetOAuthBearerTokenFailure sets the error message describing why token // retrieval/setting failed; it also schedules a new token refresh event for 10 // seconds later so the attempt may be retried. It will return nil on // success, otherwise an error if: // 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; // 2) SASL/OAUTHBEARER is supported but is not configured as the client's // authentication mechanism. SetOAuthBearerTokenFailure(errstr string) error // contains filtered or unexported methods }
Handle represents a generic client handle containing common parts for both Producer and Consumer.
type Header ¶ added in v0.11.4
type Header struct { Key string // Header name (utf-8 string) Value []byte // Header value (nil, empty, or binary) }
Header represents a single Kafka message header.
Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys.
Key is a human readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of of the Value but it should be made relatively compact. The value may be a byte array, empty, or nil.
NOTE: Message headers are not available on producer delivery report messages.
type LogEvent ¶ added in v1.0.7
type LogEvent struct { Name string // Name of client instance Tag string // Log tag that provides context to the log Message (e.g., "METADATA" or "GRPCOORD") Message string // Log message Level int // Log syslog level, lower is more critical. Timestamp time.Time // Log timestamp }
LogEvent represent the log from librdkafka internal log queue
type Message ¶
type Message struct { TopicPartition TopicPartition Value []byte Key []byte Timestamp time.Time TimestampType TimestampType Opaque interface{} Headers []Header }
Message represents a Kafka message
type Metadata ¶
type Metadata struct { Brokers []BrokerMetadata Topics map[string]TopicMetadata OriginatingBroker BrokerMetadata }
Metadata contains broker and topic metadata for all (matching) topics
type MockCluster ¶ added in v1.9.0
type MockCluster struct {
// contains filtered or unexported fields
}
MockCluster represents a Kafka mock cluster instance which can be used for testing.
func NewMockCluster ¶ added in v1.9.0
func NewMockCluster(brokerCount int) (*MockCluster, error)
NewMockCluster provides a mock Kafka cluster with a configurable number of brokers that support a reasonable subset of Kafka protocol operations, error injection, etc.
Mock clusters provide localhost listeners that can be used as the bootstrap servers by multiple Kafka client instances.
Currently supported functionality: - Producer - Idempotent Producer - Transactional Producer - Low-level consumer - High-level balanced consumer groups with offset commits - Topic Metadata and auto creation
Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
func (*MockCluster) BootstrapServers ¶ added in v1.9.0
func (mc *MockCluster) BootstrapServers() string
BootstrapServers returns the bootstrap.servers property for this MockCluster
func (*MockCluster) Close ¶ added in v1.9.0
func (mc *MockCluster) Close()
Close and destroy the MockCluster
type OAuthBearerToken ¶ added in v1.0.7
type OAuthBearerToken struct { // Token value, often (but not necessarily) a JWS compact serialization // as per https://tools.ietf.org/html/rfc7515#section-3.1; it must meet // the regular expression for a SASL/OAUTHBEARER value defined at // https://tools.ietf.org/html/rfc7628#section-3.1 TokenValue string // Metadata about the token indicating when it expires (local time); // it must represent a time in the future Expiration time.Time // Metadata about the token indicating the Kafka principal name // to which it applies (for example, "admin") Principal string // SASL extensions, if any, to be communicated to the broker during // authentication (all keys and values of which must meet the regular // expressions defined at https://tools.ietf.org/html/rfc7628#section-3.1, // and it must not contain the reserved "auth" key) Extensions map[string]string }
OAuthBearerToken represents the data to be transmitted to a broker during SASL/OAUTHBEARER authentication.
type OAuthBearerTokenRefresh ¶ added in v1.0.7
type OAuthBearerTokenRefresh struct { // Config is the value of the sasl.oauthbearer.config property Config string }
OAuthBearerTokenRefresh indicates token refresh is required
func (OAuthBearerTokenRefresh) String ¶ added in v1.0.7
func (o OAuthBearerTokenRefresh) String() string
type Offset ¶
type Offset int64
Offset type (int64) with support for canonical names
func NewOffset ¶
NewOffset creates a new Offset using the provided logical string, or an absolute int64 offset value. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
func OffsetTail ¶
OffsetTail returns the logical offset relativeOffset from current end of partition
type OffsetsCommitted ¶
type OffsetsCommitted struct { Error error Offsets []TopicPartition }
OffsetsCommitted reports committed offsets
func (OffsetsCommitted) String ¶
func (o OffsetsCommitted) String() string
type PartitionEOF ¶
type PartitionEOF TopicPartition
PartitionEOF consumer reached end of partition Needs to be explicitly enabled by setting the `enable.partition.eof` configuration property to true.
func (PartitionEOF) String ¶
func (p PartitionEOF) String() string
type PartitionMetadata ¶
PartitionMetadata contains per-partition metadata
type PartitionsSpecification ¶ added in v0.11.6
type PartitionsSpecification struct { // Topic to create more partitions for. Topic string // New partition count for topic, must be higher than current partition count. IncreaseTo int // (Optional) Explicit replica assignment. The outer array is // indexed by the new partition index (i.e., 0 for the first added // partition), while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. ReplicaAssignment [][]int32 }
PartitionsSpecification holds parameters for creating additional partitions for a topic. PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer implements a High-level Apache Kafka Producer instance
func NewProducer ¶
NewProducer creates a new high-level Producer instance.
conf is a *ConfigMap with standard librdkafka configuration properties.
Supported special configuration properties (type, default):
go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). These batches do not relate to Kafka message batches in any way. Note: timestamps and headers are not supported with this interface. go.delivery.reports (bool, true) - Forward per-message delivery reports to the Events() channel. go.delivery.report.fields (string, "key,value") - Comma separated list of fields to enable for delivery reports. Allowed values: all, none (or empty string), key, value, headers Warning: There is a performance penalty to include headers in the delivery report. go.events.channel.size (int, 1000000) - Events(). go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages) go.logs.channel.enable (bool, false) - Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.
func (*Producer) AbortTransaction ¶ added in v1.0.7
AbortTransaction aborts the ongoing transaction.
This function should also be used to recover from non-fatal abortable transaction errors.
Any outstanding messages will be purged and fail with `ErrPurgeInflight` or `ErrPurgeQueue`.
Parameters:
- `ctx` - The maximum amount of time to block, or nil for indefinite.
Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the `ctx` expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call `Purge()` and `Flush()` to ensure all queued and in-flight messages are purged before attempting to abort the transaction. The application MUST serve the `producer.Events()` channel for delivery reports in a separate go-routine during this time.
Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`.
func (*Producer) BeginTransaction ¶ added in v1.0.7
BeginTransaction starts a new transaction.
`InitTransactions()` must have been called successfully (once) before this function is called.
Any messages produced, offsets sent (`SendOffsetsToTransaction()`), etc, after the successful return of this function will be part of the transaction and committed or aborted atomatically.
Finish the transaction by calling `CommitTransaction()` or abort the transaction by calling `AbortTransaction()`.
Returns nil on success or an error object on failure. Check whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`.
Note: With the transactional producer, `Produce()`, et.al, are only allowed during an on-going transaction, as started with this function. Any produce call outside an on-going transaction, or for a failed transaction, will fail.
func (*Producer) Close ¶
func (p *Producer) Close()
Close a Producer instance. The Producer object or its channels are no longer usable after this call.
func (*Producer) CommitTransaction ¶ added in v1.0.7
CommitTransaction commits the current transaction.
Any outstanding messages will be flushed (delivered) before actually committing the transaction.
If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call `AbortTransaction()` before attempting a new transaction with `BeginTransaction()`.
Parameters:
- `ctx` - The maximum amount of time to block, or nil for indefinite.
Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the `ctx` expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call `Flush()` to ensure all queued messages are delivered before attempting to commit the transaction. The application MUST serve the `producer.Events()` channel for delivery reports in a separate go-routine during this time.
Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether an abortable or fatal error has been raised by calling `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` respectively.
func (*Producer) Flush ¶
Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.
func (*Producer) GetFatalError ¶ added in v1.0.0
GetFatalError returns an Error object if the client instance has raised a fatal error, else nil.
func (*Producer) GetMetadata ¶
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (*Producer) InitTransactions ¶ added in v1.0.7
InitTransactions Initializes transactions for the producer instance.
This function ensures any transactions initiated by previous instances of the producer with the same `transactional.id` are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the `transactional.id` is configured.
If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction's completion.
When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.
Upon successful return from this function the application has to perform at least one of the following operations within `transaction.timeout.ms` to avoid timing out the transaction on the broker:
- `Produce()` (et.al)
- `SendOffsetsToTransaction()`
- `CommitTransaction()`
- `AbortTransaction()`
Parameters:
- `ctx` - The maximum time to block, or nil for indefinite. On timeout the operation may continue in the background, depending on state, and it is okay to call `InitTransactions()` again.
Returns nil on success or an error on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`.
func (*Producer) Len ¶
Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. Includes messages on ProduceChannel.
func (*Producer) OffsetsForTimes ¶ added in v0.11.4
func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
OffsetsForTimes looks up offsets by timestamp for the given partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.
The function will block for at most timeoutMs milliseconds.
Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.
func (*Producer) Produce ¶
Produce single message. This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately. The delivery report will be sent on the provided deliveryChan if specified, or on the Producer object's Events() channel if not. msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented), api.version.request=true, and broker >= 0.10.0.0. msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented), api.version.request=true, and broker >= 0.11.0.0. Returns an error if message could not be enqueued.
func (*Producer) ProduceChannel ¶
ProduceChannel returns the produce *Message channel (write)
func (*Producer) Purge ¶ added in v1.0.7
Purge messages currently handled by this producer instance.
flags is a combination of PurgeQueue, PurgeInFlight and PurgeNonBlocking.
The application will need to call Poll(), Flush() or read the Events() channel after this call to serve delivery reports for the purged messages.
Messages purged from internal queues fail with the delivery report error code set to ErrPurgeQueue, while purged messages that are in-flight to or from the broker will fail with the error code set to ErrPurgeInflight.
Warning: Purging messages that are in-flight to or from the broker will ignore any sub-sequent acknowledgement for these messages received from the broker, effectively making it impossible for the application to know if the messages were successfully produced or not. This may result in duplicate messages if the application retries these messages at a later time.
Note: This call may block for a short time while background thread queues are purged.
Returns nil on success, ErrInvalidArg if the purge flags are invalid or unknown.
func (*Producer) QueryWatermarkOffsets ¶
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.
func (*Producer) SendOffsetsToTransaction ¶ added in v1.0.7
func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, consumerMetadata *ConsumerGroupMetadata) error
SendOffsetsToTransaction sends a list of topic partition offsets to the consumer group coordinator for `consumerMetadata`, and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.
The offsets should be the next message your application will consume, i.e., the last processed message's offset + 1 for each partition. Either track the offsets manually during processing or use `consumer.Position()` (on the consumer) to get the current offsets for the partitions assigned to the consumer.
Use this method at the end of a consume-transform-produce loop prior to committing the transaction with `CommitTransaction()`.
Parameters:
- `ctx` - The maximum amount of time to block, or nil for indefinite.
- `offsets` - List of offsets to commit to the consumer group upon successful commit of the transaction. Offsets should be the next message to consume, e.g., last processed message + 1.
- `consumerMetadata` - The current consumer group metadata as returned by `consumer.GetConsumerGroupMetadata()` on the consumer instance the provided offsets were consumed from.
Note: The consumer must disable auto commits (set `enable.auto.commit` to false on the consumer).
Note: Logical and invalid offsets (e.g., OffsetInvalid) in `offsets` will be ignored. If there are no valid offsets in `offsets` the function will return nil and no action will be taken.
Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether an abortable or fatal error has been raised by calling `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` respectively.
func (*Producer) SetOAuthBearerToken ¶ added in v1.0.7
func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc7628#section-3.1); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
func (*Producer) SetOAuthBearerTokenFailure ¶ added in v1.0.7
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.
type RebalanceCb ¶
RebalanceCb provides a per-Subscribe*() rebalance event callback. The passed Event will be either AssignedPartitions or RevokedPartitions
type ResourcePatternType ¶ added in v1.9.0
type ResourcePatternType int
ResourcePatternType enumerates the different types of Kafka resource patterns.
func ResourcePatternTypeFromString ¶ added in v1.9.0
func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error)
ResourcePatternTypeFromString translates a resource pattern type name to a ResourcePatternType value.
func (ResourcePatternType) String ¶ added in v1.9.0
func (t ResourcePatternType) String() string
String returns the human-readable representation of a ResourcePatternType
type ResourceType ¶ added in v0.11.6
type ResourceType int
ResourceType represents an Apache Kafka resource type
func ResourceTypeFromString ¶ added in v0.11.6
func ResourceTypeFromString(typeString string) (ResourceType, error)
ResourceTypeFromString translates a resource type name/string to a ResourceType value.
func (ResourceType) String ¶ added in v0.11.6
func (t ResourceType) String() string
String returns the human-readable representation of a ResourceType
type RevokedPartitions ¶
type RevokedPartitions struct {
Partitions []TopicPartition
}
RevokedPartitions consumer group rebalance event: revoked partition set
func (RevokedPartitions) String ¶
func (e RevokedPartitions) String() string
type Stats ¶ added in v0.11.0
type Stats struct {
// contains filtered or unexported fields
}
Stats statistics event
type TimestampType ¶
type TimestampType int
TimestampType is a the Message timestamp type or source
func (TimestampType) String ¶
func (t TimestampType) String() string
type TopicMetadata ¶
type TopicMetadata struct { Topic string Partitions []PartitionMetadata Error Error }
TopicMetadata contains per-topic metadata
type TopicPartition ¶
type TopicPartition struct { Topic *string Partition int32 Offset Offset Metadata *string Error error }
TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
func (TopicPartition) String ¶
func (p TopicPartition) String() string
type TopicPartitions ¶ added in v0.9.4
type TopicPartitions []TopicPartition
TopicPartitions is a slice of TopicPartitions that also implements the sort interface
func (TopicPartitions) Len ¶ added in v0.9.4
func (tps TopicPartitions) Len() int
func (TopicPartitions) Less ¶ added in v0.9.4
func (tps TopicPartitions) Less(i, j int) bool
func (TopicPartitions) Swap ¶ added in v0.9.4
func (tps TopicPartitions) Swap(i, j int)
type TopicResult ¶ added in v0.11.6
type TopicResult struct { // Topic name Topic string // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
TopicResult provides per-topic operation result (error) information.
func (TopicResult) String ¶ added in v0.11.6
func (t TopicResult) String() string
String returns a human-readable representation of a TopicResult.
type TopicSpecification ¶ added in v0.11.6
type TopicSpecification struct { // Topic name to create. Topic string // Number of partitions in topic. NumPartitions int // Default replication factor for the topic's partitions, or zero // if an explicit ReplicaAssignment is set. ReplicationFactor int // (Optional) Explicit replica assignment. The outer array is // indexed by the partition number, while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. ReplicaAssignment [][]int32 // Topic configuration. Config map[string]string }
TopicSpecification holds parameters for creating a new topic. TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
confluent-kafka-go internal tool to generate error constants from librdkafka
|
confluent-kafka-go internal tool to generate error constants from librdkafka |