kafka

package
v1.6.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2020 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package kafka provides a library to parse Kafka requests and responses and apply policy rules

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknown                                 = -1
	ErrNone                                    = 0
	ErrOffsetOutOfRange                        = 1
	ErrInvalidMessage                          = 2
	ErrUnknownTopicOrPartition                 = 3
	ErrInvalidMessageSize                      = 4
	ErrLeaderNotAvailable                      = 5
	ErrNotLeaderForPartition                   = 6
	ErrRequestTimeout                          = 7
	ErrBrokerNotAvailable                      = 8
	ErrReplicaNotAvailable                     = 9
	ErrMessageSizeTooLarge                     = 10
	ErrScaleControllerEpoch                    = 11
	ErrOffsetMetadataTooLarge                  = 12
	ErrNetwork                                 = 13
	ErrOffsetLoadInProgress                    = 14
	ErrNoCoordinator                           = 15
	ErrNotCoordinator                          = 16
	ErrInvalidTopic                            = 17
	ErrRecordListTooLarge                      = 18
	ErrNotEnoughReplicas                       = 19
	ErrNotEnoughReplicasAfterAppend            = 20
	ErrInvalidRequiredAcks                     = 21
	ErrIllegalGeneration                       = 22
	ErrInconsistentPartitionAssignmentStrategy = 23
	ErrUnknownParititonAssignmentStrategy      = 24
	ErrUnknownConsumerID                       = 25
	ErrInvalidSessionTimeout                   = 26
	ErrRebalanceInProgress                     = 27
	ErrInvalidCommitOffsetSize                 = 28
	ErrTopicAuthorizationFailed                = 29
	ErrGroupAuthorizationFailed                = 30
	ErrClusterAuthorizationFailed              = 31
	ErrInvalidTimeStamp                        = 32
)

List of possible Kafka error codes Reference: https://kafka.apache.org/protocol#protocol_error_codes

View Source
var (
	// RequestLifetime specifies the maximum time a request can stay in the
	// correlation cache without getting correlated. After this time has
	// passed, the request will be removed from the cache
	RequestLifetime = 5 * time.Minute
)

Functions

This section is empty.

Types

type CorrelationCache

type CorrelationCache struct {
	// contains filtered or unexported fields
}

CorrelationCache is a cache used to correlate requests with responses

It consists of two main functions:

cache.HandleRequest(request)

Must be called when a request is forwarded to the broker, will keep track
of the request and rewrite the correlation ID inside of the request to
a sequence number. This sequence number is guaranteed to be unique within
the connection covered by the cache.

cache.CorrelateResponse(response)

Must be called when a response is received from the broker. Will return
the original request that corresponds to the response and will restore the
correlation ID in the response to the value that was found in the original
request.

A garbage collector will run frequently and expire requests which have not been correlated for the period of `RequestLifetime`

func NewCorrelationCache

func NewCorrelationCache() *CorrelationCache

NewCorrelationCache returns a new correlation cache

func (*CorrelationCache) CorrelateResponse

func (cc *CorrelationCache) CorrelateResponse(res *ResponseMessage) *RequestMessage

CorrelateResponse extracts the correlation ID from the response message, correlates the corresponding request, restores the original correlation ID in the response and returns the original request

func (*CorrelationCache) DeleteCache

func (cc *CorrelationCache) DeleteCache()

DeleteCache releases the cache and stops the garbage collector. This function must be called when the cache is no longer required, otherwise go routines are leaked.

func (*CorrelationCache) HandleRequest

func (cc *CorrelationCache) HandleRequest(req *RequestMessage, finishFunc FinishFunc)

HandleRequest must be called when a request is forwarded to the broker, will keep track of the request and rewrite the correlation ID inside of the request to a sequence number. This sequence number is guaranteed to be unique within the connection covered by the cache.

type CorrelationID

type CorrelationID uint32

CorrelationID represents the correlation id as defined in the Kafka protocol specification

type FinishFunc

type FinishFunc func(req *RequestMessage)

FinishFunc is the function called when a request has been correlated with its response

type RequestMessage

type RequestMessage struct {
	// contains filtered or unexported fields
}

RequestMessage represents a Kafka request message

func ReadRequest

func ReadRequest(reader io.Reader) (*RequestMessage, error)

ReadRequest will read a Kafka request from an io.Reader and return the message or an error.

func (*RequestMessage) CreateResponse

func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error)

CreateResponse creates a response message based on the provided request message. The response will have the specified error code set in all topics and embedded partitions.

func (*RequestMessage) GetAPIKey

func (req *RequestMessage) GetAPIKey() int16

GetAPIKey returns the kind of Kafka request

func (*RequestMessage) GetCorrelationID

func (req *RequestMessage) GetCorrelationID() CorrelationID

GetCorrelationID returns the Kafka request correlationID

func (*RequestMessage) GetRaw

func (req *RequestMessage) GetRaw() []byte

GetRaw returns the raw Kafka request

func (*RequestMessage) GetTopics

func (req *RequestMessage) GetTopics() []string

GetTopics returns the Kafka request list of topics

func (*RequestMessage) GetVersion

func (req *RequestMessage) GetVersion() int16

GetVersion returns the version Kafka request

func (*RequestMessage) MatchesRule

func (req *RequestMessage) MatchesRule(rules []api.PortRuleKafka) bool

MatchesRule validates the Kafka request message against the provided list of rules. The function will return true if the policy allows the message, otherwise false is returned.

func (*RequestMessage) SetCorrelationID

func (req *RequestMessage) SetCorrelationID(id CorrelationID)

SetCorrelationID modified the correlation ID of the Kafka request

func (*RequestMessage) String

func (req *RequestMessage) String() string

String returns a human readable representation of the request message

type ResponseMessage

type ResponseMessage struct {
	// contains filtered or unexported fields
}

ResponseMessage represents a Kafka response message.

func ReadResponse

func ReadResponse(reader io.Reader) (*ResponseMessage, error)

ReadResponse will read a Kafka response from an io.Reader and return the message or an error.

func (*ResponseMessage) GetCorrelationID

func (res *ResponseMessage) GetCorrelationID() CorrelationID

GetCorrelationID returns the Kafka request correlationID

func (*ResponseMessage) GetRaw

func (res *ResponseMessage) GetRaw() []byte

GetRaw returns the raw Kafka response

func (*ResponseMessage) SetCorrelationID

func (res *ResponseMessage) SetCorrelationID(id CorrelationID)

SetCorrelationID modified the correlation ID of the Kafka request

func (*ResponseMessage) String

func (res *ResponseMessage) String() string

String returns a human readable representation of the response message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL