siesta

package module
v0.0.0-...-fea3d88 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2016 License: Apache-2.0 Imports: 18 Imported by: 12

README

siesta

A low level Apache Kafka library in Go

Build Status

Installation:

  1. Install Golang http://golang.org/doc/install
  2. Make sure env variables GOPATH and GOROOT exist and point to correct places
  3. go get github.com/elodina/siesta
  4. go test -v to make sure it works

You may also want to spin up a local broker at localhost:9092 for the functional test to work as well (it will be skipped otherwise).

Documentation

Index

Constants

View Source
const EarliestTime int64 = -2

EarliestTime is a value used to request for the earliest available offset.

View Source
const InvalidOffset int64 = -1

InvalidOffset is a constant that is used to denote an invalid or uninitialized offset.

View Source
const LatestTime int64 = -1

LatestTime is a value used to request for the latest offset (i.e. the offset of the next coming message).

Variables

Mapping between Kafka error codes and actual error messages.

View Source
var ErrBrokerNotAvailable = errors.New("Broker is likely not alive.")

A mapping for Kafka error code 8.

View Source
var ErrClusterAuthorizationFailed = errors.New("The client is not authorized to use an inter-broker or administrative API.")

A mapping for Kafka error code 31

View Source
var ErrConsumerCoordinatorNotAvailableCode = errors.New("Offsets topic has not yet been created.")

A mapping for Kafka error code 15.

View Source
var ErrEOF = errors.New("End of file reached")

Signals that an end of file or stream has been reached unexpectedly.

View Source
var ErrFailedToGetLeader = errors.New("Failed to get leader.")

ErrFailedToGetLeader happens when TopicMetadataResponse does not contain leader metadata for requested topic and partition.

View Source
var ErrFailedToGetMetadata = errors.New("Failed to get topic metadata.")

ErrFailedToGetMetadata happens when TopicMetadataResponse does not contain metadata for requested topic.

View Source
var ErrGroupAuthorizationFailed = errors.New("The client is not authorized to access a particular groupId.")

A mapping for Kafka error code 30

View Source
var ErrIllegalGeneration = errors.New("The generation id provided in the request is not the current generation.")

A mapping for Kafka error code 22

View Source
var ErrInconsistentGroupProtocol = errors.New("Provided protocol type or set of protocols is not compatible with the current group.")

A mapping for Kafka error code 23

View Source
var ErrInvalidCommitOffsetSize = errors.New("Offset commit was rejected because of oversize metadata.")

A mapping for Kafka error code 28

View Source
var ErrInvalidGroupID = errors.New("The groupId is empty or null.")

A mapping for Kafka error code 24

View Source
var ErrInvalidMessage = errors.New("Message contents does not match its CRC")

A mapping for Kafka error code 2.

View Source
var ErrInvalidMessageSize = errors.New("The message has a negative size")

A mapping for Kafka error code 4.

View Source
var ErrInvalidRequiredAcks = errors.New("The requested requiredAcks is invalid.")

A mapping for Kafka error code 21

View Source
var ErrInvalidSessionTimeout = errors.New("The requested session timeout is outside of the allowed range on the broker.")

A mapping for Kafka error code 26

View Source
var ErrInvalidTopicCode = errors.New("Attempt to access an invalid topic.")

A mapping for Kafka error code 17

View Source
var ErrLeaderNotAvailable = errors.New("In the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.")

A mapping for Kafka error code 5.

View Source
var ErrMessageSizeTooLarge = errors.New("You've just attempted to produce a message of size larger than broker is allowed to accept.")

A mapping for Kafka error code 10.

View Source
var ErrNoDataToUncompress = errors.New("No data to uncompress")

Happens when a compressed message is empty.

View Source
var ErrNoError = errors.New("No error - it worked!")

A mapping for Kafka error code 0.

View Source
var ErrNotCoordinatorForConsumerCode = errors.New("There is no coordinator for this consumer.")

A mapping for Kafka error code 16.

View Source
var ErrNotEnoughReplicas = errors.New("The number of in-sync replicas is lower than the configured minimum.")

A mapping for Kafka error code 19

View Source
var ErrNotEnoughReplicasAfterAppend = errors.New("The message was written to the log, but with fewer in-sync replicas than required.")

A mapping for Kafka error code 20

View Source
var ErrNotLeaderForPartition = errors.New("You've just attempted to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.")

A mapping for Kafka error code 6.

View Source
var ErrOffsetMetadataTooLargeCode = errors.New("You've jsut specified a string larger than configured maximum for offset metadata.")

A mapping for Kafka error code 12.

View Source
var ErrOffsetOutOfRange = errors.New("The requested offset is outside the range of offsets maintained by the server for the given topic/partition.")

A mapping for Kafka error code 1.

View Source
var ErrOffsetsLoadInProgressCode = errors.New("Offset loading is in progress. (Usually happens after a leader change for that offsets topic partition).")

A mapping for Kafka error code 14.

View Source
var ErrRebalanceInProgress = errors.New("The coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group.")

A mapping for Kafka error code 27

View Source
var ErrRecordListTooLarge = errors.New("Message batch exceeds the maximum configured segment size.")

A mapping for Kafka error code 18

View Source
var ErrReplicaNotAvailable = errors.New("Replica is expected on a broker, but is not (this can be safely ignored).")

A mapping for Kafka error code 9.

View Source
var ErrRequestTimedOut = errors.New("Request exceeds the user-specified time limit in the request.")

A mapping for Kafka error code 7.

View Source
var ErrStaleControllerEpochCode = errors.New("Broker-to-broker communication fault.")

A mapping for Kafka error code 11.

View Source
var ErrTopicAuthorizationFailed = errors.New("The client is not authorized to access the requested topic.")

A mapping for Kafka error code 29

