kafka

package
v0.0.0-...-e36b76f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package kafka provides a library to parse Kafka requests and responses and apply policy rules

Index

Constants

View Source
const (
	ErrUnknown                                 = -1
	ErrNone                                    = 0
	ErrOffsetOutOfRange                        = 1
	ErrInvalidMessage                          = 2
	ErrUnknownTopicOrPartition                 = 3
	ErrInvalidMessageSize                      = 4
	ErrLeaderNotAvailable                      = 5
	ErrNotLeaderForPartition                   = 6
	ErrRequestTimeout                          = 7
	ErrBrokerNotAvailable                      = 8
	ErrReplicaNotAvailable                     = 9
	ErrMessageSizeTooLarge                     = 10
	ErrScaleControllerEpoch                    = 11
	ErrOffsetMetadataTooLarge                  = 12
	ErrNetwork                                 = 13
	ErrOffsetLoadInProgress                    = 14
	ErrNoCoordinator                           = 15
	ErrNotCoordinator                          = 16
	ErrInvalidTopic                            = 17
	ErrRecordListTooLarge                      = 18
	ErrNotEnoughReplicas                       = 19
	ErrNotEnoughReplicasAfterAppend            = 20
	ErrInvalidRequiredAcks                     = 21
	ErrIllegalGeneration                       = 22
	ErrInconsistentPartitionAssignmentStrategy = 23
	ErrUnknownParititonAssignmentStrategy      = 24
	ErrUnknownConsumerID                       = 25
	ErrInvalidSessionTimeout                   = 26
	ErrRebalanceInProgress                     = 27
	ErrInvalidCommitOffsetSize                 = 28
	ErrTopicAuthorizationFailed                = 29
	ErrGroupAuthorizationFailed                = 30
	ErrClusterAuthorizationFailed              = 31
	ErrInvalidTimeStamp                        = 32
)

List of possible Kafka error codes Reference: https://kafka.apache.org/protocol#protocol_error_codes

Variables

This section is empty.

Functions

This section is empty.

Types

type CorrelationID

type CorrelationID uint32

CorrelationID represents the correlation id as defined in the Kafka protocol specification

type RequestMessage

type RequestMessage struct {
	// contains filtered or unexported fields
}

RequestMessage represents a Kafka request message

func ReadRequest

func ReadRequest(reader io.Reader) (*RequestMessage, error)

ReadRequest will read a Kafka request from an io.Reader and return the message or an error.

func (*RequestMessage) CreateAuthErrorResponse

func (req *RequestMessage) CreateAuthErrorResponse() (*ResponseMessage, error)

CreateAuthErrorResponse creates Authorization error response message for 'req'

func (*RequestMessage) CreateResponse

func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error)

CreateResponse creates a response message based on the provided request message. The response will have the specified error code set in all topics and embedded partitions.

func (*RequestMessage) GetAPIKey

func (req *RequestMessage) GetAPIKey() int16

GetAPIKey returns the kind of Kafka request

func (*RequestMessage) GetCorrelationID

func (req *RequestMessage) GetCorrelationID() CorrelationID

GetCorrelationID returns the Kafka request correlationID

func (*RequestMessage) GetRaw

func (req *RequestMessage) GetRaw() []byte

GetRaw returns the raw Kafka request

func (*RequestMessage) GetTopics

func (req *RequestMessage) GetTopics() []string

GetTopics returns the Kafka request list of topics

func (*RequestMessage) GetVersion

func (req *RequestMessage) GetVersion() int16

GetVersion returns the version Kafka request

func (*RequestMessage) SetCorrelationID

func (req *RequestMessage) SetCorrelationID(id CorrelationID)

SetCorrelationID modified the correlation ID of the Kafka request

func (*RequestMessage) String

func (req *RequestMessage) String() string

String returns a human readable representation of the request message

type ResponseMessage

type ResponseMessage struct {
	// contains filtered or unexported fields
}

ResponseMessage represents a Kafka response message.

func ReadResponse

func ReadResponse(reader io.Reader) (*ResponseMessage, error)

ReadResponse will read a Kafka response from an io.Reader and return the message or an error.

func (*ResponseMessage) GetCorrelationID

func (res *ResponseMessage) GetCorrelationID() CorrelationID

GetCorrelationID returns the Kafka request correlationID

func (*ResponseMessage) GetRaw

func (res *ResponseMessage) GetRaw() []byte

GetRaw returns the raw Kafka response

func (*ResponseMessage) SetCorrelationID

func (res *ResponseMessage) SetCorrelationID(id CorrelationID)

SetCorrelationID modified the correlation ID of the Kafka request

func (*ResponseMessage) String

func (res *ResponseMessage) String() string

String returns a human readable representation of the response message

type Rule

type Rule struct {
	// ApiVersion is the allowed version, or < 0 if all versions
	// are to be allowed
	APIVersion int16

	// ApiKeys is the set of all numerical apiKeys that are allowed.
	// If empty, all API keys are allowed.
	APIKeys map[int16]struct{}

	// ClientID is the client identifier as provided in the request.
	//
	// From Kafka protocol documentation:
	// This is a user supplied identifier for the client application. The
	// user can use any identifier they like and it will be used when
	// logging errors, monitoring aggregates, etc. For example, one might
	// want to monitor not just the requests per second overall, but the
	// number coming from each client application (each of which could
	// reside on multiple servers). This id acts as a logical grouping
	// across all requests from a particular client.
	//
	// If empty, all client identifiers are allowed.
	ClientID string

	// Topic is the topic name contained in the message. If a Kafka request
	// contains multiple topics, then all topics must be allowed or the
	// message will be rejected.
	//
	// This constraint is ignored if the matched request message type
	// doesn't contain any topic. Maximum size of Topic can be 249
	// characters as per recent Kafka spec and allowed characters are
	// a-z, A-Z, 0-9, -, . and _
	// Older Kafka versions had longer topic lengths of 255, but in Kafka 0.10
	// version the length was changed from 255 to 249. For compatibility
	// reasons we are allowing 255.
	//
	// If empty, all topics are allowed.
	Topic string
}

func NewRule

func NewRule(apiVersion int32, apiKeys []int32, clientID, topic string) Rule

NewRule creates a new rule from already sanitized inputs

func (*Rule) CheckAPIKeyRole

func (r *Rule) CheckAPIKeyRole(kind int16) bool

CheckAPIKeyRole checks the apiKey value in the request, and returns true if it is allowed else false

func (*Rule) CheckAPIVersion

func (r *Rule) CheckAPIVersion(apiVersion int16) bool

CheckAPIVersion returns true if 'apiVersion' is allowed

func (*Rule) CheckClientID

func (r *Rule) CheckClientID(clientID string) bool

CheckClientID returns true if 'clientID' is allowed

func (Rule) Matches

func (r Rule) Matches(data interface{}) bool

Matches returns true if Rule matches the request and and all required topics have matched.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL