Documentation ¶
Index ¶
- Variables
- type Batch
- type Broker
- type Conn
- func Dial(network string, address string) (*Conn, error)
- func DialContext(ctx context.Context, network string, address string) (*Conn, error)
- func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error)
- func NewConn(conn net.Conn, topic string, partition int) *Conn
- func NewConnWith(conn net.Conn, config ConnConfig) *Conn
- func (c *Conn) Close() error
- func (c *Conn) DescribeGroupsV1(request DescribeGroupsRequestV1) (DescribeGroupsResponseV1, error)
- func (c *Conn) FindCoordinatorV1(request FindCoordinatorRequestV1) (FindCoordinatorResponseV1, error)
- func (c *Conn) ListGroupsV1(request ListGroupsRequestV1) (ListGroupsResponseV1, error)
- func (c *Conn) ListOffsetsV1(request ListOffsetRequestV1) (ListOffsetResponseV1, error)
- func (c *Conn) LocalAddr() net.Addr
- func (c *Conn) MetadataV0(request MetadataRequestV0) (*MetadataResponseV0, error)
- func (c *Conn) OffsetFetchV3(request OffsetFetchRequestV3) (OffsetFetchResponseV3, error)
- func (c *Conn) RemoteAddr() net.Addr
- type ConnConfig
- type DescribeGroupsRequestV1
- type DescribeGroupsResponseV1
- type DescribeGroupsResponseV1Group
- type DescribeGroupsResponseV1Member
- type Dialer
- func (d *Dialer) Dial(network string, address string) (*Conn, error)
- func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error)
- func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error)
- func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error)
- func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error)
- type DurationStats
- type Error
- type FindCoordinatorRequestV1
- type FindCoordinatorResponseV1
- type FindCoordinatorResponseV1Coordinator
- type ListGroupsRequestV1
- type ListGroupsResponseV1
- type ListGroupsResponseV1Group
- type ListOffsetRequestV1
- type ListOffsetRequestV1Partition
- type ListOffsetRequestV1Topic
- type ListOffsetResponseV1
- type ListOffsetResponseV1Partition
- type ListOffsetResponseV1Response
- type Message
- type MetadataRequestV0
- type MetadataResponseV0
- type MetadataResponseV0Broker
- type MetadataResponseV0Partition
- type MetadataResponseV0Topic
- type OffsetFetchRequestV3
- type OffsetFetchRequestV3Topic
- type OffsetFetchResponseV3
- type OffsetFetchResponseV3PartitionResponse
- type OffsetFetchResponseV3Response
- type Partition
- type Resolver
- type SummaryStats
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultClientID is the default value used as ClientID of kafka // connections. DefaultClientID string )
var DefaultDialer = &Dialer{ Timeout: 10 * time.Second, DualStack: true, }
DefaultDialer is the default dialer used when none is specified.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
A Batch is an iterator over a sequence of messages fetched from a kafka server.
Batches are created by calling (*Conn).ReadBatch. They hold a internal lock on the connection, which is released when the batch is closed. Failing to call a batch's Close method will likely result in a dead-lock when trying to use the connection.
Batches are safe to use concurrently from multiple goroutines.
func (*Batch) Close ¶
Close closes the batch, releasing the connection lock and returning an error if reading the batch failed for any reason.
func (*Batch) HighWaterMark ¶
Watermark returns the current highest watermark in a partition.
func (*Batch) Read ¶
Read reads the value of the next message from the batch into b, returning the number of bytes read, or an error if the next message couldn't be read.
If an error is returned the batch cannot be used anymore and calling Read again will keep returning that error. All errors except io.EOF (indicating that the program consumed all messages from the batch) are also returned by Close.
The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.
func (*Batch) ReadMessage ¶
ReadMessage reads and return the next message from the batch.
Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a connection to a kafka broker.
Instances of Conn are safe to use concurrently from multiple goroutines.
func DialContext ¶
DialContext is a convenience wrapper for DefaultDialer.DialContext.
func DialLeader ¶
func DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)
DialLeader is a convenience wrapper for DefaultDialer.DialLeader.
func NewConnWith ¶
func NewConnWith(conn net.Conn, config ConnConfig) *Conn
NewConnWith returns a new kafka connection configured with config.
func (*Conn) DescribeGroupsV1 ¶
func (c *Conn) DescribeGroupsV1(request DescribeGroupsRequestV1) (DescribeGroupsResponseV1, error)
describeGroups retrieves the specified groups
See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
func (*Conn) FindCoordinatorV1 ¶
func (c *Conn) FindCoordinatorV1(request FindCoordinatorRequestV1) (FindCoordinatorResponseV1, error)
findCoordinator finds the coordinator for the specified group or transaction
See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
func (*Conn) ListGroupsV1 ¶
func (c *Conn) ListGroupsV1(request ListGroupsRequestV1) (ListGroupsResponseV1, error)
listGroups lists all the consumer groups
See http://kafka.apache.org/protocol.html#The_Messages_ListGroups
func (*Conn) ListOffsetsV1 ¶
func (c *Conn) ListOffsetsV1(request ListOffsetRequestV1) (ListOffsetResponseV1, error)
func (*Conn) MetadataV0 ¶
func (c *Conn) MetadataV0(request MetadataRequestV0) (*MetadataResponseV0, error)
offsetFetch fetches the offsets for the specified topic partitions
See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
func (*Conn) OffsetFetchV3 ¶
func (c *Conn) OffsetFetchV3(request OffsetFetchRequestV3) (OffsetFetchResponseV3, error)
offsetFetch fetches the offsets for the specified topic partitions
See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
func (*Conn) RemoteAddr ¶
RemoteAddr returns the remote network address.
type ConnConfig ¶
ConnConfig is a configuration object used to create new instances of Conn.
type DescribeGroupsRequestV1 ¶
type DescribeGroupsRequestV1 struct { // List of groupIds to request metadata for (an empty groupId array // will return empty group metadata). GroupIDs []string }
See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
type DescribeGroupsResponseV1 ¶
type DescribeGroupsResponseV1 struct { // Duration in milliseconds for which the request was throttled due // to quota violation (Zero if the request did not violate any quota) ThrottleTimeMS int32 // Groups holds selected group information Groups []DescribeGroupsResponseV1Group }
type DescribeGroupsResponseV1Group ¶
type DescribeGroupsResponseV1Group struct { // ErrorCode holds response error code ErrorCode int16 // GroupID holds the unique group identifier GroupID string // State holds current state of the group (one of: Dead, Stable, AwaitingSync, // PreparingRebalance, or empty if there is no active group) State string // ProtocolType holds the current group protocol type (will be empty if there is // no active group) ProtocolType string // Protocol holds the current group protocol (only provided if the group is Stable) Protocol string // Members contains the current group members (only provided if the group is not Dead) Members []DescribeGroupsResponseV1Member }
type DescribeGroupsResponseV1Member ¶
type DescribeGroupsResponseV1Member struct { // MemberID assigned by the group coordinator MemberID string // ClientID used in the member's latest join group request ClientID string // ClientHost used in the request session corresponding to the member's // join group. ClientHost string // MemberMetadata the metadata corresponding to the current group protocol // in use (will only be present if the group is stable). MemberMetadata []byte // MemberAssignments provided by the group leader (will only be present if // the group is stable). // // See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol MemberAssignments []byte }
type Dialer ¶
type Dialer struct { // Unique identifier for client connections established by this Dialer. ClientID string // Timeout is the maximum amount of time a dial will wait for a connect to // complete. If Deadline is also set, it may fail earlier. // // The default is no timeout. // // When dialing a name with multiple IP addresses, the timeout may be // divided between them. // // With or without a timeout, the operating system may impose its own // earlier timeout. For instance, TCP timeouts are often around 3 minutes. Timeout time.Duration // Deadline is the absolute point in time after which dials will fail. // If Timeout is set, it may fail earlier. // Zero means no deadline, or dependent on the operating system as with the // Timeout option. Deadline time.Time // LocalAddr is the local address to use when dialing an address. // The address must be of a compatible type for the network being dialed. // If nil, a local address is automatically chosen. LocalAddr net.Addr // DualStack enables RFC 6555-compliant "Happy Eyeballs" dialing when the // network is "tcp" and the destination is a host name with both IPv4 and // IPv6 addresses. This allows a client to tolerate networks where one // address family is silently broken. DualStack bool // FallbackDelay specifies the length of time to wait before spawning a // fallback connection, when DualStack is enabled. // If zero, a default delay of 300ms is used. FallbackDelay time.Duration // KeepAlive specifies the keep-alive period for an active network // connection. // If zero, keep-alives are not enabled. Network protocols that do not // support keep-alives ignore this field. KeepAlive time.Duration // Resolver optionally specifies an alternate resolver to use. Resolver Resolver // TLS enables Dialer to open secure connections. If nil, standard net.Conn // will be used. TLS *tls.Config }
The Dialer type mirrors the net.Dialer API but is designed to open kafka connections instead of raw network connections.
func (*Dialer) DialContext ¶
DialContext connects to the address on the named network using the provided context.
The provided Context must be non-nil. If the context expires before the connection is complete, an error is returned. Once successfully connected, any expiration of the context will not affect the connection.
When using TCP, and the host in the address parameter resolves to multiple network addresses, any dial timeout (from d.Timeout or ctx) is spread over each consecutive dial, such that each is given an appropriate fraction of the time to connect. For example, if a host has 4 IP addresses and the timeout is 1 minute, the connect to each single address will be given 15 seconds to complete before trying the next one.
func (*Dialer) DialLeader ¶
func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)
DialLeader opens a connection to the leader of the partition for a given topic.
The address given to the DialContext method may not be the one that the connection will end up being established to, because the dialer will lookup the partition leader for the topic and return a connection to that server. The original address is only used as a mechanism to discover the configuration of the kafka cluster that we're connecting to.
type DurationStats ¶
type DurationStats struct { Avg time.Duration `metric:"avg" type:"gauge"` Min time.Duration `metric:"min" type:"gauge"` Max time.Duration `metric:"max" type:"gauge"` }
DurationStats is a data structure that carries a summary of observed duration values. The average, minimum, and maximum are reported.
type Error ¶
type Error int
Error represents the different error codes that may be returned by kafka.
const ( Unknown Error = -1 OffsetOutOfRange Error = 1 InvalidMessage Error = 2 UnknownTopicOrPartition Error = 3 InvalidMessageSize Error = 4 LeaderNotAvailable Error = 5 NotLeaderForPartition Error = 6 RequestTimedOut Error = 7 BrokerNotAvailable Error = 8 ReplicaNotAvailable Error = 9 MessageSizeTooLarge Error = 10 StaleControllerEpoch Error = 11 OffsetMetadataTooLarge Error = 12 GroupLoadInProgress Error = 14 GroupCoordinatorNotAvailable Error = 15 NotCoordinatorForGroup Error = 16 InvalidTopic Error = 17 RecordListTooLarge Error = 18 NotEnoughReplicas Error = 19 NotEnoughReplicasAfterAppend Error = 20 InvalidRequiredAcks Error = 21 IllegalGeneration Error = 22 InconsistentGroupProtocol Error = 23 InvalidGroupId Error = 24 UnknownMemberId Error = 25 InvalidSessionTimeout Error = 26 RebalanceInProgress Error = 27 InvalidCommitOffsetSize Error = 28 TopicAuthorizationFailed Error = 29 GroupAuthorizationFailed Error = 30 ClusterAuthorizationFailed Error = 31 InvalidTimestamp Error = 32 UnsupportedSASLMechanism Error = 33 IllegalSASLState Error = 34 UnsupportedVersion Error = 35 TopicAlreadyExists Error = 36 InvalidPartitionNumber Error = 37 InvalidReplicationFactor Error = 38 InvalidReplicaAssignment Error = 39 InvalidConfiguration Error = 40 NotController Error = 41 InvalidRequest Error = 42 UnsupportedForMessageFormat Error = 43 PolicyViolation Error = 44 OutOfOrderSequenceNumber Error = 45 DuplicateSequenceNumber Error = 46 InvalidProducerEpoch Error = 47 InvalidTransactionState Error = 48 InvalidProducerIDMapping Error = 49 InvalidTransactionTimeout Error = 50 ConcurrentTransactions Error = 51 TransactionCoordinatorFenced Error = 52 TransactionalIDAuthorizationFailed Error = 53 SecurityDisabled Error = 54 BrokerAuthorizationFailed Error = 55 )
func (Error) Description ¶
Description returns a human readable description of cause of the error.
func (Error) Temporary ¶
Temporary returns true if the operation that generated the error may succeed if retried at a later time.
type FindCoordinatorRequestV1 ¶
type FindCoordinatorRequestV1 struct { // CoordinatorKey holds id to use for finding the coordinator (for groups, this is // the groupId, for transactional producers, this is the transactional id) CoordinatorKey string // CoordinatorType indicates type of coordinator to find (0 = group, 1 = transaction) CoordinatorType int8 }
FindCoordinatorRequestV1 requests the coordinator for the specified group or transaction
See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
type FindCoordinatorResponseV1 ¶
type FindCoordinatorResponseV1 struct { // ThrottleTimeMS holds the duration in milliseconds for which the request // was throttled due to quota violation (Zero if the request did not violate // any quota) ThrottleTimeMS int32 // ErrorCode holds response error code ErrorCode int16 // ErrorMessage holds response error message ErrorMessage string // Coordinator holds host and port information for the coordinator Coordinator FindCoordinatorResponseV1Coordinator }
type ListGroupsRequestV1 ¶
type ListGroupsRequestV1 struct { }
type ListGroupsResponseV1 ¶
type ListGroupsResponseV1 struct { // ThrottleTimeMS holds the duration in milliseconds for which the request // was throttled due to quota violation (Zero if the request did not violate // any quota) ThrottleTimeMS int32 // ErrorCode holds response error code ErrorCode int16 Groups []ListGroupsResponseV1Group }
type ListOffsetRequestV1 ¶
type ListOffsetRequestV1 struct { ReplicaID int32 Topics []ListOffsetRequestV1Topic }
type ListOffsetRequestV1Partition ¶
type ListOffsetRequestV1Partition struct { Partition int32 // Time is used to ask for all messages before a certain time (ms). // // There are two special values. Specify -1 to receive the latest offset (i.e. // the offset of the next coming message) and -2 to receive the earliest // available offset. This applies to all versions of the API. Note that because // offsets are pulled in descending order, asking for the earliest offset will // always return you a single element. Time int64 }
type ListOffsetRequestV1Topic ¶
type ListOffsetRequestV1Topic struct { TopicName string Partitions []ListOffsetRequestV1Partition }
type ListOffsetResponseV1 ¶
type ListOffsetResponseV1 struct {
Responses []ListOffsetResponseV1Response
}
type ListOffsetResponseV1Response ¶
type ListOffsetResponseV1Response struct { Topic string PartitionResponses []ListOffsetResponseV1Partition }
type Message ¶
type Message struct { // Topic is reads only and MUST NOT be set when writing messages Topic string // Partition is reads only and MUST NOT be set when writing messages Partition int Offset int64 Key []byte Value []byte Time time.Time }
Message is a data structure representing kafka messages.
type MetadataRequestV0 ¶
type MetadataRequestV0 []string
type MetadataResponseV0 ¶
type MetadataResponseV0 struct { Brokers []*MetadataResponseV0Broker Topics []*MetadataResponseV0Topic }
func (*MetadataResponseV0) Free ¶
func (t *MetadataResponseV0) Free()
type MetadataResponseV0Broker ¶
func (*MetadataResponseV0Broker) Free ¶
func (t *MetadataResponseV0Broker) Free()
type MetadataResponseV0Topic ¶
type MetadataResponseV0Topic struct { TopicErrorCode int16 TopicName string Partitions []MetadataResponseV0Partition }
func (*MetadataResponseV0Topic) Free ¶
func (t *MetadataResponseV0Topic) Free()
type OffsetFetchRequestV3 ¶
type OffsetFetchRequestV3 struct { // GroupID holds the unique group identifier GroupID string // Topics to fetch offsets. Topics []OffsetFetchRequestV3Topic }
type OffsetFetchResponseV3 ¶
type OffsetFetchResponseV3 struct { // ThrottleTimeMS holds the duration in milliseconds for which the request // was throttled due to quota violation (Zero if the request did not violate // any quota) ThrottleTimeMS int32 // Responses holds topic partition offsets Responses []OffsetFetchResponseV3Response // ErrorCode holds response error code ErrorCode int16 }
type OffsetFetchResponseV3Response ¶
type OffsetFetchResponseV3Response struct { // Topic name Topic string // PartitionResponses holds offsets by partition PartitionResponses []OffsetFetchResponseV3PartitionResponse }
type Resolver ¶
type Resolver interface { // LookupHost looks up the given host using the local resolver. // It returns a slice of that host's addresses. LookupHost(ctx context.Context, host string) (addrs []string, err error) }
The Resolver interface is used as an abstraction to provide service discovery of the hosts of a kafka cluster.
type SummaryStats ¶
type SummaryStats struct { Avg int64 `metric:"avg" type:"gauge"` Min int64 `metric:"min" type:"gauge"` Max int64 `metric:"max" type:"gauge"` }
SummaryStats is a data structure that carries a summary of observed values. The average, minimum, and maximum are reported.
Source Files ¶
- 01_fetch.go
- 02_list_offset.go
- 03_metadata.go
- 08_offset_commit.go
- 09_offset_fetch.go
- 10_find_coordinator.go
- 11_join_group.go
- 12_heartbeat.go
- 13_leave_group.go
- 14_sync_group.go
- 15_describe_groups.go
- 16_list_groups.go
- 19_create_topics.go
- batch.go
- conn.go
- crc32.go
- dialer.go
- discard.go
- error.go
- message.go
- produce.go
- protocol.go
- read.go
- sizeof.go
- stats.go
- strategy.go
- time.go
- write.go