View Source
var ErrUnknown = errors.New("An unexpected server error")

A mapping for Kafka error code -1.

View Source
var ErrUnknownMemberID = errors.New("The memberId is not in the current generation.")

A mapping for Kafka error code 25

View Source
var ErrUnknownTopicOrPartition = errors.New("This request is for a topic or partition that does not exist on this broker.")

A mapping for Kafka error code 3.

Functions

func Critical

func Critical(tag interface{}, message interface{})

Critical writes a given message with a given tag to log with level Critical.

func Criticalf

func Criticalf(tag interface{}, message interface{}, params ...interface{})

Criticalf formats a given message according to given params with a given tag to log with level Critical.

func Debug

func Debug(tag interface{}, message interface{})

Debug writes a given message with a given tag to log with level Debug.

func Debugf

func Debugf(tag interface{}, message interface{}, params ...interface{})

Debugf formats a given message according to given params with a given tag to log with level Debug.

func Error

func Error(tag interface{}, message interface{})

Error writes a given message with a given tag to log with level Error.

func Errorf

func Errorf(tag interface{}, message interface{}, params ...interface{})

Errorf formats a given message according to given params with a given tag to log with level Error.

func Info

func Info(tag interface{}, message interface{})

Info writes a given message with a given tag to log with level Info.

func Infof

func Infof(tag interface{}, message interface{}, params ...interface{})

Infof formats a given message according to given params with a given tag to log with level Info.

func ReadMessageSet

func ReadMessageSet(decoder Decoder) ([]*MessageAndOffset, *DecodingError)

ReadMessageSet decodes a nested message set if the MessageAndOffset is compressed.

func Trace

func Trace(tag interface{}, message interface{})

Trace writes a given message with a given tag to log with level Trace.

func Tracef

func Tracef(tag interface{}, message interface{}, params ...interface{})

Tracef formats a given message according to given params with a given tag to log with level Trace.

func Warn

func Warn(tag interface{}, message interface{})

Warn writes a given message with a given tag to log with level Warn.

func Warnf

func Warnf(tag interface{}, message interface{}, params ...interface{})

Warnf formats a given message according to given params with a given tag to log with level Warn.

Types

type BinaryDecoder

type BinaryDecoder struct {
	// contains filtered or unexported fields
}

BinaryDecoder implements Decoder and is able to decode a Kafka wire protocol message into actual data.

func NewBinaryDecoder

func NewBinaryDecoder(raw []byte) *BinaryDecoder

NewBinaryDecoder creates a new BinaryDecoder that will decode a given []byte.

func (*BinaryDecoder) GetBytes

func (bd *BinaryDecoder) GetBytes() ([]byte, error)

GetBytes gets a []byte from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt16

func (bd *BinaryDecoder) GetInt16() (int16, error)

GetInt16 gets an int16 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt32

func (bd *BinaryDecoder) GetInt32() (int32, error)

GetInt32 gets an int32 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt64

func (bd *BinaryDecoder) GetInt64() (int64, error)

GetInt64 gets an int64 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetInt8

func (bd *BinaryDecoder) GetInt8() (int8, error)

GetInt8 gets an int8 from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) GetString

func (bd *BinaryDecoder) GetString() (string, error)

GetString gets a string from this decoder. Returns EOF if end of stream is reached.

func (*BinaryDecoder) Remaining

func (bd *BinaryDecoder) Remaining() int

Remaining tells how many bytes left unread in this decoder.

type BinaryEncoder

type BinaryEncoder struct {
	// contains filtered or unexported fields
}

BinaryEncoder implements Decoder and is able to encode actual data into a Kafka wire protocol byte sequence.

func NewBinaryEncoder

func NewBinaryEncoder(buffer []byte) *BinaryEncoder

NewBinaryEncoder creates a new BinaryEncoder that will write into a given []byte.

func (*BinaryEncoder) Reserve

func (be *BinaryEncoder) Reserve(slice UpdatableSlice)

Reserve reserves a place for an updatable slice.

func (*BinaryEncoder) Size

func (be *BinaryEncoder) Size() int32

Size returns the size in bytes written to this encoder.

func (*BinaryEncoder) UpdateReserved

func (be *BinaryEncoder) UpdateReserved()

UpdateReserved tells the last reserved slice to be updated with new data.

func (*BinaryEncoder) WriteBytes

func (be *BinaryEncoder) WriteBytes(value []byte)

WriteBytes writes a []byte to this encoder.

func (*BinaryEncoder) WriteInt16

func (be *BinaryEncoder) WriteInt16(value int16)

WriteInt16 writes an int16 to this encoder.

func (*BinaryEncoder) WriteInt32

func (be *BinaryEncoder) WriteInt32(value int32)

WriteInt32 writes an int32 to this encoder.

func (*BinaryEncoder) WriteInt64

func (be *BinaryEncoder) WriteInt64(value int64)

WriteInt64 writes an int64 to this encoder.

func (*BinaryEncoder) WriteInt8

func (be *BinaryEncoder) WriteInt8(value int8)

WriteInt8 writes an int8 to this encoder.

func (*BinaryEncoder) WriteString

func (be *BinaryEncoder) WriteString(value string)

WriteString writes a string to this encoder.

type Broker

type Broker struct {
	ID   int32
	Host string
	Port int32
}

Broker contains information about a Kafka broker in cluster - its ID, host name and port.

func (*Broker) Read

func (n *Broker) Read(decoder Decoder) *DecodingError

func (*Broker) String

func (n *Broker) String() string

type BrokerConnection

type BrokerConnection struct {
	// contains filtered or unexported fields
}

func NewBrokerConnection

func NewBrokerConnection(broker *Broker, keepAliveTimeout time.Duration) *BrokerConnection

func (*BrokerConnection) GetConnection

func (bc *BrokerConnection) GetConnection() (*net.TCPConn, error)

