Documentation ¶
Overview ¶
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.
To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
To consume messages, use Consumer or Consumer-Group API.
For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.
Broker related metrics:
+----------------------------------------------+------------+---------------------------------------------------------------+ | Name | Type | Description | +----------------------------------------------+------------+---------------------------------------------------------------+ | incoming-byte-rate | meter | Bytes/second read off all brokers | | incoming-byte-rate-for-broker-<broker-id> | meter | Bytes/second read off a given broker | | outgoing-byte-rate | meter | Bytes/second written off all brokers | | outgoing-byte-rate-for-broker-<broker-id> | meter | Bytes/second written off a given broker | | request-rate | meter | Requests/second sent to all brokers | | request-rate-for-broker-<broker-id> | meter | Requests/second sent to a given broker | | request-size | histogram | Distribution of the request size in bytes for all brokers | | request-size-for-broker-<broker-id> | histogram | Distribution of the request size in bytes for a given broker | | request-latency-in-ms | histogram | Distribution of the request latency in ms for all brokers | | request-latency-in-ms-for-broker-<broker-id> | histogram | Distribution of the request latency in ms for a given broker | | response-rate | meter | Responses/second received from all brokers | | response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker | | response-size | histogram | Distribution of the response size in bytes for all brokers | | response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker | +----------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
Producer related metrics:
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | batch-size | histogram | Distribution of the number of bytes sent per partition per request for all topics | | batch-size-for-topic-<topic> | histogram | Distribution of the number of bytes sent per partition per request for a given topic | | record-send-rate | meter | Records/second sent to all topics | | record-send-rate-for-topic-<topic> | meter | Records/second sent to a given topic | | records-per-request | histogram | Distribution of the number of records sent per request for all topics | | records-per-request-for-topic-<topic> | histogram | Distribution of the number of records sent per request for a given topic | | compression-ratio | histogram | Distribution of the compression ratio times 100 of record batches for all topics | | compression-ratio-for-topic-<topic> | histogram | Distribution of the compression ratio times 100 of record batches for a given topic | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
Consumer related metrics:
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | consumer-batch-size | histogram | Distribution of the number of messages in a batch | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
Index ¶
- Constants
- Variables
- type ACL
- type ACLCreation
- type ACLCreationResponse
- type ACLFilter
- type ACLOperation
- type ACLPermissionType
- type ACLResourcePatternType
- type ACLResourceType
- type APIVersionsRequest
- type APIVersionsResponse
- type APIVersionsResponseBlock
- type AbortedTransaction
- type AccessToken
- type AccessTokenProvider
- type AddOffsetsToTxnRequest
- type AddOffsetsToTxnResponse
- type AddPartitionsToTxnRequest
- type AddPartitionsToTxnResponse
- type AlterConfigsRequest
- type AlterConfigsResource
- type AlterConfigsResourceResponse
- type AlterConfigsResponse
- type AsyncProducer
- type BalanceStrategy
- type BalanceStrategyPlan
- type Broker
- func (b *Broker) APIVersions(request *APIVersionsRequest) (*APIVersionsResponse, error)
- func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)
- func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)
- func (b *Broker) Addr() string
- func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error)
- func (b *Broker) Close() error
- func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)
- func (b *Broker) Connected() (bool, error)
- func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error)
- func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error)
- func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error)
- func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error)
- func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error)
- func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error)
- func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error)
- func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error)
- func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error)
- func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error)
- func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error)
- func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)
- func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)
- func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error)
- func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)
- func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
- func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)
- func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error)
- func (b *Broker) ID() int32
- func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error)
- func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error)
- func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error)
- func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error)
- func (b *Broker) Open(conf *Config) error
- func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)
- func (b *Broker) Rack() string
- func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error)
- func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error)
- type ByteEncoder
- type Client
- type ClusterAdmin
- type CompressionCodec
- type Config
- type ConfigEntry
- type ConfigResource
- type ConfigResourceType
- type ConfigSource
- type ConfigSynonym
- type ConfigurationError
- type Consumer
- type ConsumerError
- type ConsumerErrors
- type ConsumerGroup
- type ConsumerGroupClaim
- type ConsumerGroupHandler
- type ConsumerGroupMemberAssignment
- type ConsumerGroupMemberMetadata
- type ConsumerGroupSession
- type ConsumerMessage
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type ControlRecord
- type ControlRecordType
- type CoordinatorType
- type CreateAclsRequest
- type CreateAclsResponse
- type CreatePartitionsRequest
- type CreatePartitionsResponse
- type CreateTopicsRequest
- type CreateTopicsResponse
- type DeleteAclsRequest
- type DeleteAclsResponse
- type DeleteGroupsRequest
- type DeleteGroupsResponse
- type DeleteRecordsRequest
- type DeleteRecordsRequestTopic
- type DeleteRecordsResponse
- type DeleteRecordsResponsePartition
- type DeleteRecordsResponseTopic
- type DeleteTopicsRequest
- type DeleteTopicsResponse
- type DescribeAclsRequest
- type DescribeAclsResponse
- type DescribeConfigsRequest
- type DescribeConfigsResponse
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type DynamicConsistencyPartitioner
- type Encoder
- type EndTxnRequest
- type EndTxnResponse
- type FetchRequest
- type FetchResponse
- func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, ...)
- func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, ...)
- func (r *FetchResponse) AddError(topic string, partition int32, err KError)
- func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
- func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)
- func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
- func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)
- func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)
- type FetchResponseBlock
- type FilterResponse
- type FindCoordinatorRequest
- type FindCoordinatorResponse
- type GroupDescription
- type GroupMemberDescription
- type GroupProtocol
- type HashPartitionerOption
- type HeartbeatRequest
- type HeartbeatResponse
- type InitProducerIDRequest
- type InitProducerIDResponse
- type IsolationLevel
- type JoinGroupRequest
- type JoinGroupResponse
- type KError
- type KafkaVersion
- type LeaveGroupRequest
- type LeaveGroupResponse
- type ListGroupsRequest
- type ListGroupsResponse
- type MatchingACL
- type Message
- type MessageBlock
- type MessageSet
- type MetadataRequest
- type MetadataResponse
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type OffsetManager
- type OffsetRequest
- type OffsetResponse
- type OffsetResponseBlock
- type PacketDecodingError
- type PacketEncodingError
- type PartitionConsumer
- type PartitionError
- type PartitionMetadata
- type PartitionOffsetManager
- type PartitionOffsetMetadata
- type Partitioner
- type PartitionerConstructor
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseBlock
- type ProducerError
- type ProducerErrors
- type ProducerMessage
- type Record
- type RecordBatch
- type RecordHeader
- type Records
- type RequiredAcks
- type Resource
- type ResourceAcls
- type ResourceResponse
- type SASLMechanism
- type SCRAMClient
- type SaslAuthenticateRequest
- type SaslAuthenticateResponse
- type SaslHandshakeRequest
- type SaslHandshakeResponse
- type StdLogger
- type StringEncoder
- type SyncGroupRequest
- type SyncGroupResponse
- type SyncProducer
- type Timestamp
- type TopicDetail
- type TopicError
- type TopicMetadata
- type TopicPartition
- type TopicPartitionError
- type TxnOffsetCommitRequest
- type TxnOffsetCommitResponse
Constants ¶
const ( // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+) SASLTypeOAuth = "OAUTHBEARER" // SASLTypePlaintext represents the SASL/PLAIN mechanism SASLTypePlaintext = "PLAIN" // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256" // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512" // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and // server negotiate SASL auth using opaque packets. SASLHandshakeV0 = int16(0) // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and // server negotiate SASL by wrapping tokens with Kafka protocol headers. SASLHandshakeV1 = int16(1) // SASLExtKeyAuth is the reserved extension key name sent as part of the // SASL/OAUTHBEARER initial client response SASLExtKeyAuth = "auth" )
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest int64 = -1 // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest int64 = -2 )
const APIKeySASLAuth = 36
APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
const GroupGenerationUndefined = -1
GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.
const ReceiveTime int64 = -1
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
Variables ¶
var ( // Logger is the instance of a StdLogger interface that Sarama writes connection // management events to. By default it is set to discard all log messages via ioutil.Discard, // but you can set it to redirect wherever you want. Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) // PanicHandler is called for recovering from panics spawned internally to the library (and thus // not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered. PanicHandler func(interface{}) // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying // to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt // to process. MaxRequestSize int32 = 100 * 1024 * 1024 // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If // a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to // protect the client from running out of memory. Please note that brokers do not have any natural limit on // the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers // (see https://issues.apache.org/jira/browse/KAFKA-2063). MaxResponseSize int32 = 100 * 1024 * 1024 )
var ( V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, V0_8_2_1, V0_8_2_2, V0_9_0_0, V0_9_0_1, V0_10_0_0, V0_10_0_1, V0_10_1_0, V0_10_1_1, V0_10_2_0, V0_10_2_1, V0_11_0_0, V0_11_0_1, V0_11_0_2, V1_0_0_0, V1_1_0_0, V1_1_1_0, V2_0_0_0, V2_0_1_0, V2_1_0_0, V2_2_0_0, } MinVersion = V0_8_2_0 MaxVersion = V2_2_0_0 )
Effective constants defining the supported kafka versions.
var BalanceStrategyRange = &balanceStrategy{ name: "range", coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { step := float64(len(partitions)) / float64(len(memberIDs)) for i, memberID := range memberIDs { pos := float64(i) min := int(math.Floor(pos*step + 0.5)) max := int(math.Floor((pos+1)*step + 0.5)) plan.Add(memberID, topic, partitions[min:max]...) } }, }
BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members. Example with one topic T with six partitions (0..5) and two members (M1, M2):
M1: {T: [0, 1, 2]} M2: {T: [3, 4, 5]}
var BalanceStrategyRoundRobin = &balanceStrategy{ name: "roundrobin", coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { for i, part := range partitions { memberID := memberIDs[i%len(memberIDs)] plan.Add(memberID, topic, part) } }, }
BalanceStrategyRoundRobin assigns partitions to members in alternating order. Example with topic T with six partitions (0..5) and two members (M1, M2):
M1: {T: [0, 2, 4]} M2: {T: [1, 3, 5]}
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.
var ErrNotConnected = errors.New("kafka: broker not connected")
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
ErrShuttingDown is returned when a producer receives a message during shutdown.
var NoNode = &Broker{id: -1, addr: ":-1"}
NoNode node
Functions ¶
This section is empty.
Types ¶
type ACL ¶
type ACL struct { Principal string Host string Operation ACLOperation PermissionType ACLPermissionType }
ACL holds information about acl type
type ACLCreation ¶
ACLCreation is a wrapper around Resource and ACL type
type ACLCreationResponse ¶
ACLCreationResponse is an acl creation response type
type ACLFilter ¶
type ACLFilter struct { Version int ResourceType ACLResourceType ResourceName *string ResourcePatternTypeFilter ACLResourcePatternType Principal *string Host *string Operation ACLOperation PermissionType ACLPermissionType }
ACLFilter acl filter
type ACLOperation ¶
type ACLOperation int
ACLOperation operation type
const (
ACLOperationWrite ACLOperation = iota
)
type ACLPermissionType ¶
type ACLPermissionType int
ACLPermissionType permission type
const (
ACLPermissionAllow ACLPermissionType = iota
)
type ACLResourcePatternType ¶
type ACLResourcePatternType int
ACLResourcePatternType resource pattern type
const ( ACLPatternUnknown ACLResourcePatternType = iota ACLPatternLiteral )
type ACLResourceType ¶
type ACLResourceType int
ACLResourceType resource type
const (
ACLResourceTopic ACLResourceType = iota
)
type APIVersionsResponse ¶
type APIVersionsResponse struct { Err KError APIVersions []*APIVersionsResponseBlock }
APIVersionsResponse is an api version response type
type APIVersionsResponseBlock ¶
APIVersionsResponseBlock is an api version response block type
type AbortedTransaction ¶
AbortedTransaction transaction
type AccessToken ¶
type AccessToken struct { // Token is the access token payload. Token string // Extensions is a optional map of arbitrary key-value pairs that can be // sent with the SASL/OAUTHBEARER initial client response. These values are // ignored by the SASL server if they are unexpected. This feature is only // supported by Kafka >= 2.1.0. Extensions map[string]string }
AccessToken contains an access token used to authenticate a SASL/OAUTHBEARER client along with associated metadata.
type AccessTokenProvider ¶
type AccessTokenProvider interface { // Token returns an access token. The implementation should ensure token // reuse so that multiple calls at connect time do not create multiple // tokens. The implementation should also periodically refresh the token in // order to guarantee that each call returns an unexpired token. This // method should not block indefinitely--a timeout error should be returned // after a short period of inactivity so that the broker connection logic // can log debugging information and retry. Token() (*AccessToken, error) }
AccessTokenProvider is the interface that encapsulates how implementors can generate access tokens for Kafka broker authentication.
type AddOffsetsToTxnRequest ¶
type AddOffsetsToTxnRequest struct { TransactionalID string ProducerID int64 ProducerEpoch int16 GroupID string }
AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnResponse ¶
AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddPartitionsToTxnRequest ¶
type AddPartitionsToTxnRequest struct { TransactionalID string ProducerID int64 ProducerEpoch int16 TopicPartitions map[string][]int32 }
AddPartitionsToTxnRequest is a add paartition request
type AddPartitionsToTxnResponse ¶
type AddPartitionsToTxnResponse struct { ThrottleTime time.Duration Errors map[string][]*PartitionError }
AddPartitionsToTxnResponse is a partition errors to transaction type
type AlterConfigsRequest ¶
type AlterConfigsRequest struct { Resources []*AlterConfigsResource ValidateOnly bool }
AlterConfigsRequest is an alter config request type
type AlterConfigsResource ¶
type AlterConfigsResource struct { Type ConfigResourceType Name string ConfigEntries map[string]*string }
AlterConfigsResource is an alter config resource type
type AlterConfigsResourceResponse ¶
type AlterConfigsResourceResponse struct { ErrorCode int16 ErrorMsg string Type ConfigResourceType Name string }
AlterConfigsResourceResponse is a response type for alter config resource
type AlterConfigsResponse ¶
type AlterConfigsResponse struct { ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse }
AlterConfigsResponse is a response type for alter config
type AsyncProducer ¶
type AsyncProducer interface { // AsyncClose triggers a shutdown of the producer. The shutdown has completed // when both the Errors and Successes channels have been closed. When calling // AsyncClose, you *must* continue to read from those channels in order to // drain the results of any messages in flight. AsyncClose() // Close shuts down the producer and waits for any buffered messages to be // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. You must call this before calling // Close on the underlying client. Close() error // Input is the input channel for the user to write messages to that they // wish to send. Input() chan<- *ProducerMessage // Successes is the success output channel back to the user when Return.Successes is // enabled. If Return.Successes is true, you MUST read from this channel or the // Producer will deadlock. It is suggested that you send and read messages // together in a single select statement. Successes() <-chan *ProducerMessage // Errors is the error output channel back to the user. You MUST read from this // channel or the Producer will deadlock when the channel is full. Alternatively, // you can set Producer.Return.Errors in your config to false, which prevents // errors to be returned. Errors() <-chan *ProducerError }
AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope.
func NewAsyncProducer ¶
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error)
NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
func NewAsyncProducerFromClient ¶
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error)
NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.
type BalanceStrategy ¶
type BalanceStrategy interface { // Name uniquely identifies the strategy. Name() string // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions` // and returns a distribution plan. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) }
BalanceStrategy is used to balance topics and partitions across members of a consumer group
type BalanceStrategyPlan ¶
BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt. It contains an allocation of topic/partitions by memberID in the form of a `memberID -> topic -> partitions` map.
func (BalanceStrategyPlan) Add ¶
func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32)
Add assigns a topic with a number partitions to a member.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
func NewBroker ¶
NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.
func (*Broker) APIVersions ¶
func (b *Broker) APIVersions(request *APIVersionsRequest) (*APIVersionsResponse, error)
APIVersions return api version response or error
func (*Broker) AddOffsetsToTxn ¶
func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)
AddOffsetsToTxn sends a request to add offsets to txn and returns a response or error
func (*Broker) AddPartitionsToTxn ¶
func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)
AddPartitionsToTxn send a request to add partition to txn and returns a response or error
func (*Broker) Addr ¶
Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
func (*Broker) AlterConfigs ¶
func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error)
AlterConfigs sends a request to alter config and return a response or error
func (*Broker) CommitOffset ¶
func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)
CommitOffset return an Offset commit response or error
func (*Broker) Connected ¶
Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.
func (*Broker) CreateAcls ¶
func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error)
CreateAcls sends a create acl request and returns a response or error
func (*Broker) CreatePartitions ¶
func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error)
CreatePartitions sends a create partition request and returns create partitions response or error
func (*Broker) CreateTopics ¶
func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error)
CreateTopics send a create topic request and returns create topic response
func (*Broker) DeleteAcls ¶
func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error)
DeleteAcls sends a delete acl request and returns a response or error
func (*Broker) DeleteGroups ¶
func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error)
DeleteGroups sends a request to delete groups and returns a response or error
func (*Broker) DeleteRecords ¶
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error)
DeleteRecords send a request to delete records and return delete record response or error
func (*Broker) DeleteTopics ¶
func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error)
DeleteTopics sends a delete topic request and returns delete topic response
func (*Broker) DescribeAcls ¶
func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error)
DescribeAcls sends a describe acl request and returns a response or error
func (*Broker) DescribeConfigs ¶
func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error)
DescribeConfigs sends a request to describe config and returns a response or error
func (*Broker) DescribeGroups ¶
func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error)
DescribeGroups return describe group response or error
func (*Broker) EndTxn ¶
func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error)
EndTxn sends a request to end txn and returns a response or error
func (*Broker) Fetch ¶
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)
Fetch returns a FetchResponse or error
func (*Broker) FetchOffset ¶
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)
FetchOffset returns an offset fetch response or error
func (*Broker) FindCoordinator ¶
func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error)
FindCoordinator sends a find coordinate request and returns a response or error
func (*Broker) GetAvailableOffsets ¶
func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)
GetAvailableOffsets return an offset response or error
func (*Broker) GetConsumerMetadata ¶
func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
func (*Broker) GetMetadata ¶
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)
GetMetadata send a metadata request and returns a metadata response or error
func (*Broker) Heartbeat ¶
func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error)
Heartbeat returns a heartbeat response or error
func (*Broker) ID ¶
ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
func (*Broker) InitProducerID ¶
func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error)
InitProducerID sends an init producer request and returns a response or error
func (*Broker) JoinGroup ¶
func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error)
JoinGroup returns a join group response or error
func (*Broker) LeaveGroup ¶
func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error)
LeaveGroup return a leave group response or error
func (*Broker) ListGroups ¶
func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error)
ListGroups return a list group response or error
func (*Broker) Open ¶
Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.
func (*Broker) Produce ¶
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)
Produce returns a produce response or error
func (*Broker) Rack ¶
Rack returns the broker's rack as retrieved from Kafka's metadata or the empty string if it is not known. The returned value corresponds to the broker's broker.rack configuration setting. Requires protocol version to be at least v0.10.0.0.
func (*Broker) SyncGroup ¶
func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error)
SyncGroup returns a sync group response or error
func (*Broker) TxnOffsetCommit ¶
func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error)
TxnOffsetCommit sends a request to commit transaction offsets and returns a response or error
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
Encode byte encoder encode
type Client ¶
type Client interface { // Config returns the Config struct of the client. This struct should not be // altered after it has been created. Config() *Config // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher. Controller() (*Broker, error) // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker // Topics returns the set of available topics as retrieved from cluster metadata. Topics() ([]string, error) // Partitions returns the sorted list of all partition IDs for the given topic. Partitions(topic string) ([]int32, error) // WritablePartitions returns the sorted list of all writable partition IDs for // the given topic, where "writable" means "having a valid leader accepting // writes". WritablePartitions(topic string) ([]int32, error) // Leader returns the broker object that is the leader of the current // topic/partition, as determined by querying the cluster metadata. Leader(topic string, partitionID int32) (*Broker, error) // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) // InSyncReplicas returns the set of all in-sync replica IDs for the given // partition. In-sync replicas are replicas which are fully caught up with // the partition leader. InSyncReplicas(topic string, partitionID int32) ([]int32, error) // OfflineReplicas returns the set of all offline replica IDs for the given // partition. Offline replicas are replicas which are offline OfflineReplicas(topic string, partitionID int32) ([]int32, error) // RefreshMetadata takes a list of topics and queries the cluster to refresh the // available metadata for those topics. If no topics are provided, it will refresh // metadata for all topics. RefreshMetadata(topics ...string) error // GetOffset queries the cluster to get the most recent available offset at the // given time (in milliseconds) on the topic/partition combination. // Time should be OffsetOldest for the earliest available offset, // OffsetNewest for the offset of the message that will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) // Coordinator returns the coordinating broker for a consumer group. It will // return a locally cached value if it's available. You can call // RefreshCoordinator to update the cached value. This function only works on // Kafka 0.8.2 and higher. Coordinator(consumerGroup string) (*Broker, error) // RefreshCoordinator retrieves the coordinator for a consumer group and stores it // in local cache. This function only works on Kafka 0.8.2 and higher. RefreshCoordinator(consumerGroup string) error // InitProducerID retrieves information required for Idempotent Producer InitProducerID() (*InitProducerIDResponse, error) // Close shuts down all broker connections managed by this client. It is required // to call this function before a client object passes out of scope, as it will // otherwise leak memory. You must close any Producers or Consumers using a client // before you close the client. Close() error // Closed returns true if the client has already had Close called on it Closed() bool }
Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.
type ClusterAdmin ¶
type ClusterAdmin interface { // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher. // It may take several seconds after CreateTopic returns success for all the brokers // to become aware that the topic has been created. During this time, listTopics // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error // List the topics available in the cluster with the default options. ListTopics() (map[string]TopicDetail, error) // Describe some topics in the cluster. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) // Delete a topic. It may take several seconds after the DeleteTopic to returns success // and for all the brokers to become aware that the topics are gone. // During this time, listTopics may continue to return information about the deleted topic. // If delete.topic.enable is false on the brokers, deleteTopic will mark // the topic for deletion, but not actually delete them. // This operation is supported by brokers with version 0.10.1.0 or higher. DeleteTopic(topic string) error // Increase the number of partitions of the topics according to the corresponding values. // If partitions are increased for a topic that has a key, the partition logic or ordering of // the messages will be affected. It may take several seconds after this method returns // success for all the brokers to become aware that the partitions have been created. // During this time, ClusterAdmin#describeTopics may not return information about the // new partitions. This operation is supported by brokers with version 1.0.0 or higher. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error // Delete records whose offset is smaller than the given offset of the corresponding partition. // This operation is supported by brokers with version 0.11.0.0 or higher. DeleteRecords(topic string, partitionOffsets map[int32]int64) error // Get the configuration for the specified resources. // The returned configuration includes default values and the Default is true // can be used to distinguish them from user supplied values. // Config entries where ReadOnly is true cannot be updated. // The value of config entries where Sensitive is true is always nil so // sensitive information is not disclosed. // This operation is supported by brokers with version 0.11.0.0 or higher. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) // Update the configuration for the specified resources with the default options. // This operation is supported by brokers with version 0.11.0.0 or higher. // The resources with their configs (topic is the only resource type with configs // that can be updated currently Updates are not transactional so they may succeed // for some resources while fail for others. The configs for a particular resource are updated automatically. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error // Creates access control lists (ACLs) which are bound to specific resources. // This operation is not transactional so it may succeed for some ACLs while fail for others. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher. CreateACL(resource Resource, acl ACL) error // Lists access control lists (ACLs) according to the supplied filter. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls // This operation is supported by brokers with version 0.11.0.0 or higher. ListAcls(filter ACLFilter) ([]ResourceAcls, error) // Deletes access control lists (ACLs) according to the supplied filters. // This operation is not transactional so it may succeed for some ACLs while fail for others. // This operation is supported by brokers with version 0.11.0.0 or higher. DeleteACL(filter ACLFilter, validateOnly bool) ([]MatchingACL, error) // List the consumer groups available in the cluster. ListConsumerGroups() (map[string]string, error) // Describe the given consumer groups. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error) // List the consumer group offsets available in the cluster. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) // Get information about the nodes in the cluster. DescribeCluster() (brokers []*Broker, controllerID int32, err error) // Close shuts down the admin and closes underlying client. Close() error }
ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker version required. You MUST call Close() on a client to avoid leaks
func NewClusterAdmin ¶
func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error)
NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
type CompressionCodec ¶
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( //CompressionNone no compression CompressionNone CompressionCodec = iota //CompressionGZIP compression using GZIP CompressionGZIP //CompressionSnappy compression using snappy CompressionSnappy //CompressionLZ4 compression using LZ4 CompressionLZ4 //CompressionZSTD compression using ZSTD CompressionZSTD // CompressionLevelDefault is the constant to use in CompressionLevel // to have the default compression level for any codec. The value is picked // that we don't use any existing compression levels. CompressionLevelDefault = -1000 )
func (CompressionCodec) String ¶
func (cc CompressionCodec) String() string
type Config ¶
type Config struct { // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client. Admin struct { // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations, // including topics, brokers, configurations and ACLs (defaults to 3 seconds). Timeout time.Duration } // Net is the namespace for network-level properties used by the Broker, and // shared by the Client/Producer/Consumer. Net struct { // How many outstanding requests a connection is allowed to have before // sending on it blocks (default 5). MaxOpenRequests int // All three of the below configurations are similar to the // `socket.timeout.ms` setting in JVM kafka. All of them default // to 30 seconds. DialTimeout time.Duration // How long to wait for the initial connection. ReadTimeout time.Duration // How long to wait for a response. WriteTimeout time.Duration // How long to wait for a transmit. TLS struct { // Whether or not to use TLS when connecting to the broker // (defaults to false). Enable bool // The TLS configuration to use for secure connections if // enabled (defaults to nil). Config *tls.Config } // SASL based authentication with broker. While there are multiple SASL authentication methods // the current implementation is limited to plaintext (SASL/PLAIN) authentication SASL struct { // Whether or not to use SASL authentication when connecting to the broker // (defaults to false). Enable bool // SASLMechanism is the name of the enabled SASL mechanism. // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN). Mechanism SASLMechanism // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. Handshake bool //username and password for SASL/PLAIN or SASL/SCRAM authentication User string Password string // authz id used for SASL/SCRAM authentication SCRAMAuthzID string // SCRAMClient is a user provided implementation of a SCRAM // client used to perform the SCRAM exchange with the server. SCRAMClient SCRAMClient // TokenProvider is a user-defined callback for generating // access tokens for SASL/OAUTHBEARER auth. See the // AccessTokenProvider interface docs for proper implementation // guidelines. TokenProvider AccessTokenProvider } // KeepAlive specifies the keep-alive period for an active network connection. // If zero, keep-alives are disabled. (default is 0: disabled). KeepAlive time.Duration // LocalAddr is the local address to use when dialing an // address. The address must be of a compatible type for the // network being dialed. // If nil, a local address is automatically chosen. LocalAddr net.Addr Proxy struct { // Whether or not to use proxy when connecting to the broker // (defaults to false). Enable bool // The proxy dialer to use enabled (defaults to nil). Dialer proxy.Dialer } } // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata struct { Retry struct { // The total number of times to retry a metadata request when the // cluster is in the middle of a leader election (default 3). Max int // How long to wait for leader election to occur before retrying // (default 250ms). Similar to the JVM's `retry.backoff.ms`. Backoff time.Duration // Called to compute backoff time dynamically. Useful for implementing // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration } // How frequently to refresh the cluster metadata in the background. // Defaults to 10 minutes. Set to 0 to disable. Similar to // `topic.metadata.refresh.interval.ms` in the JVM version. RefreshFrequency time.Duration // Whether to maintain a full set of metadata for all topics, or just // the minimal set that has been necessary so far. The full set is simpler // and usually more convenient, but can take up a substantial amount of // memory if you have many topics and partitions. Defaults to true. Full bool } // Producer is the namespace for configuration related to producing messages, // used by the Producer. Producer struct { // The maximum permitted size of a message (defaults to 1000000). Should be // set equal to or smaller than the broker's `message.max.bytes`. MaxMessageBytes int // The level of acknowledgement reliability needed from the broker (defaults // to WaitForLocal). Equivalent to the `request.required.acks` setting of the // JVM producer. RequiredAcks RequiredAcks // The maximum duration the broker will wait the receipt of the number of // RequiredAcks (defaults to 10 seconds). This is only relevant when // RequiredAcks is set to WaitForAll or a number > 1. Only supports // millisecond resolution, nanoseconds will be truncated. Equivalent to // the JVM producer's `request.timeout.ms` setting. Timeout time.Duration // The type of compression to use on messages (defaults to no compression). // Similar to `compression.codec` setting of the JVM producer. Compression CompressionCodec // The level of compression to use on messages. The meaning depends // on the actual compression type used and defaults to default compression // level for the codec. CompressionLevel int // Generates partitioners for choosing the partition to send messages to // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. Partitioner PartitionerConstructor // If enabled, the producer will ensure that exactly one copy of each message is // written. Idempotent bool // Return specifies what channels will be populated. If they are set to true, // you must read from the respective channels to prevent deadlock. If, // however, this config is used to create a `SyncProducer`, both must be set // to true and you shall not read from the channels since the producer does // this internally. Return struct { // If enabled, successfully delivered messages will be returned on the // Successes channel (default disabled). Successes bool // If enabled, messages that failed to deliver will be returned on the // Errors channel, including error (default enabled). Errors bool } // The following config options control how often messages are batched up and // sent to the broker. By default, messages are sent as fast as possible, and // all messages received while the current batch is in-flight are placed // into the subsequent batch. Flush struct { // The best-effort number of bytes needed to trigger a flush. Use the // global sarama.MaxRequestSize to set a hard upper limit. Bytes int // The best-effort number of messages needed to trigger a flush. Use // `MaxMessages` to set a hard upper limit. Messages int // The best-effort frequency of flushes. Equivalent to // `queue.buffering.max.ms` setting of JVM producer. Frequency time.Duration // The maximum number of messages the producer will send in a single // broker request. Defaults to 0 for unlimited. Similar to // `queue.buffering.max.messages` in the JVM producer. MaxMessages int } Retry struct { // The total number of times to retry sending a message (default 3). // Similar to the `message.send.max.retries` setting of the JVM producer. Max int // How long to wait for the cluster to settle between retries // (default 100ms). Similar to the `retry.backoff.ms` setting of the // JVM producer. Backoff time.Duration // Called to compute backoff time dynamically. Useful for implementing // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration } } // Consumer is the namespace for configuration related to consuming messages, // used by the Consumer. Consumer struct { // Group is the namespace for configuring consumer group. Group struct { Session struct { // The timeout used to detect consumer failures when using Kafka's group management facility. // The consumer sends periodic heartbeats to indicate its liveness to the broker. // If no heartbeats are received by the broker before the expiration of this session timeout, // then the broker will remove this consumer from the group and initiate a rebalance. // Note that the value must be in the allowable range as configured in the broker configuration // by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s) Timeout time.Duration } Heartbeat struct { // The expected time between heartbeats to the consumer coordinator when using Kafka's group // management facilities. Heartbeats are used to ensure that the consumer's session stays active and // to facilitate rebalancing when new consumers join or leave the group. // The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no // higher than 1/3 of that value. // It can be adjusted even lower to control the expected time for normal rebalances (default 3s) Interval time.Duration } Rebalance struct { // Strategy for allocating topic partitions to members (default BalanceStrategyRange) Strategy BalanceStrategy // The maximum allowed time for each worker to join the group once a rebalance has begun. // This is basically a limit on the amount of time needed for all tasks to flush any pending // data and commit offsets. If the timeout is exceeded, then the worker will be removed from // the group, which will cause offset commit failures (default 60s). Timeout time.Duration Retry struct { // When a new consumer joins a consumer group the set of consumers attempt to "rebalance" // the load to assign partitions to each consumer. If the set of consumers changes while // this assignment is taking place the rebalance will fail and retry. This setting controls // the maximum number of attempts before giving up (default 4). Max int // Backoff time between retries during rebalance (default 2s) Backoff time.Duration } } Member struct { // Custom metadata to include when joining the group. The user data for all joined members // can be retrieved by sending a DescribeGroupRequest to the broker that is the // coordinator for the group. UserData []byte } } Retry struct { // How long to wait after a failing to read from a partition before // trying again (default 2s). Backoff time.Duration // Called to compute backoff time dynamically. Useful for implementing // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries int) time.Duration } // Fetch is the namespace for controlling how many bytes are retrieved by any // given request. Fetch struct { // The minimum number of message bytes to fetch in a request - the broker // will wait until at least this many are available. The default is 1, // as 0 causes the consumer to spin when no messages are available. // Equivalent to the JVM's `fetch.min.bytes`. Min int32 // The default number of message bytes to fetch from the broker in each // request (default 1MB). This should be larger than the majority of // your messages, or else the consumer will spend a lot of time // negotiating sizes and not actually consuming. Similar to the JVM's // `fetch.message.max.bytes`. Default int32 // The maximum number of message bytes to fetch from the broker in a // single request. Messages larger than this will return // ErrMessageTooLarge and will not be consumable, so you must be sure // this is at least as large as your largest message. Defaults to 0 // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The // global `sarama.MaxResponseSize` still applies. Max int32 } // The maximum amount of time the broker will wait for Consumer.Fetch.Min // bytes to become available before it returns fewer than that anyways. The // default is 250ms, since 0 causes the consumer to spin when no events are // available. 100-500ms is a reasonable range for most cases. Kafka only // supports precision up to milliseconds; nanoseconds will be truncated. // Equivalent to the JVM's `fetch.wait.max.ms`. MaxWaitTime time.Duration // The maximum amount of time the consumer expects a message takes to // process for the user. If writing to the Messages channel takes longer // than this, that partition will stop fetching more messages until it // can proceed again. // Note that, since the Messages channel is buffered, the actual grace time is // (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. // If a message is not written to the Messages channel between two ticks // of the expiryTicker then a timeout is detected. // Using a ticker instead of a timer to detect timeouts should typically // result in many fewer calls to Timer functions which may result in a // significant performance improvement if many messages are being sent // and timeouts are infrequent. // The disadvantage of using a ticker instead of a timer is that // timeouts will be less accurate. That is, the effective timeout could // be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For // example, if `MaxProcessingTime` is 100ms then a delay of 180ms // between two messages being sent may not be recognized as a timeout. MaxProcessingTime time.Duration // Return specifies what channels will be populated. If they are set to true, // you must read from them to prevent deadlock. Return struct { // If enabled, any errors that occurred while consuming are returned on // the Errors channel (default disabled). Errors bool } // Offsets specifies configuration for how and when to commit consumed // offsets. This currently requires the manual use of an OffsetManager // but will eventually be automated. Offsets struct { // How frequently to commit updated offsets. Defaults to 1s. CommitInterval time.Duration // The initial offset to use if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. Initial int64 // The retention duration for committed offsets. If zero, disabled // (in which case the `offsets.retention.minutes` option on the // broker will be used). Kafka only supports precision up to // milliseconds; nanoseconds will be truncated. Requires Kafka // broker version 0.9.0 or later. // (default is 0: disabled). Retention time.Duration Retry struct { // The total number of times to retry failing commit // requests during OffsetManager shutdown (default 3). Max int } } // IsolationLevel support 2 mode: // - use `ReadUncommitted` (default) to consume and return all messages in message channel // - use `ReadCommitted` to hide messages that are part of an aborted transaction IsolationLevel IsolationLevel } // A user-provided string sent with every request to the brokers for logging, // debugging, and auditing purposes. Defaults to "sarama", but you should // probably set it to something specific to your application. ClientID string // The number of events to buffer in internal and external channels. This // permits the producer and consumer to continue processing some messages // in the background while user code is working, greatly improving throughput. // Defaults to 256. ChannelBufferSize int // The version of Kafka that Sarama will assume it is running against. // Defaults to the oldest supported stable version. Since Kafka provides // backwards-compatibility, setting it to a version older than you have // will not break anything, although it may prevent you from using the // latest features. Setting it to a version greater than you are actually // running may lead to random breakage. Version KafkaVersion // The registry to define metrics into. // Defaults to a local registry. // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true" // prior to starting Sarama. // See Examples on how to use the metrics registry MetricRegistry metrics.Registry }
Config is used to pass multiple configuration options to Sarama's constructors.
type ConfigEntry ¶
type ConfigEntry struct { Name string Value string ReadOnly bool Default bool Source ConfigSource Sensitive bool Synonyms []*ConfigSynonym }
ConfigEntry entry
type ConfigResource ¶
type ConfigResource struct { Type ConfigResourceType Name string ConfigNames []string }
ConfigResource resource
type ConfigResourceType ¶
type ConfigResourceType int8
ConfigResourceType is a type for config resource
const ( //UnknownResource constant type UnknownResource ConfigResourceType = iota //AnyResource constant type AnyResource //TopicResource constant type TopicResource //GroupResource constant type GroupResource //ClusterResource constant type ClusterResource //BrokerResource constant type BrokerResource )
type ConfigSource ¶
type ConfigSource int8
ConfigSource config source
const ( // SourceUnknown unknown SourceUnknown ConfigSource = iota // SourceTopic topic SourceTopic // SourceDynamicBroker dynamic broker SourceDynamicBroker // SourceDynamicDefaultBroker dynamic default broker SourceDynamicDefaultBroker // SourceStaticBroker static broker SourceStaticBroker // SourceDefault default SourceDefault )
func (ConfigSource) String ¶
func (s ConfigSource) String() string
type ConfigSynonym ¶
type ConfigSynonym struct { ConfigName string ConfigValue string Source ConfigSource }
ConfigSynonym synonym
type ConfigurationError ¶
type ConfigurationError string
ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type Consumer ¶
type Consumer interface { // Topics returns the set of available topics as retrieved from the cluster // metadata. This method is the same as Client.Topics(), and is provided for // convenience. Topics() ([]string, error) // Partitions returns the sorted list of all partition IDs for the given topic. // This method is the same as Client.Partitions(), and is provided for convenience. Partitions(topic string) ([]int32, error) // ConsumePartition creates a PartitionConsumer on the given topic/partition with // the given offset. It will return an error if this Consumer is already consuming // on the given topic/partition. Offset can be a literal offset, or OffsetNewest // or OffsetOldest ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) // HighWaterMarks returns the current high water marks for each topic and partition. // Consistency between partitions is not guaranteed since high water marks are updated separately. HighWaterMarks() map[string]map[int32]int64 // Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. Close() error }
Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
func NewConsumer ¶
NewConsumer creates a new consumer using the given broker addresses and configuration.
func NewConsumerFromClient ¶
NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.
type ConsumerError ¶
ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.
func (ConsumerError) Error ¶
func (ce ConsumerError) Error() string
type ConsumerErrors ¶
type ConsumerErrors []*ConsumerError
ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.
func (ConsumerErrors) Error ¶
func (ce ConsumerErrors) Error() string
type ConsumerGroup ¶
type ConsumerGroup interface { // Consume joins a cluster of consumers for a given list of topics and // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler. // // The life-cycle of a session is represented by the following steps: // // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers) // and is assigned their "fair share" of partitions, aka 'claims'. // 2. Before processing starts, the handler's Setup() hook is called to notify the user // of the claims and allow any necessary preparation or alteration of state. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected // from concurrent reads/writes. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the // parent context is cancelled or when a server-side rebalance cycle is initiated. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called // to allow the user to perform any final tasks before a rebalance. // 6. Finally, marked offsets are committed one last time before claims are released. // // Please note, that once a rebalance is triggered, sessions must be completed within // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset // commit failures. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error // Errors returns a read channel of errors that occurred during the consumer life-cycle. // By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan error // Close stops the ConsumerGroup and detaches any running sessions. It is required to call // this function before the object passes out of scope, as it will otherwise leak memory. Close() error }
ConsumerGroup is responsible for dividing up processing of topics and partitions over a collection of processes (the members of the consumer group).
func NewConsumerGroup ¶
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error)
NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
func NewConsumerGroupFromClient ¶
func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error)
NewConsumerGroupFromClient creates a new consumer group using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer. PLEASE NOTE: consumer groups can only re-use but not share clients.
type ConsumerGroupClaim ¶
type ConsumerGroupClaim interface { // Topic returns the consumed topic name. Topic() string // Partition returns the consumed partition. Partition() int32 // InitialOffset returns the initial offset that was used as a starting point for this claim. InitialOffset() int64 // HighWaterMarkOffset returns the high water mark offset of the partition, // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 // Messages returns the read channel for the messages that are returned by // the broker. The messages channel will be closed when a new rebalance cycle // is due. You must finish processing and mark offsets within // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually // re-assigned to another group member. Messages() <-chan *ConsumerMessage }
ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
type ConsumerGroupHandler ¶
type ConsumerGroupHandler interface { // Setup is run at the beginning of a new session, before ConsumeClaim. Setup(ConsumerGroupSession) error // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time. Cleanup(ConsumerGroupSession) error // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). // Once the Messages() channel is closed, the Handler must finish its processing // loop and exit. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error }
ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).
PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.
type ConsumerGroupMemberAssignment ¶
type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 UserData []byte }
ConsumerGroupMemberAssignment holds the member assignment for a consume group
type ConsumerGroupMemberMetadata ¶
ConsumerGroupMemberMetadata holds the metadata for consumer group
type ConsumerGroupSession ¶
type ConsumerGroupSession interface { // Claims returns information about the claimed partitions by topic. Claims() map[string][]int32 // MemberID returns the cluster member ID. MemberID() string // GenerationID returns the current generation ID. GenerationID() int32 // MarkOffset marks the provided offset, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // // To follow upstream conventions, you are expected to mark the offset of the // next message to read, not the last message read. Thus, when calling `MarkOffset` // you should typically add one to the offset of the last consumed message. // // Note: calling MarkOffset does not necessarily commit the offset to the backend // store immediately for efficiency reasons, and it may never be committed if // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. MarkOffset(topic string, partition int32, offset int64, metadata string) // ResetOffset resets to the provided offset, alongside a metadata string that // represents the state of the partition consumer at that point in time. Reset // acts as a counterpart to MarkOffset, the difference being that it allows to // reset an offset to an earlier or smaller value, where MarkOffset only // allows incrementing the offset. cf MarkOffset for more details. ResetOffset(topic string, partition int32, offset int64, metadata string) // MarkMessage marks a message as consumed. MarkMessage(msg *ConsumerMessage, metadata string) // Context returns the session context. Context() context.Context }
ConsumerGroupSession represents a consumer group member session.
type ConsumerMessage ¶
type ConsumerMessage struct { Headers []*RecordHeader // only set if kafka is version 0.11+ Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp Key, Value []byte Topic string Partition int32 Offset int64 }
ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
ConsumerMetadataRequest is used for metadata requests
type ConsumerMetadataResponse ¶
type ConsumerMetadataResponse struct { Err KError Coordinator *Broker CoordinatorID int32 // deprecated: use Coordinator.ID() CoordinatorHost string // deprecated: use Coordinator.Addr() CoordinatorPort int32 // deprecated: use Coordinator.Addr() }
ConsumerMetadataResponse holds the response for a consumer group meta data requests
type ControlRecord ¶
type ControlRecord struct { Version int16 CoordinatorEpoch int32 Type ControlRecordType }
ControlRecord are returned as a record by fetchRequest However unlike "normal" records, they mean nothing application wise. They only serve internal logic for supporting transactions.
type ControlRecordType ¶
type ControlRecordType int
ControlRecordType ...
const ( //ControlRecordAbort is a control record for abort ControlRecordAbort ControlRecordType = iota //ControlRecordCommit is a control record for commit ControlRecordCommit //ControlRecordUnknown is a control record of unknown type ControlRecordUnknown )
type CoordinatorType ¶
type CoordinatorType int8
CoordinatorType type
const ( // CoordinatorGroup group CoordinatorGroup CoordinatorType = iota // CoordinatorTransaction transaction CoordinatorTransaction )
type CreateAclsRequest ¶
type CreateAclsRequest struct { Version int16 ACLCreations []*ACLCreation }
CreateAclsRequest is an acl creation request
type CreateAclsResponse ¶
type CreateAclsResponse struct { ThrottleTime time.Duration ACLCreationResponses []*ACLCreationResponse }
CreateAclsResponse is a an acl response creation type
type CreatePartitionsRequest ¶
type CreatePartitionsRequest struct { TopicPartitions map[string]*TopicPartition Timeout time.Duration ValidateOnly bool }
CreatePartitionsRequest request
type CreatePartitionsResponse ¶
type CreatePartitionsResponse struct { ThrottleTime time.Duration TopicPartitionErrors map[string]*TopicPartitionError }
CreatePartitionsResponse response
type CreateTopicsRequest ¶
type CreateTopicsRequest struct { Version int16 TopicDetails map[string]*TopicDetail Timeout time.Duration ValidateOnly bool }
CreateTopicsRequest request
type CreateTopicsResponse ¶
type CreateTopicsResponse struct { Version int16 ThrottleTime time.Duration TopicErrors map[string]*TopicError }
CreateTopicsResponse response
type DeleteAclsRequest ¶
DeleteAclsRequest is a delete acl request
type DeleteAclsResponse ¶
type DeleteAclsResponse struct { Version int16 ThrottleTime time.Duration FilterResponses []*FilterResponse }
DeleteAclsResponse is a delete acl response
type DeleteGroupsRequest ¶
type DeleteGroupsRequest struct {
Groups []string
}
DeleteGroupsRequest request
func (*DeleteGroupsRequest) AddGroup ¶
func (r *DeleteGroupsRequest) AddGroup(group string)
AddGroup add group
type DeleteGroupsResponse ¶
DeleteGroupsResponse response
type DeleteRecordsRequest ¶
type DeleteRecordsRequest struct { Topics map[string]*DeleteRecordsRequestTopic Timeout time.Duration }
DeleteRecordsRequest request
type DeleteRecordsRequestTopic ¶
DeleteRecordsRequestTopic request topic
type DeleteRecordsResponse ¶
type DeleteRecordsResponse struct { Version int16 ThrottleTime time.Duration Topics map[string]*DeleteRecordsResponseTopic }
DeleteRecordsResponse response
type DeleteRecordsResponsePartition ¶
DeleteRecordsResponsePartition response partition
type DeleteRecordsResponseTopic ¶
type DeleteRecordsResponseTopic struct {
Partitions map[int32]*DeleteRecordsResponsePartition
}
DeleteRecordsResponseTopic response topic
type DeleteTopicsRequest ¶
DeleteTopicsRequest request
type DeleteTopicsResponse ¶
type DeleteTopicsResponse struct { Version int16 ThrottleTime time.Duration TopicErrorCodes map[string]KError }
DeleteTopicsResponse delete topics response
type DescribeAclsRequest ¶
DescribeAclsRequest is a secribe acl request type
type DescribeAclsResponse ¶
type DescribeAclsResponse struct { Version int16 ThrottleTime time.Duration Err KError ErrMsg *string ResourceAcls []*ResourceAcls }
DescribeAclsResponse is a describe acl response type
type DescribeConfigsRequest ¶
type DescribeConfigsRequest struct { Version int16 Resources []*ConfigResource IncludeSynonyms bool }
DescribeConfigsRequest request
type DescribeConfigsResponse ¶
type DescribeConfigsResponse struct { Version int16 ThrottleTime time.Duration Resources []*ResourceResponse }
DescribeConfigsResponse response
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct {
Groups []string
}
DescribeGroupsRequest group request
func (*DescribeGroupsRequest) AddGroup ¶
func (r *DescribeGroupsRequest) AddGroup(group string)
AddGroup add group
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct {
Groups []*GroupDescription
}
DescribeGroupsResponse response
type DynamicConsistencyPartitioner ¶
type DynamicConsistencyPartitioner interface { Partitioner // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency, // but takes in the message being partitioned so that the partitioner can // make a per-message determination. MessageRequiresConsistency(message *ProducerMessage) bool }
DynamicConsistencyPartitioner can optionally be implemented by Partitioners in order to allow more flexibility than is originally allowed by the RequiresConsistency method in the Partitioner interface. This allows partitioners to require consistency sometimes, but not all times. It's useful for, e.g., the HashPartitioner, which does not require consistency if the message key is nil.
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type EndTxnRequest ¶
type EndTxnRequest struct { TransactionalID string ProducerID int64 ProducerEpoch int16 TransactionResult bool }
EndTxnRequest request
type EndTxnResponse ¶
EndTxnResponse response
type FetchRequest ¶
type FetchRequest struct { MaxWaitTime int32 MinBytes int32 MaxBytes int32 Version int16 Isolation IsolationLevel // contains filtered or unexported fields }
FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchResponse ¶
type FetchResponse struct { Blocks map[string]map[int32]*FetchResponseBlock ThrottleTime time.Duration Version int16 // v1 requires 0.9+, v2 requires 0.10+ LogAppendTime bool Timestamp time.Time }
FetchResponse response
func (*FetchResponse) AddControlRecord ¶
func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType)
AddControlRecord add control record
func (*FetchResponse) AddControlRecordWithTimestamp ¶
func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time)
AddControlRecordWithTimestamp add control record with timestamp
func (*FetchResponse) AddError ¶
func (r *FetchResponse) AddError(topic string, partition int32, err KError)
AddError add error
func (*FetchResponse) AddMessage ¶
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
AddMessage add message
func (*FetchResponse) AddMessageWithTimestamp ¶
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8)
AddMessageWithTimestamp add message with timestamp
func (*FetchResponse) AddRecord ¶
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)
AddRecord add record
func (*FetchResponse) AddRecordBatch ¶
func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool)
AddRecordBatch add record batch
func (*FetchResponse) AddRecordBatchWithTimestamp ¶
func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time)
AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
func (*FetchResponse) AddRecordWithTimestamp ¶
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time)
AddRecordWithTimestamp add record with timestamp
func (*FetchResponse) GetBlock ¶
func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
GetBlock get block
func (*FetchResponse) SetLastOffsetDelta ¶
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)
SetLastOffsetDelta set last offset delta
func (*FetchResponse) SetLastStableOffset ¶
func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)
SetLastStableOffset set last stable offset
type FetchResponseBlock ¶
type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 LastStableOffset int64 AbortedTransactions []*AbortedTransaction Records *Records // deprecated: use FetchResponseBlock.RecordsSet RecordsSet []*Records Partial bool }
FetchResponseBlock block
type FilterResponse ¶
type FilterResponse struct { Err KError ErrMsg *string MatchingAcls []*MatchingACL }
FilterResponse is a filter response type
type FindCoordinatorRequest ¶
type FindCoordinatorRequest struct { Version int16 CoordinatorKey string CoordinatorType CoordinatorType }
FindCoordinatorRequest request
type FindCoordinatorResponse ¶
type FindCoordinatorResponse struct { Version int16 ThrottleTime time.Duration Err KError ErrMsg *string Coordinator *Broker }
FindCoordinatorResponse response
type GroupDescription ¶
type GroupDescription struct { Err KError GroupID string State string ProtocolType string Protocol string Members map[string]*GroupMemberDescription }
GroupDescription description
type GroupMemberDescription ¶
type GroupMemberDescription struct { ClientID string ClientHost string MemberMetadata []byte MemberAssignment []byte }
GroupMemberDescription description
func (*GroupMemberDescription) GetMemberAssignment ¶
func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
GetMemberAssignment get member assignment
func (*GroupMemberDescription) GetMemberMetadata ¶
func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)
GetMemberMetadata get member metadata
type GroupProtocol ¶
GroupProtocol group protocol
type HashPartitionerOption ¶
type HashPartitionerOption func(*hashPartitioner)
HashPartitionerOption lets you modify default values of the partitioner
func WithAbsFirst ¶
func WithAbsFirst() HashPartitionerOption
WithAbsFirst means that the partitioner handles absolute values in the same way as the reference Java implementation
func WithCustomFallbackPartitioner ¶
func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption
WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
func WithCustomHashFunction ¶
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption
WithCustomHashFunction lets you specify what hash function to use for the partitioning
type HeartbeatRequest ¶
HeartbeatRequest request
type InitProducerIDRequest ¶
InitProducerIDRequest request
type InitProducerIDResponse ¶
type InitProducerIDResponse struct { ThrottleTime time.Duration Err KError ProducerID int64 ProducerEpoch int16 }
InitProducerIDResponse response
type IsolationLevel ¶
type IsolationLevel int8
IsolationLevel level
const ( // ReadUncommitted un committed ReadUncommitted IsolationLevel = iota // ReadCommitted committed ReadCommitted )
type JoinGroupRequest ¶
type JoinGroupRequest struct { Version int16 GroupID string SessionTimeout int32 RebalanceTimeout int32 MemberID string ProtocolType string GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols OrderedGroupProtocols []*GroupProtocol }
JoinGroupRequest join group request
func (*JoinGroupRequest) AddGroupProtocol ¶
func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)
AddGroupProtocol add group protocol
func (*JoinGroupRequest) AddGroupProtocolMetadata ¶
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error
AddGroupProtocolMetadata add group protocol metadata
type JoinGroupResponse ¶
type JoinGroupResponse struct { Version int16 ThrottleTime int32 Err KError GenerationID int32 GroupProtocol string LeaderID string MemberID string Members map[string][]byte }
JoinGroupResponse response
func (*JoinGroupResponse) GetMembers ¶
func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error)
GetMembers get members
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( ErrNoError KError = 0 ErrUnknown KError = -1 ErrOffsetOutOfRange KError = 1 ErrInvalidMessage KError = 2 ErrUnknownTopicOrPartition KError = 3 ErrInvalidMessageSize KError = 4 ErrLeaderNotAvailable KError = 5 ErrNotLeaderForPartition KError = 6 ErrRequestTimedOut KError = 7 ErrBrokerNotAvailable KError = 8 ErrReplicaNotAvailable KError = 9 ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 ErrInvalidTopic KError = 17 ErrMessageSetSizeTooLarge KError = 18 ErrNotEnoughReplicas KError = 19 ErrNotEnoughReplicasAfterAppend KError = 20 ErrInvalidRequiredAcks KError = 21 ErrIllegalGeneration KError = 22 ErrInconsistentGroupProtocol KError = 23 ErrInvalidGroupID KError = 24 ErrUnknownMemberID KError = 25 ErrInvalidSessionTimeout KError = 26 ErrRebalanceInProgress KError = 27 ErrInvalidCommitOffsetSize KError = 28 ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 ErrInvalidTimestamp KError = 32 ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 ErrTopicAlreadyExists KError = 36 ErrInvalidPartitions KError = 37 ErrInvalidReplicationFactor KError = 38 ErrInvalidReplicaAssignment KError = 39 ErrInvalidConfig KError = 40 ErrNotController KError = 41 ErrInvalidRequest KError = 42 ErrUnsupportedForMessageFormat KError = 43 ErrPolicyViolation KError = 44 ErrOutOfOrderSequenceNumber KError = 45 ErrDuplicateSequenceNumber KError = 46 ErrInvalidProducerEpoch KError = 47 ErrInvalidTxnState KError = 48 ErrInvalidProducerIDMapping KError = 49 ErrInvalidTransactionTimeout KError = 50 ErrConcurrentTransactions KError = 51 ErrTransactionCoordinatorFenced KError = 52 ErrTransactionalIDAuthorizationFailed KError = 53 ErrSecurityDisabled KError = 54 ErrOperationNotAttempted KError = 55 ErrKafkaStorageError KError = 56 ErrLogDirNotFound KError = 57 ErrSASLAuthenticationFailed KError = 58 ErrUnknownProducerID KError = 59 ErrReassignmentInProgress KError = 60 ErrDelegationTokenAuthDisabled KError = 61 ErrDelegationTokenNotFound KError = 62 ErrDelegationTokenOwnerMismatch KError = 63 ErrDelegationTokenRequestNotAllowed KError = 64 ErrDelegationTokenAuthorizationFailed KError = 65 ErrDelegationTokenExpired KError = 66 ErrInvalidPrincipalType KError = 67 ErrNonEmptyGroup KError = 68 ErrGroupIDNotFound KError = 69 ErrFetchSessionIDNotFound KError = 70 ErrInvalidFetchSessionEpoch KError = 71 ErrListenerNotFound KError = 72 ErrTopicDeletionDisabled KError = 73 ErrFencedLeaderEpoch KError = 74 ErrUnknownLeaderEpoch KError = 75 ErrUnsupportedCompressionType KError = 76 )
Numeric error codes returned by the Kafka server.
type KafkaVersion ¶
type KafkaVersion struct {
// contains filtered or unexported fields
}
KafkaVersion instances represent versions of the upstream Kafka broker.
func ParseKafkaVersion ¶
func ParseKafkaVersion(s string) (KafkaVersion, error)
ParseKafkaVersion parses and returns kafka version or error from a string
func (KafkaVersion) IsAtLeast ¶
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool
IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:
V1.IsAtLeast(V2) // false V2.IsAtLeast(V1) // true
func (KafkaVersion) String ¶
func (v KafkaVersion) String() string
type LeaveGroupRequest ¶
LeaveGroupRequest request
type ListGroupsResponse ¶
ListGroupsResponse response
type MatchingACL ¶
MatchingACL is a matching acl type
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level LogAppendTime bool // the used timestamp is LogAppendTime Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap Version int8 // v1 requires Kafka 0.10 Timestamp time.Time // the timestamp of the message (version 1+ only) // contains filtered or unexported fields }
Message is a kafka message type
type MessageBlock ¶
MessageBlock message block
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock OverflowMessage bool // whether the set on the wire contained an overflow message Messages []*MessageBlock }
MessageSet message set
type MetadataRequest ¶
MetadataRequest request
type MetadataResponse ¶
type MetadataResponse struct { Version int16 ThrottleTimeMs int32 Brokers []*Broker ClusterID *string ControllerID int32 Topics []*TopicMetadata }
MetadataResponse response
func (*MetadataResponse) AddBroker ¶
func (r *MetadataResponse) AddBroker(addr string, id int32)
AddBroker add broker
func (*MetadataResponse) AddTopic ¶
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata
AddTopic add topic
func (*MetadataResponse) AddTopicPartition ¶
func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError)
AddTopicPartition add topic partition
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string ConsumerGroupGeneration int32 // v1 or later ConsumerID string // v1 or later RetentionTime int64 // v2 or later // Version can be: // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) // - 2 (kafka 0.9.0 and later) // - 3 (kafka 0.11.0 and later) // - 4 (kafka 2.0.0 and later) Version int16 // contains filtered or unexported fields }
OffsetCommitRequest request
type OffsetCommitResponse ¶
type OffsetCommitResponse struct { Version int16 ThrottleTimeMs int32 Errors map[string]map[int32]KError }
OffsetCommitResponse response
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { Version int16 ConsumerGroup string // contains filtered or unexported fields }
OffsetFetchRequest request
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
AddPartition add partition
func (*OffsetFetchRequest) ZeroPartitions ¶
func (r *OffsetFetchRequest) ZeroPartitions()
ZeroPartitions zero partitions
type OffsetFetchResponse ¶
type OffsetFetchResponse struct { Version int16 ThrottleTimeMs int32 Blocks map[string]map[int32]*OffsetFetchResponseBlock Err KError }
OffsetFetchResponse response
func (*OffsetFetchResponse) AddBlock ¶
func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)
AddBlock add block
func (*OffsetFetchResponse) GetBlock ¶
func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock
GetBlock get block
type OffsetFetchResponseBlock ¶
OffsetFetchResponseBlock response block
type OffsetManager ¶
type OffsetManager interface { // ManagePartition creates a PartitionOffsetManager on the given topic/partition. // It will return an error if this OffsetManager is already managing the given // topic/partition. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) // Close stops the OffsetManager from managing offsets. It is required to call // this function before an OffsetManager object passes out of scope, as it // will otherwise leak memory. You must call this after all the // PartitionOffsetManagers are closed. Close() error }
OffsetManager uses Kafka to store and fetch consumed partition offsets.
func NewOffsetManagerFromClient ¶
func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error)
NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.
type OffsetRequest ¶
type OffsetRequest struct { Version int16 // contains filtered or unexported fields }
OffsetRequest request
func (*OffsetRequest) AddBlock ¶
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32)
AddBlock add block
func (*OffsetRequest) SetReplicaID ¶
func (r *OffsetRequest) SetReplicaID(id int32)
SetReplicaID set replica id
type OffsetResponse ¶
type OffsetResponse struct { Version int16 Blocks map[string]map[int32]*OffsetResponseBlock }
OffsetResponse response
func (*OffsetResponse) AddTopicPartition ¶
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)
AddTopicPartition add topic partition
func (*OffsetResponse) GetBlock ¶
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock
GetBlock block
type OffsetResponseBlock ¶
type OffsetResponseBlock struct { Err KError Offsets []int64 // Version 0 Offset int64 // Version 1 Timestamp int64 // Version 1 }
OffsetResponseBlock block
type PacketDecodingError ¶
type PacketDecodingError struct {
Info string
}
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
type PacketEncodingError struct {
Info string
}
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type PartitionConsumer ¶
type PartitionConsumer interface { // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call // this before calling Close on the underlying client. AsyncClose() // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service // the Messages channel when this function is called, you will be competing with Close for messages; consider // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client. Close() error // Messages returns the read channel for the messages that are returned by // the broker. Messages() <-chan *ConsumerMessage // Errors returns a read channel of errors that occurred during consuming, if // enabled. By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan *ConsumerError // HighWaterMarkOffset returns the high water mark offset of the partition, // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 }
PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out of scope.
The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will also drain the Messages channel, harvest all errors & return them once cleanup has completed.
type PartitionError ¶
PartitionError is a partition error type
type PartitionMetadata ¶
type PartitionMetadata struct { Err KError ID int32 Leader int32 Replicas []int32 Isr []int32 OfflineReplicas []int32 }
PartitionMetadata metadata
type PartitionOffsetManager ¶
type PartitionOffsetManager interface { // NextOffset returns the next offset that should be consumed for the managed // partition, accompanied by metadata which can be used to reconstruct the state // of the partition consumer when it resumes. NextOffset() will return // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset // was committed for this partition yet. NextOffset() (int64, string) // MarkOffset marks the provided offset, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // // To follow upstream conventions, you are expected to mark the offset of the // next message to read, not the last message read. Thus, when calling `MarkOffset` // you should typically add one to the offset of the last consumed message. // // Note: calling MarkOffset does not necessarily commit the offset to the backend // store immediately for efficiency reasons, and it may never be committed if // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. MarkOffset(offset int64, metadata string) // ResetOffset resets to the provided offset, alongside a metadata string that // represents the state of the partition consumer at that point in time. Reset // acts as a counterpart to MarkOffset, the difference being that it allows to // reset an offset to an earlier or smaller value, where MarkOffset only // allows incrementing the offset. cf MarkOffset for more details. ResetOffset(offset int64, metadata string) // Errors returns a read channel of errors that occur during offset management, if // enabled. By default, errors are logged and not returned over this channel. If // you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan *ConsumerError // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will // return immediately, after which you should wait until the 'errors' channel has // been drained and closed. It is required to call this function, or Close before // a consumer object passes out of scope, as it will otherwise leak memory. You // must call this before calling Close on the underlying client. AsyncClose() // Close stops the PartitionOffsetManager from managing offsets. It is required to // call this function (or AsyncClose) before a PartitionOffsetManager object // passes out of scope, as it will otherwise leak memory. You must call this // before calling Close on the underlying client. Close() error }
PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.
type PartitionOffsetMetadata ¶
PartitionOffsetMetadata metadata
type Partitioner ¶
type Partitioner interface { // Partition takes a message and partition count and chooses a partition Partition(message *ProducerMessage, numPartitions int32) (int32, error) // RequiresConsistency indicates to the user of the partitioner whether the // mapping of key->partition is consistent or not. Specifically, if a // partitioner requires consistency then it must be allowed to choose from all // partitions (even ones known to be unavailable), and its choice must be // respected by the caller. The obvious example is the HashPartitioner. RequiresConsistency() bool }
Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.
func NewHashPartitioner ¶
func NewHashPartitioner(topic string) Partitioner
NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
func NewManualPartitioner ¶
func NewManualPartitioner(topic string) Partitioner
NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.
func NewRandomPartitioner ¶
func NewRandomPartitioner(topic string) Partitioner
NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
func NewReferenceHashPartitioner ¶
func NewReferenceHashPartitioner(topic string) Partitioner
NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values in the same way as the reference Java implementation. NewHashPartitioner was supposed to do that but it had a mistake and now there are people depending on both behaviours. This will all go away on the next major version bump.
func NewRoundRobinPartitioner ¶
func NewRoundRobinPartitioner(topic string) Partitioner
NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
type PartitionerConstructor ¶
type PartitionerConstructor func(topic string) Partitioner
PartitionerConstructor is the type for a function capable of constructing new Partitioners.
func NewCustomHashPartitioner ¶
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
func NewCustomPartitioner ¶
func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor
NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
type ProduceRequest ¶
type ProduceRequest struct { TransactionalID *string RequiredAcks RequiredAcks Timeout int32 Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11 // contains filtered or unexported fields }
ProduceRequest request
func (*ProduceRequest) AddBatch ¶
func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch)
AddBatch add batch
func (*ProduceRequest) AddMessage ¶
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
AddMessage add message
func (*ProduceRequest) AddSet ¶
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)
AddSet add set
type ProduceResponse ¶
type ProduceResponse struct { Blocks map[string]map[int32]*ProduceResponseBlock Version int16 ThrottleTime time.Duration // only provided if Version >= 1 }
ProduceResponse response
func (*ProduceResponse) AddTopicPartition ¶
func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)
AddTopicPartition add topic partition
func (*ProduceResponse) GetBlock ¶
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock
GetBlock get block
type ProduceResponseBlock ¶
type ProduceResponseBlock struct { Err KError Offset int64 // only provided if Version >= 2 and the broker is configured with `LogAppendTime` Timestamp time.Time }
ProduceResponseBlock block
type ProducerError ¶
type ProducerError struct { Msg *ProducerMessage Err error }
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
func (ProducerError) Error ¶
func (pe ProducerError) Error() string
type ProducerErrors ¶
type ProducerErrors []*ProducerError
ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.
func (ProducerErrors) Error ¶
func (pe ProducerErrors) Error() string
type ProducerMessage ¶
type ProducerMessage struct { Topic string // The Kafka topic for this message. // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key Encoder // The actual message to store in Kafka. Pre-existing Encoders include // StringEncoder and ByteEncoder. Value Encoder // The headers are key-value pairs that are transparently passed // by Kafka between producers and consumers. Headers []RecordHeader // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} // Offset is the offset of the message stored on the broker. This is only // guaranteed to be defined if the message was successfully delivered and // RequiredAcks is not NoResponse. Offset int64 // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 // Timestamp is the timestamp assigned to the message by the broker. This // is only guaranteed to be defined if the message was successfully // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at // least version 0.10.0. Timestamp time.Time // contains filtered or unexported fields }
ProducerMessage is the collection of elements passed to the Producer in order to send a message.
type Record ¶
type Record struct { Headers []*RecordHeader Attributes int8 TimestampDelta time.Duration OffsetDelta int64 Key []byte Value []byte // contains filtered or unexported fields }
Record is kafka record type
type RecordBatch ¶
type RecordBatch struct { FirstOffset int64 PartitionLeaderEpoch int32 Version int8 Codec CompressionCodec CompressionLevel int Control bool LogAppendTime bool LastOffsetDelta int32 FirstTimestamp time.Time MaxTimestamp time.Time ProducerID int64 ProducerEpoch int16 FirstSequence int32 Records []*Record PartialTrailingRecord bool IsTransactional bool // contains filtered or unexported fields }
RecordBatch batch
type RecordHeader ¶
RecordHeader stores key and value for a record header
type Records ¶
type Records struct { MsgSet *MessageSet RecordBatch *RecordBatch // contains filtered or unexported fields }
Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
type Resource ¶
type Resource struct { ResourceType ACLResourceType ResourceName string ResoucePatternType ACLResourcePatternType }
Resource holds information about acl resource type
type ResourceAcls ¶
ResourceAcls is an acl resource type
type ResourceResponse ¶
type ResourceResponse struct { ErrorCode int16 ErrorMsg string Type ConfigResourceType Name string Configs []*ConfigEntry }
ResourceResponse response
type SASLMechanism ¶
type SASLMechanism string
SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
type SCRAMClient ¶
type SCRAMClient interface { // Begin prepares the client for the SCRAM exchange // with the server with a user name and a password Begin(userName, password, authzID string) error // Step steps client through the SCRAM exchange. It is // called repeatedly until it errors or `Done` returns true. Step(challenge string) (response string, err error) // Done should return true when the SCRAM conversation // is over. Done() bool }
SCRAMClient is a an interface to a SCRAM client implementation.
type SaslAuthenticateRequest ¶
type SaslAuthenticateRequest struct {
SaslAuthBytes []byte
}
SaslAuthenticateRequest sasl authenticate request
type SaslAuthenticateResponse ¶
SaslAuthenticateResponse response
type SaslHandshakeRequest ¶
SaslHandshakeRequest request
type SaslHandshakeResponse ¶
SaslHandshakeResponse response
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
Encode string 2 byte slice
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupID string GenerationID int32 MemberID string GroupAssignments map[string][]byte }
SyncGroupRequest request
func (*SyncGroupRequest) AddGroupAssignment ¶
func (r *SyncGroupRequest) AddGroupAssignment(memberID string, memberAssignment []byte)
AddGroupAssignment add group assignment
func (*SyncGroupRequest) AddGroupAssignmentMember ¶
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberID string, memberAssignment *ConsumerGroupMemberAssignment) error
AddGroupAssignmentMember add group assignment member
type SyncGroupResponse ¶
SyncGroupResponse response
func (*SyncGroupResponse) GetMemberAssignment ¶
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
GetMemberAssignment get member assignment
type SyncProducer ¶
type SyncProducer interface { // SendMessage produces a given message, and returns only when it either has // succeeded or failed to produce. It will return the partition and the offset // of the produced message, or an error if the message failed to produce. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) // SendMessages produces a given set of messages, and returns only when all // messages in the set have either succeeded or failed. Note that messages // can succeed and fail individually; if some succeed and some fail, // SendMessages will return an error. SendMessages(msgs []*ProducerMessage) error // Close shuts down the producer and waits for any buffered messages to be // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. You must call this before calling // Close on the underlying client. Close() error }
SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.
func NewSyncProducer ¶
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error)
NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducerFromClient ¶
func NewSyncProducerFromClient(client Client) (SyncProducer, error)
NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.
type TopicDetail ¶
type TopicDetail struct { NumPartitions int32 ReplicationFactor int16 ReplicaAssignment map[int32][]int32 ConfigEntries map[string]*string }
TopicDetail detail
type TopicError ¶
TopicError topic error
func (*TopicError) Error ¶
func (t *TopicError) Error() string
type TopicMetadata ¶
type TopicMetadata struct { Err KError Name string IsInternal bool // Only valid for Version >= 1 Partitions []*PartitionMetadata }
TopicMetadata metadata
type TopicPartition ¶
TopicPartition topic partition
type TopicPartitionError ¶
TopicPartitionError topic partition error
func (*TopicPartitionError) Error ¶
func (t *TopicPartitionError) Error() string
type TxnOffsetCommitRequest ¶
type TxnOffsetCommitRequest struct { TransactionalID string GroupID string ProducerID int64 ProducerEpoch int16 Topics map[string][]*PartitionOffsetMetadata }
TxnOffsetCommitRequest request
type TxnOffsetCommitResponse ¶
type TxnOffsetCommitResponse struct { ThrottleTime time.Duration Topics map[string][]*PartitionError }
TxnOffsetCommitResponse response
Source Files ¶
- acl_bindings.go
- acl_create_request.go
- acl_create_response.go
- acl_delete_request.go
- acl_delete_response.go
- acl_describe_request.go
- acl_describe_response.go
- acl_filter.go
- acl_types.go
- add_offsets_to_txn_request.go
- add_offsets_to_txn_response.go
- add_partitions_to_txn_request.go
- add_partitions_to_txn_response.go
- admin.go
- alter_configs_request.go
- alter_configs_response.go
- api_versions_request.go
- api_versions_response.go
- async_producer.go
- balance_strategy.go
- broker.go
- client.go
- compress.go
- config.go
- config_resource_type.go
- consumer.go
- consumer_group.go
- consumer_group_members.go
- consumer_metadata_request.go
- consumer_metadata_response.go
- control_record.go
- crc32_field.go
- create_partitions_request.go
- create_partitions_response.go
- create_topics_request.go
- create_topics_response.go
- decompress.go
- delete_groups_request.go
- delete_groups_response.go
- delete_records_request.go
- delete_records_response.go
- delete_topics_request.go
- delete_topics_response.go
- describe_configs_request.go
- describe_configs_response.go
- describe_groups_request.go
- describe_groups_response.go
- encoder_decoder.go
- end_txn_request.go
- end_txn_response.go
- errors.go
- fetch_request.go
- fetch_response.go
- find_coordinator_request.go
- find_coordinator_response.go
- heartbeat_request.go
- heartbeat_response.go
- init_producer_id_request.go
- init_producer_id_response.go
- join_group_request.go
- join_group_response.go
- leave_group_request.go
- leave_group_response.go
- length_field.go
- list_groups_request.go
- list_groups_response.go
- message.go
- message_set.go
- metadata_request.go
- metadata_response.go
- metrics.go
- offset_commit_request.go
- offset_commit_response.go
- offset_fetch_request.go
- offset_fetch_response.go
- offset_manager.go
- offset_request.go
- offset_response.go
- packet_decoder.go
- packet_encoder.go
- partitioner.go
- prep_encoder.go
- produce_request.go
- produce_response.go
- produce_set.go
- real_decoder.go
- real_encoder.go
- record.go
- record_batch.go
- records.go
- request.go
- response_header.go
- sarama.go
- sasl_authenticate_request.go
- sasl_authenticate_response.go
- sasl_handshake_request.go
- sasl_handshake_response.go
- sync_group_request.go
- sync_group_response.go
- sync_producer.go
- timestamp.go
- txn_offset_commit_request.go
- txn_offset_commit_response.go
- utils.go
- zstd_cgo.go