Documentation ¶
Index ¶
- Constants
- Variables
- func RegisterCompressionCodec(codec func() CompressionCodec)
- type Balancer
- type BalancerFunc
- type Batch
- type Broker
- type CompressionCodec
- type ConfigEntry
- type Conn
- func Dial(network string, address string) (*Conn, error)
- func DialContext(ctx context.Context, network string, address string) (*Conn, error)
- func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error)
- func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
- func NewConn(conn net.Conn, topic string, partition int) *Conn
- func NewConnWith(conn net.Conn, config ConnConfig) *Conn
- func (c *Conn) Close() error
- func (c *Conn) CreateTopics(topics ...TopicConfig) error
- func (c *Conn) DeleteTopics(topics ...string) error
- func (c *Conn) LocalAddr() net.Addr
- func (c *Conn) Offset() (offset int64, whence int)
- func (c *Conn) Read(b []byte) (int, error)
- func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch
- func (c *Conn) ReadFirstOffset() (int64, error)
- func (c *Conn) ReadLastOffset() (int64, error)
- func (c *Conn) ReadMessage(maxBytes int) (Message, error)
- func (c *Conn) ReadOffset(t time.Time) (int64, error)
- func (c *Conn) ReadOffsets() (first, last int64, err error)
- func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) Seek(offset int64, whence int) (int64, error)
- func (c *Conn) SetDeadline(t time.Time) error
- func (c *Conn) SetReadDeadline(t time.Time) error
- func (c *Conn) SetRequiredAcks(n int) error
- func (c *Conn) SetWriteDeadline(t time.Time) error
- func (c *Conn) Write(b []byte) (int, error)
- func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (int, error)
- func (c *Conn) WriteMessages(msgs ...Message) (int, error)
- type ConnConfig
- type Dialer
- func (d *Dialer) Dial(network string, address string) (*Conn, error)
- func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error)
- func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error)
- func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
- func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error)
- func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error)
- func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error)
- type DurationStats
- type Error
- type GroupBalancer
- type GroupMember
- type GroupMemberAssignments
- type Hash
- type LeastBytes
- type ListGroupsResponseGroupV1
- type Message
- type Partition
- type RangeGroupBalancer
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error
- func (r *Reader) Config() ReaderConfig
- func (r *Reader) FetchMessage(ctx context.Context) (Message, error)
- func (r *Reader) Lag() int64
- func (r *Reader) Offset() int64
- func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error)
- func (r *Reader) ReadMessage(ctx context.Context) (Message, error)
- func (r *Reader) SetOffset(offset int64) error
- func (r *Reader) Stats() ReaderStats
- type ReaderConfig
- type ReaderStats
- type ReplicaAssignment
- type Resolver
- type RoundRobin
- type RoundRobinGroupBalancer
- type SummaryStats
- type TopicConfig
- type Writer
- type WriterConfig
- type WriterStats
Examples ¶
Constants ¶
const ( SeekStart = 0 // Seek relative to the first offset available in the partition. SeekAbsolute = 1 // Seek to an absolute offset. SeekEnd = 2 // Seek relative to the last offset available in the partition. SeekCurrent = 3 // Seek relative to the current offset. )
const ( LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset = -2 // The least recent offset available for a partition. )
const CompressionNoneCode = 0
const DefaultCompressionLevel int = -1
Variables ¶
var ( // DefaultClientID is the default value used as ClientID of kafka // connections. DefaultClientID string )
var DefaultDialer = &Dialer{ Timeout: 10 * time.Second, DualStack: true, }
DefaultDialer is the default dialer used when none is specified.
Functions ¶
func RegisterCompressionCodec ¶
func RegisterCompressionCodec(codec func() CompressionCodec)
RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
Types ¶
type Balancer ¶
type Balancer interface { // Balance receives a message and a set of available partitions and // returns the partition number that the message should be routed to. // // An application should refrain from using a balancer to manage multiple // sets of partitions (from different topics for examples), use one balancer // instance for each partition set, so the balancer can detect when the // partitions change and assume that the kafka topic has been rebalanced. Balance(msg Message, partitions ...int) (partition int) }
The Balancer interface provides an abstraction of the message distribution logic used by Writer instances to route messages to the partitions available on a kafka cluster.
Instances of Balancer do not have to be safe to use concurrently by multiple goroutines, the Writer implementation ensures that calls to Balance are synchronized.
type BalancerFunc ¶
BalancerFunc is an implementation of the Balancer interface that makes it possible to use regular functions to distribute messages across partitions.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
A Batch is an iterator over a sequence of messages fetched from a kafka server.
Batches are created by calling (*Conn).ReadBatch. They hold a internal lock on the connection, which is released when the batch is closed. Failing to call a batch's Close method will likely result in a dead-lock when trying to use the connection.
Batches are safe to use concurrently from multiple goroutines.
func (*Batch) Close ¶
Close closes the batch, releasing the connection lock and returning an error if reading the batch failed for any reason.
func (*Batch) HighWaterMark ¶
Watermark returns the current highest watermark in a partition.
func (*Batch) Read ¶
Read reads the value of the next message from the batch into b, returning the number of bytes read, or an error if the next message couldn't be read.
If an error is returned the batch cannot be used anymore and calling Read again will keep returning that error. All errors except io.EOF (indicating that the program consumed all messages from the batch) are also returned by Close.
The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.
func (*Batch) ReadMessage ¶
ReadMessage reads and return the next message from the batch.
Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.
type CompressionCodec ¶
type CompressionCodec interface { // Code returns the compression codec code Code() int8 // Encode encodes the src data Encode(src []byte) ([]byte, error) // Decode decodes the src data Decode(src []byte) ([]byte, error) }
CompressionCodec represents a compression codec to encode and decode the messages. See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
A CompressionCodec must be safe for concurrent access by multiple go routines.
type ConfigEntry ¶
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a connection to a kafka broker.
Instances of Conn are safe to use concurrently from multiple goroutines.
func DialContext ¶
DialContext is a convenience wrapper for DefaultDialer.DialContext.
func DialLeader ¶
func DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)
DialLeader is a convenience wrapper for DefaultDialer.DialLeader.
func DialPartition ¶
func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
DialPartition is a convenience wrapper for DefaultDialer.DialPartition.
func NewConnWith ¶
func NewConnWith(conn net.Conn, config ConnConfig) *Conn
NewConnWith returns a new kafka connection configured with config. The offset is initialized to FirstOffset.
func (*Conn) CreateTopics ¶
func (c *Conn) CreateTopics(topics ...TopicConfig) error
CreateTopics creates one topic per provided configuration with idempotent operational semantics. In other words, if CreateTopics is invoked with a configuration for an existing topic, it will have no effect.
func (*Conn) DeleteTopics ¶
DeleteTopics deletes the specified topics.
func (*Conn) Offset ¶
Offset returns the current offset of the connection as pair of integers, where the first one is an offset value and the second one indicates how to interpret it.
See Seek for more details about the offset and whence values.
func (*Conn) Read ¶
Read reads the message at the current offset from the connection, advancing the offset on success so the next call to a read method will produce the next message. The method returns the number of bytes read, or an error if something went wrong.
While it is safe to call Read concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.
The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.
This method is provided to satisfy the net.Conn interface but is much less efficient than using the more general purpose ReadBatch method.
func (*Conn) ReadBatch ¶
ReadBatch reads a batch of messages from the kafka server. The method always returns a non-nil Batch value. If an error occurred, either sending the fetch request or reading the response, the error will be made available by the returned value of the batch's Close method.
While it is safe to call ReadBatch concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.
A program doesn't specify the number of messages in wants from a batch, but gives the minimum and maximum number of bytes that it wants to receive from the kafka server.
func (*Conn) ReadFirstOffset ¶
ReadFirstOffset returns the first offset available on the connection.
func (*Conn) ReadLastOffset ¶
ReadLastOffset returns the last offset available on the connection.
func (*Conn) ReadMessage ¶
ReadMessage reads the message at the current offset from the connection, advancing the offset on success so the next call to a read method will produce the next message.
Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.
While it is safe to call Read concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.
This method is provided for convenience purposes but is much less efficient than using the more general purpose ReadBatch method.
func (*Conn) ReadOffset ¶
ReadOffset returns the offset of the first message with a timestamp equal or greater to t.
func (*Conn) ReadOffsets ¶
ReadOffsets returns the absolute first and last offsets of the topic used by the connection.
func (*Conn) ReadPartitions ¶
ReadPartitions returns the list of available partitions for the given list of topics.
If the method is called with no topic, it uses the topic configured on the connection. If there are none, the method fetches all partitions of the kafka cluster.
func (*Conn) RemoteAddr ¶
RemoteAddr returns the remote network address.
func (*Conn) Seek ¶
Seek sets the offset for the next read or write operation according to whence, which should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent. When seeking relative to the end, the offset is subtracted from the current offset. Note that for historical reasons, these do not align with the usual whence constants as in lseek(2) or os.Seek. The method returns the new absolute offset of the connection.
func (*Conn) SetDeadline ¶
SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
A deadline is an absolute time after which I/O operations fail with a timeout (see type Error) instead of blocking. The deadline applies to all future and pending I/O, not just the immediately following call to Read or Write. After a deadline has been exceeded, the connection may be closed if it was found to be in an unrecoverable state.
A zero value for t means I/O operations will not time out.
func (*Conn) SetReadDeadline ¶
SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.
func (*Conn) SetRequiredAcks ¶
SetRequiredAcks sets the number of acknowledges from replicas that the connection requests when producing messages.
func (*Conn) SetWriteDeadline ¶
SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.
func (*Conn) Write ¶
Write writes a message to the kafka broker that this connection was established to. The method returns the number of bytes written, or an error if something went wrong.
The operation either succeeds or fail, it never partially writes the message.
This method is exposed to satisfy the net.Conn interface but is less efficient than the more general purpose WriteMessages method.
func (*Conn) WriteCompressedMessages ¶
func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (int, error)
WriteCompressedMessages writes a batch of messages to the connection's topic and partition, returning the number of bytes written. The write is an atomic operation, it either fully succeeds or fails.
If the compression codec is not nil, the messages will be compressed.
type ConnConfig ¶
ConnConfig is a configuration object used to create new instances of Conn.
type Dialer ¶
type Dialer struct { // Unique identifier for client connections established by this Dialer. ClientID string // Timeout is the maximum amount of time a dial will wait for a connect to // complete. If Deadline is also set, it may fail earlier. // // The default is no timeout. // // When dialing a name with multiple IP addresses, the timeout may be // divided between them. // // With or without a timeout, the operating system may impose its own // earlier timeout. For instance, TCP timeouts are often around 3 minutes. Timeout time.Duration // Deadline is the absolute point in time after which dials will fail. // If Timeout is set, it may fail earlier. // Zero means no deadline, or dependent on the operating system as with the // Timeout option. Deadline time.Time // LocalAddr is the local address to use when dialing an address. // The address must be of a compatible type for the network being dialed. // If nil, a local address is automatically chosen. LocalAddr net.Addr // DualStack enables RFC 6555-compliant "Happy Eyeballs" dialing when the // network is "tcp" and the destination is a host name with both IPv4 and // IPv6 addresses. This allows a client to tolerate networks where one // address family is silently broken. DualStack bool // FallbackDelay specifies the length of time to wait before spawning a // fallback connection, when DualStack is enabled. // If zero, a default delay of 300ms is used. FallbackDelay time.Duration // KeepAlive specifies the keep-alive period for an active network // connection. // If zero, keep-alives are not enabled. Network protocols that do not // support keep-alives ignore this field. KeepAlive time.Duration // Resolver optionally specifies an alternate resolver to use. Resolver Resolver // TLS enables Dialer to open secure connections. If nil, standard net.Conn // will be used. TLS *tls.Config }
The Dialer type mirrors the net.Dialer API but is designed to open kafka connections instead of raw network connections.
func (*Dialer) DialContext ¶
DialContext connects to the address on the named network using the provided context.
The provided Context must be non-nil. If the context expires before the connection is complete, an error is returned. Once successfully connected, any expiration of the context will not affect the connection.
When using TCP, and the host in the address parameter resolves to multiple network addresses, any dial timeout (from d.Timeout or ctx) is spread over each consecutive dial, such that each is given an appropriate fraction of the time to connect. For example, if a host has 4 IP addresses and the timeout is 1 minute, the connect to each single address will be given 15 seconds to complete before trying the next one.
func (*Dialer) DialLeader ¶
func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)
DialLeader opens a connection to the leader of the partition for a given topic.
The address given to the DialContext method may not be the one that the connection will end up being established to, because the dialer will lookup the partition leader for the topic and return a connection to that server. The original address is only used as a mechanism to discover the configuration of the kafka cluster that we're connecting to.
func (*Dialer) DialPartition ¶
func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)
DialPartition opens a connection to the leader of the partition specified by partition descriptor. It's strongly advised to use descriptor of the partition that comes out of functions LookupPartition or LookupPartitions.
func (*Dialer) LookupLeader ¶
func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, partition int) (Broker, error)
LookupLeader searches for the kafka broker that is the leader of the partition for a given topic, returning a Broker value representing it.
type DurationStats ¶
type DurationStats struct { Avg time.Duration `metric:"avg" type:"gauge"` Min time.Duration `metric:"min" type:"gauge"` Max time.Duration `metric:"max" type:"gauge"` }
DurationStats is a data structure that carries a summary of observed duration values. The average, minimum, and maximum are reported.
type Error ¶
type Error int
Error represents the different error codes that may be returned by kafka.
const ( Unknown Error = -1 OffsetOutOfRange Error = 1 InvalidMessage Error = 2 UnknownTopicOrPartition Error = 3 InvalidMessageSize Error = 4 LeaderNotAvailable Error = 5 NotLeaderForPartition Error = 6 RequestTimedOut Error = 7 BrokerNotAvailable Error = 8 ReplicaNotAvailable Error = 9 MessageSizeTooLarge Error = 10 StaleControllerEpoch Error = 11 OffsetMetadataTooLarge Error = 12 GroupLoadInProgress Error = 14 GroupCoordinatorNotAvailable Error = 15 NotCoordinatorForGroup Error = 16 InvalidTopic Error = 17 RecordListTooLarge Error = 18 NotEnoughReplicas Error = 19 NotEnoughReplicasAfterAppend Error = 20 InvalidRequiredAcks Error = 21 IllegalGeneration Error = 22 InconsistentGroupProtocol Error = 23 InvalidGroupId Error = 24 UnknownMemberId Error = 25 InvalidSessionTimeout Error = 26 RebalanceInProgress Error = 27 InvalidCommitOffsetSize Error = 28 TopicAuthorizationFailed Error = 29 GroupAuthorizationFailed Error = 30 ClusterAuthorizationFailed Error = 31 InvalidTimestamp Error = 32 UnsupportedSASLMechanism Error = 33 IllegalSASLState Error = 34 UnsupportedVersion Error = 35 TopicAlreadyExists Error = 36 InvalidPartitionNumber Error = 37 InvalidReplicationFactor Error = 38 InvalidReplicaAssignment Error = 39 InvalidConfiguration Error = 40 NotController Error = 41 InvalidRequest Error = 42 UnsupportedForMessageFormat Error = 43 PolicyViolation Error = 44 OutOfOrderSequenceNumber Error = 45 DuplicateSequenceNumber Error = 46 InvalidProducerEpoch Error = 47 InvalidTransactionState Error = 48 InvalidProducerIDMapping Error = 49 InvalidTransactionTimeout Error = 50 ConcurrentTransactions Error = 51 TransactionCoordinatorFenced Error = 52 TransactionalIDAuthorizationFailed Error = 53 SecurityDisabled Error = 54 BrokerAuthorizationFailed Error = 55 )
func (Error) Description ¶
Description returns a human readable description of cause of the error.
func (Error) Temporary ¶
Temporary returns true if the operation that generated the error may succeed if retried at a later time.
type GroupBalancer ¶
type GroupBalancer interface { // ProtocolName of the GroupBalancer ProtocolName() string // UserData provides the GroupBalancer an opportunity to embed custom // UserData into the metadata. // // Will be used by JoinGroup to begin the consumer group handshake. // // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest UserData() ([]byte, error) // DefineMemberships returns which members will be consuming // which topic partitions AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments }
GroupBalancer encapsulates the client side rebalancing logic
type GroupMember ¶
type GroupMember struct { // ID is the unique ID for this member as taken from the JoinGroup response. ID string // Topics is a list of topics that this member is consuming. Topics []string // UserData contains any information that the GroupBalancer sent to the // consumer group coordinator. UserData []byte }
GroupMember describes a single participant in a consumer group.
type GroupMemberAssignments ¶
GroupMemberAssignments holds MemberID => topic => partitions
type Hash ¶
Hash is a Balancer that uses the provided hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition.
The logic to calculate the partition is:
hasher.Sum32() % len(partitions) => partition
By default, Hash uses the FNV-1a algorithm. This is the same algorithm used by the Sarama Producer and ensures that messages produced by kafka-go will be delivered to the same topics that the Sarama producer would be delivered to
type LeastBytes ¶
type LeastBytes struct {
// contains filtered or unexported fields
}
LeastBytes is a Balancer implementation that routes messages to the partition that has received the least amount of data.
Note that no coordination is done between multiple producers, having good balancing relies on the fact that each producer using a LeastBytes balancer should produce well balanced messages.
type Message ¶
type Message struct { // Topic is reads only and MUST NOT be set when writing messages Topic string // Partition is reads only and MUST NOT be set when writing messages Partition int Offset int64 Key []byte Value []byte // If not set at the creation, Time will be automatically set when // writing the message. Time time.Time }
Message is a data structure representing kafka messages.
type Partition ¶
Partition carries the metadata associated with a kafka partition.
type RangeGroupBalancer ¶
type RangeGroupBalancer struct{}
RangeGroupBalancer groups consumers by partition
Example: 5 partitions, 2 consumers
C0: [0, 1, 2] C1: [3, 4]
Example: 6 partitions, 3 consumers
C0: [0, 1] C1: [2, 3] C2: [4, 5]
func (RangeGroupBalancer) AssignGroups ¶
func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments
func (RangeGroupBalancer) ProtocolName ¶
func (r RangeGroupBalancer) ProtocolName() string
func (RangeGroupBalancer) UserData ¶
func (r RangeGroupBalancer) UserData() ([]byte, error)
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides a high-level API for consuming messages from kafka.
A Reader automatically manages reconnections to a kafka server, and blocking methods have context support for asynchronous cancellations.
func NewReader ¶
func NewReader(config ReaderConfig) *Reader
NewReader creates and returns a new Reader configured with config. The offset is initialized to FirstOffset.
func (*Reader) Close ¶
Close closes the stream, preventing the program from reading any more messages from it.
func (*Reader) CommitMessages ¶
CommitMessages commits the list of messages passed as argument. The program may pass a context to asynchronously cancel the commit operation when it was configured to be blocking.
func (*Reader) Config ¶
func (r *Reader) Config() ReaderConfig
Config returns the reader's configuration.
func (*Reader) FetchMessage ¶
FetchMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.
The method returns io.EOF to indicate that the reader has been closed.
FetchMessage does not commit offsets automatically when using consumer groups. Use CommitMessages to commit the offset.
func (*Reader) Lag ¶
Lag returns the lag of the last message returned by ReadMessage, or -1 if r is backed by a consumer group.
func (*Reader) Offset ¶
Offset returns the current absolute offset of the reader, or -1 if r is backed by a consumer group.
func (*Reader) ReadLag ¶
ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage.
This method is intended to be used in cases where a program may be unable to call ReadMessage to update the value returned by Lag, but still needs to get an up to date estimation of how far behind the reader is. For example when the consumer is not ready to process the next message.
The function returns a lag of zero when the reader's current offset is negative.
func (*Reader) ReadMessage ¶
ReadMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.
The method returns io.EOF to indicate that the reader has been closed.
If consumer groups are used, ReadMessage will automatically commit the offset when called.
func (*Reader) SetOffset ¶
SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed.
From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note while -1 and -2 were accepted to indicate the first or last offset in previous versions, the meanings of the numbers were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol specification.
func (*Reader) Stats ¶
func (r *Reader) Stats() ReaderStats
Stats returns a snapshot of the reader stats since the last time the method was called, or since the reader was created if it is called for the first time.
A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka reader and report the metrics to a stats collection system.
type ReaderConfig ¶
type ReaderConfig struct { // The list of broker addresses used to connect to the kafka cluster. Brokers []string // GroupID holds the optional consumer group id. If GroupID is specified, then // Partition should NOT be specified e.g. 0 GroupID string // The topic to read messages from. Topic string // Partition to read messages from. Either Partition or GroupID may // be assigned, but not both Partition int // An dialer used to open connections to the kafka server. This field is // optional, if nil, the default dialer is used instead. Dialer *Dialer // The capacity of the internal message queue, defaults to 100 if none is // set. QueueCapacity int // Min and max number of bytes to fetch from kafka in each request. MinBytes int MaxBytes int // Maximum amount of time to wait for new data to come when fetching batches // of messages from kafka. MaxWait time.Duration // ReadLagInterval sets the frequency at which the reader lag is updated. // Setting this field to a negative value disables lag reporting. ReadLagInterval time.Duration // GroupBalancers is the priority-ordered list of client-side consumer group // balancing strategies that will be offered to the coordinator. The first // strategy that all group members support will be chosen by the leader. // // Default: [Range, RoundRobin] // // Only used when GroupID is set GroupBalancers []GroupBalancer // HeartbeatInterval sets the optional frequency at which the reader sends the consumer // group heartbeat update. // // Default: 3s // // Only used when GroupID is set HeartbeatInterval time.Duration // CommitInterval indicates the interval at which offsets are committed to // the broker. If 0, commits will be handled synchronously. // // Defaults to 1s // // Only used when GroupID is set CommitInterval time.Duration // SessionTimeout optionally sets the length of time that may pass without a heartbeat // before the coordinator considers the consumer dead and initiates a rebalance. // // Default: 30s // // Only used when GroupID is set SessionTimeout time.Duration // RebalanceTimeout optionally sets the length of time the coordinator will wait // for members to join as part of a rebalance. For kafka servers under higher // load, it may be useful to set this value higher. // // Default: 30s // // Only used when GroupID is set RebalanceTimeout time.Duration // RetentionTime optionally sets the length of time the consumer group will be saved // by the broker // // Default: 24h // // Only used when GroupID is set RetentionTime time.Duration // If not nil, specifies a logger used to report internal changes within the // reader. Logger *log.Logger // ErrorLogger is the logger used to report errors. If nil, the reader falls // back to using Logger instead. ErrorLogger *log.Logger }
ReaderConfig is a configuration object used to create new instances of Reader.
type ReaderStats ¶
type ReaderStats struct { Dials int64 `metric:"kafka.reader.dial.count" type:"counter"` Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"` Messages int64 `metric:"kafka.reader.message.count" type:"counter"` Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"` Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"` Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"` Errors int64 `metric:"kafka.reader.error.count" type:"counter"` DialTime DurationStats `metric:"kafka.reader.dial.seconds"` ReadTime DurationStats `metric:"kafka.reader.read.seconds"` WaitTime DurationStats `metric:"kafka.reader.wait.seconds"` FetchSize SummaryStats `metric:"kafka.reader.fetch.size"` FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"` Offset int64 `metric:"kafka.reader.offset" type:"gauge"` Lag int64 `metric:"kafka.reader.lag" type:"gauge"` MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"` MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"` MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"` QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"` QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"` ClientID string `tag:"client_id"` Topic string `tag:"topic"` Partition string `tag:"partition"` // The original `Fetches` field had a typo where the metric name was called // "kafak..." instead of "kafka...", in order to offer time to fix monitors // that may be relying on this mistake we are temporarily introducing this // field. DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"` }
ReaderStats is a data structure returned by a call to Reader.Stats that exposes details about the behavior of the reader.
type ReplicaAssignment ¶
type Resolver ¶
type Resolver interface { // LookupHost looks up the given host using the local resolver. // It returns a slice of that host's addresses. LookupHost(ctx context.Context, host string) (addrs []string, err error) }
The Resolver interface is used as an abstraction to provide service discovery of the hosts of a kafka cluster.
type RoundRobin ¶
type RoundRobin struct {
// contains filtered or unexported fields
}
RoundRobin is an Balancer implementation that equally distributes messages across all available partitions.
type RoundRobinGroupBalancer ¶
type RoundRobinGroupBalancer struct{}
RoundrobinGroupBalancer divides partitions evenly among consumers
Example: 5 partitions, 2 consumers
C0: [0, 2, 4] C1: [1, 3]
Example: 6 partitions, 3 consumers
C0: [0, 3] C1: [1, 4] C2: [2, 5]
func (RoundRobinGroupBalancer) AssignGroups ¶
func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments
func (RoundRobinGroupBalancer) ProtocolName ¶
func (r RoundRobinGroupBalancer) ProtocolName() string
func (RoundRobinGroupBalancer) UserData ¶
func (r RoundRobinGroupBalancer) UserData() ([]byte, error)
type SummaryStats ¶
type SummaryStats struct { Avg int64 `metric:"avg" type:"gauge"` Min int64 `metric:"min" type:"gauge"` Max int64 `metric:"max" type:"gauge"` }
SummaryStats is a data structure that carries a summary of observed values. The average, minimum, and maximum are reported.
type TopicConfig ¶
type TopicConfig struct { // Topic name Topic string // NumPartitions created. -1 indicates unset. NumPartitions int // ReplicationFactor for the topic. -1 indicates unset. ReplicationFactor int // ReplicaAssignments among kafka brokers for this topic partitions. If this // is set num_partitions and replication_factor must be unset. ReplicaAssignments []ReplicaAssignment // ConfigEntries holds topic level configuration for topic to be set. ConfigEntries []ConfigEntry }
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
The Writer type provides the implementation of a producer of kafka messages that automatically distributes messages across partitions of a single topic using a configurable balancing policy.
Instances of Writer are safe to use concurrently from multiple goroutines.
Example ¶
w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092"}, Topic: "Topic-1", }) w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, ) w.Close()
Output:
func NewWriter ¶
func NewWriter(config WriterConfig) *Writer
NewWriter creates and returns a new Writer configured with config.
func (*Writer) Close ¶
Close flushes all buffered messages and closes the writer. The call to Close aborts any concurrent calls to WriteMessages, which then return with the io.ErrClosedPipe error.
func (*Writer) Stats ¶
func (w *Writer) Stats() WriterStats
Stats returns a snapshot of the writer stats since the last time the method was called, or since the writer was created if it is called for the first time.
A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka writer and report the metrics to a stats collection system.
func (*Writer) WriteMessages ¶
WriteMessages writes a batch of messages to the kafka topic configured on this writer.
Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached.
When the method returns an error, there's no way to know yet which messages have succeeded of failed.
The context passed as first argument may also be used to asynchronously cancel the operation. Note that in this case there are no guarantees made on whether messages were written to kafka. The program should assume that the whole batch failed and re-write the messages later (which could then cause duplicates).
type WriterConfig ¶
type WriterConfig struct { // The list of brokers used to discover the partitions available on the // kafka cluster. // // This field is required, attempting to create a writer with an empty list // of brokers will panic. Brokers []string // The topic that the writer will produce messages to. // // This field is required, attempting to create a writer with an empty topic // will panic. Topic string // The dialer used by the writer to establish connections to the kafka // cluster. // // If nil, the default dialer is used instead. Dialer *Dialer // The balancer used to distribute messages across partitions. // // The default is to use a round-robin distribution. Balancer Balancer // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int // A hint on the capacity of the writer's internal message queue. // // The default is to use a queue capacity of 100 messages. QueueCapacity int // Limit on how many messages will be buffered before being sent to a // partition. // // The default is to use a target batch size of 100 messages. BatchSize int // Time limit on how often incomplete message batches will be flushed to // kafka. // // The default is to flush at least every second. BatchTimeout time.Duration // Timeout for read operations performed by the Writer. // // Defaults to 10 seconds. ReadTimeout time.Duration // Timeout for write operation performed by the Writer. // // Defaults to 10 seconds. WriteTimeout time.Duration // This interval defines how often the list of partitions is refreshed from // kafka. It allows the writer to automatically handle when new partitions // are added to a topic. // // The default is to refresh partitions every 15 seconds. RebalanceInterval time.Duration // Number of acknowledges from partition replicas required before receiving // a response to a produce request (default to -1, which means to wait for // all replicas). RequiredAcks int // Setting this flag to true causes the WriteMessages method to never block. // It also means that errors are ignored since the caller will not receive // the returned value. Use this only if you don't care about guarantees of // whether the messages were written to kafka. Async bool // CompressionCodec set the codec to be used to compress Kafka messages. // Note that messages are allowed to overwrite the compression codec individually. CompressionCodec // If not nil, specifies a logger used to report internal changes within the // writer. Logger *log.Logger // ErrorLogger is the logger used to report errors. If nil, the writer falls // back to using Logger instead. ErrorLogger *log.Logger // contains filtered or unexported fields }
WriterConfig is a configuration type used to create new instances of Writer.
type WriterStats ¶
type WriterStats struct { Dials int64 `metric:"kafka.writer.dial.count" type:"counter"` Writes int64 `metric:"kafka.writer.write.count" type:"counter"` Messages int64 `metric:"kafka.writer.message.count" type:"counter"` Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"` Rebalances int64 `metric:"kafka.writer.rebalance.count" type:"counter"` Errors int64 `metric:"kafka.writer.error.count" type:"counter"` DialTime DurationStats `metric:"kafka.writer.dial.seconds"` WriteTime DurationStats `metric:"kafka.writer.write.seconds"` WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` Retries SummaryStats `metric:"kafka.writer.retries.count"` BatchSize SummaryStats `metric:"kafka.writer.batch.size"` MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"` BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"` ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"` WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"` RebalanceInterval time.Duration `metric:"kafka.writer.rebalance.interval" type:"gauge"` RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"` Async bool `metric:"kafka.writer.async" type:"gauge"` QueueLength int64 `metric:"kafka.writer.queue.length" type:"gauge"` QueueCapacity int64 `metric:"kafka.writer.queue.capacity" type:"gauge"` ClientID string `tag:"client_id"` Topic string `tag:"topic"` }
WriterStats is a data structure returned by a call to Writer.Stats that exposes details about the behavior of the writer.
Source Files ¶
- balancer.go
- batch.go
- commit.go
- compression.go
- conn.go
- crc32.go
- createtopics.go
- deletetopics.go
- describegroups.go
- dialer.go
- discard.go
- error.go
- fetch.go
- findcoordinator.go
- groupbalancer.go
- heartbeat.go
- joingroup.go
- leavegroup.go
- listgroups.go
- listoffset.go
- message.go
- metadata.go
- offsetcommit.go
- offsetfetch.go
- produce.go
- protocol.go
- read.go
- reader.go
- rungroup.go
- sizeof.go
- stats.go
- syncgroup.go
- time.go
- write.go
- writer.go