func (*BrokerConnection) ReleaseConnection

func (bc *BrokerConnection) ReleaseConnection(conn *net.TCPConn)

type Brokers

type Brokers struct {
	// contains filtered or unexported fields
}

func NewBrokers

func NewBrokers(keepAliveTimeout time.Duration) *Brokers

func (*Brokers) Add

func (b *Brokers) Add(broker *Broker)

func (*Brokers) Get

func (b *Brokers) Get(id int32) *BrokerConnection

func (*Brokers) GetAll

func (b *Brokers) GetAll() []*BrokerConnection

func (*Brokers) NextCorrelationID

func (b *Brokers) NextCorrelationID() int32

func (*Brokers) Remove

func (b *Brokers) Remove(id int32)

func (*Brokers) Update

func (b *Brokers) Update(broker *Broker)

type CompressionCodec

type CompressionCodec int

CompressionCodec is a compression codec id used to distinguish various compression types.

const (
	// CompressionNone is a compression codec id for uncompressed data.
	CompressionNone CompressionCodec = 0

	// CompressionGZIP is a compression codec id for GZIP compression.
	CompressionGZIP CompressionCodec = 1

	// CompressionSnappy is a compression codec id for Snappy compression.
	CompressionSnappy CompressionCodec = 2

	// CompressionLZ4 is a compression codec id for LZ4 compression.
	CompressionLZ4 CompressionCodec = 3
)

type Connector

type Connector interface {
	// GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have.
	// Passing it an empty topic list will retrieve metadata for all topics in a cluster.
	GetTopicMetadata(topics []string) (*MetadataResponse, error)

	// GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.
	// More on offset time here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
	GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)

	// Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset.
	Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)

	// GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API.
	GetOffset(group string, topic string, partition int32) (int64, error)

	// CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API.
	CommitOffset(group string, topic string, partition int32, offset int64) error

	GetLeader(topic string, partition int32) (*BrokerConnection, error)

	GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)

	// Metadata returns a structure that holds all topic and broker metadata.
	Metadata() *Metadata

	// Tells the Connector to close all existing connections and stop.
	// This method is NOT blocking but returns a channel which will get a single value once the closing is finished.
	Close() <-chan bool
}

Connector is an interface that should provide ways to clearly interact with Kafka cluster and hide all broker management stuff from user.

type ConnectorConfig

type ConnectorConfig struct {
	// BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required.
	BrokerList []string

	// ReadTimeout is a timeout to read the response from a TCP socket.
	ReadTimeout time.Duration

	// WriteTimeout is a timeout to write the request to a TCP socket.
	WriteTimeout time.Duration

	// ConnectTimeout is a timeout to connect to a TCP socket.
	ConnectTimeout time.Duration

	// Sets whether the connection should be kept alive.
	KeepAlive bool

	// A keep alive period for a TCP connection.
	KeepAliveTimeout time.Duration

	// Maximum number of open connections for a connector.
	MaxConnections int

	// Maximum number of open connections for a single broker for a connector.
	MaxConnectionsPerBroker int

	// Maximum fetch size in bytes which will be used in all Consume() calls.
	FetchSize int32

	// The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block
	FetchMinBytes int32

	// The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes
	FetchMaxWaitTime int32

	// Number of retries to get topic metadata.
	MetadataRetries int

	// Backoff value between topic metadata requests.
	MetadataBackoff time.Duration

	// MetadataTTL is how long topic metadata is considered valid. Used to refresh metadata from time to time even if no leader changes occurred.
	MetadataTTL time.Duration

	// Number of retries to commit an offset.
	CommitOffsetRetries int

	// Backoff value between commit offset requests.
	CommitOffsetBackoff time.Duration

	// Number of retries to get consumer metadata.
	ConsumerMetadataRetries int

	// Backoff value between consumer metadata requests.
	ConsumerMetadataBackoff time.Duration

	// ClientID that will be used by a connector to identify client requests by broker.
	ClientID string
}

ConnectorConfig is used to pass multiple configuration values for a Connector

func NewConnectorConfig

func NewConnectorConfig() *ConnectorConfig

NewConnectorConfig returns a new ConnectorConfig with sane defaults.

func (*ConnectorConfig) Validate

func (cc *ConnectorConfig) Validate() error

Validate validates this ConnectorConfig. Returns a corresponding error if the ConnectorConfig is invalid and nil otherwise.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	Group string
}

ConsumerMetadataRequest is used to discover the current offset coordinator to issue its offset commit and fetch requests.

func NewConsumerMetadataRequest

func NewConsumerMetadataRequest(group string) *ConsumerMetadataRequest

NewConsumerMetadataRequest creates a new ConsumerMetadataRequest for a given consumer group.

func (*ConsumerMetadataRequest) Key

func (cmr *ConsumerMetadataRequest) Key() int16

Key returns the Kafka API key for ConsumerMetadataRequest.

func (*ConsumerMetadataRequest) Version

func (cmr *ConsumerMetadataRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*ConsumerMetadataRequest) Write

func (cmr *ConsumerMetadataRequest) Write(encoder Encoder)

Write writes the ConsumerMetadataRequest to the given Encoder.

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Error       error
	Coordinator *Broker
}

ConsumerMetadataResponse contains information about the current offset coordinator and error if it occurred.

func (*ConsumerMetadataResponse) Read

func (cmr *ConsumerMetadataResponse) Read(decoder Decoder) *DecodingError

type CorrelationIDGenerator

type CorrelationIDGenerator struct {
	// contains filtered or unexported fields
}

func (*CorrelationIDGenerator) NextCorrelationID

func (c *CorrelationIDGenerator) NextCorrelationID() int32

type CrcSlice

type CrcSlice struct {
	// contains filtered or unexported fields
}

