Documentation ¶
Index ¶
- Constants
- Variables
- func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)
- type BinaryDecoder
- func (bd *BinaryDecoder) GetBytes() ([]byte, error)
- func (bd *BinaryDecoder) GetInt16() (int16, error)
- func (bd *BinaryDecoder) GetInt32() (int32, error)
- func (bd *BinaryDecoder) GetInt64() (int64, error)
- func (bd *BinaryDecoder) GetInt8() (int8, error)
- func (bd *BinaryDecoder) GetString() (string, error)
- func (bd *BinaryDecoder) Remaining() int
- type BinaryEncoder
- func (be *BinaryEncoder) Reserve(slice UpdatableSlice)
- func (be *BinaryEncoder) Reset()
- func (be *BinaryEncoder) Size() int32
- func (be *BinaryEncoder) UpdateReserved()
- func (be *BinaryEncoder) WriteBytes(value []byte)
- func (be *BinaryEncoder) WriteInt16(value int16)
- func (be *BinaryEncoder) WriteInt32(value int32)
- func (be *BinaryEncoder) WriteInt64(value int64)
- func (be *BinaryEncoder) WriteInt8(value int8)
- func (be *BinaryEncoder) WriteString(value string)
- type Broker
- type BrokerConnection
- type Brokers
- type Client
- type CompressionCodec
- type Config
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type CorrelationIDGenerator
- type CrcSlice
- type Decoder
- type DecodingError
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type Encoder
- type FetchRequest
- type FetchResponse
- type FetchResponsePartitionData
- type GroupDescription
- type GroupMemberDescription
- type GroupProtocol
- type HeartbeatRequest
- type HeartbeatResponse
- type JoinGroupRequest
- type JoinGroupResponse
- type KafkaClient
- func (c *KafkaClient) Close() <-chan struct{}
- func (c *KafkaClient) CommitOffset(group string, topic string, partition int32, offset int64) error
- func (c *KafkaClient) Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)
- func (c *KafkaClient) GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)
- func (c *KafkaClient) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)
- func (c *KafkaClient) GetLeader(topic string, partition int32) (*BrokerConnection, error)
- func (c *KafkaClient) GetOffset(group string, topic string, partition int32) (int64, error)
- func (c *KafkaClient) GetTopicMetadata(topics []string) (*MetadataResponse, error)
- func (c *KafkaClient) Metadata() *Metadata
- type LeaveGroupRequest
- type LeaveGroupResponse
- type LengthSlice
- type ListGroupsRequest
- type ListGroupsResponse
- type Message
- type MessageAndMetadata
- type MessageAndOffset
- type Metadata
- func (m *Metadata) Invalidate(topic string)
- func (m *Metadata) Leader(topic string, partition int32) (int32, error)
- func (m *Metadata) OffsetCoordinator(group string) (*BrokerConnection, error)
- func (m *Metadata) PartitionsFor(topic string) ([]int32, error)
- func (m *Metadata) Refresh(topics []string) error
- func (m *Metadata) TopicMetadata(topic string) (map[int32]int32, error)
- type MetadataResponse
- type OffsetAndMetadata
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetMetadataAndError
- type OffsetRequest
- type OffsetResponse
- type PartitionFetchInfo
- type PartitionMetadata
- type PartitionOffsetRequestInfo
- type PartitionOffsetsResponse
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseStatus
- type Request
- type RequestHeader
- type Response
- type SizingEncoder
- func (se *SizingEncoder) Reserve(slice UpdatableSlice)
- func (se *SizingEncoder) Reset()
- func (se *SizingEncoder) Size() int32
- func (se *SizingEncoder) UpdateReserved()
- func (se *SizingEncoder) WriteBytes(value []byte)
- func (se *SizingEncoder) WriteInt16(int16)
- func (se *SizingEncoder) WriteInt32(int32)
- func (se *SizingEncoder) WriteInt64(int64)
- func (se *SizingEncoder) WriteInt8(int8)
- func (se *SizingEncoder) WriteString(value string)
- type SyncGroupRequest
- type SyncGroupResponse
- type TopicMetadata
- type TopicMetadataRequest
- type UpdatableSlice
Constants ¶
const EarliestTime int64 = -2
EarliestTime is a value used to request for the earliest available offset.
const InvalidOffset int64 = -1
InvalidOffset is a constant that is used to denote an invalid or uninitialized offset.
const LatestTime int64 = -1
LatestTime is a value used to request for the latest offset (i.e. the offset of the next coming message).
Variables ¶
var BrokerErrors = map[int16]error{ -1: ErrUnknown, 0: ErrNoError, 1: ErrOffsetOutOfRange, 2: ErrInvalidMessage, 3: ErrUnknownTopicOrPartition, 4: ErrInvalidMessageSize, 5: ErrLeaderNotAvailable, 6: ErrNotLeaderForPartition, 7: ErrRequestTimedOut, 8: ErrBrokerNotAvailable, 9: ErrReplicaNotAvailable, 10: ErrMessageSizeTooLarge, 11: ErrStaleControllerEpochCode, 12: ErrOffsetMetadataTooLargeCode, 14: ErrOffsetsLoadInProgressCode, 15: ErrConsumerCoordinatorNotAvailableCode, 16: ErrNotCoordinatorForConsumerCode, 17: ErrInvalidTopicCode, 18: ErrRecordListTooLarge, 19: ErrNotEnoughReplicas, 20: ErrNotEnoughReplicasAfterAppend, 21: ErrInvalidRequiredAcks, 22: ErrIllegalGeneration, 23: ErrInconsistentGroupProtocol, 24: ErrInvalidGroupID, 25: ErrUnknownMemberID, 26: ErrInvalidSessionTimeout, 27: ErrRebalanceInProgress, 28: ErrInvalidCommitOffsetSize, 29: ErrTopicAuthorizationFailed, 30: ErrGroupAuthorizationFailed, 31: ErrClusterAuthorizationFailed, }
BrokerErrors are mappings between Kafka error codes and actual error messages.
var ErrBrokerNotAvailable = errors.New("Broker is likely not alive.")
ErrBrokerNotAvailable is a mapping for Kafka error code 8.
var ErrClusterAuthorizationFailed = errors.New("The client is not authorized to use an inter-broker or administrative API.")
ErrClusterAuthorizationFailed is a mapping for Kafka error code 31
var ErrConfigEmptyClientID = errors.New("ClientID cannot be empty.")
ErrConfigEmptyClientID happens when trying to create a new client with empty ClientID value.
var ErrConfigInvalidCommitOffsetBackoff = errors.New("CommitOffsetBackoff must be at least 1ms.")
ErrConfigInvalidCommitOffsetBackoff happens when trying to create a new client with too small CommitOffsetBackoff value.
var ErrConfigInvalidCommitOffsetRetries = errors.New("CommitOffsetRetries cannot be less than 0.")
ErrConfigInvalidCommitOffsetRetries happens when trying to create a new client with too small CommitOffsetRetries value.
var ErrConfigInvalidConnectTimeout = errors.New("ConnectTimeout must be at least 1ms.")
ErrConfigInvalidConnectTimeout happens when trying to create a new client with too small ConnectTimeout value.
var ErrConfigInvalidConsumerMetadataBackoff = errors.New("ConsumerMetadataBackoff must be at least 1ms.")
ErrConfigInvalidConsumerMetadataBackoff happens when trying to create a new client with too small ConsumerMetadataBackoff value.
var ErrConfigInvalidConsumerMetadataRetries = errors.New("ConsumerMetadataRetries cannot be less than 0.")
ErrConfigInvalidConsumerMetadataRetries happens when trying to create a new client with too small ConsumerMetadataRetries value.
var ErrConfigInvalidFetchSize = errors.New("FetchSize cannot be less than 1.")
ErrConfigInvalidFetchSize happens when trying to create a new client with too small FetchSize value.
var ErrConfigInvalidKeepAliveTimeout = errors.New("KeepAliveTimeout must be at least 1ms.")
ErrConfigInvalidKeepAliveTimeout happens when trying to create a new client with too small KeepAliveTimeout value.
var ErrConfigInvalidMetadataBackoff = errors.New("MetadataBackoff must be at least 1ms.")
ErrConfigInvalidMetadataBackoff happens when trying to create a new client with too small MetadataBackoff value.
var ErrConfigInvalidMetadataRetries = errors.New("MetadataRetries cannot be less than 0.")
ErrConfigInvalidMetadataRetries happens when trying to create a new client with too small MetadataRetries value.
var ErrConfigInvalidMetadataTTL = errors.New("MetadataTTL must be at least 1ms.")
ErrConfigInvalidMetadataTTL happens when trying to create a new client with too small MetadataTTL value.
var ErrConfigInvalidReadTimeout = errors.New("ReadTimeout must be at least 1ms.")
ErrConfigInvalidReadTimeout happens when trying to create a new client with too small ReadTimeout value.
var ErrConfigInvalidWriteTimeout = errors.New("WriteTimeout must be at least 1ms.")
ErrConfigInvalidWriteTimeout happens when trying to create a new client with too small WriteTimeout value.
var ErrConfigNoBrokers = errors.New("BrokerList must have at least one broker.")
ErrConfigNoBrokers happens when trying to create a new client without bootstrap brokers specified.
var ErrConsumerCoordinatorNotAvailableCode = errors.New("Offsets topic has not yet been created.")
ErrConsumerCoordinatorNotAvailableCode is a mapping for Kafka error code 15.
var ErrEOF = errors.New("End of file reached")
ErrEOF signals that an end of file or stream has been reached unexpectedly.
var ErrFailedToGetLeader = errors.New("Failed to get leader.")
ErrFailedToGetLeader happens when TopicMetadataResponse does not contain leader metadata for requested topic and partition.
var ErrFailedToGetMetadata = errors.New("Failed to get topic metadata.")
ErrFailedToGetMetadata happens when TopicMetadataResponse does not contain metadata for requested topic.
var ErrGroupAuthorizationFailed = errors.New("The client is not authorized to access a particular groupId.")
ErrGroupAuthorizationFailed is a mapping for Kafka error code 30
var ErrIllegalGeneration = errors.New("The generation id provided in the request is not the current generation.")
ErrIllegalGeneration is a mapping for Kafka error code 22
var ErrInconsistentGroupProtocol = errors.New("Provided protocol type or set of protocols is not compatible with the current group.")
ErrInconsistentGroupProtocol is a mapping for Kafka error code 23
var ErrInvalidCommitOffsetSize = errors.New("Offset commit was rejected because of oversize metadata.")
ErrInvalidCommitOffsetSize is a mapping for Kafka error code 28
var ErrInvalidGroupID = errors.New("The groupId is empty or null.")
ErrInvalidGroupID is a mapping for Kafka error code 24
var ErrInvalidMessage = errors.New("Message contents does not match its CRC")
ErrInvalidMessage is a mapping for Kafka error code 2.
var ErrInvalidMessageSize = errors.New("The message has a negative size")
ErrInvalidMessageSize is a mapping for Kafka error code 4.
var ErrInvalidRequiredAcks = errors.New("The requested requiredAcks is invalid.")
ErrInvalidRequiredAcks is a mapping for Kafka error code 21
var ErrInvalidSessionTimeout = errors.New("The requested session timeout is outside of the allowed range on the broker.")
ErrInvalidSessionTimeout is a mapping for Kafka error code 26
var ErrInvalidTopicCode = errors.New("Attempt to access an invalid topic.")
ErrInvalidTopicCode is a mapping for Kafka error code 17
var ErrLeaderNotAvailable = errors.New("In the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.")
ErrLeaderNotAvailable is a mapping for Kafka error code 5.
var ErrMessageSizeTooLarge = errors.New("You've just attempted to produce a message of size larger than broker is allowed to accept.")
ErrMessageSizeTooLarge is a mapping for Kafka error code 10.
var ErrNoClientConfig = errors.New("ClientConfig cannot be nil.")
ErrNoClientConfig happens when trying to create a new client without a configuration.
var ErrNoDataToUncompress = errors.New("No data to uncompress")
ErrNoDataToUncompress happens when a compressed message is empty.
var ErrNoError = errors.New("No error - it worked!")
ErrNoError is a mapping for Kafka error code 0.
var ErrNotCoordinatorForConsumerCode = errors.New("There is no coordinator for this consumer.")
ErrNotCoordinatorForConsumerCode is a mapping for Kafka error code 16.
var ErrNotEnoughReplicas = errors.New("The number of in-sync replicas is lower than the configured minimum.")
ErrNotEnoughReplicas is a mapping for Kafka error code 19
var ErrNotEnoughReplicasAfterAppend = errors.New("The message was written to the log, but with fewer in-sync replicas than required.")
ErrNotEnoughReplicasAfterAppend is a mapping for Kafka error code 20
var ErrNotLeaderForPartition = errors.New("You've just attempted to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.")
ErrNotLeaderForPartition is a mapping for Kafka error code 6.
var ErrOffsetMetadataTooLargeCode = errors.New("You've jsut specified a string larger than configured maximum for offset metadata.")
ErrOffsetMetadataTooLargeCode is a mapping for Kafka error code 12.
var ErrOffsetOutOfRange = errors.New("The requested offset is outside the range of offsets maintained by the server for the given topic/partition.")
ErrOffsetOutOfRange is a mapping for Kafka error code 1.
var ErrOffsetsLoadInProgressCode = errors.New("Offset loading is in progress. (Usually happens after a leader change for that offsets topic partition).")
ErrOffsetsLoadInProgressCode is a mapping for Kafka error code 14.
var ErrRebalanceInProgress = errors.New("The coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group.")
ErrRebalanceInProgress is a mapping for Kafka error code 27
var ErrRecordListTooLarge = errors.New("Message batch exceeds the maximum configured segment size.")
ErrRecordListTooLarge is a mapping for Kafka error code 18
var ErrReplicaNotAvailable = errors.New("Replica is expected on a broker, but is not (this can be safely ignored).")
ErrReplicaNotAvailable is a mapping for Kafka error code 9.
var ErrRequestTimedOut = errors.New("Request exceeds the user-specified time limit in the request.")
ErrRequestTimedOut is a mapping for Kafka error code 7.
var ErrStaleControllerEpochCode = errors.New("Broker-to-broker communication fault.")
ErrStaleControllerEpochCode is a mapping for Kafka error code 11.
var ErrTopicAuthorizationFailed = errors.New("The client is not authorized to access the requested topic.")
ErrTopicAuthorizationFailed is a mapping for Kafka error code 29
var ErrUnknown = errors.New("An unexpected server error")
ErrUnknown is a mapping for Kafka error code -1.
var ErrUnknownMemberID = errors.New("The memberId is not in the current generation.")
ErrUnknownMemberID is a mapping for Kafka error code 25
var ErrUnknownTopicOrPartition = errors.New("This request is for a topic or partition that does not exist on this broker.")
ErrUnknownTopicOrPartition is a mapping for Kafka erfror code 3.
Functions ¶
func ReadMessageSet ¶
func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)
ReadMessageSet decodes a nested message set if the MessageAndOffset is compressed.
Types ¶
type BinaryDecoder ¶
type BinaryDecoder struct {
// contains filtered or unexported fields
}
BinaryDecoder implements Decoder and is able to decode a Kafka wire protocol message into actual data.
func NewBinaryDecoder ¶
func NewBinaryDecoder(raw []byte) *BinaryDecoder
NewBinaryDecoder creates a new BinaryDecoder that will decode a given []byte.
func (*BinaryDecoder) GetBytes ¶
func (bd *BinaryDecoder) GetBytes() ([]byte, error)
GetBytes gets a []byte from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt16 ¶
func (bd *BinaryDecoder) GetInt16() (int16, error)
GetInt16 gets an int16 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt32 ¶
func (bd *BinaryDecoder) GetInt32() (int32, error)
GetInt32 gets an int32 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt64 ¶
func (bd *BinaryDecoder) GetInt64() (int64, error)
GetInt64 gets an int64 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt8 ¶
func (bd *BinaryDecoder) GetInt8() (int8, error)
GetInt8 gets an int8 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetString ¶
func (bd *BinaryDecoder) GetString() (string, error)
GetString gets a string from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) Remaining ¶
func (bd *BinaryDecoder) Remaining() int
Remaining tells how many bytes left unread in this decoder.
type BinaryEncoder ¶
type BinaryEncoder struct {
// contains filtered or unexported fields
}
BinaryEncoder implements Decoder and is able to encode actual data into a Kafka wire protocol byte sequence.
func NewBinaryEncoder ¶
func NewBinaryEncoder(buffer []byte) *BinaryEncoder
NewBinaryEncoder creates a new BinaryEncoder that will write into a given []byte.
func (*BinaryEncoder) Reserve ¶
func (be *BinaryEncoder) Reserve(slice UpdatableSlice)
Reserve reserves a place for an updatable slice.
func (*BinaryEncoder) Reset ¶
func (be *BinaryEncoder) Reset()
func (*BinaryEncoder) Size ¶
func (be *BinaryEncoder) Size() int32
Size returns the size in bytes written to this encoder.
func (*BinaryEncoder) UpdateReserved ¶
func (be *BinaryEncoder) UpdateReserved()
UpdateReserved tells the last reserved slice to be updated with new data.
func (*BinaryEncoder) WriteBytes ¶
func (be *BinaryEncoder) WriteBytes(value []byte)
WriteBytes writes a []byte to this encoder.
func (*BinaryEncoder) WriteInt16 ¶
func (be *BinaryEncoder) WriteInt16(value int16)
WriteInt16 writes an int16 to this encoder.
func (*BinaryEncoder) WriteInt32 ¶
func (be *BinaryEncoder) WriteInt32(value int32)
WriteInt32 writes an int32 to this encoder.
func (*BinaryEncoder) WriteInt64 ¶
func (be *BinaryEncoder) WriteInt64(value int64)
WriteInt64 writes an int64 to this encoder.
func (*BinaryEncoder) WriteInt8 ¶
func (be *BinaryEncoder) WriteInt8(value int8)
WriteInt8 writes an int8 to this encoder.
func (*BinaryEncoder) WriteString ¶
func (be *BinaryEncoder) WriteString(value string)
WriteString writes a string to this encoder.
type Broker ¶
Broker contains information about a Kafka broker in cluster - its ID, host name and port.
func (*Broker) Read ¶
func (n *Broker) Read(decoder Decoder) *DecodingError
type BrokerConnection ¶
type BrokerConnection struct {
// contains filtered or unexported fields
}
BrokerConnection manages TCP connections to a single broker.
func NewBrokerConnection ¶
func NewBrokerConnection(broker *Broker, keepAliveTimeout time.Duration) *BrokerConnection
NewBrokerConnection creates a new BrokerConnection to a given broker with a given TCP keep alive timeout.
func (*BrokerConnection) GetConnection ¶
func (bc *BrokerConnection) GetConnection() (*net.TCPConn, error)
GetConnection either gets an existing connection from pool or creates a new one. May return an error if fails to open a new connection.
func (*BrokerConnection) ReleaseConnection ¶
func (bc *BrokerConnection) ReleaseConnection(conn *net.TCPConn)
ReleaseConnection puts an existing connection back to pool to be reused later.
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
Brokers provide information about Kafka cluster and expose convenience functions to simplify interaction with it.
func NewBrokers ¶
NewBrokers creates new Brokers with provided TCP keep alive timeout for all connection pools that will be created by this structure.
func (*Brokers) Get ¶
func (b *Brokers) Get(id int32) *BrokerConnection
Get gets a BrokerConnection for a given broker ID.
func (*Brokers) GetAll ¶
func (b *Brokers) GetAll() []*BrokerConnection
GetAll gets all BrokerConnections for this Brokers structure.
func (*Brokers) NextCorrelationID ¶
NextCorrelationID returns a next sequential request correlation ID.
type Client ¶
type Client interface { // GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. // Passing it an empty topic list will retrieve metadata for all topics in a cluster. GetTopicMetadata(topics []string) (*MetadataResponse, error) // GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time. // More on offset time here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error) // Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset. Fetch(topic string, partition int32, offset int64) (*FetchResponse, error) // GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API. GetOffset(group string, topic string, partition int32) (int64, error) // CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API. CommitOffset(group string, topic string, partition int32, offset int64) error GetLeader(topic string, partition int32) (*BrokerConnection, error) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error) // Metadata returns a structure that holds all topic and broker metadata. Metadata() *Metadata // Tells the Client to close all existing connections and stop. // This method is NOT blocking but returns a channel which will be closed once the closing is finished. Close() <-chan struct{} }
Client is an interface that should provide ways to clearly interact with Kafka cluster and hide all broker management stuff from user.
type CompressionCodec ¶
type CompressionCodec int
CompressionCodec is a compression codec id used to distinguish various compression types.
const ( // CompressionNone is a compression codec id for uncompressed data. CompressionNone CompressionCodec = 0 // CompressionGZIP is a compression codec id for GZIP compression. CompressionGZIP CompressionCodec = 1 // CompressionSnappy is a compression codec id for Snappy compression. CompressionSnappy CompressionCodec = 2 // CompressionLZ4 is a compression codec id for LZ4 compression. CompressionLZ4 CompressionCodec = 3 )
type Config ¶
type Config struct { // BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required. BrokerList []string // ReadTimeout is a timeout to read the response from a TCP socket. ReadTimeout time.Duration // WriteTimeout is a timeout to write the request to a TCP socket. WriteTimeout time.Duration // ConnectTimeout is a timeout to connect to a TCP socket. ConnectTimeout time.Duration // Sets whether the connection should be kept alive. KeepAlive bool // A keep alive period for a TCP connection. KeepAliveTimeout time.Duration // Maximum fetch size in bytes which will be used in all Consume() calls. FetchSize int32 // The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block FetchMinBytes int32 // The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes FetchMaxWaitTime int32 // Number of retries to get topic metadata. MetadataRetries int // Backoff value between topic metadata requests. MetadataBackoff time.Duration // MetadataTTL is how long topic metadata is considered valid. Used to refresh metadata from time to time even if no leader changes occurred. MetadataTTL time.Duration // Number of retries to commit an offset. CommitOffsetRetries int // Backoff value between commit offset requests. CommitOffsetBackoff time.Duration // Number of retries to get consumer metadata. ConsumerMetadataRetries int // Backoff value between consumer metadata requests. ConsumerMetadataBackoff time.Duration // ClientID that will be used by a Client to identify client requests by broker. ClientID string }
Config is used to pass multiple configuration values for a Client
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
Group string
}
ConsumerMetadataRequest is used to discover the current offset coordinator to issue its offset commit and fetch requests.
func NewConsumerMetadataRequest ¶
func NewConsumerMetadataRequest(group string) *ConsumerMetadataRequest
NewConsumerMetadataRequest creates a new ConsumerMetadataRequest for a given consumer group.
func (*ConsumerMetadataRequest) Key ¶
func (cmr *ConsumerMetadataRequest) Key() int16
Key returns the Kafka API key for ConsumerMetadataRequest.
func (*ConsumerMetadataRequest) Version ¶
func (cmr *ConsumerMetadataRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*ConsumerMetadataRequest) Write ¶
func (cmr *ConsumerMetadataRequest) Write(encoder Encoder)
Write writes the ConsumerMetadataRequest to the given Encoder.
type ConsumerMetadataResponse ¶
ConsumerMetadataResponse contains information about the current offset coordinator and error if it occurred.
func (*ConsumerMetadataResponse) Read ¶
func (cmr *ConsumerMetadataResponse) Read(decoder Decoder) *DecodingError
type CorrelationIDGenerator ¶
type CorrelationIDGenerator struct {
// contains filtered or unexported fields
}
CorrelationIDGenerator is a simple structure that provides thread-safe correlation ID generation.
func (*CorrelationIDGenerator) NextCorrelationID ¶
func (c *CorrelationIDGenerator) NextCorrelationID() int32
NextCorrelationID returns a next sequential request correlation ID.
type CrcSlice ¶
type CrcSlice struct {
// contains filtered or unexported fields
}
CrcSlice is used to calculate the CRC32 value of the message.
func (*CrcSlice) GetPosition ¶
GetPosition gets the position within the encoder to be updated later.
func (*CrcSlice) GetReserveLength ¶
GetReserveLength returns the length to reserve for this slice.
func (*CrcSlice) SetPosition ¶
SetPosition sets the current position within the encoder to be updated later.
type Decoder ¶
type Decoder interface { // Gets an int8 from this decoder. Returns EOF if end of stream is reached. GetInt8() (int8, error) // Gets an int16 from this decoder. Returns EOF if end of stream is reached. GetInt16() (int16, error) // Gets an int32 from this decoder. Returns EOF if end of stream is reached. GetInt32() (int32, error) // Gets an int64 from this decoder. Returns EOF if end of stream is reached. GetInt64() (int64, error) // Gets a []byte from this decoder. Returns EOF if end of stream is reached. GetBytes() ([]byte, error) // Gets a string from this decoder. Returns EOF if end of stream is reached. GetString() (string, error) // Tells how many bytes left unread in this decoder. Remaining() int }
Decoder is able to decode a Kafka wire protocol message into actual data.
type DecodingError ¶
type DecodingError struct {
// contains filtered or unexported fields
}
DecodingError is an error that also holds the information about why it happened.
func NewDecodingError ¶
func NewDecodingError(err error, reason string) *DecodingError
NewDecodingError creates a new DecodingError with a given error message and reason.
func (*DecodingError) Error ¶
func (de *DecodingError) Error() error
Error returns the error message for this DecodingError.
func (*DecodingError) Reason ¶
func (de *DecodingError) Reason() string
Reason returns the reason for this DecodingError.
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct {
Groups []string
}
DescribeGroupsRequest is used to fetch consumer group state information from Kafka cluster.
func (*DescribeGroupsRequest) Key ¶
func (*DescribeGroupsRequest) Key() int16
Key returns the Kafka API key for DescribeGroupsRequest.
func (*DescribeGroupsRequest) Version ¶
func (*DescribeGroupsRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*DescribeGroupsRequest) Write ¶
func (dgr *DescribeGroupsRequest) Write(encoder Encoder)
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct {
Groups []*GroupDescription
}
DescribeGroupsResponse is used to hold consumer group state information from Kafka cluster.
func (*DescribeGroupsResponse) Read ¶
func (dgr *DescribeGroupsResponse) Read(decoder Decoder) *DecodingError
type Encoder ¶
type Encoder interface { // Writes an int8 to this encoder. WriteInt8(int8) // Writes an int16 to this encoder. WriteInt16(int16) // Writes an int32 to this encoder. WriteInt32(int32) // Writes an int64 to this encoder. WriteInt64(int64) // Writes a []byte to this encoder. WriteBytes([]byte) // Writes a string to this encoder. WriteString(string) // Returns the size in bytes written to this encoder. Size() int32 // Reserves a place for an updatable slice. // This is used as an optimization for length and crc fields. // The encoder reserves a place for this data and updates it later instead of pre-calculating it and doing redundant work. Reserve(UpdatableSlice) // Tells the last reserved slice to be updated with new data. UpdateReserved() }
Encoder is able to encode actual data into a Kafka wire protocol byte sequence.
type FetchRequest ¶
type FetchRequest struct { MaxWait int32 MinBytes int32 RequestInfo map[string][]*PartitionFetchInfo }
FetchRequest is used to fetch a chunk of one or more logs for some topic-partitions.
func (*FetchRequest) AddFetch ¶
func (fr *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32)
AddFetch is a convenience method to add a PartitionFetchInfo.
func (*FetchRequest) Key ¶
func (fr *FetchRequest) Key() int16
Key returns the Kafka API key for FetchRequest.
func (*FetchRequest) Version ¶
func (fr *FetchRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*FetchRequest) Write ¶
func (fr *FetchRequest) Write(encoder Encoder)
Write writes the FetchRequest to the given Encoder.
type FetchResponse ¶
type FetchResponse struct {
Data map[string]map[int32]*FetchResponsePartitionData
}
FetchResponse contains FetchResponseData for all requested topics and partitions.
func (*FetchResponse) CollectMessages ¶
func (fr *FetchResponse) CollectMessages(collector func(topic string, partition int32, offset int64, key []byte, value []byte) error) error
CollectMessages traverses this FetchResponse and applies a collector function to each message giving the possibility to avoid response -> kafka-client.Message -> other.Message conversion if necessary.
func (*FetchResponse) Error ¶
func (fr *FetchResponse) Error(topic string, partition int32) error
Error returns the error message for a given topic and pertion of this FetchResponse
func (*FetchResponse) GetMessages ¶
func (fr *FetchResponse) GetMessages() ([]*MessageAndMetadata, error)
GetMessages traverses this FetchResponse and collects all messages. Returns an error if FetchResponse contains one. Messages should be ordered by offset.
func (*FetchResponse) Read ¶
func (fr *FetchResponse) Read(decoder Decoder) *DecodingError
type FetchResponsePartitionData ¶
type FetchResponsePartitionData struct { Error error HighwaterMarkOffset int64 Messages []*MessageAndOffset }
FetchResponsePartitionData contains fetched messages for a single partition, the offset at the end of the log for this partition and an error code.
func (*FetchResponsePartitionData) Read ¶
func (frd *FetchResponsePartitionData) Read(decoder Decoder) *DecodingError
type GroupDescription ¶
type GroupDescription struct { Error error GroupID string State string ProtocolType string Protocol string Members []*GroupMemberDescription }
GroupDescription holds information about a single consumer group in a Kafka cluster.
func (*GroupDescription) Read ¶
func (gd *GroupDescription) Read(decoder Decoder) *DecodingError
type GroupMemberDescription ¶
type GroupMemberDescription struct { MemberID string ClientID string ClientHost string MemberMetadata []byte MemberAssignment []byte }
GroupMemberDescription holds information about a single consumer group member in a Kafka cluster.
func (*GroupMemberDescription) Read ¶
func (gmd *GroupMemberDescription) Read(decoder Decoder) *DecodingError
type GroupProtocol ¶
GroupProtocol carries additional protocol information for a ProtocolType in JoinGroupRequest.
type HeartbeatRequest ¶
HeartbeatRequest is used to keep a member alive in a group.
func (*HeartbeatRequest) Key ¶
func (*HeartbeatRequest) Key() int16
Key returns the Kafka API key for HeartbeatRequest.
func (*HeartbeatRequest) Version ¶
func (*HeartbeatRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*HeartbeatRequest) Write ¶
func (hr *HeartbeatRequest) Write(encoder Encoder)
type HeartbeatResponse ¶
type HeartbeatResponse struct {
Error error
}
HeartbeatResponse signals whether the sent HeartbeatRequest succeeded or not, and tells why if not.
func (*HeartbeatResponse) Read ¶
func (hr *HeartbeatResponse) Read(decoder Decoder) *DecodingError
type JoinGroupRequest ¶
type JoinGroupRequest struct { GroupID string SessionTimeout int32 MemberID string ProtocolType string GroupProtocols []*GroupProtocol }
JoinGroupRequest is used to become a member of a group, creating it if there are no active members.
func (*JoinGroupRequest) Key ¶
func (jgr *JoinGroupRequest) Key() int16
Key returns the Kafka API key for JoinGroupRequest.
func (*JoinGroupRequest) Version ¶
func (jgr *JoinGroupRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*JoinGroupRequest) Write ¶
func (jgr *JoinGroupRequest) Write(encoder Encoder)
type JoinGroupResponse ¶
type JoinGroupResponse struct { Error error GenerationID int32 GroupProtocol string LeaderID string MemberID string Members map[string][]byte }
JoinGroupResponse kindly asks you to write a meaningful comment when you get a chance.
func (*JoinGroupResponse) Read ¶
func (jgr *JoinGroupResponse) Read(decoder Decoder) *DecodingError
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
KafkaClient is a default (and only one for now) Client implementation for kafka-client library.
func New ¶
func New(config *Config) (*KafkaClient, error)
New creates a new KafkaClient with a given ClientConfig. May return an error if the passed config is invalid.
func (*KafkaClient) Close ¶
func (c *KafkaClient) Close() <-chan struct{}
Close tells the Client to close all existing connections and stop. This method is NOT blocking but returns a channel which will be closed once the closing is finished.
func (*KafkaClient) CommitOffset ¶
CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API.
func (*KafkaClient) Fetch ¶
func (c *KafkaClient) Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)
Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset.
func (*KafkaClient) GetAvailableOffset ¶
func (c *KafkaClient) GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)
GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
func (*KafkaClient) GetConsumerMetadata ¶
func (c *KafkaClient) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)
GetConsumerMetadata returns a ConsumerMetadataResponse for a given consumer group. May return an error if fails to get consumer metadata for whatever reason within ConsumerMetadataRetries retries.
func (*KafkaClient) GetLeader ¶
func (c *KafkaClient) GetLeader(topic string, partition int32) (*BrokerConnection, error)
GetLeader returns a leader broker for a given topic and partition. Returns an error if fails to get leader for whatever reason for MetadataRetries retries.
func (*KafkaClient) GetOffset ¶
GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API.
func (*KafkaClient) GetTopicMetadata ¶
func (c *KafkaClient) GetTopicMetadata(topics []string) (*MetadataResponse, error)
GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. Passing it an empty topic list will retrieve metadata for all topics in a cluster.
func (*KafkaClient) Metadata ¶
func (c *KafkaClient) Metadata() *Metadata
Metadata returns Metadata structure used by this Client.
type LeaveGroupRequest ¶
LeaveGroupRequest is used to directly depart a group.
func (*LeaveGroupRequest) Key ¶
func (*LeaveGroupRequest) Key() int16
Key returns the Kafka API key for LeaveGroupRequest.
func (*LeaveGroupRequest) Version ¶
func (*LeaveGroupRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*LeaveGroupRequest) Write ¶
func (lgr *LeaveGroupRequest) Write(encoder Encoder)
type LeaveGroupResponse ¶
type LeaveGroupResponse struct {
Error error
}
LeaveGroupResponse contains whether the member successfully left a group and contains a failure reason if not.
func (*LeaveGroupResponse) Read ¶
func (lgr *LeaveGroupResponse) Read(decoder Decoder) *DecodingError
type LengthSlice ¶
type LengthSlice struct {
// contains filtered or unexported fields
}
LengthSlice is used to determine the length of upcoming message.
func (*LengthSlice) GetPosition ¶
func (ls *LengthSlice) GetPosition() int
GetPosition gets the position within the encoder to be updated later.
func (*LengthSlice) GetReserveLength ¶
func (ls *LengthSlice) GetReserveLength() int
GetReserveLength returns the length to reserve for this slice.
func (*LengthSlice) SetPosition ¶
func (ls *LengthSlice) SetPosition(pos int)
SetPosition sets the current position within the encoder to be updated later.
func (*LengthSlice) Update ¶
func (ls *LengthSlice) Update(slice []byte)
Update this slice. At this point all necessary data should be written to encoder.
type ListGroupsRequest ¶
type ListGroupsRequest struct{}
ListGroupsRequest is used to list the current groups managed by a broker
func (*ListGroupsRequest) Key ¶
func (*ListGroupsRequest) Key() int16
Key returns the Kafka API key for ListGroupsRequest.
func (*ListGroupsRequest) Version ¶
func (*ListGroupsRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*ListGroupsRequest) Write ¶
func (*ListGroupsRequest) Write(encoder Encoder)
type ListGroupsResponse ¶
ListGroupsResponse lists the current groups managed by a broker and contains an error if it happened.
func (*ListGroupsResponse) Read ¶
func (lgr *ListGroupsResponse) Read(decoder Decoder) *DecodingError
type Message ¶
type Message struct { Crc int32 MagicByte int8 Attributes int8 Key []byte Value []byte Nested []*MessageAndOffset }
Message contains a single message and its metadata or a nested message set if compression is used.
func (*Message) Read ¶
func (md *Message) Read(decoder Decoder) *DecodingError
type MessageAndMetadata ¶
type MessageAndMetadata struct { Topic string Partition int32 Offset int64 Key []byte Value []byte }
MessageAndMetadata is a single message and its metadata.
type MessageAndOffset ¶
MessageAndOffset is a single message or a message set (if it is compressed) with its offset value.
func (*MessageAndOffset) Read ¶
func (mo *MessageAndOffset) Read(decoder Decoder) *DecodingError
func (*MessageAndOffset) Write ¶
func (mo *MessageAndOffset) Write(encoder Encoder)
type Metadata ¶
type Metadata struct { Brokers *Brokers // contains filtered or unexported fields }
Metadata is a helper structure that provides access to topic/partition leaders and offset coordinators, caches them, refreshes and invalidates when necessary.
func NewMetadata ¶
NewMetadata creates a new Metadata for a given Client and Brokers with metadata cache TTL set to metadataTTL.
func (*Metadata) Invalidate ¶
Invalidate forcibly invalidates metadata cache for a given topic so that next time it is requested it is guaranteed to be refreshed.
func (*Metadata) Leader ¶
Leader tries to get a leader for a topic and partition from cache. Automatically refreshes if the leader information is missing or expired. May return an error if fails to get a leader for whatever reason.
func (*Metadata) OffsetCoordinator ¶
func (m *Metadata) OffsetCoordinator(group string) (*BrokerConnection, error)
OffsetCoordinator returns a BrokerConnection for an offset coordinator for a given group ID. May return an error if fails to to get metadata for whatever reason.
func (*Metadata) PartitionsFor ¶
PartitionsFor returns a sorted slice of partitions for a given topic. Automatically refreshes metadata if it is missing or expired. May return an error if fails to to get metadata for whatever reason.
func (*Metadata) Refresh ¶
Refresh forces metadata refresh for given topics. If the argument is empty or nil, metadata for all known topics by a Kafka cluster will be requested. May return an error if fails to to get metadata for whatever reason.
func (*Metadata) TopicMetadata ¶
TopicMetadata returns a map where keys are partitions of a topic and values are leader broker IDs. Automatically refreshes metadata if it is missing or expired. May return an error if fails to to get metadata for whatever reason.
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker TopicsMetadata []*TopicMetadata }
MetadataResponse contains information about brokers in cluster and topics that exist.
func (*MetadataResponse) Read ¶
func (tmr *MetadataResponse) Read(decoder Decoder) *DecodingError
type OffsetAndMetadata ¶
OffsetAndMetadata contains offset for a partition and optional metadata.
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { GroupID string RequestInfo map[string]map[int32]*OffsetAndMetadata }
OffsetCommitRequest is used to commit offsets for a group/topic/partition.
func NewOffsetCommitRequest ¶
func NewOffsetCommitRequest(group string) *OffsetCommitRequest
NewOffsetCommitRequest creates a new OffsetCommitRequest for a given consumer group.
func (*OffsetCommitRequest) AddOffset ¶
func (ocr *OffsetCommitRequest) AddOffset(topic string, partition int32, offset int64, timestamp int64, metadata string)
AddOffset is a convenience method to add an offset for a topic partition.
func (*OffsetCommitRequest) Key ¶
func (ocr *OffsetCommitRequest) Key() int16
Key returns the Kafka API key for OffsetCommitRequest.
func (*OffsetCommitRequest) Version ¶
func (ocr *OffsetCommitRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*OffsetCommitRequest) Write ¶
func (ocr *OffsetCommitRequest) Write(encoder Encoder)
type OffsetCommitResponse ¶
OffsetCommitResponse contains errors for partitions if they occur.
func (*OffsetCommitResponse) Read ¶
func (ocr *OffsetCommitResponse) Read(decoder Decoder) *DecodingError
type OffsetFetchRequest ¶
OffsetFetchRequest is used to fetch offsets for a consumer group and given topic partitions.
func NewOffsetFetchRequest ¶
func NewOffsetFetchRequest(group string) *OffsetFetchRequest
NewOffsetFetchRequest creates a new OffsetFetchRequest for a given consumer group.
func (*OffsetFetchRequest) AddOffset ¶
func (ofr *OffsetFetchRequest) AddOffset(topic string, partition int32)
AddOffset is a convenience method to add a topic partition to this OffsetFetchRequest.
func (*OffsetFetchRequest) Key ¶
func (ofr *OffsetFetchRequest) Key() int16
Key returns the Kafka API key for OffsetFetchRequest.
func (*OffsetFetchRequest) Version ¶
func (ofr *OffsetFetchRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*OffsetFetchRequest) Write ¶
func (ofr *OffsetFetchRequest) Write(encoder Encoder)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Offsets map[string]map[int32]*OffsetMetadataAndError
}
OffsetFetchResponse contains fetched offsets for each requested topic partition.
func (*OffsetFetchResponse) Read ¶
func (ofr *OffsetFetchResponse) Read(decoder Decoder) *DecodingError
type OffsetMetadataAndError ¶
OffsetMetadataAndError contains a fetched offset for a topic partition, optional metadata and an error if it occurred.
func (*OffsetMetadataAndError) Read ¶
func (ofr *OffsetMetadataAndError) Read(decoder Decoder) *DecodingError
type OffsetRequest ¶
type OffsetRequest struct {
RequestInfo map[string][]*PartitionOffsetRequestInfo
}
OffsetRequest describes the valid offset range available for a set of topic-partitions.
func (*OffsetRequest) AddPartitionOffsetRequestInfo ¶
func (or *OffsetRequest) AddPartitionOffsetRequestInfo(topic string, partition int32, time int64, maxNumOffsets int32)
AddPartitionOffsetRequestInfo is a convenience method to add a PartitionOffsetRequestInfo to this request.
func (*OffsetRequest) Key ¶
func (or *OffsetRequest) Key() int16
Key returns the Kafka API key for OffsetRequest.
func (*OffsetRequest) Version ¶
func (or *OffsetRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*OffsetRequest) Write ¶
func (or *OffsetRequest) Write(encoder Encoder)
type OffsetResponse ¶
type OffsetResponse struct {
PartitionErrorAndOffsets map[string]map[int32]*PartitionOffsetsResponse
}
OffsetResponse contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.
func (*OffsetResponse) Read ¶
func (or *OffsetResponse) Read(decoder Decoder) *DecodingError
type PartitionFetchInfo ¶
PartitionFetchInfo contains information about what partition to fetch, what offset to fetch from and the maximum bytes to include in the message set for this partition.
type PartitionMetadata ¶
type PartitionMetadata struct { Error error PartitionID int32 Leader int32 Replicas []int32 ISR []int32 }
PartitionMetadata contains information about a topic partition - its id, leader, replicas, ISRs and error if it occurred.
func (*PartitionMetadata) Read ¶
func (pm *PartitionMetadata) Read(decoder Decoder) *DecodingError
type PartitionOffsetRequestInfo ¶
PartitionOffsetRequestInfo contains partition specific configurations to fetch offsets.
type PartitionOffsetsResponse ¶
PartitionOffsetsResponse contain offsets for a single partition and an error if it occurred.
func (*PartitionOffsetsResponse) Read ¶
func (po *PartitionOffsetsResponse) Read(decoder Decoder) *DecodingError
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks int16 AckTimeoutMs int32 Data map[string]map[int32][]*MessageAndOffset }
ProduceRequest is used to send message sets to the server.
func (*ProduceRequest) AddMessage ¶
func (pr *ProduceRequest) AddMessage(topic string, partition int32, message *Message)
AddMessage is a convenience method to add a single message to be produced to a topic partition.
func (*ProduceRequest) Key ¶
func (pr *ProduceRequest) Key() int16
Key returns the Kafka API key for ProduceRequest.
func (*ProduceRequest) Version ¶
func (pr *ProduceRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*ProduceRequest) Write ¶
func (pr *ProduceRequest) Write(encoder Encoder)
type ProduceResponse ¶
type ProduceResponse struct {
Status map[string]map[int32]*ProduceResponseStatus
}
ProduceResponse contains highest assigned offsets by topic partitions and errors if they occurred.
func (*ProduceResponse) Read ¶
func (pr *ProduceResponse) Read(decoder Decoder) *DecodingError
type ProduceResponseStatus ¶
ProduceResponseStatus contains a highest assigned offset from a ProduceRequest and an error if it occurred.
type Request ¶
type Request interface { // Writes the Request to the given Encoder. Write(Encoder) // Returns the Kafka API key for this Request. Key() int16 // Returns the Kafka request version for backwards compatibility. Version() int16 }
Request is a generic interface for any request issued to Kafka. Must be able to identify and write itself.
type RequestHeader ¶
type RequestHeader struct {
// contains filtered or unexported fields
}
RequestHeader is used to decouple the message header/metadata writing from the actual message. It is able to accept a request and encode/write it according to Kafka Wire Protocol format adding the correlation id and client id to the request.
func NewRequestHeader ¶
func NewRequestHeader(correlationID int32, clientID string, request Request) *RequestHeader
NewRequestHeader creates a new RequestHeader holding the correlation id, client id and the actual request.
func (*RequestHeader) Size ¶
func (rw *RequestHeader) Size() int32
Size returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.
func (*RequestHeader) Write ¶
func (rw *RequestHeader) Write(encoder Encoder)
Write writes this RequestHeader into a given Encoder.
type Response ¶
type Response interface { // Read the Response from the given Decoder. May return a DecodingError if the response is invalid. Read(Decoder) *DecodingError }
Response is a generic interface for any response received from Kafka. Must be able to read itself.
type SizingEncoder ¶
type SizingEncoder struct {
// contains filtered or unexported fields
}
SizingEncoder is used to determine the size for []byte that will hold the actual encoded data. This is used as an optimization as it is cheaper to run once and determine the size instead of growing the slice dynamically.
func NewSizingEncoder ¶
func NewSizingEncoder() *SizingEncoder
NewSizingEncoder creates a new SizingEncoder
func (*SizingEncoder) Reserve ¶
func (se *SizingEncoder) Reserve(slice UpdatableSlice)
Reserve reserves a place for an updatable slice.
func (*SizingEncoder) Reset ¶
func (se *SizingEncoder) Reset()
Reset resets this encoders value to 0 for reusing.
func (*SizingEncoder) Size ¶
func (se *SizingEncoder) Size() int32
Size returns the size in bytes written to this encoder.
func (*SizingEncoder) UpdateReserved ¶
func (se *SizingEncoder) UpdateReserved()
UpdateReserved tells the last reserved slice to be updated with new data.
func (*SizingEncoder) WriteBytes ¶
func (se *SizingEncoder) WriteBytes(value []byte)
WriteBytes writes a []byte to this encoder.
func (*SizingEncoder) WriteInt16 ¶
func (se *SizingEncoder) WriteInt16(int16)
WriteInt16 writes an int16 to this encoder.
func (*SizingEncoder) WriteInt32 ¶
func (se *SizingEncoder) WriteInt32(int32)
WriteInt32 writes an int32 to this encoder.
func (*SizingEncoder) WriteInt64 ¶
func (se *SizingEncoder) WriteInt64(int64)
WriteInt64 writes an int64 to this encoder.
func (*SizingEncoder) WriteInt8 ¶
func (se *SizingEncoder) WriteInt8(int8)
WriteInt8 writes an int8 to this encoder.
func (*SizingEncoder) WriteString ¶
func (se *SizingEncoder) WriteString(value string)
WriteString writes a string to this encoder.
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupID string GenerationID int32 MemberID string GroupAssignment map[string][]byte }
SyncGroupRequest is used to synchronize state for all members of a group.
func (*SyncGroupRequest) Key ¶
func (*SyncGroupRequest) Key() int16
Key returns the Kafka API key for SyncGroupRequest.
func (*SyncGroupRequest) Version ¶
func (*SyncGroupRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*SyncGroupRequest) Write ¶
func (sgr *SyncGroupRequest) Write(encoder Encoder)
type SyncGroupResponse ¶
SyncGroupResponse contains information about partition distribution within a group.
func (*SyncGroupResponse) Read ¶
func (sgr *SyncGroupResponse) Read(decoder Decoder) *DecodingError
type TopicMetadata ¶
type TopicMetadata struct { Error error Topic string PartitionsMetadata []*PartitionMetadata }
TopicMetadata contains information about topic - its name, number of partitions, leaders, ISRs and errors if they occur.
func (*TopicMetadata) Read ¶
func (tm *TopicMetadata) Read(decoder Decoder) *DecodingError
type TopicMetadataRequest ¶
type TopicMetadataRequest struct {
Topics []string
}
TopicMetadataRequest is used to get topics, their partitions, leader brokers for them and where these brokers are located.
func NewMetadataRequest ¶
func NewMetadataRequest(topics []string) *TopicMetadataRequest
NewMetadataRequest creates a new MetadataRequest to fetch metadata for given topics. Passing it an empty slice will request metadata for all topics.
func (*TopicMetadataRequest) Key ¶
func (mr *TopicMetadataRequest) Key() int16
Key returns the Kafka API key for TopicMetadataRequest.
func (*TopicMetadataRequest) Version ¶
func (mr *TopicMetadataRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*TopicMetadataRequest) Write ¶
func (mr *TopicMetadataRequest) Write(encoder Encoder)
type UpdatableSlice ¶
type UpdatableSlice interface { // Returns the length to reserve for this slice. GetReserveLength() int // Set the current position within the encoder to be updated later. SetPosition(int) // Get the position within the encoder to be updated later. GetPosition() int // Update this slice. At this point all necessary data should be written to encoder. Update([]byte) }
UpdatableSlice is an interface that is used when the encoder has to write the value based on bytes that are not yet written (e.g. calculate the CRC of the message).
Source Files ¶
- brokers.go
- client.go
- consumer_metadata.go
- decoder.go
- describe_groups.go
- encoder.go
- errors.go
- fetch.go
- heartbeat.go
- join_group.go
- leave_group.go
- list_groups.go
- messages.go
- metadata.go
- offset.go
- offset_commit.go
- offset_fetch.go
- produce.go
- request_response.go
- snappy.go
- sync_group.go
- topic_metadata.go
- util.go