Documentation
¶
Index ¶
- Constants
- func BytesLen(bytes []byte) int
- func CompactBytesLen(bytes []byte) int
- func CompactNullableStrLen(str *string) int
- func CompactStrLen(str string) int
- func StrLen(str string) int
- type ApiReq
- type ApiRespVersion
- type ApiResponse
- type BaseReq
- type BaseResp
- type BrokerMetadata
- type EnableMechanism
- type FetchPartitionReq
- type FetchPartitionResp
- type FetchReq
- type FetchResp
- type FetchTopicReq
- type FetchTopicResp
- type FindCoordinatorReq
- type FindCoordinatorResp
- type GroupAssignment
- type GroupProtocol
- type HeartBeatReq
- type HeartBeatResp
- type JoinGroupReq
- type JoinGroupResp
- type KafkaProtocolConfig
- type LeaveGroupMember
- type LeaveGroupReq
- type LeaveGroupResp
- type ListOffsetPartition
- type ListOffsetPartitionResp
- type ListOffsetReq
- type ListOffsetResp
- type ListOffsetTopic
- type ListOffsetTopicResp
- type Member
- type MetadataReq
- type MetadataResp
- type MetadataTopicReq
- type OffsetCommitPartitionReq
- type OffsetCommitPartitionResp
- type OffsetCommitReq
- type OffsetCommitResp
- type OffsetCommitTopicReq
- type OffsetCommitTopicResp
- type OffsetFetchPartitionReq
- type OffsetFetchPartitionResp
- type OffsetFetchReq
- type OffsetFetchResp
- type OffsetFetchTopicReq
- type OffsetFetchTopicResp
- type PartitionMetadata
- type Record
- type RecordBatch
- type Replica
- type SaslAuthenticateReq
- type SaslAuthenticateResp
- type SaslHandshakeReq
- type SaslHandshakeResp
- type SyncGroupReq
- type SyncGroupResp
- type TopicMetadata
Constants ¶
View Source
const ( LenCorrId = 4 LenErrorCode = 2 LenArray = 4 LenApiV0 = 6 LenApiV3 = 7 LenThrottleTime = 4 LenFetchSessionId = 4 LenPartitionId = 4 LenOffset = 8 LenLastStableOffset = 8 LenStartOffset = 8 LenAbortTransactions = 4 LenReplicaId = 4 LenMessageSize = 4 LenNodeId = 4 LenPort = 4 LenGenerationId = 4 LenTime = 8 LenLeaderId = 4 LenLeaderEpoch = 4 LenControllerId = 4 LenIsInternal = 4 LenTopicAuthOperation = 4 LenClusterAuthOperation = 4 LenRecordAttributes = 1 LenMagicByte = 1 LenCrc32 = 4 LenOffsetDelta = 4 LenProducerId = 8 LenProducerEpoch = 2 LenBaseSequence = 4 LenSessionTimeout = 8 )
View Source
const LenTaggedField = 1
Variables ¶
This section is empty.
Functions ¶
func CompactBytesLen ¶
func CompactNullableStrLen ¶
func CompactStrLen ¶
Types ¶
type ApiRespVersion ¶
type ApiResponse ¶
type ApiResponse struct { BaseResp ErrorCode int16 ApiRespVersions []*ApiRespVersion ThrottleTime int }
func NewApiVersionResp ¶
func NewApiVersionResp(corrId int) *ApiResponse
func (*ApiResponse) Bytes ¶
func (a *ApiResponse) Bytes(version int16) []byte
func (*ApiResponse) BytesLength ¶
func (a *ApiResponse) BytesLength(version int16) int
type EnableMechanism ¶
type EnableMechanism struct {
SaslMechanism string
}
type FetchPartitionReq ¶
type FetchPartitionResp ¶
type FetchReq ¶
type FetchResp ¶
type FetchResp struct { BaseResp ThrottleTime int ErrorCode int16 SessionId int TopicResponses []*FetchTopicResp }
func NewFetchResp ¶
func (*FetchResp) BytesLength ¶
type FetchTopicReq ¶
type FetchTopicReq struct { Topic string FetchPartitions []*FetchPartitionReq }
type FetchTopicResp ¶
type FetchTopicResp struct { Topic string PartitionDataList []*FetchPartitionResp }
type FindCoordinatorReq ¶
func DecodeFindCoordinatorReq ¶
func DecodeFindCoordinatorReq(bytes []byte, version int16) (findCoordinatorReq *FindCoordinatorReq, err error)
type FindCoordinatorResp ¶
type FindCoordinatorResp struct { BaseResp ErrorCode int16 ThrottleTime int ErrorMessage *string NodeId int Host string Port int }
func NewFindCoordinatorResp ¶
func NewFindCoordinatorResp(corrId int, config *KafkaProtocolConfig) *FindCoordinatorResp
func (*FindCoordinatorResp) Bytes ¶
func (f *FindCoordinatorResp) Bytes() []byte
func (*FindCoordinatorResp) BytesLength ¶
func (f *FindCoordinatorResp) BytesLength() int
type GroupAssignment ¶
type GroupProtocol ¶
type HeartBeatReq ¶
type HeartBeatReq struct { BaseReq GroupId string GenerationId int MemberId string GroupInstanceId *string }
func DecodeHeartbeatReq ¶
func DecodeHeartbeatReq(bytes []byte, version int16) (heartBeatReq *HeartBeatReq, err error)
type HeartBeatResp ¶
func NewHeartBeatResp ¶
func NewHeartBeatResp(corrId int) *HeartBeatResp
func (*HeartBeatResp) Bytes ¶
func (h *HeartBeatResp) Bytes() []byte
func (*HeartBeatResp) BytesLength ¶
func (h *HeartBeatResp) BytesLength() int
type JoinGroupReq ¶
type JoinGroupReq struct { BaseReq GroupId string SessionTimeout int RebalanceTimeout int MemberId string GroupInstanceId *string ProtocolType string GroupProtocols []*GroupProtocol }
func DecodeJoinGroupReq ¶
func DecodeJoinGroupReq(bytes []byte, version int16) (joinGroupReq *JoinGroupReq, err error)
type JoinGroupResp ¶
type JoinGroupResp struct { BaseResp ErrorCode int16 ThrottleTime int GenerationId int ProtocolType *string ProtocolName string LeaderId string MemberId string Members []*Member }
func ErrorJoinGroupResp ¶
func ErrorJoinGroupResp(corrId int, errorCode int16) *JoinGroupResp
func NewJoinGroupResp ¶
func NewJoinGroupResp(corrId int) *JoinGroupResp
func (*JoinGroupResp) Bytes ¶
func (j *JoinGroupResp) Bytes(version int16) []byte
func (*JoinGroupResp) BytesLength ¶
func (j *JoinGroupResp) BytesLength(version int16) int
type KafkaProtocolConfig ¶
type LeaveGroupMember ¶
type LeaveGroupReq ¶
type LeaveGroupReq struct { BaseReq GroupId string Members []*LeaveGroupMember }
func DecodeLeaveGroupReq ¶
func DecodeLeaveGroupReq(bytes []byte, version int16) (leaveGroupReq *LeaveGroupReq, err error)
type LeaveGroupResp ¶
type LeaveGroupResp struct { BaseResp ErrorCode int16 ThrottleTime int Members []*LeaveGroupMember MemberErrorCode int16 }
func NewLeaveGroupResp ¶
func NewLeaveGroupResp(corrId int) *LeaveGroupResp
func (*LeaveGroupResp) Bytes ¶
func (l *LeaveGroupResp) Bytes() []byte
func (*LeaveGroupResp) BytesLength ¶
func (l *LeaveGroupResp) BytesLength() int
type ListOffsetPartition ¶
type ListOffsetPartitionResp ¶
type ListOffsetReq ¶
type ListOffsetReq struct { BaseReq ReplicaId int32 IsolationLevel byte OffsetTopics []*ListOffsetTopic }
func DecodeListOffsetReq ¶
func DecodeListOffsetReq(bytes []byte, version int16) (offsetReq *ListOffsetReq, err error)
type ListOffsetResp ¶
type ListOffsetResp struct { BaseResp ErrorCode int16 ThrottleTime int OffsetTopics []*ListOffsetTopicResp }
func NewListOffsetResp ¶
func NewListOffsetResp(corrId int) *ListOffsetResp
func (*ListOffsetResp) Bytes ¶
func (o *ListOffsetResp) Bytes(version int16) []byte
func (*ListOffsetResp) BytesLength ¶
func (o *ListOffsetResp) BytesLength(version int16) int
type ListOffsetTopic ¶
type ListOffsetTopic struct { Topic string ListOffsetPartitions []*ListOffsetPartition }
type ListOffsetTopicResp ¶
type ListOffsetTopicResp struct { Topic string ListOffsetPartitions []*ListOffsetPartitionResp }
type MetadataReq ¶
type MetadataReq struct { BaseReq Topics []*MetadataTopicReq AllowAutoTopicCreation bool IncludeClusterAuthorizedOperations bool IncludeTopicAuthorizedOperations bool }
func DecodeMetadataTopicReq ¶
func DecodeMetadataTopicReq(bytes []byte, version int16) (metadataReq *MetadataReq, err error)
type MetadataResp ¶
type MetadataResp struct { BaseResp ThrottleTime int ErrorCode int16 BrokerMetadataList []*BrokerMetadata ClusterId string ControllerId int TopicMetadataList []*TopicMetadata ClusterAuthorizedOperation int }
func NewMetadataResp ¶
func NewMetadataResp(corrId int, config *KafkaProtocolConfig, topicName string, errorCode int16) *MetadataResp
func (*MetadataResp) Bytes ¶
func (m *MetadataResp) Bytes() []byte
func (*MetadataResp) BytesLength ¶
func (m *MetadataResp) BytesLength() int
type MetadataTopicReq ¶
type MetadataTopicReq struct {
Topic string
}
type OffsetCommitReq ¶
type OffsetCommitReq struct { BaseReq GroupId string GenerationId int MemberId string GroupInstanceId *string OffsetCommitTopicReqList []*OffsetCommitTopicReq }
func DecodeOffsetCommitReq ¶
func DecodeOffsetCommitReq(bytes []byte, version int16) (offsetReq *OffsetCommitReq, err error)
type OffsetCommitResp ¶
type OffsetCommitResp struct { BaseResp ThrottleTime int Topics []*OffsetCommitTopicResp }
func NewOffsetCommitResp ¶
func NewOffsetCommitResp(corrId int) *OffsetCommitResp
func (*OffsetCommitResp) Bytes ¶
func (o *OffsetCommitResp) Bytes(version int16) []byte
func (*OffsetCommitResp) BytesLength ¶
func (o *OffsetCommitResp) BytesLength(version int16) int
type OffsetCommitTopicReq ¶
type OffsetCommitTopicReq struct { Topic string OffsetPartitions []*OffsetCommitPartitionReq }
type OffsetCommitTopicResp ¶
type OffsetCommitTopicResp struct { Topic string Partitions []*OffsetCommitPartitionResp }
type OffsetFetchPartitionReq ¶
type OffsetFetchPartitionReq struct {
PartitionId int
}
type OffsetFetchReq ¶
type OffsetFetchReq struct { BaseReq GroupId string TopicReqList []*OffsetFetchTopicReq RequireStableOffset bool }
func DecodeOffsetFetchReq ¶
func DecodeOffsetFetchReq(bytes []byte, version int16) (fetchReq *OffsetFetchReq, err error)
type OffsetFetchResp ¶
type OffsetFetchResp struct { BaseResp ThrottleTime int ErrorCode int16 TopicRespList []*OffsetFetchTopicResp }
func NewOffsetFetchResp ¶
func NewOffsetFetchResp(corrId int) *OffsetFetchResp
func (*OffsetFetchResp) Bytes ¶
func (o *OffsetFetchResp) Bytes(version int16) []byte
func (*OffsetFetchResp) BytesLength ¶
func (o *OffsetFetchResp) BytesLength(version int16) int
type OffsetFetchTopicReq ¶
type OffsetFetchTopicReq struct { Topic string PartitionReqList []*OffsetFetchPartitionReq }
type OffsetFetchTopicResp ¶
type OffsetFetchTopicResp struct { Topic string PartitionRespList []*OffsetFetchPartitionResp }
type PartitionMetadata ¶
type Record ¶
type Record struct { RecordAttributes byte RelativeTimestamp int64 RelativeOffset int Key []byte Value string Headers []byte }
func (*Record) BytesLength ¶
type RecordBatch ¶
type RecordBatch struct { Offset int64 MessageSize int LeaderEpoch int MagicByte byte // 8位flag字节 Flags uint16 LastOffsetDelta int FirstTimestamp int64 LastTimestamp int64 ProducerId int64 ProducerEpoch int16 BaseSequence int Records []*Record }
func (*RecordBatch) Bytes ¶
func (r *RecordBatch) Bytes() []byte
func (*RecordBatch) BytesLength ¶
func (r *RecordBatch) BytesLength() int
type SaslAuthenticateReq ¶
func DecodeSaslHandshakeAuthReq ¶
func DecodeSaslHandshakeAuthReq(bytes []byte, version int16) (authReq *SaslAuthenticateReq, err error)
type SaslAuthenticateResp ¶
type SaslAuthenticateResp struct { BaseResp ErrorCode int16 ErrorMessage string AuthBytes []byte SessionLifetime int64 }
func NewSaslHandshakeAuthResp ¶
func NewSaslHandshakeAuthResp(corrId int) *SaslAuthenticateResp
func (*SaslAuthenticateResp) Bytes ¶
func (s *SaslAuthenticateResp) Bytes(version int16) []byte
Bytes 转化为字节数组 tagged field 暂不实现
func (*SaslAuthenticateResp) BytesLength ¶
func (s *SaslAuthenticateResp) BytesLength(version int16) int
type SaslHandshakeReq ¶
func DecodeSaslHandshakeReq ¶
func DecodeSaslHandshakeReq(bytes []byte, version int16) (saslHandshakeReq *SaslHandshakeReq, err error)
DecodeSaslHandshakeReq SaslHandshakeReq
type SaslHandshakeResp ¶
type SaslHandshakeResp struct { BaseResp ErrorCode int16 EnableMechanisms []*EnableMechanism }
func NewSaslHandshakeResp ¶
func NewSaslHandshakeResp(corrId int) *SaslHandshakeResp
func (*SaslHandshakeResp) BytesLength ¶
func (s *SaslHandshakeResp) BytesLength() int
type SyncGroupReq ¶
type SyncGroupReq struct { BaseReq GroupId string GenerationId int MemberId string GroupInstanceId *string ProtocolType string ProtocolName string GroupAssignments []*GroupAssignment }
func DecodeSyncGroupReq ¶
func DecodeSyncGroupReq(bytes []byte, version int16) (groupReq *SyncGroupReq, err error)
type SyncGroupResp ¶
type SyncGroupResp struct { BaseResp ThrottleTime int ErrorCode int16 ProtocolType string ProtocolName string MemberAssignment string }
func NewSyncGroupResp ¶
func NewSyncGroupResp(corrId int) *SyncGroupResp
func (*SyncGroupResp) Bytes ¶
func (s *SyncGroupResp) Bytes(version int16) []byte
func (*SyncGroupResp) BytesLength ¶
func (s *SyncGroupResp) BytesLength(version int16) int
type TopicMetadata ¶
type TopicMetadata struct { ErrorCode int16 Topic string IsInternal bool PartitionMetadataList []*PartitionMetadata TopicAuthorizedOperation int }
Source Files
¶
- api_resp_builder.go
- api_versions_req.go
- api_versions_resp.go
- base.go
- config.go
- const.go
- crc32.go
- fetch_req.go
- fetch_resp.go
- find_coordinator_req.go
- find_coordinator_resp.go
- heartbeat_req.go
- heartbeat_resp.go
- join_group_req.go
- join_group_resp.go
- leave_group_common.go
- leave_group_req.go
- leave_group_resp.go
- list_offsets_req.go
- list_offsets_resp.go
- metadata_req.go
- metadata_resp.go
- offset_commit_req.go
- offset_commit_resp.go
- offset_fetch_req.go
- offset_fetch_resp.go
- record.go
- record_batch.go
- sasl_authenticate_req.go
- sasl_authenticate_resp.go
- sasl_handshake_req.go
- sasl_handshake_resp.go
- sync_group_req.go
- sync_group_resp.go
- test_util.go
- util_general_bool.go
- util_general_bytes.go
- util_general_integer.go
- util_general_string.go
- util_general_varint.go
- util_kafka_array.go
- util_kafka_byte.go
- util_kafka_bytes.go
- util_kafka_field.go
- util_kafka_int.go
- util_kafka_int16.go
- util_kafka_int64.go
- util_kafka_sasl.go
- util_kafka_string.go
Click to show internal directories.
Click to hide internal directories.