CrcSlice is used to calculate the CRC32 value of the message.

func (*CrcSlice) GetPosition

func (cs *CrcSlice) GetPosition() int

GetPosition gets the position within the encoder to be updated later.

func (*CrcSlice) GetReserveLength

func (cs *CrcSlice) GetReserveLength() int

GetReserveLength returns the length to reserve for this slice.

func (*CrcSlice) SetPosition

func (cs *CrcSlice) SetPosition(pos int)

SetPosition sets the current position within the encoder to be updated later.

func (*CrcSlice) Update

func (cs *CrcSlice) Update(slice []byte)

Update this slice. At this point all necessary data should be written to encoder.

type Decoder

type Decoder interface {
	// Gets an int8 from this decoder. Returns EOF if end of stream is reached.
	GetInt8() (int8, error)

	// Gets an int16 from this decoder. Returns EOF if end of stream is reached.
	GetInt16() (int16, error)

	// Gets an int32 from this decoder. Returns EOF if end of stream is reached.
	GetInt32() (int32, error)

	// Gets an int64 from this decoder. Returns EOF if end of stream is reached.
	GetInt64() (int64, error)

	// Gets a []byte from this decoder. Returns EOF if end of stream is reached.
	GetBytes() ([]byte, error)

	// Gets a string from this decoder. Returns EOF if end of stream is reached.
	GetString() (string, error)

	// Tells how many bytes left unread in this decoder.
	Remaining() int
}

Decoder is able to decode a Kafka wire protocol message into actual data.

type DecodingError

type DecodingError struct {
	// contains filtered or unexported fields
}

DecodingError is an error that also holds the information about why it happened.

func NewDecodingError

func NewDecodingError(err error, reason string) *DecodingError

NewDecodingError creates a new DecodingError with a given error message and reason.

func (*DecodingError) Error

func (de *DecodingError) Error() error

Error returns the error message for this DecodingError.

func (*DecodingError) Reason

func (de *DecodingError) Reason() string

Reason returns the reason for this DecodingError.

type DefaultConnector

type DefaultConnector struct {
	// contains filtered or unexported fields
}

DefaultConnector is a default (and only one for now) Connector implementation for Siesta library.

func NewDefaultConnector

func NewDefaultConnector(config *ConnectorConfig) (*DefaultConnector, error)

NewDefaultConnector creates a new DefaultConnector with a given ConnectorConfig. May return an error if the passed config is invalid.

func (*DefaultConnector) Close

func (dc *DefaultConnector) Close() <-chan bool

Close tells the Connector to close all existing connections and stop. This method is NOT blocking but returns a channel which will get a single value once the closing is finished.

func (*DefaultConnector) CommitOffset

func (dc *DefaultConnector) CommitOffset(group string, topic string, partition int32, offset int64) error

CommitOffset commits the offset for a given group, topic and partition to Kafka. A part of new offset management API.

func (*DefaultConnector) Fetch

func (dc *DefaultConnector) Fetch(topic string, partition int32, offset int64) (*FetchResponse, error)

Fetch issues a single fetch request to a broker responsible for a given topic and partition and returns a FetchResponse that contains messages starting from a given offset.

func (*DefaultConnector) GetAvailableOffset

func (dc *DefaultConnector) GetAvailableOffset(topic string, partition int32, offsetTime int64) (int64, error)

GetAvailableOffset issues an offset request to a specified topic and partition with a given offset time.

func (*DefaultConnector) GetConsumerMetadata

func (dc *DefaultConnector) GetConsumerMetadata(group string) (*ConsumerMetadataResponse, error)

func (*DefaultConnector) GetLeader

func (dc *DefaultConnector) GetLeader(topic string, partition int32) (*BrokerConnection, error)

func (*DefaultConnector) GetOffset

func (dc *DefaultConnector) GetOffset(group string, topic string, partition int32) (int64, error)

GetOffset gets the offset for a given group, topic and partition from Kafka. A part of new offset management API.

func (*DefaultConnector) GetTopicMetadata

func (dc *DefaultConnector) GetTopicMetadata(topics []string) (*MetadataResponse, error)

GetTopicMetadata is primarily used to discover leaders for given topics and how many partitions these topics have. Passing it an empty topic list will retrieve metadata for all topics in a cluster.

func (*DefaultConnector) Metadata

func (dc *DefaultConnector) Metadata() *Metadata

func (*DefaultConnector) String

func (dc *DefaultConnector) String() string

Returns a string representation of this DefaultConnector.

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

DefaultLogger is a default implementation of KafkaLogger interface used in this client.

func NewDefaultLogger

func NewDefaultLogger(Level LogLevel) *DefaultLogger

NewDefaultLogger creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.

func (*DefaultLogger) Critical

func (dl *DefaultLogger) Critical(message string, params ...interface{})

Critical formats a given message according to given params to log with level Critical.

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(message string, params ...interface{})

Debug formats a given message according to given params to log with level Debug.

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(message string, params ...interface{})

Error formats a given message according to given params to log with level Error.

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(message string, params ...interface{})

Info formats a given message according to given params to log with level Info.

func (*DefaultLogger) Trace

func (dl *DefaultLogger) Trace(message string, params ...interface{})

Trace formats a given message according to given params to log with level Trace.

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(message string, params ...interface{})

Warn formats a given message according to given params to log with level Warn.

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	Groups []string
}

func (*DescribeGroupsRequest) Key

Key returns the Kafka API key for DescribeGroupsRequest.

func (*DescribeGroupsRequest) Version

func (*DescribeGroupsRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*DescribeGroupsRequest) Write

func (dgr *DescribeGroupsRequest) Write(encoder Encoder)

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	Groups []*GroupDescription
}

func (*DescribeGroupsResponse) Read

func (dgr *DescribeGroupsResponse) Read(decoder Decoder) *DecodingError

