Documentation ¶
Index ¶
- Constants
- Variables
- func Critical(tag interface{}, message interface{})
- func Criticalf(tag interface{}, message interface{}, params ...interface{})
- func Debug(tag interface{}, message interface{})
- func Debugf(tag interface{}, message interface{}, params ...interface{})
- func Error(tag interface{}, message interface{})
- func Errorf(tag interface{}, message interface{}, params ...interface{})
- func Info(tag interface{}, message interface{})
- func Infof(tag interface{}, message interface{}, params ...interface{})
- func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)
- func Trace(tag interface{}, message interface{})
- func Tracef(tag interface{}, message interface{}, params ...interface{})
- func Warn(tag interface{}, message interface{})
- func Warnf(tag interface{}, message interface{}, params ...interface{})
- type BinaryDecoder
- func (bd *BinaryDecoder) GetBytes() ([]byte, error)
- func (bd *BinaryDecoder) GetInt16() (int16, error)
- func (bd *BinaryDecoder) GetInt32() (int32, error)
- func (bd *BinaryDecoder) GetInt64() (int64, error)
- func (bd *BinaryDecoder) GetInt8() (int8, error)
- func (bd *BinaryDecoder) GetString() (string, error)
- func (bd *BinaryDecoder) Remaining() int
- type BinaryEncoder
- func (be *BinaryEncoder) Reserve(slice UpdatableSlice)
- func (be *BinaryEncoder) Size() int32
- func (be *BinaryEncoder) UpdateReserved()
- func (be *BinaryEncoder) WriteBytes(value []byte)
- func (be *BinaryEncoder) WriteInt16(value int16)
- func (be *BinaryEncoder) WriteInt32(value int32)
- func (be *BinaryEncoder) WriteInt64(value int64)
- func (be *BinaryEncoder) WriteInt8(value int8)
- func (be *BinaryEncoder) WriteString(value string)
- type Broker
- type BrokerConnection
- type Brokers
- type CompressionCodec
- type Connector
- type ConnectorConfig
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type CorrelationIDGenerator
- type CrcSlice
- type Decoder
- type DecodingError
- type DefaultConnector
- func (dc *DefaultConnector) Close() <-chan bool
- func (dc *DefaultConnector) CommitOffset(group string, topic string, partition int32, offset int64) error
- func (dc *DefaultConnector) Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)
- func (dc *DefaultConnector) GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)
- func (dc *DefaultConnector) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)
- func (dc *DefaultConnector) GetLeader(topic string, partition int32) (*BrokerConnection, error)
- func (dc *DefaultConnector) GetOffset(group string, topic string, partition int32) (int64, error)
- func (dc *DefaultConnector) GetTopicMetadata(topics []string) (*MetadataResponse, error)
- func (dc *DefaultConnector) Metadata() *Metadata
- func (dc *DefaultConnector) String() string
- type DefaultLogger
- func (dl *DefaultLogger) Critical(message string, params ...interface{})
- func (dl *DefaultLogger) Debug(message string, params ...interface{})
- func (dl *DefaultLogger) Error(message string, params ...interface{})
- func (dl *DefaultLogger) Info(message string, params ...interface{})
- func (dl *DefaultLogger) Trace(message string, params ...interface{})
- func (dl *DefaultLogger) Warn(message string, params ...interface{})
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type Encoder
- type FetchRequest
- type FetchResponse
- type FetchResponsePartitionData
- type GroupDescription
- type GroupMemberDescription
- type GroupProtocol
- type HeartbeatRequest
- type HeartbeatResponse
- type Int32Slice
- type JoinGroupRequest
- type JoinGroupResponse
- type KafkaLogger
- type LeaveGroupRequest
- type LeaveGroupResponse
- type LengthSlice
- type ListGroupsRequest
- type ListGroupsResponse
- type LogLevel
- type Message
- type MessageAndMetadata
- type MessageAndOffset
- type Metadata
- func (m *Metadata) Invalidate(topic string)
- func (m *Metadata) Leader(topic string, partition int32) (int32, error)
- func (m *Metadata) OffsetCoordinator(group string) (*BrokerConnection, error)
- func (m *Metadata) PartitionsFor(topic string) ([]int32, error)
- func (m *Metadata) Refresh(topics []string) error
- func (m *Metadata) TopicMetadata(topic string) (map[int32]int32, error)
- type MetadataResponse
- type OffsetAndMetadata
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetMetadataAndError
- type OffsetRequest
- type OffsetResponse
- type PartitionFetchInfo
- type PartitionMetadata
- type PartitionOffsetRequestInfo
- type PartitionOffsetsResponse
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseStatus
- type Request
- type RequestHeader
- type Response
- type SizingEncoder
- func (se *SizingEncoder) Reserve(slice UpdatableSlice)
- func (se *SizingEncoder) Size() int32
- func (se *SizingEncoder) UpdateReserved()
- func (se *SizingEncoder) WriteBytes(value []byte)
- func (se *SizingEncoder) WriteInt16(int16)
- func (se *SizingEncoder) WriteInt32(int32)
- func (se *SizingEncoder) WriteInt64(int64)
- func (se *SizingEncoder) WriteInt8(int8)
- func (se *SizingEncoder) WriteString(value string)
- type SyncGroupRequest
- type SyncGroupResponse
- type TopicMetadata
- type TopicMetadataRequest
- type UpdatableSlice
Constants ¶
const EarliestTime int64 = -2
EarliestTime is a value used to request for the earliest available offset.
const InvalidOffset int64 = -1
InvalidOffset is a constant that is used to denote an invalid or uninitialized offset.
const LatestTime int64 = -1
LatestTime is a value used to request for the latest offset (i.e. the offset of the next coming message).
Variables ¶
var BrokerErrors = map[int16]error{ -1: ErrUnknown, 0: ErrNoError, 1: ErrOffsetOutOfRange, 2: ErrInvalidMessage, 3: ErrUnknownTopicOrPartition, 4: ErrInvalidMessageSize, 5: ErrLeaderNotAvailable, 6: ErrNotLeaderForPartition, 7: ErrRequestTimedOut, 8: ErrBrokerNotAvailable, 9: ErrReplicaNotAvailable, 10: ErrMessageSizeTooLarge, 11: ErrStaleControllerEpochCode, 12: ErrOffsetMetadataTooLargeCode, 14: ErrOffsetsLoadInProgressCode, 15: ErrConsumerCoordinatorNotAvailableCode, 16: ErrNotCoordinatorForConsumerCode, 17: ErrInvalidTopicCode, 18: ErrRecordListTooLarge, 19: ErrNotEnoughReplicas, 20: ErrNotEnoughReplicasAfterAppend, 21: ErrInvalidRequiredAcks, 22: ErrIllegalGeneration, 23: ErrInconsistentGroupProtocol, 24: ErrInvalidGroupID, 25: ErrUnknownMemberID, 26: ErrInvalidSessionTimeout, 27: ErrRebalanceInProgress, 28: ErrInvalidCommitOffsetSize, 29: ErrTopicAuthorizationFailed, 30: ErrGroupAuthorizationFailed, 31: ErrClusterAuthorizationFailed, }
Mapping between Kafka error codes and actual error messages.
var ErrBrokerNotAvailable = errors.New("Broker is likely not alive.")
A mapping for Kafka error code 8.
var ErrClusterAuthorizationFailed = errors.New("The client is not authorized to use an inter-broker or administrative API.")
A mapping for Kafka error code 31
var ErrConsumerCoordinatorNotAvailableCode = errors.New("Offsets topic has not yet been created.")
A mapping for Kafka error code 15.
var ErrEOF = errors.New("End of file reached")
Signals that an end of file or stream has been reached unexpectedly.
var ErrFailedToGetLeader = errors.New("Failed to get leader.")
ErrFailedToGetLeader happens when TopicMetadataResponse does not contain leader metadata for requested topic and partition.
var ErrFailedToGetMetadata = errors.New("Failed to get topic metadata.")
ErrFailedToGetMetadata happens when TopicMetadataResponse does not contain metadata for requested topic.
var ErrGroupAuthorizationFailed = errors.New("The client is not authorized to access a particular groupId.")
A mapping for Kafka error code 30
var ErrIllegalGeneration = errors.New("The generation id provided in the request is not the current generation.")
A mapping for Kafka error code 22
var ErrInconsistentGroupProtocol = errors.New("Provided protocol type or set of protocols is not compatible with the current group.")
A mapping for Kafka error code 23
var ErrInvalidCommitOffsetSize = errors.New("Offset commit was rejected because of oversize metadata.")
A mapping for Kafka error code 28
var ErrInvalidGroupID = errors.New("The groupId is empty or null.")
A mapping for Kafka error code 24
var ErrInvalidMessage = errors.New("Message contents does not match its CRC")
A mapping for Kafka error code 2.
var ErrInvalidMessageSize = errors.New("The message has a negative size")
A mapping for Kafka error code 4.
var ErrInvalidRequiredAcks = errors.New("The requested requiredAcks is invalid.")
A mapping for Kafka error code 21
var ErrInvalidSessionTimeout = errors.New("The requested session timeout is outside of the allowed range on the broker.")
A mapping for Kafka error code 26
var ErrInvalidTopicCode = errors.New("Attempt to access an invalid topic.")
A mapping for Kafka error code 17
var ErrLeaderNotAvailable = errors.New("In the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.")
A mapping for Kafka error code 5.
var ErrMessageSizeTooLarge = errors.New("You've just attempted to produce a message of size larger than broker is allowed to accept.")
A mapping for Kafka error code 10.
var ErrNoDataToUncompress = errors.New("No data to uncompress")
Happens when a compressed message is empty.
var ErrNoError = errors.New("No error - it worked!")
A mapping for Kafka error code 0.
var ErrNotCoordinatorForConsumerCode = errors.New("There is no coordinator for this consumer.")
A mapping for Kafka error code 16.
var ErrNotEnoughReplicas = errors.New("The number of in-sync replicas is lower than the configured minimum.")
A mapping for Kafka error code 19
var ErrNotEnoughReplicasAfterAppend = errors.New("The message was written to the log, but with fewer in-sync replicas than required.")
A mapping for Kafka error code 20
var ErrNotLeaderForPartition = errors.New("You've just attempted to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.")
A mapping for Kafka error code 6.
var ErrOffsetMetadataTooLargeCode = errors.New("You've jsut specified a string larger than configured maximum for offset metadata.")
A mapping for Kafka error code 12.
var ErrOffsetOutOfRange = errors.New("The requested offset is outside the range of offsets maintained by the server for the given topic/partition.")
A mapping for Kafka error code 1.
var ErrOffsetsLoadInProgressCode = errors.New("Offset loading is in progress. (Usually happens after a leader change for that offsets topic partition).")
A mapping for Kafka error code 14.
var ErrRebalanceInProgress = errors.New("The coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group.")
A mapping for Kafka error code 27
var ErrRecordListTooLarge = errors.New("Message batch exceeds the maximum configured segment size.")
A mapping for Kafka error code 18
var ErrReplicaNotAvailable = errors.New("Replica is expected on a broker, but is not (this can be safely ignored).")
A mapping for Kafka error code 9.
var ErrRequestTimedOut = errors.New("Request exceeds the user-specified time limit in the request.")
A mapping for Kafka error code 7.
var ErrStaleControllerEpochCode = errors.New("Broker-to-broker communication fault.")
A mapping for Kafka error code 11.
var ErrTopicAuthorizationFailed = errors.New("The client is not authorized to access the requested topic.")
A mapping for Kafka error code 29
var ErrUnknown = errors.New("An unexpected server error")
A mapping for Kafka error code -1.
var ErrUnknownMemberID = errors.New("The memberId is not in the current generation.")
A mapping for Kafka error code 25
var ErrUnknownTopicOrPartition = errors.New("This request is for a topic or partition that does not exist on this broker.")
A mapping for Kafka error code 3.
Functions ¶
func Critical ¶
func Critical(tag interface{}, message interface{})
Critical writes a given message with a given tag to log with level Critical.
func Criticalf ¶
func Criticalf(tag interface{}, message interface{}, params ...interface{})
Criticalf formats a given message according to given params with a given tag to log with level Critical.
func Debug ¶
func Debug(tag interface{}, message interface{})
Debug writes a given message with a given tag to log with level Debug.
func Debugf ¶
func Debugf(tag interface{}, message interface{}, params ...interface{})
Debugf formats a given message according to given params with a given tag to log with level Debug.
func Error ¶
func Error(tag interface{}, message interface{})
Error writes a given message with a given tag to log with level Error.
func Errorf ¶
func Errorf(tag interface{}, message interface{}, params ...interface{})
Errorf formats a given message according to given params with a given tag to log with level Error.
func Info ¶
func Info(tag interface{}, message interface{})
Info writes a given message with a given tag to log with level Info.
func Infof ¶
func Infof(tag interface{}, message interface{}, params ...interface{})
Infof formats a given message according to given params with a given tag to log with level Info.
func ReadMessageSet ¶
func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)
ReadMessageSet decodes a nested message set if the MessageAndOffset is compressed.
func Trace ¶
func Trace(tag interface{}, message interface{})
Trace writes a given message with a given tag to log with level Trace.
func Tracef ¶
func Tracef(tag interface{}, message interface{}, params ...interface{})
Tracef formats a given message according to given params with a given tag to log with level Trace.
Types ¶
type BinaryDecoder ¶
type BinaryDecoder struct {
// contains filtered or unexported fields
}
BinaryDecoder implements Decoder and is able to decode a Kafka wire protocol message into actual data.
func NewBinaryDecoder ¶
func NewBinaryDecoder(raw []byte) *BinaryDecoder
NewBinaryDecoder creates a new BinaryDecoder that will decode a given []byte.
func (*BinaryDecoder) GetBytes ¶
func (bd *BinaryDecoder) GetBytes() ([]byte, error)
GetBytes gets a []byte from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt16 ¶
func (bd *BinaryDecoder) GetInt16() (int16, error)
GetInt16 gets an int16 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt32 ¶
func (bd *BinaryDecoder) GetInt32() (int32, error)
GetInt32 gets an int32 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt64 ¶
func (bd *BinaryDecoder) GetInt64() (int64, error)
GetInt64 gets an int64 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetInt8 ¶
func (bd *BinaryDecoder) GetInt8() (int8, error)
GetInt8 gets an int8 from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) GetString ¶
func (bd *BinaryDecoder) GetString() (string, error)
GetString gets a string from this decoder. Returns EOF if end of stream is reached.
func (*BinaryDecoder) Remaining ¶
func (bd *BinaryDecoder) Remaining() int
Remaining tells how many bytes left unread in this decoder.
type BinaryEncoder ¶
type BinaryEncoder struct {
// contains filtered or unexported fields
}
BinaryEncoder implements Decoder and is able to encode actual data into a Kafka wire protocol byte sequence.
func NewBinaryEncoder ¶
func NewBinaryEncoder(buffer []byte) *BinaryEncoder
NewBinaryEncoder creates a new BinaryEncoder that will write into a given []byte.
func (*BinaryEncoder) Reserve ¶
func (be *BinaryEncoder) Reserve(slice UpdatableSlice)
Reserve reserves a place for an updatable slice.
func (*BinaryEncoder) Size ¶
func (be *BinaryEncoder) Size() int32
Size returns the size in bytes written to this encoder.
func (*BinaryEncoder) UpdateReserved ¶
func (be *BinaryEncoder) UpdateReserved()
UpdateReserved tells the last reserved slice to be updated with new data.
func (*BinaryEncoder) WriteBytes ¶
func (be *BinaryEncoder) WriteBytes(value []byte)
WriteBytes writes a []byte to this encoder.
func (*BinaryEncoder) WriteInt16 ¶
func (be *BinaryEncoder) WriteInt16(value int16)
WriteInt16 writes an int16 to this encoder.
func (*BinaryEncoder) WriteInt32 ¶
func (be *BinaryEncoder) WriteInt32(value int32)
WriteInt32 writes an int32 to this encoder.
func (*BinaryEncoder) WriteInt64 ¶
func (be *BinaryEncoder) WriteInt64(value int64)
WriteInt64 writes an int64 to this encoder.
func (*BinaryEncoder) WriteInt8 ¶
func (be *BinaryEncoder) WriteInt8(value int8)
WriteInt8 writes an int8 to this encoder.
func (*BinaryEncoder) WriteString ¶
func (be *BinaryEncoder) WriteString(value string)
WriteString writes a string to this encoder.
type Broker ¶
Broker contains information about a Kafka broker in cluster - its ID, host name and port.
func (*Broker) Read ¶
func (n *Broker) Read(decoder Decoder) *DecodingError
type BrokerConnection ¶
type BrokerConnection struct {
// contains filtered or unexported fields
}
func NewBrokerConnection ¶
func NewBrokerConnection(broker *Broker, keepAliveTimeout time.Duration) *BrokerConnection
func (*BrokerConnection) GetConnection ¶
func (bc *BrokerConnection) GetConnection() (*net.TCPConn, error)
func (*BrokerConnection) ReleaseConnection ¶
func (bc *BrokerConnection) ReleaseConnection(conn *net.TCPConn)
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
func NewBrokers ¶
func (*Brokers) Get ¶
func (b *Brokers) Get(id int32) *BrokerConnection
func (*Brokers) GetAll ¶
func (b *Brokers) GetAll() []*BrokerConnection
func (*Brokers) NextCorrelationID ¶
type CompressionCodec ¶
type CompressionCodec int
CompressionCodec is a compression codec id used to distinguish various compression types.
const ( // CompressionNone is a compression codec id for uncompressed data. CompressionNone CompressionCodec = 0 // CompressionGZIP is a compression codec id for GZIP compression. CompressionGZIP CompressionCodec = 1 // CompressionSnappy is a compression codec id for Snappy compression. CompressionSnappy CompressionCodec = 2 // CompressionLZ4 is a compression codec id for LZ4 compression. CompressionLZ4 CompressionCodec = 3 )
type Connector ¶
type Connector interface { // GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. // Passing it an empty topic list will retrieve metadata for all topics in a cluster. GetTopicMetadata(topics []string) (*MetadataResponse, error) // GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time. // More on offset time here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error) // Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset. Fetch(topic string, partition int32, offset int64) (*FetchResponse, error) // GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API. GetOffset(group string, topic string, partition int32) (int64, error) // CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API. CommitOffset(group string, topic string, partition int32, offset int64) error GetLeader(topic string, partition int32) (*BrokerConnection, error) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error) // Metadata returns a structure that holds all topic and broker metadata. Metadata() *Metadata // Tells the Connector to close all existing connections and stop. // This method is NOT blocking but returns a channel which will get a single value once the closing is finished. Close() <-chan bool }
Connector is an interface that should provide ways to clearly interact with Kafka cluster and hide all broker management stuff from user.
type ConnectorConfig ¶
type ConnectorConfig struct { // BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required. BrokerList []string // ReadTimeout is a timeout to read the response from a TCP socket. ReadTimeout time.Duration // WriteTimeout is a timeout to write the request to a TCP socket. WriteTimeout time.Duration // ConnectTimeout is a timeout to connect to a TCP socket. ConnectTimeout time.Duration // Sets whether the connection should be kept alive. KeepAlive bool // A keep alive period for a TCP connection. KeepAliveTimeout time.Duration // Maximum number of open connections for a connector. MaxConnections int // Maximum number of open connections for a single broker for a connector. MaxConnectionsPerBroker int // Maximum fetch size in bytes which will be used in all Consume() calls. FetchSize int32 // The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block FetchMinBytes int32 // The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes FetchMaxWaitTime int32 // Number of retries to get topic metadata. MetadataRetries int // Backoff value between topic metadata requests. MetadataBackoff time.Duration // MetadataTTL is how long topic metadata is considered valid. Used to refresh metadata from time to time even if no leader changes occurred. MetadataTTL time.Duration // Number of retries to commit an offset. CommitOffsetRetries int // Backoff value between commit offset requests. CommitOffsetBackoff time.Duration // Number of retries to get consumer metadata. ConsumerMetadataRetries int // Backoff value between consumer metadata requests. ConsumerMetadataBackoff time.Duration // ClientID that will be used by a connector to identify client requests by broker. ClientID string }
ConnectorConfig is used to pass multiple configuration values for a Connector
func NewConnectorConfig ¶
func NewConnectorConfig() *ConnectorConfig
NewConnectorConfig returns a new ConnectorConfig with sane defaults.
func (*ConnectorConfig) Validate ¶
func (cc *ConnectorConfig) Validate() error
Validate validates this ConnectorConfig. Returns a corresponding error if the ConnectorConfig is invalid and nil otherwise.
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
Group string
}
ConsumerMetadataRequest is used to discover the current offset coordinator to issue its offset commit and fetch requests.
func NewConsumerMetadataRequest ¶
func NewConsumerMetadataRequest(group string) *ConsumerMetadataRequest
NewConsumerMetadataRequest creates a new ConsumerMetadataRequest for a given consumer group.
func (*ConsumerMetadataRequest) Key ¶
func (cmr *ConsumerMetadataRequest) Key() int16
Key returns the Kafka API key for ConsumerMetadataRequest.
func (*ConsumerMetadataRequest) Version ¶
func (cmr *ConsumerMetadataRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*ConsumerMetadataRequest) Write ¶
func (cmr *ConsumerMetadataRequest) Write(encoder Encoder)
Write writes the ConsumerMetadataRequest to the given Encoder.
type ConsumerMetadataResponse ¶
ConsumerMetadataResponse contains information about the current offset coordinator and error if it occurred.
func (*ConsumerMetadataResponse) Read ¶
func (cmr *ConsumerMetadataResponse) Read(decoder Decoder) *DecodingError
type CorrelationIDGenerator ¶
type CorrelationIDGenerator struct {
// contains filtered or unexported fields
}
func (*CorrelationIDGenerator) NextCorrelationID ¶
func (c *CorrelationIDGenerator) NextCorrelationID() int32
type CrcSlice ¶
type CrcSlice struct {
// contains filtered or unexported fields
}
CrcSlice is used to calculate the CRC32 value of the message.
func (*CrcSlice) GetPosition ¶
GetPosition gets the position within the encoder to be updated later.
func (*CrcSlice) GetReserveLength ¶
GetReserveLength returns the length to reserve for this slice.
func (*CrcSlice) SetPosition ¶
SetPosition sets the current position within the encoder to be updated later.
type Decoder ¶
type Decoder interface { // Gets an int8 from this decoder. Returns EOF if end of stream is reached. GetInt8() (int8, error) // Gets an int16 from this decoder. Returns EOF if end of stream is reached. GetInt16() (int16, error) // Gets an int32 from this decoder. Returns EOF if end of stream is reached. GetInt32() (int32, error) // Gets an int64 from this decoder. Returns EOF if end of stream is reached. GetInt64() (int64, error) // Gets a []byte from this decoder. Returns EOF if end of stream is reached. GetBytes() ([]byte, error) // Gets a string from this decoder. Returns EOF if end of stream is reached. GetString() (string, error) // Tells how many bytes left unread in this decoder. Remaining() int }
Decoder is able to decode a Kafka wire protocol message into actual data.
type DecodingError ¶
type DecodingError struct {
// contains filtered or unexported fields
}
DecodingError is an error that also holds the information about why it happened.
func NewDecodingError ¶
func NewDecodingError(err error, reason string) *DecodingError
NewDecodingError creates a new DecodingError with a given error message and reason.
func (*DecodingError) Error ¶
func (de *DecodingError) Error() error
Error returns the error message for this DecodingError.
func (*DecodingError) Reason ¶
func (de *DecodingError) Reason() string
Reason returns the reason for this DecodingError.
type DefaultConnector ¶
type DefaultConnector struct {
// contains filtered or unexported fields
}
DefaultConnector is a default (and only one for now) Connector implementation for Siesta library.
func NewDefaultConnector ¶
func NewDefaultConnector(config *ConnectorConfig) (*DefaultConnector, error)
NewDefaultConnector creates a new DefaultConnector with a given ConnectorConfig. May return an error if the passed config is invalid.
func (*DefaultConnector) Close ¶
func (dc *DefaultConnector) Close() <-chan bool
Close tells the Connector to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
func (*DefaultConnector) CommitOffset ¶
func (dc *DefaultConnector) CommitOffset(group string, topic string, partition int32, offset int64) error
CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API.
func (*DefaultConnector) Fetch ¶
func (dc *DefaultConnector) Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)
Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset.
func (*DefaultConnector) GetAvailableOffset ¶
func (dc *DefaultConnector) GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)
GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
func (*DefaultConnector) GetConsumerMetadata ¶
func (dc *DefaultConnector) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)
func (*DefaultConnector) GetLeader ¶
func (dc *DefaultConnector) GetLeader(topic string, partition int32) (*BrokerConnection, error)
func (*DefaultConnector) GetOffset ¶
GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API.
func (*DefaultConnector) GetTopicMetadata ¶
func (dc *DefaultConnector) GetTopicMetadata(topics []string) (*MetadataResponse, error)
GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. Passing it an empty topic list will retrieve metadata for all topics in a cluster.
func (*DefaultConnector) Metadata ¶
func (dc *DefaultConnector) Metadata() *Metadata
func (*DefaultConnector) String ¶
func (dc *DefaultConnector) String() string
Returns a string representation of this DefaultConnector.
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
DefaultLogger is a default implementation of KafkaLogger interface used in this client.
func NewDefaultLogger ¶
func NewDefaultLogger(Level LogLevel) *DefaultLogger
NewDefaultLogger creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.
func (*DefaultLogger) Critical ¶
func (dl *DefaultLogger) Critical(message string, params ...interface{})
Critical formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Debug ¶
func (dl *DefaultLogger) Debug(message string, params ...interface{})
Debug formats a given message according to given params to log with level Debug.
func (*DefaultLogger) Error ¶
func (dl *DefaultLogger) Error(message string, params ...interface{})
Error formats a given message according to given params to log with level Error.
func (*DefaultLogger) Info ¶
func (dl *DefaultLogger) Info(message string, params ...interface{})
Info formats a given message according to given params to log with level Info.
func (*DefaultLogger) Trace ¶
func (dl *DefaultLogger) Trace(message string, params ...interface{})
Trace formats a given message according to given params to log with level Trace.
func (*DefaultLogger) Warn ¶
func (dl *DefaultLogger) Warn(message string, params ...interface{})
Warn formats a given message according to given params to log with level Warn.
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct {
Groups []string
}
func (*DescribeGroupsRequest) Key ¶
func (*DescribeGroupsRequest) Key() int16
Key returns the Kafka API key for DescribeGroupsRequest.
func (*DescribeGroupsRequest) Version ¶
func (*DescribeGroupsRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*DescribeGroupsRequest) Write ¶
func (dgr *DescribeGroupsRequest) Write(encoder Encoder)
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct {
Groups []*GroupDescription
}
func (*DescribeGroupsResponse) Read ¶
func (dgr *DescribeGroupsResponse) Read(decoder Decoder) *DecodingError
type Encoder ¶
type Encoder interface { // Writes an int8 to this encoder. WriteInt8(int8) // Writes an int16 to this encoder. WriteInt16(int16) // Writes an int32 to this encoder. WriteInt32(int32) // Writes an int64 to this encoder. WriteInt64(int64) // Writes a []byte to this encoder. WriteBytes([]byte) // Writes a string to this encoder. WriteString(string) // Returns the size in bytes written to this encoder. Size() int32 // Reserves a place for an updatable slice. // This is used as an optimization for length and crc fields. // The encoder reserves a place for this data and updates it later instead of pre-calculating it and doing redundant work. Reserve(UpdatableSlice) // Tells the last reserved slice to be updated with new data. UpdateReserved() }
Encoder is able to encode actual data into a Kafka wire protocol byte sequence.
type FetchRequest ¶
type FetchRequest struct { MaxWait int32 MinBytes int32 RequestInfo map[string][]*PartitionFetchInfo }
FetchRequest is used to fetch a chunk of one or more logs for some topic-partitions.
func (*FetchRequest) AddFetch ¶
func (fr *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32)
AddFetch is a convenience method to add a PartitionFetchInfo.
func (*FetchRequest) Key ¶
func (fr *FetchRequest) Key() int16
Key returns the Kafka API key for FetchRequest.
func (*FetchRequest) Version ¶
func (fr *FetchRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*FetchRequest) Write ¶
func (fr *FetchRequest) Write(encoder Encoder)
Write writes the FetchRequest to the given Encoder.
type FetchResponse ¶
type FetchResponse struct {
Data map[string]map[int32]*FetchResponsePartitionData
}
FetchResponse contains FetchResponseData for all requested topics and partitions.
func (*FetchResponse) CollectMessages ¶
func (fr *FetchResponse) CollectMessages(collector func(topic string, partition int32, offset int64, key []byte, value []byte) error) error
CollectMessages traverses this FetchResponse and applies a collector function to each message giving the possibility to avoid response -> siesta.Message -> other.Message conversion if necessary.
func (*FetchResponse) Error ¶
func (fr *FetchResponse) Error(topic string, partition int32) error
Error returns the error message for a given topic and pertion of this FetchResponse
func (*FetchResponse) GetMessages ¶
func (fr *FetchResponse) GetMessages() ([]*MessageAndMetadata, error)
GetMessages traverses this FetchResponse and collects all messages. Returns an error if FetchResponse contains one. Messages should be ordered by offset.
func (*FetchResponse) Read ¶
func (fr *FetchResponse) Read(decoder Decoder) *DecodingError
type FetchResponsePartitionData ¶
type FetchResponsePartitionData struct { Error error HighwaterMarkOffset int64 Messages []*MessageAndOffset }
FetchResponsePartitionData contains fetched messages for a single partition, the offset at the end of the log for this partition and an error code.
func (*FetchResponsePartitionData) Read ¶
func (frd *FetchResponsePartitionData) Read(decoder Decoder) *DecodingError
type GroupDescription ¶
type GroupDescription struct { Error error GroupID string State string ProtocolType string Protocol string Members []*GroupMemberDescription }
func (*GroupDescription) Read ¶
func (gd *GroupDescription) Read(decoder Decoder) *DecodingError
type GroupMemberDescription ¶
type GroupMemberDescription struct { MemberID string ClientID string ClientHost string MemberMetadata []byte MemberAssignment []byte }
func (*GroupMemberDescription) Read ¶
func (gmd *GroupMemberDescription) Read(decoder Decoder) *DecodingError
type GroupProtocol ¶
type HeartbeatRequest ¶
func (*HeartbeatRequest) Key ¶
func (*HeartbeatRequest) Key() int16
Key returns the Kafka API key for HeartbeatRequest.
func (*HeartbeatRequest) Version ¶
func (*HeartbeatRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*HeartbeatRequest) Write ¶
func (hr *HeartbeatRequest) Write(encoder Encoder)
type HeartbeatResponse ¶
type HeartbeatResponse struct {
Error error
}
func (*HeartbeatResponse) Read ¶
func (hr *HeartbeatResponse) Read(decoder Decoder) *DecodingError
type Int32Slice ¶
type Int32Slice []int32
func (Int32Slice) Len ¶
func (p Int32Slice) Len() int
func (Int32Slice) Less ¶
func (p Int32Slice) Less(i, j int) bool
func (Int32Slice) Swap ¶
func (p Int32Slice) Swap(i, j int)
type JoinGroupRequest ¶
type JoinGroupRequest struct { GroupID string SessionTimeout int32 MemberID string ProtocolType string GroupProtocols []*GroupProtocol }
func (*JoinGroupRequest) Key ¶
func (jgr *JoinGroupRequest) Key() int16
Key returns the Kafka API key for JoinGroupRequest.
func (*JoinGroupRequest) Version ¶
func (jgr *JoinGroupRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*JoinGroupRequest) Write ¶
func (jgr *JoinGroupRequest) Write(encoder Encoder)
type JoinGroupResponse ¶
type JoinGroupResponse struct { Error error GenerationID int32 GroupProtocol string LeaderID string MemberID string Members map[string][]byte }
func (*JoinGroupResponse) Read ¶
func (jgr *JoinGroupResponse) Read(decoder Decoder) *DecodingError
type KafkaLogger ¶
type KafkaLogger interface { //Formats a given message according to given params to log with level Trace. Trace(message string, params ...interface{}) //Formats a given message according to given params to log with level Debug. Debug(message string, params ...interface{}) //Formats a given message according to given params to log with level Info. Info(message string, params ...interface{}) //Formats a given message according to given params to log with level Warn. Warn(message string, params ...interface{}) //Formats a given message according to given params to log with level Error. Error(message string, params ...interface{}) //Formats a given message according to given params to log with level Critical. Critical(message string, params ...interface{}) }
KafkaLogger is a logger interface. Lets you plug-in your custom logging library instead of using built-in one.
var Logger KafkaLogger = NewDefaultLogger(InfoLevel)
Logger used by this client. Defaults to build-in logger with Info log level.
type LeaveGroupRequest ¶
func (*LeaveGroupRequest) Key ¶
func (*LeaveGroupRequest) Key() int16
Key returns the Kafka API key for LeaveGroupRequest.
func (*LeaveGroupRequest) Version ¶
func (*LeaveGroupRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*LeaveGroupRequest) Write ¶
func (lgr *LeaveGroupRequest) Write(encoder Encoder)
type LeaveGroupResponse ¶
type LeaveGroupResponse struct {
Error error
}
func (*LeaveGroupResponse) Read ¶
func (lgr *LeaveGroupResponse) Read(decoder Decoder) *DecodingError
type LengthSlice ¶
type LengthSlice struct {
// contains filtered or unexported fields
}
LengthSlice is used to determine the length of upcoming message.
func (*LengthSlice) GetPosition ¶
func (ls *LengthSlice) GetPosition() int
GetPosition gets the position within the encoder to be updated later.
func (*LengthSlice) GetReserveLength ¶
func (ls *LengthSlice) GetReserveLength() int
GetReserveLength returns the length to reserve for this slice.
func (*LengthSlice) SetPosition ¶
func (ls *LengthSlice) SetPosition(pos int)
SetPosition sets the current position within the encoder to be updated later.
func (*LengthSlice) Update ¶
func (ls *LengthSlice) Update(slice []byte)
Update this slice. At this point all necessary data should be written to encoder.
type ListGroupsRequest ¶
type ListGroupsRequest struct{}
func (*ListGroupsRequest) Key ¶
func (*ListGroupsRequest) Key() int16
Key returns the Kafka API key for ListGroupsRequest.
func (*ListGroupsRequest) Version ¶
func (*ListGroupsRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*ListGroupsRequest) Write ¶
func (*ListGroupsRequest) Write(encoder Encoder)
type ListGroupsResponse ¶
func (*ListGroupsResponse) Read ¶
func (lgr *ListGroupsResponse) Read(decoder Decoder) *DecodingError
type LogLevel ¶
type LogLevel string
LogLevel represents a logging level.
const ( // TraceLevel is used for debugging to find problems in functions, variables etc. TraceLevel LogLevel = "trace" // DebugLevel is used for detailed system reports and diagnostic messages. DebugLevel LogLevel = "debug" // InfoLevel is used for general information about a running application. InfoLevel LogLevel = "info" // WarnLevel is used to indicate small errors and failures that should not happen normally but are recovered automatically. WarnLevel LogLevel = "warn" // ErrorLevel is used to indicate severe errors that affect application workflow and are not handled automatically. ErrorLevel LogLevel = "error" // CriticalLevel is used to indicate fatal errors that may cause data corruption or loss. CriticalLevel LogLevel = "critical" )
type Message ¶
type Message struct { Crc int32 MagicByte int8 Attributes int8 Key []byte Value []byte Nested []*MessageAndOffset }
Message contains a single message and its metadata or a nested message set if compression is used.
func (*Message) Read ¶
func (md *Message) Read(decoder Decoder) *DecodingError
type MessageAndMetadata ¶
type MessageAndMetadata struct { Topic string Partition int32 Offset int64 Key []byte Value []byte }
MessageAndMetadata is a single message and its metadata.
type MessageAndOffset ¶
MessageAndOffset is a single message or a message set (if it is compressed) with its offset value.
func (*MessageAndOffset) Read ¶
func (mo *MessageAndOffset) Read(decoder Decoder) *DecodingError
func (*MessageAndOffset) Write ¶
func (mo *MessageAndOffset) Write(encoder Encoder)
type Metadata ¶
type Metadata struct { Brokers *Brokers // contains filtered or unexported fields }
func NewMetadata ¶
func (*Metadata) Invalidate ¶
func (*Metadata) OffsetCoordinator ¶
func (m *Metadata) OffsetCoordinator(group string) (*BrokerConnection, error)
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker TopicsMetadata []*TopicMetadata }
MetadataResponse contains information about brokers in cluster and topics that exist.
func (*MetadataResponse) Read ¶
func (tmr *MetadataResponse) Read(decoder Decoder) *DecodingError
type OffsetAndMetadata ¶
OffsetAndMetadata contains offset for a partition and optional metadata.
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { GroupID string RequestInfo map[string]map[int32]*OffsetAndMetadata }
OffsetCommitRequest is used to commit offsets for a group/topic/partition.
func NewOffsetCommitRequest ¶
func NewOffsetCommitRequest(group string) *OffsetCommitRequest
NewOffsetCommitRequest creates a new OffsetCommitRequest for a given consumer group.
func (*OffsetCommitRequest) AddOffset ¶
func (ocr *OffsetCommitRequest) AddOffset(topic string, partition int32, offset int64, timestamp int64, metadata string)
AddOffset is a convenience method to add an offset for a topic partition.
func (*OffsetCommitRequest) Key ¶
func (ocr *OffsetCommitRequest) Key() int16
Key returns the Kafka API key for OffsetCommitRequest.
func (*OffsetCommitRequest) Version ¶
func (ocr *OffsetCommitRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*OffsetCommitRequest) Write ¶
func (ocr *OffsetCommitRequest) Write(encoder Encoder)
type OffsetCommitResponse ¶
OffsetCommitResponse contains errors for partitions if they occur.
func (*OffsetCommitResponse) Read ¶
func (ocr *OffsetCommitResponse) Read(decoder Decoder) *DecodingError
type OffsetFetchRequest ¶
OffsetFetchRequest is used to fetch offsets for a consumer group and given topic partitions.
func NewOffsetFetchRequest ¶
func NewOffsetFetchRequest(group string) *OffsetFetchRequest
NewOffsetFetchRequest creates a new OffsetFetchRequest for a given consumer group.
func (*OffsetFetchRequest) AddOffset ¶
func (ofr *OffsetFetchRequest) AddOffset(topic string, partition int32)
AddOffset is a convenience method to add a topic partition to this OffsetFetchRequest.
func (*OffsetFetchRequest) Key ¶
func (ofr *OffsetFetchRequest) Key() int16
Key returns the Kafka API key for OffsetFetchRequest.
func (*OffsetFetchRequest) Version ¶
func (ofr *OffsetFetchRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*OffsetFetchRequest) Write ¶
func (ofr *OffsetFetchRequest) Write(encoder Encoder)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Offsets map[string]map[int32]*OffsetMetadataAndError
}
OffsetFetchResponse contains fetched offsets for each requested topic partition.
func (*OffsetFetchResponse) Read ¶
func (ofr *OffsetFetchResponse) Read(decoder Decoder) *DecodingError
type OffsetMetadataAndError ¶
OffsetMetadataAndError contains a fetched offset for a topic partition, optional metadata and an error if it occurred.
func (*OffsetMetadataAndError) Read ¶
func (ofr *OffsetMetadataAndError) Read(decoder Decoder) *DecodingError
type OffsetRequest ¶
type OffsetRequest struct {
RequestInfo map[string][]*PartitionOffsetRequestInfo
}
OffsetRequest describes the valid offset range available for a set of topic-partitions.
func (*OffsetRequest) AddPartitionOffsetRequestInfo ¶
func (or *OffsetRequest) AddPartitionOffsetRequestInfo(topic string, partition int32, time int64, maxNumOffsets int32)
AddPartitionOffsetRequestInfo is a convenience method to add a PartitionOffsetRequestInfo to this request.
func (*OffsetRequest) Key ¶
func (or *OffsetRequest) Key() int16
Key returns the Kafka API key for OffsetRequest.
func (*OffsetRequest) Version ¶
func (or *OffsetRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*OffsetRequest) Write ¶
func (or *OffsetRequest) Write(encoder Encoder)
type OffsetResponse ¶
type OffsetResponse struct {
PartitionErrorAndOffsets map[string]map[int32]*PartitionOffsetsResponse
}
OffsetResponse contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.
func (*OffsetResponse) Read ¶
func (or *OffsetResponse) Read(decoder Decoder) *DecodingError
type PartitionFetchInfo ¶
PartitionFetchInfo contains information about what partition to fetch, what offset to fetch from and the maximum bytes to include in the message set for this partition.
type PartitionMetadata ¶
type PartitionMetadata struct { Error error PartitionID int32 Leader int32 Replicas []int32 ISR []int32 }
PartitionMetadata contains information about a topic partition - its id, leader, replicas, ISRs and error if it occurred.
func (*PartitionMetadata) Read ¶
func (pm *PartitionMetadata) Read(decoder Decoder) *DecodingError
type PartitionOffsetRequestInfo ¶
PartitionOffsetRequestInfo contains partition specific configurations to fetch offsets.
type PartitionOffsetsResponse ¶
PartitionOffsetsResponse contain offsets for a single partition and an error if it occurred.
func (*PartitionOffsetsResponse) Read ¶
func (po *PartitionOffsetsResponse) Read(decoder Decoder) *DecodingError
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks int16 AckTimeoutMs int32 Data map[string]map[int32][]*MessageAndOffset }
ProduceRequest is used to send message sets to the server.
func (*ProduceRequest) AddMessage ¶
func (pr *ProduceRequest) AddMessage(topic string, partition int32, message *Message)
AddMessage is a convenience method to add a single message to be produced to a topic partition.
func (*ProduceRequest) Key ¶
func (pr *ProduceRequest) Key() int16
Key returns the Kafka API key for ProduceRequest.
func (*ProduceRequest) Version ¶
func (pr *ProduceRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*ProduceRequest) Write ¶
func (pr *ProduceRequest) Write(encoder Encoder)
type ProduceResponse ¶
type ProduceResponse struct {
Status map[string]map[int32]*ProduceResponseStatus
}
ProduceResponse contains highest assigned offsets by topic partitions and errors if they occurred.
func (*ProduceResponse) Read ¶
func (pr *ProduceResponse) Read(decoder Decoder) *DecodingError
type ProduceResponseStatus ¶
ProduceResponseStatus contains a highest assigned offset from a ProduceRequest and an error if it occurred.
type Request ¶
type Request interface { // Writes the Request to the given Encoder. Write(Encoder) // Returns the Kafka API key for this Request. Key() int16 // Returns the Kafka request version for backwards compatibility. Version() int16 }
Request is a generic interface for any request issued to Kafka. Must be able to identify and write itself.
type RequestHeader ¶
type RequestHeader struct {
// contains filtered or unexported fields
}
RequestHeader is used to decouple the message header/metadata writing from the actual message. It is able to accept a request and encode/write it according to Kafka Wire Protocol format adding the correlation id and client id to the request.
func NewRequestHeader ¶
func NewRequestHeader(correlationID int32, clientID string, request Request) *RequestHeader
NewRequestHeader creates a new RequestHeader holding the correlation id, client id and the actual request.
func (*RequestHeader) Size ¶
func (rw *RequestHeader) Size() int32
Size returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.
func (*RequestHeader) Write ¶
func (rw *RequestHeader) Write(encoder Encoder)
Write writes this RequestHeader into a given Encoder.
type Response ¶
type Response interface { // Read the Response from the given Decoder. May return a DecodingError if the response is invalid. Read(Decoder) *DecodingError }
Response is a generic interface for any response received from Kafka. Must be able to read itself.
type SizingEncoder ¶
type SizingEncoder struct {
// contains filtered or unexported fields
}
SizingEncoder is used to determine the size for []byte that will hold the actual encoded data. This is used as an optimization as it is cheaper to run once and determine the size instead of growing the slice dynamically.
func NewSizingEncoder ¶
func NewSizingEncoder() *SizingEncoder
NewSizingEncoder creates a new SizingEncoder
func (*SizingEncoder) Reserve ¶
func (se *SizingEncoder) Reserve(slice UpdatableSlice)
Reserve reserves a place for an updatable slice.
func (*SizingEncoder) Size ¶
func (se *SizingEncoder) Size() int32
Size returns the size in bytes written to this encoder.
func (*SizingEncoder) UpdateReserved ¶
func (se *SizingEncoder) UpdateReserved()
UpdateReserved tells the last reserved slice to be updated with new data.
func (*SizingEncoder) WriteBytes ¶
func (se *SizingEncoder) WriteBytes(value []byte)
WriteBytes writes a []byte to this encoder.
func (*SizingEncoder) WriteInt16 ¶
func (se *SizingEncoder) WriteInt16(int16)
WriteInt16 writes an int16 to this encoder.
func (*SizingEncoder) WriteInt32 ¶
func (se *SizingEncoder) WriteInt32(int32)
WriteInt32 writes an int32 to this encoder.
func (*SizingEncoder) WriteInt64 ¶
func (se *SizingEncoder) WriteInt64(int64)
WriteInt64 writes an int64 to this encoder.
func (*SizingEncoder) WriteInt8 ¶
func (se *SizingEncoder) WriteInt8(int8)
WriteInt8 writes an int8 to this encoder.
func (*SizingEncoder) WriteString ¶
func (se *SizingEncoder) WriteString(value string)
WriteString writes a string to this encoder.
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupID string GenerationID int32 MemberID string GroupAssignment map[string][]byte }
func (*SyncGroupRequest) Key ¶
func (*SyncGroupRequest) Key() int16
Key returns the Kafka API key for SyncGroupRequest.
func (*SyncGroupRequest) Version ¶
func (*SyncGroupRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*SyncGroupRequest) Write ¶
func (sgr *SyncGroupRequest) Write(encoder Encoder)
type SyncGroupResponse ¶
func (*SyncGroupResponse) Read ¶
func (sgr *SyncGroupResponse) Read(decoder Decoder) *DecodingError
type TopicMetadata ¶
type TopicMetadata struct { Error error Topic string PartitionsMetadata []*PartitionMetadata }
TopicMetadata contains information about topic - its name, number of partitions, leaders, ISRs and errors if they occur.
func (*TopicMetadata) Read ¶
func (tm *TopicMetadata) Read(decoder Decoder) *DecodingError
type TopicMetadataRequest ¶
type TopicMetadataRequest struct {
Topics []string
}
TopicMetadataRequest is used to get topics, their partitions, leader brokers for them and where these brokers are located.
func NewMetadataRequest ¶
func NewMetadataRequest(topics []string) *TopicMetadataRequest
NewMetadataRequest creates a new MetadataRequest to fetch metadata for given topics. Passing it an empty slice will request metadata for all topics.
func (*TopicMetadataRequest) Key ¶
func (mr *TopicMetadataRequest) Key() int16
Key returns the Kafka API key for TopicMetadataRequest.
func (*TopicMetadataRequest) Version ¶
func (mr *TopicMetadataRequest) Version() int16
Version returns the Kafka request version for backwards compatibility.
func (*TopicMetadataRequest) Write ¶
func (mr *TopicMetadataRequest) Write(encoder Encoder)
type UpdatableSlice ¶
type UpdatableSlice interface { // Returns the length to reserve for this slice. GetReserveLength() int // Set the current position within the encoder to be updated later. SetPosition(int) // Get the position within the encoder to be updated later. GetPosition() int // Update this slice. At this point all necessary data should be written to encoder. Update([]byte) }
UpdatableSlice is an interface that is used when the encoder has to write the value based on bytes that are not yet written (e.g. calculate the CRC of the message).
Source Files ¶
- brokers.go
- connector.go
- consumer_metadata.go
- decoder.go
- describe_groups.go
- encoder.go
- errors.go
- fetch.go
- heartbeat.go
- join_group.go
- leave_group.go
- list_groups.go
- logger.go
- messages.go
- metadata.go
- offset.go
- offset_commit.go
- offset_fetch.go
- produce.go
- request_response.go
- snappy.go
- sync_group.go
- topic_metadata.go
- util.go