Documentation ¶
Overview ¶
Package kafka provides a library to parse Kafka requests and responses and apply policy rules
Index ¶
- Variables
- type CorrelationCache
- type CorrelationID
- type FinishFunc
- type RequestMessage
- func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error)
- func (req *RequestMessage) GetAPIKey() int16
- func (req *RequestMessage) GetCorrelationID() CorrelationID
- func (req *RequestMessage) GetRaw() []byte
- func (req *RequestMessage) GetTopics() []string
- func (req *RequestMessage) GetVersion() int16
- func (req *RequestMessage) MatchesRule(rules []api.PortRuleKafka) bool
- func (req *RequestMessage) SetCorrelationID(id CorrelationID)
- func (req *RequestMessage) String() string
- type ResponseMessage
Constants ¶
This section is empty.
Variables ¶
var ( ErrUnknown = -1 ErrNone = 0 ErrOffsetOutOfRange = 1 ErrInvalidMessage = 2 ErrUnknownTopicOrPartition = 3 ErrInvalidMessageSize = 4 ErrLeaderNotAvailable = 5 ErrNotLeaderForPartition = 6 ErrRequestTimeout = 7 ErrBrokerNotAvailable = 8 ErrReplicaNotAvailable = 9 ErrMessageSizeTooLarge = 10 ErrScaleControllerEpoch = 11 ErrOffsetMetadataTooLarge = 12 ErrNetwork = 13 ErrOffsetLoadInProgress = 14 ErrNoCoordinator = 15 ErrNotCoordinator = 16 ErrInvalidTopic = 17 ErrRecordListTooLarge = 18 ErrNotEnoughReplicas = 19 ErrNotEnoughReplicasAfterAppend = 20 ErrInvalidRequiredAcks = 21 ErrIllegalGeneration = 22 ErrInconsistentPartitionAssignmentStrategy = 23 ErrUnknownParititonAssignmentStrategy = 24 ErrUnknownConsumerID = 25 ErrInvalidSessionTimeout = 26 ErrRebalanceInProgress = 27 ErrInvalidCommitOffsetSize = 28 ErrTopicAuthorizationFailed = 29 ErrGroupAuthorizationFailed = 30 ErrClusterAuthorizationFailed = 31 ErrInvalidTimeStamp = 32 )
List of possible Kafka error codes Reference: https://kafka.apache.org/protocol#protocol_error_codes
var ( // RequestLifetime specifies the maximum time a request can stay in the // correlation cache without getting correlated. After this time has // passed, the request will be removed from the cache RequestLifetime = 5 * time.Minute )
Functions ¶
This section is empty.
Types ¶
type CorrelationCache ¶
type CorrelationCache struct {
// contains filtered or unexported fields
}
CorrelationCache is a cache used to correlate requests with responses
It consists of two main functions:
cache.HandleRequest(request)
Must be called when a request is forwarded to the broker, will keep track of the request and rewrite the correlation ID inside of the request to a sequence number. This sequence number is guaranteed to be unique within the connection covered by the cache.
cache.CorrelateResponse(response)
Must be called when a response is received from the broker. Will return the original request that corresponds to the response and will restore the correlation ID in the response to the value that was found in the original request.
A garbage collector will run frequently and expire requests which have not been correlated for the period of `RequestLifetime`
func NewCorrelationCache ¶
func NewCorrelationCache() *CorrelationCache
NewCorrelationCache returns a new correlation cache
func (*CorrelationCache) CorrelateResponse ¶
func (cc *CorrelationCache) CorrelateResponse(res *ResponseMessage) *RequestMessage
CorrelateResponse extracts the correlation ID from the response message, correlates the corresponding request, restores the original correlation ID in the response and returns the original request
func (*CorrelationCache) DeleteCache ¶
func (cc *CorrelationCache) DeleteCache()
DeleteCache releases the cache and stops the garbage collector. This function must be called when the cache is no longer required, otherwise go routines are leaked.
func (*CorrelationCache) HandleRequest ¶
func (cc *CorrelationCache) HandleRequest(req *RequestMessage, finishFunc FinishFunc)
HandleRequest must be called when a request is forwarded to the broker, will keep track of the request and rewrite the correlation ID inside of the request to a sequence number. This sequence number is guaranteed to be unique within the connection covered by the cache.
type CorrelationID ¶
type CorrelationID uint32
CorrelationID represents the correlation id as defined in the Kafka protocol specification
type FinishFunc ¶
type FinishFunc func(req *RequestMessage)
FinishFunc is the function called when a request has been correlated with its response
type RequestMessage ¶
type RequestMessage struct {
// contains filtered or unexported fields
}
RequestMessage represents a Kafka request message
func ReadRequest ¶
func ReadRequest(reader io.Reader) (*RequestMessage, error)
ReadRequest will read a Kafka request from an io.Reader and return the message or an error.
func (*RequestMessage) CreateResponse ¶
func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error)
CreateResponse creates a response message based on the provided request message. The response will have the specified error code set in all topics and embedded partitions.
func (*RequestMessage) GetAPIKey ¶
func (req *RequestMessage) GetAPIKey() int16
GetAPIKey returns the kind of Kafka request
func (*RequestMessage) GetCorrelationID ¶
func (req *RequestMessage) GetCorrelationID() CorrelationID
GetCorrelationID returns the Kafka request correlationID
func (*RequestMessage) GetRaw ¶
func (req *RequestMessage) GetRaw() []byte
GetRaw returns the raw Kafka request
func (*RequestMessage) GetTopics ¶
func (req *RequestMessage) GetTopics() []string
GetTopics returns the Kafka request list of topics
func (*RequestMessage) GetVersion ¶
func (req *RequestMessage) GetVersion() int16
GetVersion returns the version Kafka request
func (*RequestMessage) MatchesRule ¶
func (req *RequestMessage) MatchesRule(rules []api.PortRuleKafka) bool
MatchesRule validates the Kafka request message against the provided list of rules. The function will return true if the policy allows the message, otherwise false is returned.
func (*RequestMessage) SetCorrelationID ¶
func (req *RequestMessage) SetCorrelationID(id CorrelationID)
SetCorrelationID modified the correlation ID of the Kafka request
func (*RequestMessage) String ¶
func (req *RequestMessage) String() string
String returns a human readable representation of the request message
type ResponseMessage ¶
type ResponseMessage struct {
// contains filtered or unexported fields
}
ResponseMessage represents a Kafka response message.
func ReadResponse ¶
func ReadResponse(reader io.Reader) (*ResponseMessage, error)
ReadResponse will read a Kafka response from an io.Reader and return the message or an error.
func (*ResponseMessage) GetCorrelationID ¶
func (res *ResponseMessage) GetCorrelationID() CorrelationID
GetCorrelationID returns the Kafka request correlationID
func (*ResponseMessage) GetRaw ¶
func (res *ResponseMessage) GetRaw() []byte
GetRaw returns the raw Kafka response
func (*ResponseMessage) SetCorrelationID ¶
func (res *ResponseMessage) SetCorrelationID(id CorrelationID)
SetCorrelationID modified the correlation ID of the Kafka request
func (*ResponseMessage) String ¶
func (res *ResponseMessage) String() string
String returns a human readable representation of the response message