type Encoder

type Encoder interface {
	// Writes an int8 to this encoder.
	WriteInt8(int8)

	// Writes an int16 to this encoder.
	WriteInt16(int16)

	// Writes an int32 to this encoder.
	WriteInt32(int32)

	// Writes an int64 to this encoder.
	WriteInt64(int64)

	// Writes a []byte to this encoder.
	WriteBytes([]byte)

	// Writes a string to this encoder.
	WriteString(string)

	// Returns the size in bytes written to this encoder.
	Size() int32

	// Reserves a place for an updatable slice.
	// This is used as an optimization for length and crc fields.
	// The encoder reserves a place for this data and updates it later instead of pre-calculating it and doing redundant work.
	Reserve(UpdatableSlice)

	// Tells the last reserved slice to be updated with new data.
	UpdateReserved()
}

Encoder is able to encode actual data into a Kafka wire protocol byte sequence.

type FetchRequest

type FetchRequest struct {
	MaxWait     int32
	MinBytes    int32
	RequestInfo map[string][]*PartitionFetchInfo
}

FetchRequest is used to fetch a chunk of one or more logs for some topic-partitions.

func (*FetchRequest) AddFetch

func (fr *FetchRequest) AddFetch(topic string, partition int32, offset int64, fetchSize int32)

AddFetch is a convenience method to add a PartitionFetchInfo.

func (*FetchRequest) Key

func (fr *FetchRequest) Key() int16

Key returns the Kafka API key for FetchRequest.

func (*FetchRequest) Version

func (fr *FetchRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*FetchRequest) Write

func (fr *FetchRequest) Write(encoder Encoder)

Write writes the FetchRequest to the given Encoder.

type FetchResponse

type FetchResponse struct {
	Data map[string]map[int32]*FetchResponsePartitionData
}

FetchResponse contains FetchResponseData for all requested topics and partitions.

func (*FetchResponse) CollectMessages

func (fr *FetchResponse) CollectMessages(collector func(topic string, partition int32, offset int64, key []byte, value []byte) error) error

CollectMessages traverses this FetchResponse and applies a collector function to each message giving the possibility to avoid response -> siesta.Message -> other.Message conversion if necessary.

func (*FetchResponse) Error

func (fr *FetchResponse) Error(topic string, partition int32) error

Error returns the error message for a given topic and pertion of this FetchResponse

func (*FetchResponse) GetMessages

func (fr *FetchResponse) GetMessages() ([]*MessageAndMetadata, error)

GetMessages traverses this FetchResponse and collects all messages. Returns an error if FetchResponse contains one. Messages should be ordered by offset.

func (*FetchResponse) Read

func (fr *FetchResponse) Read(decoder Decoder) *DecodingError

type FetchResponsePartitionData

type FetchResponsePartitionData struct {
	Error               error
	HighwaterMarkOffset int64
	Messages            []*MessageAndOffset
}

FetchResponsePartitionData contains fetched messages for a single partition, the offset at the end of the log for this partition and an error code.

func (*FetchResponsePartitionData) Read

func (frd *FetchResponsePartitionData) Read(decoder Decoder) *DecodingError

type GroupDescription

type GroupDescription struct {
	Error        error
	GroupID      string
	State        string
	ProtocolType string
	Protocol     string
	Members      []*GroupMemberDescription
}

func (*GroupDescription) Read

func (gd *GroupDescription) Read(decoder Decoder) *DecodingError

type GroupMemberDescription

type GroupMemberDescription struct {
	MemberID         string
	ClientID         string
	ClientHost       string
	MemberMetadata   []byte
	MemberAssignment []byte
}

func (*GroupMemberDescription) Read

func (gmd *GroupMemberDescription) Read(decoder Decoder) *DecodingError

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}

type HeartbeatRequest

type HeartbeatRequest struct {
	GroupID      string
	GenerationID int32
	MemberID     string
}

func (*HeartbeatRequest) Key

func (*HeartbeatRequest) Key() int16

Key returns the Kafka API key for HeartbeatRequest.

func (*HeartbeatRequest) Version

func (*HeartbeatRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*HeartbeatRequest) Write

func (hr *HeartbeatRequest) Write(encoder Encoder)

type HeartbeatResponse

type HeartbeatResponse struct {
	Error error
}

func (*HeartbeatResponse) Read

func (hr *HeartbeatResponse) Read(decoder Decoder) *DecodingError

type Int32Slice

type Int32Slice []int32

func (Int32Slice) Len

func (p Int32Slice) Len() int

func (Int32Slice) Less

func (p Int32Slice) Less(i, j int) bool

func (Int32Slice) Swap

func (p Int32Slice) Swap(i, j int)

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupID        string
	SessionTimeout int32
	MemberID       string
	ProtocolType   string
	GroupProtocols []*GroupProtocol
}

func (*JoinGroupRequest) Key

func (jgr *JoinGroupRequest) Key() int16

Key returns the Kafka API key for JoinGroupRequest.

func (*JoinGroupRequest) Version

func (jgr *JoinGroupRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*JoinGroupRequest) Write

func (jgr *JoinGroupRequest) Write(encoder Encoder)

type JoinGroupResponse

type JoinGroupResponse struct {
	Error         error
	GenerationID  int32
	GroupProtocol string
	LeaderID      string
	MemberID      string
	Members       map[string][]byte
}

func (*JoinGroupResponse) Read

func (jgr *JoinGroupResponse) Read(decoder Decoder) *DecodingError

type KafkaLogger

