Documentation ¶
Index ¶
- Constants
- Variables
- func Decode(b []byte, in Decoder) error
- func Encode(e Encoder) ([]byte, error)
- func Read(r io.Reader, data interface{}) error
- func Size(v interface{}) int
- func Write(w io.Writer, data interface{}) error
- type APIVersion
- type APIVersionsRequest
- type APIVersionsResponse
- type Body
- type Broker
- type ByteDecoder
- func (d *ByteDecoder) ArrayLength() (int, error)
- func (d *ByteDecoder) Bool() (bool, error)
- func (d *ByteDecoder) Bytes() ([]byte, error)
- func (d *ByteDecoder) Int16() (int16, error)
- func (d *ByteDecoder) Int32() (int32, error)
- func (d *ByteDecoder) Int32Array() ([]int32, error)
- func (d *ByteDecoder) Int64() (int64, error)
- func (d *ByteDecoder) Int64Array() ([]int64, error)
- func (d *ByteDecoder) Int8() (int8, error)
- func (d *ByteDecoder) Offset() int
- func (d *ByteDecoder) Pop() error
- func (d *ByteDecoder) Push(pd PushDecoder) error
- func (d *ByteDecoder) String() (string, error)
- func (d *ByteDecoder) StringArray() ([]string, error)
- type ByteEncoder
- func (b *ByteEncoder) Bytes() []byte
- func (e *ByteEncoder) Pop()
- func (e *ByteEncoder) Push(pe PushEncoder)
- func (e *ByteEncoder) PutArrayLength(in int) error
- func (e *ByteEncoder) PutBool(in bool)
- func (e *ByteEncoder) PutBytes(in []byte) error
- func (e *ByteEncoder) PutInt16(in int16)
- func (e *ByteEncoder) PutInt32(in int32)
- func (e *ByteEncoder) PutInt32Array(in []int32) error
- func (e *ByteEncoder) PutInt64(in int64)
- func (e *ByteEncoder) PutInt64Array(in []int64) error
- func (e *ByteEncoder) PutInt8(in int8)
- func (e *ByteEncoder) PutRawBytes(in []byte) error
- func (e *ByteEncoder) PutString(in string) error
- func (e *ByteEncoder) PutStringArray(in []string) error
- type CRCField
- type Coordinator
- type CreateTopicRequest
- type CreateTopicRequests
- type CreateTopicsResponse
- type Data
- type Decoder
- type DeleteTopicsRequest
- type DeleteTopicsResponse
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type Encoder
- type Error
- type FetchPartition
- type FetchPartitionResponse
- type FetchRequest
- type FetchResponse
- type FetchResponses
- type FetchTopic
- type Group
- type GroupCoordinatorRequest
- type GroupCoordinatorResponse
- type GroupMember
- type GroupProtocol
- type HeartbeatRequest
- type HeartbeatResponse
- type JoinGroupRequest
- type JoinGroupResponse
- type LeaderAndISRPartition
- type LeaderAndISRRequest
- type LeaderAndISRResponse
- type LeaveGroupRequest
- type LeaveGroupResponse
- type LenEncoder
- func (e *LenEncoder) Pop()
- func (e *LenEncoder) Push(pe PushEncoder)
- func (e *LenEncoder) PutArrayLength(in int) error
- func (e *LenEncoder) PutBool(in bool)
- func (e *LenEncoder) PutBytes(in []byte) error
- func (e *LenEncoder) PutInt16(in int16)
- func (e *LenEncoder) PutInt32(in int32)
- func (e *LenEncoder) PutInt32Array(in []int32) error
- func (e *LenEncoder) PutInt64(in int64)
- func (e *LenEncoder) PutInt64Array(in []int64) error
- func (e *LenEncoder) PutInt8(in int8)
- func (e *LenEncoder) PutRawBytes(in []byte) error
- func (e *LenEncoder) PutString(in string) error
- func (e *LenEncoder) PutStringArray(in []string) error
- type ListGroupsRequest
- type ListGroupsResponse
- type LiveLeader
- type Member
- type Message
- type MessageSet
- type MetadataRequest
- type MetadataResponse
- type OffsetResponse
- type OffsetsPartition
- type OffsetsRequest
- type OffsetsResponse
- type OffsetsTopic
- type PacketDecoder
- type PacketEncoder
- type PartitionMetadata
- type PartitionResponse
- type PartitionState
- type ProducePartitionResponse
- type ProduceRequest
- type ProduceResponse
- type ProduceResponses
- type PushDecoder
- type PushEncoder
- type Request
- type RequestHeader
- type Response
- type ResponseBody
- type SizeField
- type StopReplicaPartition
- type StopReplicaRequest
- type StopReplicaResponse
- type StopReplicaResponsePartition
- type SyncGroupRequest
- type SyncGroupResponse
- type TopicData
- type TopicErrorCode
- type TopicMetadata
Constants ¶
View Source
const ( ProduceKey = 0 FetchKey = 1 OffsetsKey = 2 MetadataKey = 3 LeaderAndISRKey = 4 StopReplicaKey = 5 UpdateMetadataKey = 6 ControlledShutdownKey = 7 OffsetCommitKey = 8 OffsetFetchKey = 9 GroupCoordinatorKey = 10 JoinGroupKey = 11 HeartbeatKey = 12 LeaveGroupKey = 13 SyncGroupKey = 14 DescribeGroupsKey = 15 ListGroupsKey = 16 SaslHandshakeKey = 17 APIVersionsKey = 18 CreateTopicsKey = 19 DeleteTopicsKey = 20 )
Protocol API keys. See: https://kafka.apache.org/protocol#protocol_api_keys
Variables ¶
View Source
var ( ErrUnknown = Error{/* contains filtered or unexported fields */} ErrNone = Error{/* contains filtered or unexported fields */} ErrOffsetOutOfRange = Error{/* contains filtered or unexported fields */} ErrCorruptMessage = Error{/* contains filtered or unexported fields */} ErrUnknownTopicOrPartition = Error{/* contains filtered or unexported fields */} ErrInvalidFetchSize = Error{/* contains filtered or unexported fields */} ErrLeaderNotAvailable = Error{/* contains filtered or unexported fields */} ErrNotLeaderForPartition = Error{/* contains filtered or unexported fields */} ErrRequestTimedOut = Error{/* contains filtered or unexported fields */} ErrBrokerNotAvailable = Error{/* contains filtered or unexported fields */} ErrReplicaNotAvailable = Error{/* contains filtered or unexported fields */} ErrMessageTooLarge = Error{/* contains filtered or unexported fields */} ErrStaleControllerEpoch = Error{/* contains filtered or unexported fields */} ErrOffsetMetadataTooLarge = Error{/* contains filtered or unexported fields */} ErrNetworkException = Error{/* contains filtered or unexported fields */} ErrCoordinatorLoadInProgress = Error{/* contains filtered or unexported fields */} ErrCoordinatorNotAvailable = Error{/* contains filtered or unexported fields */} ErrNotCoordinator = Error{/* contains filtered or unexported fields */} ErrInvalidTopicException = Error{/* contains filtered or unexported fields */} ErrRecordListTooLarge = Error{/* contains filtered or unexported fields */} ErrNotEnoughReplicas = Error{/* contains filtered or unexported fields */} ErrNotEnoughReplicasAfterAppend = Error{/* contains filtered or unexported fields */} ErrInvalidRequiredAcks = Error{/* contains filtered or unexported fields */} ErrIllegalGeneration = Error{/* contains filtered or unexported fields */} ErrInconsistentGroupProtocol = Error{/* contains filtered or unexported fields */} ErrInvalidGroupId = Error{/* contains filtered or unexported fields */} ErrUnknownMemberId = Error{/* contains filtered or unexported fields */} ErrInvalidSessionTimeout = Error{/* contains filtered or unexported fields */} ErrRebalanceInProgress = Error{/* contains filtered or unexported fields */} ErrInvalidCommitOffsetSize = Error{/* contains filtered or unexported fields */} ErrTopicAuthorizationFailed = Error{/* contains filtered or unexported fields */} ErrGroupAuthorizationFailed = Error{/* contains filtered or unexported fields */} ErrClusterAuthorizationFailed = Error{/* contains filtered or unexported fields */} ErrInvalidTimestamp = Error{/* contains filtered or unexported fields */} ErrUnsupportedSaslMechanism = Error{/* contains filtered or unexported fields */} ErrIllegalSaslState = Error{/* contains filtered or unexported fields */} ErrUnsupportedVersion = Error{/* contains filtered or unexported fields */} ErrTopicAlreadyExists = Error{/* contains filtered or unexported fields */} ErrInvalidPartitions = Error{/* contains filtered or unexported fields */} ErrInvalidReplicationFactor = Error{/* contains filtered or unexported fields */} ErrInvalidReplicaAssignment = Error{/* contains filtered or unexported fields */} ErrInvalidConfig = Error{/* contains filtered or unexported fields */} ErrNotController = Error{/* contains filtered or unexported fields */} ErrInvalidRequest = Error{/* contains filtered or unexported fields */} ErrUnsupportedForMessageFormat = Error{/* contains filtered or unexported fields */} ErrPolicyViolation = Error{/* contains filtered or unexported fields */} ErrOutOfOrderSequenceNumber = Error{/* contains filtered or unexported fields */} ErrDuplicateSequenceNumber = Error{/* contains filtered or unexported fields */} ErrInvalidProducerEpoch = Error{/* contains filtered or unexported fields */} ErrInvalidTxnState = Error{/* contains filtered or unexported fields */} ErrInvalidProducerIdMapping = Error{/* contains filtered or unexported fields */} ErrInvalidTransactionTimeout = Error{/* contains filtered or unexported fields */} ErrConcurrentTransactions = Error{/* contains filtered or unexported fields */} ErrTransactionCoordinatorFenced = Error{/* contains filtered or unexported fields */} ErrTransactionalIdAuthorizationFailed = Error{/* contains filtered or unexported fields */} ErrSecurityDisabled = Error{/* contains filtered or unexported fields */} ErrOperationNotAttempted = Error{/* contains filtered or unexported fields */} // Errs maps err codes to their errs. Errs = map[int16]Error{ -1: ErrUnknown, 0: ErrNone, 1: ErrOffsetOutOfRange, 2: ErrCorruptMessage, 3: ErrUnknownTopicOrPartition, 4: ErrInvalidFetchSize, 5: ErrLeaderNotAvailable, 6: ErrNotLeaderForPartition, 7: ErrRequestTimedOut, 8: ErrBrokerNotAvailable, 9: ErrReplicaNotAvailable, 10: ErrMessageTooLarge, 11: ErrStaleControllerEpoch, 12: ErrOffsetMetadataTooLarge, 13: ErrNetworkException, 14: ErrCoordinatorLoadInProgress, 15: ErrCoordinatorNotAvailable, 16: ErrNotCoordinator, 17: ErrInvalidTopicException, 18: ErrRecordListTooLarge, 19: ErrNotEnoughReplicas, 20: ErrNotEnoughReplicasAfterAppend, 21: ErrInvalidRequiredAcks, 22: ErrIllegalGeneration, 23: ErrInconsistentGroupProtocol, 24: ErrInvalidGroupId, 25: ErrUnknownMemberId, 26: ErrInvalidSessionTimeout, 27: ErrRebalanceInProgress, 28: ErrInvalidCommitOffsetSize, 29: ErrTopicAuthorizationFailed, 30: ErrGroupAuthorizationFailed, 31: ErrClusterAuthorizationFailed, 32: ErrInvalidTimestamp, 33: ErrUnsupportedSaslMechanism, 34: ErrIllegalSaslState, 35: ErrUnsupportedVersion, 36: ErrTopicAlreadyExists, 37: ErrInvalidPartitions, 38: ErrInvalidReplicationFactor, 39: ErrInvalidReplicaAssignment, 40: ErrInvalidConfig, 41: ErrNotController, 42: ErrInvalidRequest, 43: ErrUnsupportedForMessageFormat, 44: ErrPolicyViolation, 45: ErrOutOfOrderSequenceNumber, 46: ErrDuplicateSequenceNumber, 47: ErrInvalidProducerEpoch, 48: ErrInvalidTxnState, 49: ErrInvalidProducerIdMapping, 50: ErrInvalidTransactionTimeout, 51: ErrConcurrentTransactions, 52: ErrTransactionCoordinatorFenced, 53: ErrTransactionalIdAuthorizationFailed, 54: ErrSecurityDisabled, 55: ErrOperationNotAttempted, } )
View Source
var Encoding = binary.BigEndian
View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
View Source
var ErrInvalidArrayLength = errors.New("kafka: invalid array length")
View Source
var ErrInvalidByteSliceLength = errors.New("invalid byteslice length")
View Source
var ErrInvalidStringLength = errors.New("kafka: invalid string length")
Functions ¶
Types ¶
type APIVersion ¶
type APIVersionsRequest ¶
type APIVersionsRequest struct{}
func (*APIVersionsRequest) Decode ¶
func (c *APIVersionsRequest) Decode(_ PacketDecoder) error
func (*APIVersionsRequest) Encode ¶
func (c *APIVersionsRequest) Encode(_ PacketEncoder) error
func (*APIVersionsRequest) Key ¶
func (c *APIVersionsRequest) Key() int16
func (*APIVersionsRequest) Version ¶
func (c *APIVersionsRequest) Version() int16
type APIVersionsResponse ¶
type APIVersionsResponse struct { APIVersions []APIVersion ErrorCode int16 }
func (*APIVersionsResponse) Decode ¶
func (c *APIVersionsResponse) Decode(d PacketDecoder) error
func (*APIVersionsResponse) Encode ¶
func (c *APIVersionsResponse) Encode(e PacketEncoder) error
type ByteDecoder ¶
type ByteDecoder struct {
// contains filtered or unexported fields
}
func NewDecoder ¶
func NewDecoder(b []byte) *ByteDecoder
func (*ByteDecoder) ArrayLength ¶
func (d *ByteDecoder) ArrayLength() (int, error)
func (*ByteDecoder) Bool ¶
func (d *ByteDecoder) Bool() (bool, error)
func (*ByteDecoder) Bytes ¶
func (d *ByteDecoder) Bytes() ([]byte, error)
func (*ByteDecoder) Int16 ¶
func (d *ByteDecoder) Int16() (int16, error)
func (*ByteDecoder) Int32 ¶
func (d *ByteDecoder) Int32() (int32, error)
func (*ByteDecoder) Int32Array ¶
func (d *ByteDecoder) Int32Array() ([]int32, error)
func (*ByteDecoder) Int64 ¶
func (d *ByteDecoder) Int64() (int64, error)
func (*ByteDecoder) Int64Array ¶
func (d *ByteDecoder) Int64Array() ([]int64, error)
func (*ByteDecoder) Int8 ¶
func (d *ByteDecoder) Int8() (int8, error)
func (*ByteDecoder) Offset ¶
func (d *ByteDecoder) Offset() int
func (*ByteDecoder) Pop ¶
func (d *ByteDecoder) Pop() error
func (*ByteDecoder) Push ¶
func (d *ByteDecoder) Push(pd PushDecoder) error
func (*ByteDecoder) String ¶
func (d *ByteDecoder) String() (string, error)
func (*ByteDecoder) StringArray ¶
func (d *ByteDecoder) StringArray() ([]string, error)
type ByteEncoder ¶
type ByteEncoder struct {
// contains filtered or unexported fields
}
func NewByteEncoder ¶
func NewByteEncoder(b []byte) *ByteEncoder
func (*ByteEncoder) Bytes ¶
func (b *ByteEncoder) Bytes() []byte
func (*ByteEncoder) Pop ¶
func (e *ByteEncoder) Pop()
func (*ByteEncoder) Push ¶
func (e *ByteEncoder) Push(pe PushEncoder)
func (*ByteEncoder) PutArrayLength ¶
func (e *ByteEncoder) PutArrayLength(in int) error
func (*ByteEncoder) PutBool ¶
func (e *ByteEncoder) PutBool(in bool)
func (*ByteEncoder) PutBytes ¶
func (e *ByteEncoder) PutBytes(in []byte) error
func (*ByteEncoder) PutInt16 ¶
func (e *ByteEncoder) PutInt16(in int16)
func (*ByteEncoder) PutInt32 ¶
func (e *ByteEncoder) PutInt32(in int32)
func (*ByteEncoder) PutInt32Array ¶
func (e *ByteEncoder) PutInt32Array(in []int32) error
func (*ByteEncoder) PutInt64 ¶
func (e *ByteEncoder) PutInt64(in int64)
func (*ByteEncoder) PutInt64Array ¶
func (e *ByteEncoder) PutInt64Array(in []int64) error
func (*ByteEncoder) PutInt8 ¶
func (e *ByteEncoder) PutInt8(in int8)
func (*ByteEncoder) PutRawBytes ¶
func (e *ByteEncoder) PutRawBytes(in []byte) error
func (*ByteEncoder) PutString ¶
func (e *ByteEncoder) PutString(in string) error
func (*ByteEncoder) PutStringArray ¶
func (e *ByteEncoder) PutStringArray(in []string) error
type CRCField ¶
type CRCField struct {
StartOffset int
}
func (*CRCField) ReserveSize ¶
func (*CRCField) SaveOffset ¶
type Coordinator ¶
type CreateTopicRequest ¶
type CreateTopicRequests ¶
type CreateTopicRequests struct { Requests []*CreateTopicRequest Timeout int32 }
func (*CreateTopicRequests) Decode ¶
func (c *CreateTopicRequests) Decode(d PacketDecoder) error
func (*CreateTopicRequests) Encode ¶
func (c *CreateTopicRequests) Encode(e PacketEncoder) error
func (*CreateTopicRequests) Key ¶
func (c *CreateTopicRequests) Key() int16
func (*CreateTopicRequests) Version ¶
func (c *CreateTopicRequests) Version() int16
type CreateTopicsResponse ¶
type CreateTopicsResponse struct {
TopicErrorCodes []*TopicErrorCode
}
func (*CreateTopicsResponse) Decode ¶
func (c *CreateTopicsResponse) Decode(d PacketDecoder) error
func (*CreateTopicsResponse) Encode ¶
func (c *CreateTopicsResponse) Encode(e PacketEncoder) error
type Decoder ¶
type Decoder interface {
Decode(d PacketDecoder) error
}
type DeleteTopicsRequest ¶
func (*DeleteTopicsRequest) Decode ¶
func (c *DeleteTopicsRequest) Decode(d PacketDecoder) (err error)
func (*DeleteTopicsRequest) Encode ¶
func (c *DeleteTopicsRequest) Encode(e PacketEncoder) (err error)
func (*DeleteTopicsRequest) Key ¶
func (c *DeleteTopicsRequest) Key() int16
func (*DeleteTopicsRequest) Version ¶
func (c *DeleteTopicsRequest) Version() int16
type DeleteTopicsResponse ¶
type DeleteTopicsResponse struct {
TopicErrorCodes []*TopicErrorCode
}
func (*DeleteTopicsResponse) Decode ¶
func (c *DeleteTopicsResponse) Decode(d PacketDecoder) error
func (*DeleteTopicsResponse) Encode ¶
func (c *DeleteTopicsResponse) Encode(e PacketEncoder) error
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct {
GroupIDs []string
}
func (*DescribeGroupsRequest) Decode ¶
func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error)
func (*DescribeGroupsRequest) Encode ¶
func (r *DescribeGroupsRequest) Encode(e PacketEncoder) error
func (*DescribeGroupsRequest) Key ¶
func (r *DescribeGroupsRequest) Key() int16
func (*DescribeGroupsRequest) Version ¶
func (r *DescribeGroupsRequest) Version() int16
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct {
Groups []*Group
}
func (*DescribeGroupsResponse) Decode ¶
func (r *DescribeGroupsResponse) Decode(d PacketDecoder) (err error)
func (*DescribeGroupsResponse) Encode ¶
func (r *DescribeGroupsResponse) Encode(e PacketEncoder) error
func (*DescribeGroupsResponse) Key ¶
func (r *DescribeGroupsResponse) Key() int16
func (*DescribeGroupsResponse) Version ¶
func (r *DescribeGroupsResponse) Version() int16
type Encoder ¶
type Encoder interface {
Encode(e PacketEncoder) error
}
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error represents a protocol err. It makes it so the errors can have their error code and description too.
type FetchPartition ¶
type FetchPartitionResponse ¶
type FetchRequest ¶
type FetchRequest struct { ReplicaID int32 MaxWaitTime int32 MinBytes int32 // MaxBytes int32 Topics []*FetchTopic }
func (*FetchRequest) Decode ¶
func (r *FetchRequest) Decode(d PacketDecoder) error
func (*FetchRequest) Encode ¶
func (r *FetchRequest) Encode(e PacketEncoder) error
func (*FetchRequest) Key ¶
func (r *FetchRequest) Key() int16
func (*FetchRequest) Version ¶
func (r *FetchRequest) Version() int16
type FetchResponse ¶
type FetchResponse struct { Topic string PartitionResponses []*FetchPartitionResponse }
type FetchResponses ¶
type FetchResponses struct { ThrottleTimeMs int32 Responses []*FetchResponse }
func (*FetchResponses) Decode ¶
func (r *FetchResponses) Decode(d PacketDecoder) error
func (*FetchResponses) Encode ¶
func (r *FetchResponses) Encode(e PacketEncoder) (err error)
type FetchTopic ¶
type FetchTopic struct { Topic string Partitions []*FetchPartition }
type Group ¶
type Group struct { ErrorCode int16 GroupID string State string ProtocolType string Protocol string GroupMembers map[string]*GroupMember }
func (*Group) Decode ¶
func (r *Group) Decode(d PacketDecoder) (err error)
func (*Group) Encode ¶
func (r *Group) Encode(e PacketEncoder) error
type GroupCoordinatorRequest ¶
type GroupCoordinatorRequest struct {
GroupID string
}
func (*GroupCoordinatorRequest) Decode ¶
func (r *GroupCoordinatorRequest) Decode(d PacketDecoder) (err error)
func (*GroupCoordinatorRequest) Encode ¶
func (r *GroupCoordinatorRequest) Encode(e PacketEncoder) error
func (*GroupCoordinatorRequest) Key ¶
func (r *GroupCoordinatorRequest) Key() int16
func (*GroupCoordinatorRequest) Version ¶
func (r *GroupCoordinatorRequest) Version() int16
type GroupCoordinatorResponse ¶
type GroupCoordinatorResponse struct { ErrorCode int16 Coordinator *Coordinator }
func (*GroupCoordinatorResponse) Decode ¶
func (r *GroupCoordinatorResponse) Decode(d PacketDecoder) (err error)
func (*GroupCoordinatorResponse) Encode ¶
func (r *GroupCoordinatorResponse) Encode(e PacketEncoder) error
type GroupMember ¶
type GroupMember struct { ClientID string ClientHost string GroupMemberMetadata []byte GroupMemberAssignment []byte }
func (*GroupMember) Decode ¶
func (r *GroupMember) Decode(d PacketDecoder) (err error)
func (*GroupMember) Encode ¶
func (r *GroupMember) Encode(e PacketEncoder) error
type GroupProtocol ¶
type HeartbeatRequest ¶
func (*HeartbeatRequest) Decode ¶
func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error)
func (*HeartbeatRequest) Key ¶
func (r *HeartbeatRequest) Key() int16
func (*HeartbeatRequest) Version ¶
func (r *HeartbeatRequest) Version() int16
type HeartbeatResponse ¶
type HeartbeatResponse struct {
ErrorCode int16
}
func (*HeartbeatResponse) Decode ¶
func (r *HeartbeatResponse) Decode(d PacketDecoder) (err error)
func (*HeartbeatResponse) Encode ¶
func (r *HeartbeatResponse) Encode(e PacketEncoder) error
func (*HeartbeatResponse) Key ¶
func (r *HeartbeatResponse) Key() int16
func (*HeartbeatResponse) Version ¶
func (r *HeartbeatResponse) Version() int16
type JoinGroupRequest ¶
type JoinGroupRequest struct { GroupID string SessionTimeout int32 MemberID string ProtocolType string GroupProtocols []*GroupProtocol }
func (*JoinGroupRequest) Decode ¶
func (r *JoinGroupRequest) Decode(d PacketDecoder) error
func (*JoinGroupRequest) Encode ¶
func (r *JoinGroupRequest) Encode(e PacketEncoder) error
func (*JoinGroupRequest) Key ¶
func (r *JoinGroupRequest) Key() int16
func (*JoinGroupRequest) Version ¶
func (r *JoinGroupRequest) Version() int16
type JoinGroupResponse ¶
type JoinGroupResponse struct { ErrorCode int16 GenerationID int32 GroupProtocol string LeaderID string MemberID string Members map[string][]byte }
func (*JoinGroupResponse) Decode ¶
func (r *JoinGroupResponse) Decode(d PacketDecoder) error
func (*JoinGroupResponse) Encode ¶
func (r *JoinGroupResponse) Encode(e PacketEncoder) error
func (*JoinGroupResponse) Key ¶
func (r *JoinGroupResponse) Key() int16
func (*JoinGroupResponse) Version ¶
func (r *JoinGroupResponse) Version() int16
type LeaderAndISRPartition ¶
type LeaderAndISRRequest ¶
type LeaderAndISRRequest struct { ControllerID int32 ControllerEpoch int32 PartitionStates []*PartitionState LiveLeaders []*LiveLeader }
func (*LeaderAndISRRequest) Decode ¶
func (r *LeaderAndISRRequest) Decode(d PacketDecoder) error
func (*LeaderAndISRRequest) Encode ¶
func (r *LeaderAndISRRequest) Encode(e PacketEncoder) error
func (*LeaderAndISRRequest) Key ¶
func (r *LeaderAndISRRequest) Key() int16
func (*LeaderAndISRRequest) Version ¶
func (r *LeaderAndISRRequest) Version() int16
type LeaderAndISRResponse ¶
type LeaderAndISRResponse struct { ErrorCode int16 Partitions []*LeaderAndISRPartition }
func (*LeaderAndISRResponse) Decode ¶
func (r *LeaderAndISRResponse) Decode(d PacketDecoder) error
func (*LeaderAndISRResponse) Encode ¶
func (r *LeaderAndISRResponse) Encode(e PacketEncoder) error
func (*LeaderAndISRResponse) Key ¶
func (r *LeaderAndISRResponse) Key() int16
func (*LeaderAndISRResponse) Version ¶
func (r *LeaderAndISRResponse) Version() int16
type LeaveGroupRequest ¶
func (*LeaveGroupRequest) Decode ¶
func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error)
func (*LeaveGroupRequest) Encode ¶
func (r *LeaveGroupRequest) Encode(e PacketEncoder) error
func (*LeaveGroupRequest) Key ¶
func (r *LeaveGroupRequest) Key() int16
func (*LeaveGroupRequest) Version ¶
func (r *LeaveGroupRequest) Version() int16
type LeaveGroupResponse ¶
type LeaveGroupResponse struct {
ErrorCode int16
}
func (*LeaveGroupResponse) Decode ¶
func (r *LeaveGroupResponse) Decode(d PacketDecoder) (err error)
func (*LeaveGroupResponse) Encode ¶
func (r *LeaveGroupResponse) Encode(e PacketEncoder) error
func (*LeaveGroupResponse) Key ¶
func (r *LeaveGroupResponse) Key() int16
func (*LeaveGroupResponse) Version ¶
func (r *LeaveGroupResponse) Version() int16
type LenEncoder ¶
type LenEncoder struct { Length int // contains filtered or unexported fields }
func (*LenEncoder) Pop ¶
func (e *LenEncoder) Pop()
func (*LenEncoder) Push ¶
func (e *LenEncoder) Push(pe PushEncoder)
func (*LenEncoder) PutArrayLength ¶
func (e *LenEncoder) PutArrayLength(in int) error
func (*LenEncoder) PutBool ¶
func (e *LenEncoder) PutBool(in bool)
func (*LenEncoder) PutBytes ¶
func (e *LenEncoder) PutBytes(in []byte) error
func (*LenEncoder) PutInt16 ¶
func (e *LenEncoder) PutInt16(in int16)
func (*LenEncoder) PutInt32 ¶
func (e *LenEncoder) PutInt32(in int32)
func (*LenEncoder) PutInt32Array ¶
func (e *LenEncoder) PutInt32Array(in []int32) error
func (*LenEncoder) PutInt64 ¶
func (e *LenEncoder) PutInt64(in int64)
func (*LenEncoder) PutInt64Array ¶
func (e *LenEncoder) PutInt64Array(in []int64) error
func (*LenEncoder) PutInt8 ¶
func (e *LenEncoder) PutInt8(in int8)
func (*LenEncoder) PutRawBytes ¶
func (e *LenEncoder) PutRawBytes(in []byte) error
func (*LenEncoder) PutString ¶
func (e *LenEncoder) PutString(in string) error
func (*LenEncoder) PutStringArray ¶
func (e *LenEncoder) PutStringArray(in []string) error
type ListGroupsRequest ¶
type ListGroupsRequest struct { }
func (*ListGroupsRequest) Decode ¶
func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error)
func (*ListGroupsRequest) Encode ¶
func (r *ListGroupsRequest) Encode(e PacketEncoder) error
func (*ListGroupsRequest) Key ¶
func (r *ListGroupsRequest) Key() int16
func (*ListGroupsRequest) Version ¶
func (r *ListGroupsRequest) Version() int16
type ListGroupsResponse ¶
func (*ListGroupsResponse) Decode ¶
func (r *ListGroupsResponse) Decode(d PacketDecoder) (err error)
func (*ListGroupsResponse) Encode ¶
func (r *ListGroupsResponse) Encode(e PacketEncoder) error
func (*ListGroupsResponse) Key ¶
func (r *ListGroupsResponse) Key() int16
func (*ListGroupsResponse) Version ¶
func (r *ListGroupsResponse) Version() int16
type LiveLeader ¶
type Message ¶
func (*Message) Decode ¶
func (m *Message) Decode(d PacketDecoder) error
func (*Message) Encode ¶
func (m *Message) Encode(e PacketEncoder) error
type MessageSet ¶
func (*MessageSet) Decode ¶
func (ms *MessageSet) Decode(d PacketDecoder) error
func (*MessageSet) Encode ¶
func (ms *MessageSet) Encode(e PacketEncoder) error
type MetadataRequest ¶
type MetadataRequest struct {
Topics []string
}
func (*MetadataRequest) Decode ¶
func (r *MetadataRequest) Decode(d PacketDecoder) (err error)
func (*MetadataRequest) Encode ¶
func (r *MetadataRequest) Encode(e PacketEncoder) error
func (*MetadataRequest) Key ¶
func (r *MetadataRequest) Key() int16
func (*MetadataRequest) Version ¶
func (r *MetadataRequest) Version() int16
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker // unsupported: ClusterID *string // unsupported: ControllerID string TopicMetadata []*TopicMetadata }
func (*MetadataResponse) Decode ¶
func (r *MetadataResponse) Decode(d PacketDecoder) error
func (*MetadataResponse) Encode ¶
func (r *MetadataResponse) Encode(e PacketEncoder) (err error)
type OffsetResponse ¶
type OffsetResponse struct { Topic string PartitionResponses []*PartitionResponse }
type OffsetsPartition ¶
type OffsetsRequest ¶
type OffsetsRequest struct { ReplicaID int32 Topics []*OffsetsTopic MaxNumOffsets int32 }
func (*OffsetsRequest) Decode ¶
func (r *OffsetsRequest) Decode(d PacketDecoder) error
func (*OffsetsRequest) Encode ¶
func (r *OffsetsRequest) Encode(e PacketEncoder) error
func (*OffsetsRequest) Key ¶
func (r *OffsetsRequest) Key() int16
func (*OffsetsRequest) Version ¶
func (r *OffsetsRequest) Version() int16
type OffsetsResponse ¶
type OffsetsResponse struct {
Responses []*OffsetResponse
}
func (*OffsetsResponse) Decode ¶
func (r *OffsetsResponse) Decode(d PacketDecoder) error
func (*OffsetsResponse) Encode ¶
func (r *OffsetsResponse) Encode(e PacketEncoder) error
type OffsetsTopic ¶
type OffsetsTopic struct { Topic string Partitions []*OffsetsPartition }
type PacketDecoder ¶
type PacketDecoder interface { Bool() (bool, error) Int8() (int8, error) Int16() (int16, error) Int32() (int32, error) Int64() (int64, error) ArrayLength() (int, error) Bytes() ([]byte, error) String() (string, error) Int32Array() ([]int32, error) Int64Array() ([]int64, error) StringArray() ([]string, error) Push(pd PushDecoder) error Pop() error // contains filtered or unexported methods }
type PacketEncoder ¶
type PacketEncoder interface { PutBool(in bool) PutInt8(in int8) PutInt16(in int16) PutInt32(in int32) PutInt64(in int64) PutArrayLength(in int) error PutRawBytes(in []byte) error PutBytes(in []byte) error PutString(in string) error PutStringArray(in []string) error PutInt32Array(in []int32) error PutInt64Array(in []int64) error Push(pe PushEncoder) Pop() }
type PartitionMetadata ¶
type PartitionResponse ¶
type PartitionState ¶
type ProduceRequest ¶
func (*ProduceRequest) Decode ¶
func (r *ProduceRequest) Decode(d PacketDecoder) (err error)
func (*ProduceRequest) Encode ¶
func (r *ProduceRequest) Encode(e PacketEncoder) (err error)
func (*ProduceRequest) Key ¶
func (r *ProduceRequest) Key() int16
func (*ProduceRequest) Version ¶
func (r *ProduceRequest) Version() int16
type ProduceResponse ¶
type ProduceResponse struct { Topic string PartitionResponses []*ProducePartitionResponse }
type ProduceResponses ¶
type ProduceResponses struct { Responses []*ProduceResponse ThrottleTimeMs int32 }
func (*ProduceResponses) Decode ¶
func (r *ProduceResponses) Decode(d PacketDecoder) error
func (*ProduceResponses) Encode ¶
func (r *ProduceResponses) Encode(e PacketEncoder) error
func (*ProduceResponses) Version ¶
func (r *ProduceResponses) Version() int16
type PushDecoder ¶
type PushEncoder ¶
type RequestHeader ¶
type RequestHeader struct { // Size of the request Size int32 // ID of the API (e.g. produce, fetch, metadata) APIKey int16 // Version of the API to use APIVersion int16 // User defined ID to correlate requests between server and client CorrelationID int32 // Size of the Client ID ClientID string }
func (*RequestHeader) Decode ¶
func (r *RequestHeader) Decode(d PacketDecoder) error
func (*RequestHeader) Encode ¶
func (r *RequestHeader) Encode(e PacketEncoder)
type Response ¶
type Response struct { Size int32 CorrelationID int32 Body ResponseBody }
func (*Response) Decode ¶
func (r *Response) Decode(pd PacketDecoder) (err error)
func (*Response) Encode ¶
func (r *Response) Encode(pe PacketEncoder) (err error)
type ResponseBody ¶
type SizeField ¶
type SizeField struct {
StartOffset int
}
func (*SizeField) ReserveSize ¶
func (*SizeField) SaveOffset ¶
type StopReplicaPartition ¶
type StopReplicaRequest ¶
type StopReplicaRequest struct { ControllerID int32 ControllerEpoch int32 DeletePartitions bool Partitions []*StopReplicaPartition }
func (*StopReplicaRequest) Decode ¶
func (r *StopReplicaRequest) Decode(d PacketDecoder) (err error)
func (*StopReplicaRequest) Encode ¶
func (r *StopReplicaRequest) Encode(e PacketEncoder) (err error)
func (*StopReplicaRequest) Key ¶
func (r *StopReplicaRequest) Key() int16
func (*StopReplicaRequest) Version ¶
func (r *StopReplicaRequest) Version() int16
type StopReplicaResponse ¶
type StopReplicaResponse struct { ErrorCode int16 Partitions []*StopReplicaResponsePartition }
func (*StopReplicaResponse) Decode ¶
func (r *StopReplicaResponse) Decode(d PacketDecoder) (err error)
func (*StopReplicaResponse) Encode ¶
func (r *StopReplicaResponse) Encode(e PacketEncoder) (err error)
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupID string GenerationID int32 MemberID string GroupAssignments map[string][]byte }
func (*SyncGroupRequest) Decode ¶
func (r *SyncGroupRequest) Decode(d PacketDecoder) (err error)
func (*SyncGroupRequest) Encode ¶
func (r *SyncGroupRequest) Encode(e PacketEncoder) error
func (*SyncGroupRequest) Key ¶
func (r *SyncGroupRequest) Key() int16
func (*SyncGroupRequest) Version ¶
func (r *SyncGroupRequest) Version() int16
type SyncGroupResponse ¶
func (*SyncGroupResponse) Decode ¶
func (r *SyncGroupResponse) Decode(d PacketDecoder) (err error)
func (*SyncGroupResponse) Encode ¶
func (r *SyncGroupResponse) Encode(e PacketEncoder) error
func (*SyncGroupResponse) Key ¶
func (r *SyncGroupResponse) Key() int16
func (*SyncGroupResponse) Version ¶
func (r *SyncGroupResponse) Version() int16
type TopicErrorCode ¶
type TopicMetadata ¶
type TopicMetadata struct { TopicErrorCode int16 Topic string PartitionMetadata []*PartitionMetadata }
Source Files ¶
- api_key.go
- api_versions_requests.go
- api_versions_response.go
- crc_field.go
- create_topic_requests.go
- create_topics_response.go
- decoder.go
- delete_topic_request.go
- delete_topic_response.go
- describe_groups_request.go
- describe_groups_response.go
- encoder.go
- error.go
- fetch_request.go
- fetch_response.go
- group_coordinator_request.go
- group_coordinator_response.go
- heartbeat_request.go
- heartbeat_response.go
- join_group_request.go
- join_group_response.go
- leader_and_isr_request.go
- leader_and_isr_response.go
- leave_group_request.go
- leave_group_response.go
- list_groups_request.go
- list_groups_response.go
- message.go
- message_set.go
- metadata_request.go
- metadata_response.go
- offsets_request.go
- offsets_response.go
- produce_request.go
- produce_response.go
- protocol.go
- request.go
- request_header.go
- response.go
- size_field.go
- stop_replica_request.go
- stop_replica_response.go
- sync_group_request.go
- sync_group_response.go
Click to show internal directories.
Click to hide internal directories.