Documentation ¶
Overview ¶
Package kafka provides a library to parse Kafka requests and responses and apply policy rules
Index ¶
- Constants
- type CorrelationID
- type RequestMessage
- func (req *RequestMessage) CreateAuthErrorResponse() (*ResponseMessage, error)
- func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error)
- func (req *RequestMessage) GetAPIKey() int16
- func (req *RequestMessage) GetCorrelationID() CorrelationID
- func (req *RequestMessage) GetRaw() []byte
- func (req *RequestMessage) GetTopics() []string
- func (req *RequestMessage) GetVersion() int16
- func (req *RequestMessage) SetCorrelationID(id CorrelationID)
- func (req *RequestMessage) String() string
- type ResponseMessage
- type Rule
Constants ¶
const ( ErrUnknown = -1 ErrNone = 0 ErrOffsetOutOfRange = 1 ErrInvalidMessage = 2 ErrUnknownTopicOrPartition = 3 ErrInvalidMessageSize = 4 ErrLeaderNotAvailable = 5 ErrNotLeaderForPartition = 6 ErrRequestTimeout = 7 ErrBrokerNotAvailable = 8 ErrReplicaNotAvailable = 9 ErrMessageSizeTooLarge = 10 ErrScaleControllerEpoch = 11 ErrOffsetMetadataTooLarge = 12 ErrNetwork = 13 ErrOffsetLoadInProgress = 14 ErrNoCoordinator = 15 ErrNotCoordinator = 16 ErrInvalidTopic = 17 ErrRecordListTooLarge = 18 ErrNotEnoughReplicas = 19 ErrNotEnoughReplicasAfterAppend = 20 ErrInvalidRequiredAcks = 21 ErrIllegalGeneration = 22 ErrInconsistentPartitionAssignmentStrategy = 23 ErrUnknownParititonAssignmentStrategy = 24 ErrUnknownConsumerID = 25 ErrInvalidSessionTimeout = 26 ErrRebalanceInProgress = 27 ErrInvalidCommitOffsetSize = 28 ErrTopicAuthorizationFailed = 29 ErrGroupAuthorizationFailed = 30 ErrClusterAuthorizationFailed = 31 ErrInvalidTimeStamp = 32 )
List of possible Kafka error codes Reference: https://kafka.apache.org/protocol#protocol_error_codes
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CorrelationID ¶
type CorrelationID uint32
CorrelationID represents the correlation id as defined in the Kafka protocol specification
type RequestMessage ¶
type RequestMessage struct {
// contains filtered or unexported fields
}
RequestMessage represents a Kafka request message
func ReadRequest ¶
func ReadRequest(reader io.Reader) (*RequestMessage, error)
ReadRequest will read a Kafka request from an io.Reader and return the message or an error.
func (*RequestMessage) CreateAuthErrorResponse ¶
func (req *RequestMessage) CreateAuthErrorResponse() (*ResponseMessage, error)
CreateAuthErrorResponse creates Authorization error response message for 'req'
func (*RequestMessage) CreateResponse ¶
func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error)
CreateResponse creates a response message based on the provided request message. The response will have the specified error code set in all topics and embedded partitions.
func (*RequestMessage) GetAPIKey ¶
func (req *RequestMessage) GetAPIKey() int16
GetAPIKey returns the kind of Kafka request
func (*RequestMessage) GetCorrelationID ¶
func (req *RequestMessage) GetCorrelationID() CorrelationID
GetCorrelationID returns the Kafka request correlationID
func (*RequestMessage) GetRaw ¶
func (req *RequestMessage) GetRaw() []byte
GetRaw returns the raw Kafka request
func (*RequestMessage) GetTopics ¶
func (req *RequestMessage) GetTopics() []string
GetTopics returns the Kafka request list of topics
func (*RequestMessage) GetVersion ¶
func (req *RequestMessage) GetVersion() int16
GetVersion returns the version Kafka request
func (*RequestMessage) SetCorrelationID ¶
func (req *RequestMessage) SetCorrelationID(id CorrelationID)
SetCorrelationID modified the correlation ID of the Kafka request
func (*RequestMessage) String ¶
func (req *RequestMessage) String() string
String returns a human readable representation of the request message
type ResponseMessage ¶
type ResponseMessage struct {
// contains filtered or unexported fields
}
ResponseMessage represents a Kafka response message.
func ReadResponse ¶
func ReadResponse(reader io.Reader) (*ResponseMessage, error)
ReadResponse will read a Kafka response from an io.Reader and return the message or an error.
func (*ResponseMessage) GetCorrelationID ¶
func (res *ResponseMessage) GetCorrelationID() CorrelationID
GetCorrelationID returns the Kafka request correlationID
func (*ResponseMessage) GetRaw ¶
func (res *ResponseMessage) GetRaw() []byte
GetRaw returns the raw Kafka response
func (*ResponseMessage) SetCorrelationID ¶
func (res *ResponseMessage) SetCorrelationID(id CorrelationID)
SetCorrelationID modified the correlation ID of the Kafka request
func (*ResponseMessage) String ¶
func (res *ResponseMessage) String() string
String returns a human readable representation of the response message
type Rule ¶
type Rule struct { // ApiVersion is the allowed version, or < 0 if all versions // are to be allowed APIVersion int16 // ApiKeys is the set of all numerical apiKeys that are allowed. // If empty, all API keys are allowed. APIKeys map[int16]struct{} // ClientID is the client identifier as provided in the request. // // From Kafka protocol documentation: // This is a user supplied identifier for the client application. The // user can use any identifier they like and it will be used when // logging errors, monitoring aggregates, etc. For example, one might // want to monitor not just the requests per second overall, but the // number coming from each client application (each of which could // reside on multiple servers). This id acts as a logical grouping // across all requests from a particular client. // // If empty, all client identifiers are allowed. ClientID string // Topic is the topic name contained in the message. If a Kafka request // contains multiple topics, then all topics must be allowed or the // message will be rejected. // // This constraint is ignored if the matched request message type // doesn't contain any topic. Maximum size of Topic can be 249 // characters as per recent Kafka spec and allowed characters are // a-z, A-Z, 0-9, -, . and _ // Older Kafka versions had longer topic lengths of 255, but in Kafka 0.10 // version the length was changed from 255 to 249. For compatibility // reasons we are allowing 255. // // If empty, all topics are allowed. Topic string }
func (*Rule) CheckAPIKeyRole ¶
CheckAPIKeyRole checks the apiKey value in the request, and returns true if it is allowed else false
func (*Rule) CheckAPIVersion ¶
CheckAPIVersion returns true if 'apiVersion' is allowed
func (*Rule) CheckClientID ¶
CheckClientID returns true if 'clientID' is allowed