type KafkaLogger interface {
	//Formats a given message according to given params to log with level Trace.
	Trace(message string, params ...interface{})

	//Formats a given message according to given params to log with level Debug.
	Debug(message string, params ...interface{})

	//Formats a given message according to given params to log with level Info.
	Info(message string, params ...interface{})

	//Formats a given message according to given params to log with level Warn.
	Warn(message string, params ...interface{})

	//Formats a given message according to given params to log with level Error.
	Error(message string, params ...interface{})

	//Formats a given message according to given params to log with level Critical.
	Critical(message string, params ...interface{})
}

KafkaLogger is a logger interface. Lets you plug-in your custom logging library instead of using built-in one.

Logger used by this client. Defaults to build-in logger with Info log level.

type LeaveGroupRequest

type LeaveGroupRequest struct {
	GroupID  string
	MemberID string
}

func (*LeaveGroupRequest) Key

func (*LeaveGroupRequest) Key() int16

Key returns the Kafka API key for LeaveGroupRequest.

func (*LeaveGroupRequest) Version

func (*LeaveGroupRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*LeaveGroupRequest) Write

func (lgr *LeaveGroupRequest) Write(encoder Encoder)

type LeaveGroupResponse

type LeaveGroupResponse struct {
	Error error
}

func (*LeaveGroupResponse) Read

func (lgr *LeaveGroupResponse) Read(decoder Decoder) *DecodingError

type LengthSlice

type LengthSlice struct {
	// contains filtered or unexported fields
}

LengthSlice is used to determine the length of upcoming message.

func (*LengthSlice) GetPosition

func (ls *LengthSlice) GetPosition() int

GetPosition gets the position within the encoder to be updated later.

func (*LengthSlice) GetReserveLength

func (ls *LengthSlice) GetReserveLength() int

GetReserveLength returns the length to reserve for this slice.

func (*LengthSlice) SetPosition

func (ls *LengthSlice) SetPosition(pos int)

SetPosition sets the current position within the encoder to be updated later.

func (*LengthSlice) Update

func (ls *LengthSlice) Update(slice []byte)

Update this slice. At this point all necessary data should be written to encoder.

type ListGroupsRequest

type ListGroupsRequest struct{}

func (*ListGroupsRequest) Key

func (*ListGroupsRequest) Key() int16

Key returns the Kafka API key for ListGroupsRequest.

func (*ListGroupsRequest) Version

func (*ListGroupsRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*ListGroupsRequest) Write

func (*ListGroupsRequest) Write(encoder Encoder)

type ListGroupsResponse

type ListGroupsResponse struct {
	Error  error
	Groups map[string]string
}

func (*ListGroupsResponse) Read

func (lgr *ListGroupsResponse) Read(decoder Decoder) *DecodingError

type LogLevel

type LogLevel string

LogLevel represents a logging level.

const (
	// TraceLevel is used for debugging to find problems in functions, variables etc.
	TraceLevel LogLevel = "trace"

	// DebugLevel is used for detailed system reports and diagnostic messages.
	DebugLevel LogLevel = "debug"

	// InfoLevel is used for general information about a running application.
	InfoLevel LogLevel = "info"

	// WarnLevel is used to indicate small errors and failures that should not happen normally but are recovered automatically.
	WarnLevel LogLevel = "warn"

	// ErrorLevel is used to indicate severe errors that affect application workflow and are not handled automatically.
	ErrorLevel LogLevel = "error"

	// CriticalLevel is used to indicate fatal errors that may cause data corruption or loss.
	CriticalLevel LogLevel = "critical"
)

type Message

type Message struct {
	Crc        int32
	MagicByte  int8
	Attributes int8
	Key        []byte
	Value      []byte

	Nested []*MessageAndOffset
}

Message contains a single message and its metadata or a nested message set if compression is used.

func (*Message) Read

func (md *Message) Read(decoder Decoder) *DecodingError

func (*Message) Write

func (md *Message) Write(encoder Encoder)

TODO compress and write if needed

type MessageAndMetadata

type MessageAndMetadata struct {
	Topic     string
	Partition int32
	Offset    int64
	Key       []byte
	Value     []byte
}

MessageAndMetadata is a single message and its metadata.

type MessageAndOffset

type MessageAndOffset struct {
	Offset  int64
	Message *Message
}

MessageAndOffset is a single message or a message set (if it is compressed) with its offset value.

func (*MessageAndOffset) Read

func (mo *MessageAndOffset) Read(decoder Decoder) *DecodingError

func (*MessageAndOffset) Write

func (mo *MessageAndOffset) Write(encoder Encoder)

type Metadata

type Metadata struct {
	Brokers *Brokers
	// contains filtered or unexported fields
}

func NewMetadata

func NewMetadata(connector Connector, brokers *Brokers, metadataTTL time.Duration) *Metadata

func (*Metadata) Invalidate

func (m *Metadata) Invalidate(topic string)

func (*Metadata) Leader

func (m *Metadata) Leader(topic string, partition int32) (int32, error)

func (*Metadata) OffsetCoordinator

func (m *Metadata) OffsetCoordinator(group string) (*BrokerConnection, error)

func (*Metadata) PartitionsFor

func (m *Metadata) PartitionsFor(topic string) ([]int32, error)

func (*Metadata) Refresh

func (m *Metadata) Refresh(topics []string) error

func (*Metadata) TopicMetadata

func (m *Metadata) TopicMetadata(topic string) (map[int32]int32, error)

type MetadataResponse

type MetadataResponse struct {
	Brokers        []*Broker
	TopicsMetadata []*TopicMetadata
}

MetadataResponse contains information about brokers in cluster and topics that exist.

func (*MetadataResponse) Read

func (tmr *MetadataResponse) Read(decoder Decoder) *DecodingError

type OffsetAndMetadata

type OffsetAndMetadata struct {
	Offset    int64
	Timestamp int64
	Metadata  string
}

OffsetAndMetadata contains offset for a partition and optional metadata.

type OffsetCommitRequest

type OffsetCommitRequest struct {
	GroupID     string
	RequestInfo map[string]map[int32]*OffsetAndMetadata
}

OffsetCommitRequest is used to commit offsets for a group/topic/partition.

func NewOffsetCommitRequest

func NewOffsetCommitRequest(group string) *OffsetCommitRequest

NewOffsetCommitRequest creates a new OffsetCommitRequest for a given consumer group.

func (*OffsetCommitRequest) AddOffset

func (ocr *OffsetCommitRequest) AddOffset(topic string, partition int32, offset int64, timestamp int64, metadata string)

AddOffset is a convenience method to add an offset for a topic partition.

func (*OffsetCommitRequest) Key

func (ocr *OffsetCommitRequest) Key() int16

Key returns the Kafka API key for OffsetCommitRequest.

func (*OffsetCommitRequest) Version

func (ocr *OffsetCommitRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*OffsetCommitRequest) Write

func (ocr *OffsetCommitRequest) Write(encoder Encoder)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	CommitStatus map[string]map[int32]error
}

OffsetCommitResponse contains errors for partitions if they occur.

func (*OffsetCommitResponse) Read

func (ocr *OffsetCommitResponse) Read(decoder Decoder) *DecodingError

type OffsetFetchRequest

type OffsetFetchRequest struct {
	GroupID     string
	RequestInfo map[string][]int32
}

OffsetFetchRequest is used to fetch offsets for a consumer group and given topic partitions.

func NewOffsetFetchRequest

func NewOffsetFetchRequest(group string) *OffsetFetchRequest

NewOffsetFetchRequest creates a new OffsetFetchRequest for a given consumer group.

func (*OffsetFetchRequest) AddOffset

func (ofr *OffsetFetchRequest) AddOffset(topic string, partition int32)

AddOffset is a convenience method to add a topic partition to this OffsetFetchRequest.

func (*OffsetFetchRequest) Key

func (ofr *OffsetFetchRequest) Key() int16

Key returns the Kafka API key for OffsetFetchRequest.

func (*OffsetFetchRequest) Version

func (ofr *OffsetFetchRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*OffsetFetchRequest) Write

func (ofr *OffsetFetchRequest) Write(encoder Encoder)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Offsets map[string]map[int32]*OffsetMetadataAndError
}

OffsetFetchResponse contains fetched offsets for each requested topic partition.

func (*OffsetFetchResponse) Read

func (ofr *OffsetFetchResponse) Read(decoder Decoder) *DecodingError

type OffsetMetadataAndError

type OffsetMetadataAndError struct {
	Offset   int64
	Metadata string
	Error    error
}

OffsetMetadataAndError contains a fetched offset for a topic partition, optional metadata and an error if it occurred.

func (*OffsetMetadataAndError) Read

func (ofr *OffsetMetadataAndError) Read(decoder Decoder) *DecodingError

type OffsetRequest

type OffsetRequest struct {
	RequestInfo map[string][]*PartitionOffsetRequestInfo
}

OffsetRequest describes the valid offset range available for a set of topic-partitions.

func (*OffsetRequest) AddPartitionOffsetRequestInfo

func (or *OffsetRequest) AddPartitionOffsetRequestInfo(topic string, partition int32, time int64, maxNumOffsets int32)

AddPartitionOffsetRequestInfo is a convenience method to add a PartitionOffsetRequestInfo to this request.

func (*OffsetRequest) Key

func (or *OffsetRequest) Key() int16

Key returns the Kafka API key for OffsetRequest.

func (*OffsetRequest) Version

func (or *OffsetRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*OffsetRequest) Write

func (or *OffsetRequest) Write(encoder Encoder)

type OffsetResponse

type OffsetResponse struct {
	PartitionErrorAndOffsets map[string]map[int32]*PartitionOffsetsResponse
}

OffsetResponse contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.

func (*OffsetResponse) Read

func (or *OffsetResponse) Read(decoder Decoder) *DecodingError

type PartitionFetchInfo

type PartitionFetchInfo struct {
	Partition int32
	Offset    int64
	FetchSize int32
}

PartitionFetchInfo contains information about what partition to fetch, what offset to fetch from and the maximum bytes to include in the message set for this partition.

type PartitionMetadata

type PartitionMetadata struct {
	Error       error
	PartitionID int32
	Leader      int32
	Replicas    []int32
	ISR         []int32
}

PartitionMetadata contains information about a topic partition - its id, leader, replicas, ISRs and error if it occurred.

func (*PartitionMetadata) Read

func (pm *PartitionMetadata) Read(decoder Decoder) *DecodingError

type PartitionOffsetRequestInfo

type PartitionOffsetRequestInfo struct {
	Partition     int32
	Time          int64
	MaxNumOffsets int32
}

PartitionOffsetRequestInfo contains partition specific configurations to fetch offsets.

type PartitionOffsetsResponse

type PartitionOffsetsResponse struct {
	Error   error
	Offsets []int64
}

PartitionOffsetsResponse contain offsets for a single partition and an error if it occurred.

func (*PartitionOffsetsResponse) Read

func (po *PartitionOffsetsResponse) Read(decoder Decoder) *DecodingError

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks int16
	AckTimeoutMs int32
	Data         map[string]map[int32][]*MessageAndOffset
}

ProduceRequest is used to send message sets to the server.

func (*ProduceRequest) AddMessage

func (pr *ProduceRequest) AddMessage(topic string, partition int32, message *Message)

AddMessage is a convenience method to add a single message to be produced to a topic partition.

func (*ProduceRequest) Key

func (pr *ProduceRequest) Key() int16

Key returns the Kafka API key for ProduceRequest.

func (*ProduceRequest) Version

func (pr *ProduceRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*ProduceRequest) Write

func (pr *ProduceRequest) Write(encoder Encoder)

type ProduceResponse

type ProduceResponse struct {
	Status map[string]map[int32]*ProduceResponseStatus
}

ProduceResponse contains highest assigned offsets by topic partitions and errors if they occurred.

func (*ProduceResponse) Read

func (pr *ProduceResponse) Read(decoder Decoder) *DecodingError

type ProduceResponseStatus

type ProduceResponseStatus struct {
	Error  error
	Offset int64
}

ProduceResponseStatus contains a highest assigned offset from a ProduceRequest and an error if it occurred.

type Request

type Request interface {
	// Writes the Request to the given Encoder.
	Write(Encoder)

	// Returns the Kafka API key for this Request.
	Key() int16

	// Returns the Kafka request version for backwards compatibility.
	Version() int16
}

Request is a generic interface for any request issued to Kafka. Must be able to identify and write itself.

type RequestHeader

type RequestHeader struct {
	// contains filtered or unexported fields
}

RequestHeader is used to decouple the message header/metadata writing from the actual message. It is able to accept a request and encode/write it according to Kafka Wire Protocol format adding the correlation id and client id to the request.

func NewRequestHeader

func NewRequestHeader(correlationID int32, clientID string, request Request) *RequestHeader

NewRequestHeader creates a new RequestHeader holding the correlation id, client id and the actual request.

func (*RequestHeader) Size

func (rw *RequestHeader) Size() int32

Size returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.

func (*RequestHeader) Write

func (rw *RequestHeader) Write(encoder Encoder)

Write writes this RequestHeader into a given Encoder.

type Response

type Response interface {
	// Read the Response from the given Decoder. May return a DecodingError if the response is invalid.
	Read(Decoder) *DecodingError
}

Response is a generic interface for any response received from Kafka. Must be able to read itself.

type SizingEncoder

type SizingEncoder struct {
	// contains filtered or unexported fields
}

SizingEncoder is used to determine the size for []byte that will hold the actual encoded data. This is used as an optimization as it is cheaper to run once and determine the size instead of growing the slice dynamically.

func NewSizingEncoder

func NewSizingEncoder() *SizingEncoder

NewSizingEncoder creates a new SizingEncoder

func (*SizingEncoder) Reserve

func (se *SizingEncoder) Reserve(slice UpdatableSlice)

Reserve reserves a place for an updatable slice.

func (*SizingEncoder) Size

func (se *SizingEncoder) Size() int32

Size returns the size in bytes written to this encoder.

func (*SizingEncoder) UpdateReserved

func (se *SizingEncoder) UpdateReserved()

UpdateReserved tells the last reserved slice to be updated with new data.

func (*SizingEncoder) WriteBytes

func (se *SizingEncoder) WriteBytes(value []byte)

WriteBytes writes a []byte to this encoder.

func (*SizingEncoder) WriteInt16

func (se *SizingEncoder) WriteInt16(int16)

WriteInt16 writes an int16 to this encoder.

func (*SizingEncoder) WriteInt32

func (se *SizingEncoder) WriteInt32(int32)

WriteInt32 writes an int32 to this encoder.

func (*SizingEncoder) WriteInt64

func (se *SizingEncoder) WriteInt64(int64)

WriteInt64 writes an int64 to this encoder.

func (*SizingEncoder) WriteInt8

func (se *SizingEncoder) WriteInt8(int8)

WriteInt8 writes an int8 to this encoder.

func (*SizingEncoder) WriteString

func (se *SizingEncoder) WriteString(value string)

WriteString writes a string to this encoder.

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupID         string
	GenerationID    int32
	MemberID        string
	GroupAssignment map[string][]byte
}

func (*SyncGroupRequest) Key

func (*SyncGroupRequest) Key() int16

Key returns the Kafka API key for SyncGroupRequest.

func (*SyncGroupRequest) Version

func (*SyncGroupRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*SyncGroupRequest) Write

func (sgr *SyncGroupRequest) Write(encoder Encoder)

type SyncGroupResponse

type SyncGroupResponse struct {
	Error            error
	MemberAssignment []byte
}

func (*SyncGroupResponse) Read

func (sgr *SyncGroupResponse) Read(decoder Decoder) *DecodingError

type TopicMetadata

type TopicMetadata struct {
	Error              error
	Topic              string
	PartitionsMetadata []*PartitionMetadata
}

TopicMetadata contains information about topic - its name, number of partitions, leaders, ISRs and errors if they occur.

func (*TopicMetadata) Read

func (tm *TopicMetadata) Read(decoder Decoder) *DecodingError

type TopicMetadataRequest

type TopicMetadataRequest struct {
	Topics []string
}

TopicMetadataRequest is used to get topics, their partitions, leader brokers for them and where these brokers are located.

func NewMetadataRequest

func NewMetadataRequest(topics []string) *TopicMetadataRequest

NewMetadataRequest creates a new MetadataRequest to fetch metadata for given topics. Passing it an empty slice will request metadata for all topics.

func (*TopicMetadataRequest) Key

func (mr *TopicMetadataRequest) Key() int16

Key returns the Kafka API key for TopicMetadataRequest.

func (*TopicMetadataRequest) Version

func (mr *TopicMetadataRequest) Version() int16

Version returns the Kafka request version for backwards compatibility.

func (*TopicMetadataRequest) Write

func (mr *TopicMetadataRequest) Write(encoder Encoder)

type UpdatableSlice

type UpdatableSlice interface {
	// Returns the length to reserve for this slice.
	GetReserveLength() int

	// Set the current position within the encoder to be updated later.
	SetPosition(int)

	// Get the position within the encoder to be updated later.
	GetPosition() int

	// Update this slice. At this point all necessary data should be written to encoder.
	Update([]byte)
}

UpdatableSlice is an interface that is used when the encoder has to write the value based on bytes that are not yet written (e.g. calculate the CRC of the message).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL