Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertConfigResourceType(resourceType string) uint8
- func GetLogger() logr.Logger
- func SetLogger(l logr.Logger)
- type APIVersion
- type APIVersionsRequest
- type APIVersionsResponse
- type Acl
- type AclCreation
- type AclCreationResult
- type AclResource
- type AclsOperation
- type AclsPatternType
- type AclsPermissionType
- type AclsResourceType
- type AlterConfigsRequest
- type AlterConfigsRequestConfigEntry
- type AlterConfigsRequestResource
- type AlterConfigsResponse
- type AlterConfigsResponseResource
- type AlterPartitionReassignmentsPartition
- type AlterPartitionReassignmentsRequest
- type AlterPartitionReassignmentsResponse
- type AlterPartitionReassignmentsTopic
- type ApiKey
- type AssignmentStrategy
- type Broker
- func (broker *Broker) Close()
- func (broker *Broker) GetAddress() string
- func (broker *Broker) Request(r Request) (ReadParser, error)
- func (broker *Broker) RequestAndGet(r Request) (resp Response, err error)
- func (broker *Broker) RequestListGroups(clientID string) (r *ListGroupsResponse, err error)
- func (broker *Broker) String() string
- type BrokerConfig
- type BrokerInfo
- type Brokers
- func (brokers *Brokers) AlterPartitionReassignments(req *AlterPartitionReassignmentsRequest) (r *AlterPartitionReassignmentsResponse, err error)
- func (brokers *Brokers) BrokersInfo() map[int32]*BrokerInfo
- func (brokers *Brokers) Close()
- func (brokers *Brokers) Controller() int32
- func (brokers *Brokers) ElectLeaders(req *ElectLeadersRequest) (r *ElectLeadersResponse, err error)
- func (brokers *Brokers) FindCoordinator(clientID, groupID string) (r FindCoordinatorResponse, err error)
- func (brokers *Brokers) GetBroker(nodeID int32) (*Broker, error)
- func (brokers *Brokers) GetController() (*Broker, error)
- func (brokers *Brokers) ListPartitionReassignments(req ListPartitionReassignmentsRequest) (r *ListPartitionReassignmentsResponse, err error)
- func (brokers *Brokers) NewBroker(nodeID int32) (*Broker, error)
- func (brokers *Brokers) Request(req Request) (Response, error)
- func (brokers *Brokers) RequestMetaData(clientID string, topics []string) (r MetadataResponse, err error)
- func (brokers *Brokers) RequestOffsets(clientID, topic string, partitionID int32, timeValue int64, offsets uint32) ([]OffsetsResponse, error)
- type ByMemberID
- type ByPartitionID
- type Client
- func (c *Client) Close()
- func (c *Client) CreateAcls(creations []AclCreation) (*CreateAclsResponse, error)
- func (c *Client) DeleteAcls(filters []*DeleteAclsFilter) (*DeleteAclsResponse, error)
- func (c *Client) DeleteTopics(topics []string, timeoutMs int32) (r DeleteTopicsResponse, err error)
- func (c *Client) DescribeAcls(r DescribeAclsRequestBody) (DescribeAclsResponse, error)
- func (c *Client) DescribeConfigs(resourceType, resourceName string, keys []string) (r DescribeConfigsResponse, err error)
- func (c *Client) DescribeLogDirs(topics []string) (map[int32]DescribeLogDirsResponse, error)
- func (c *Client) ListGroups() (groups map[int32][]*Group, err error)
- func (c *Client) RefreshMetadata()
- func (client *Client) WithLogger(logger logr.Logger) *Client
- type Compressor
- type ConfigEntry
- type Consumer
- type ConsumerConfig
- type Coordinator
- type CreateAclsRequest
- type CreateAclsResponse
- type CreatePartitionsRequest
- type CreatePartitionsResponse
- type CreateTopicRequest
- type CreateTopicsRequest
- func (r *CreateTopicsRequest) AddReplicaAssignment(topic string, pid int32, replicas []int32) error
- func (r *CreateTopicsRequest) AddTopic(topic string, partitions int32, replicationFactor int16) error
- func (r *CreateTopicsRequest) Encode(version uint16) []byte
- func (r *CreateTopicsRequest) Length() int
- type CreateTopicsResponse
- type DeleteAclsFilter
- type DeleteAclsFilterResult
- type DeleteAclsMatchingAcl
- type DeleteAclsRequest
- type DeleteAclsResponse
- type DeleteGroupsRequest
- type DeleteGroupsResponse
- type DeleteTopicsRequest
- type DeleteTopicsResponse
- type DescribeAclsRequest
- type DescribeAclsRequestBody
- type DescribeAclsResponse
- type DescribeConfigsRequest
- type DescribeConfigsRequestResource
- type DescribeConfigsResponse
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type DescribeLogDirsRequest
- type DescribeLogDirsRequestTopic
- type DescribeLogDirsResponse
- type DescribeLogDirsResponsePartition
- type DescribeLogDirsResponseResult
- type DescribeLogDirsResponseTopic
- type ElectLeadersRequest
- type ElectLeadersResponse
- type Error
- type FetchRequest
- type FetchResponse
- type FindCoordinatorRequest
- type FindCoordinatorResponse
- type FullMessage
- type Group
- type GroupAssignment
- type GroupConsumer
- type GroupDetail
- type GroupProtocol
- type GzipCompressor
- type HealerError
- type HeartbeatRequest
- type HeartbeatResponse
- type Helper
- type IncrementalAlterConfigsRequest
- func (r *IncrementalAlterConfigsRequest) AddConfig(resourceType uint8, resourceName, configName, configValue string) error
- func (r IncrementalAlterConfigsRequest) Encode(version uint16) []byte
- func (r *IncrementalAlterConfigsRequest) SetValidateOnly(validateOnly bool) *IncrementalAlterConfigsRequest
- type IncrementalAlterConfigsRequestConfigEntry
- type IncrementalAlterConfigsRequestResource
- type IncrementalAlterConfigsResponse
- type IncrementalAlterConfigsResponseResource
- type JoinGroupRequest
- type JoinGroupResponse
- type KafkaError
- type LZ4Compressor
- type LeaveGroupRequest
- type LeaveGroupResponse
- type ListGroupsRequest
- type ListGroupsResponse
- type ListPartitionReassignmentsRequest
- type ListPartitionReassignmentsResponse
- type Member
- type MemberAssignment
- type MemberDetail
- type Message
- type MessageSet
- type MetaInfo
- type MetadataRequest
- type MetadataResponse
- type MockConn
- func (_m *MockConn) Close() error
- func (_m *MockConn) LocalAddr() net.Addr
- func (_m *MockConn) Read(p []byte) (n int, err error)
- func (_m *MockConn) RemoteAddr() net.Addr
- func (_m *MockConn) SetDeadline(t time.Time) error
- func (_m *MockConn) SetReadDeadline(t time.Time) error
- func (_m *MockConn) SetWriteDeadline(t time.Time) error
- func (_m *MockConn) Write(p []byte) (n int, err error)
- type NetConfig
- type NoneCompressor
- type OffsetCommitRequest
- func (r *OffsetCommitRequest) AddPartiton(topic string, partitionID int32, offset int64, metadata string)
- func (r *OffsetCommitRequest) Encode(version uint16) []byte
- func (r *OffsetCommitRequest) Length() int
- func (r *OffsetCommitRequest) SetGenerationID(generationID int32)
- func (r *OffsetCommitRequest) SetMemberID(memberID string)
- func (r *OffsetCommitRequest) SetRetentionTime(retentionTime int64)
- type OffsetCommitRequestPartition
- type OffsetCommitRequestTopic
- type OffsetCommitResponse
- type OffsetCommitResponsePartition
- type OffsetCommitResponseTopic
- type OffsetFetchRequest
- type OffsetFetchRequestTopic
- type OffsetFetchResponse
- type OffsetFetchResponsePartition
- type OffsetFetchResponseTopic
- type OffsetsRequest
- type OffsetsResponse
- type PartitionAssignment
- type PartitionBlock
- type PartitionMetadataInfo
- type PartitionOffset
- type PartitionOffsetRequestInfo
- type PartitionResponse
- type PartitionResult
- type PlainSasl
- type ProduceRequest
- type ProduceResponse
- type ProduceResponsePiece
- type ProduceResponse_PartitionResponse
- type Producer
- type ProducerConfig
- type ProtocolMetadata
- type ReadParser
- type Record
- type RecordBatch
- type RecordHeader
- type ReplicaAssignment
- type ReplicaElectionResult
- type Request
- type RequestHeader
- func (requestHeader *RequestHeader) API() uint16
- func (h *RequestHeader) EncodeTo(payload []byte) int
- func (h *RequestHeader) IsFlexible() bool
- func (requestHeader *RequestHeader) SetCorrelationID(c uint32)
- func (requestHeader *RequestHeader) SetVersion(version uint16)
- func (requestHeader *RequestHeader) Version() uint16
- type Response
- type ResponseHeader
- type SaslAuth
- type SaslAuthenticateRequest
- type SaslAuthenticateResponse
- type SaslConfig
- type SaslHandShakeRequest
- type SaslHandshakeResponse
- type SimpleConsumer
- type SimpleProducer
- type SnappyCompressor
- type SyncGroupRequest
- type SyncGroupResponse
- type TLSConfig
- type TaggedField
- type TaggedFields
- type TopicError
- type TopicMetadata
- type TopicPartition
Constants ¶
const ( AclsResourceTypeUnknown = 0 AclsResourceTypeAny = 1 AclsResourceTypeTopic = 2 AclsResourceTypeGroup = 3 AclsResourceTypeBroker = 4 AclsResourceTypeCluster = 4 AclsResourceTypeTransactionalID = 5 AclsResourceTypeDelegationToken = 6 AclsResourceTypeUser = 7 )
const ( AclsPatternTypeUnknown = 0 AclsPatternTypeAny = 1 AclsPatternTypeMatch = 2 AclsPatternTypeLiteral = 3 AclsPatternTypePrefixed = 4 )
const ( COMPRESSION_NONE int8 = 0 COMPRESSION_GZIP int8 = 1 COMPRESSION_SNAPPY int8 = 2 COMPRESSION_LZ4 int8 = 3 )
const ( API_ProduceRequest uint16 = 0 API_FetchRequest uint16 = 1 API_OffsetRequest uint16 = 2 API_MetadataRequest uint16 = 3 API_OffsetCommitRequest uint16 = 8 API_OffsetFetchRequest uint16 = 9 API_FindCoordinator uint16 = 10 API_JoinGroup uint16 = 11 API_Heartbeat uint16 = 12 API_LeaveGroup uint16 = 13 API_SyncGroup uint16 = 14 API_DescribeGroups uint16 = 15 API_ListGroups uint16 = 16 API_SaslHandshake uint16 = 17 API_ApiVersions uint16 = 18 API_CreateTopics uint16 = 19 API_DeleteTopics uint16 = 20 API_DescribeAcls uint16 = 29 API_CreateAcls uint16 = 30 API_DeleteAcls uint16 = 31 API_DescribeConfigs uint16 = 32 API_AlterConfigs uint16 = 33 API_DescribeLogDirs uint16 = 35 API_SaslAuthenticate uint16 = 36 API_CreatePartitions uint16 = 37 API_Delete_Groups uint16 = 42 API_ElectLeaders uint16 = 43 API_IncrementalAlterConfigs uint16 = 44 API_AlterPartitionReassignments uint16 = 45 API_ListPartitionReassignments uint16 = 46 )
const ( RESOURCETYPE_UNKNOWN uint8 = iota RESOURCETYPE_ANY RESOURCETYPE_TOPIC RESOURCETYPE_GROUP RESOURCETYPE_CLUSTER RESOURCETYPE_TRANSACTIONAL_ID RESOURCETYPE_DELEGATION_TOKEN )
const AclsOperationAll = 2
const AclsOperationAlter = 7
const AclsOperationAlterConfigs = 11
const AclsOperationAny = 1
const AclsOperationClusterAction = 9
const AclsOperationCreate = 5
const AclsOperationDelete = 6
const AclsOperationDescribe = 8
const AclsOperationDescribeConfigs = 10
const AclsOperationIdempotentWrite = 12
const AclsOperationRead = 3
const AclsOperationUnknown = 0
const AclsOperationWrite = 4
Variables ¶
var ErrProducerClosed = fmt.Errorf("producer closed")
ErrProducerClosed is returned when adding message while producer is closed
Functions ¶
func ConvertConfigResourceType ¶
ConvertConfigResourceType convert string to uint8 that's used in DescribeConfigsRequest
Types ¶
type APIVersion ¶
type APIVersion struct {
// contains filtered or unexported fields
}
APIVersion holds the parameters for a API version struct in the APIVersions response
type APIVersionsRequest ¶
type APIVersionsRequest struct {
*RequestHeader
}
func (*APIVersionsRequest) Encode ¶
func (req *APIVersionsRequest) Encode(version uint16) []byte
Encode encodes ApiVersionsRequest to []byte
type APIVersionsResponse ¶
type APIVersionsResponse struct { CorrelationID uint32 ErrorCode int16 APIVersions []APIVersion }
APIVersionsResponse holds the parameters for a APIVersions response
func (APIVersionsResponse) Error ¶
func (r APIVersionsResponse) Error() error
type Acl ¶
type Acl struct { Principal string Host string Operation AclsOperation PermissionType AclsPermissionType TaggedFields TaggedFields }
type AclCreation ¶
type AclCreation struct { ResourceType AclsResourceType ResourceName string PatternType AclsPatternType Principal string Host string Operation AclsOperation PermissionType AclsPermissionType TaggedFields TaggedFields }
type AclCreationResult ¶
type AclCreationResult struct { ErrorCode uint16 ErrorMessage *string TaggedFields TaggedFields }
type AclResource ¶
type AclResource struct { ResourceType AclsResourceType ResourceName string PatternType AclsPatternType Acls []Acl TaggedFields TaggedFields }
type AclsOperation ¶
type AclsOperation int8
func (AclsOperation) MarshalText ¶
func (o AclsOperation) MarshalText() ([]byte, error)
func (AclsOperation) String ¶
func (o AclsOperation) String() string
func (*AclsOperation) UnmarshalText ¶
func (o *AclsOperation) UnmarshalText(text []byte) error
type AclsPatternType ¶
type AclsPatternType int8
func (AclsPatternType) MarshalText ¶
func (t AclsPatternType) MarshalText() ([]byte, error)
func (AclsPatternType) String ¶
func (t AclsPatternType) String() string
func (*AclsPatternType) UnmarshalText ¶
func (t *AclsPatternType) UnmarshalText(text []byte) error
type AclsPermissionType ¶
type AclsPermissionType int8
const ( AclsPermissionTypeUnkown AclsPermissionType = 0 AclsPermissionTypeAny AclsPermissionType = 1 AclsPermissionTypeDeny AclsPermissionType = 2 AclsPermissionTypeAllow AclsPermissionType = 3 )
func (AclsPermissionType) MarshalText ¶
func (p AclsPermissionType) MarshalText() ([]byte, error)
func (AclsPermissionType) String ¶
func (p AclsPermissionType) String() string
func (*AclsPermissionType) UnmarshalText ¶
func (p *AclsPermissionType) UnmarshalText(text []byte) error
type AclsResourceType ¶
type AclsResourceType int8
func (AclsResourceType) MarshalText ¶
func (t AclsResourceType) MarshalText() ([]byte, error)
func (AclsResourceType) String ¶
func (t AclsResourceType) String() string
func (*AclsResourceType) UnmarshalText ¶
func (t *AclsResourceType) UnmarshalText(text []byte) error
type AlterConfigsRequest ¶
type AlterConfigsRequest struct { *RequestHeader Resources []AlterConfigsRequestResource // contains filtered or unexported fields }
AlterConfigsRequest struct holds params in AlterConfigsRequest
func NewAlterConfigsRequest ¶
func NewAlterConfigsRequest(clientID string) AlterConfigsRequest
NewAlterConfigsRequest create a new AlterConfigsRequest
func (*AlterConfigsRequest) AddConfig ¶
func (r *AlterConfigsRequest) AddConfig(resourceType uint8, resourceName, configName, configValue string) error
AddConfig add new config entry to request
func (AlterConfigsRequest) Encode ¶
func (r AlterConfigsRequest) Encode(version uint16) []byte
Encode encodes AlterConfigsRequest object to []byte. it implement Request Interface
func (*AlterConfigsRequest) SetValidateOnly ¶
func (r *AlterConfigsRequest) SetValidateOnly(validateOnly bool) *AlterConfigsRequest
SetValidateOnly set validateOnly in request
type AlterConfigsRequestConfigEntry ¶
AlterConfigsRequestConfigEntry is sub struct in AlterConfigsRequestResource
type AlterConfigsRequestResource ¶
type AlterConfigsRequestResource struct { ResourceType uint8 ResourceName string ConfigEntries []AlterConfigsRequestConfigEntry }
AlterConfigsRequestResource is sub struct in AlterConfigsRequest
type AlterConfigsResponse ¶
type AlterConfigsResponse struct { CorrelationID uint32 ThrottleTimeMS uint32 Resources []AlterConfigsResponseResource }
func NewAlterConfigsResponse ¶
func NewAlterConfigsResponse(payload []byte) (r AlterConfigsResponse, err error)
func (AlterConfigsResponse) Error ¶
func (r AlterConfigsResponse) Error() error
type AlterPartitionReassignmentsPartition ¶
type AlterPartitionReassignmentsPartition struct { PartitionID int32 `json:"partition_id"` Replicas []int32 `json:"replicas"` TaggedFields TaggedFields `json:"tagged_fields"` }
AlterPartitionReassignmentsPartition is the partition of AlterPartitionReassignmentsTopic
type AlterPartitionReassignmentsRequest ¶
type AlterPartitionReassignmentsRequest struct { *RequestHeader TimeoutMs int32 `json:"timeout_ms"` Topics []*AlterPartitionReassignmentsTopic `json:"topics"` TaggedFields TaggedFields `json:"tagged_fields"` }
AlterPartitionReassignmentsRequest is the request of AlterPartitionReassignmentsRequest
func DecodeAlterPartitionReassignmentsRequest ¶
func DecodeAlterPartitionReassignmentsRequest(payload []byte, version uint16) (r AlterPartitionReassignmentsRequest, err error)
just for test
func NewAlterPartitionReassignmentsRequest ¶
func NewAlterPartitionReassignmentsRequest(timeoutMs int32) (r AlterPartitionReassignmentsRequest)
NewAlterPartitionReassignmentsRequest is used to create a new AlterPartitionReassignmentsRequest
func (*AlterPartitionReassignmentsRequest) AddAssignment ¶
func (r *AlterPartitionReassignmentsRequest) AddAssignment(topic string, partitionID int32, replicas []int32)
AddAssignment is used to add a new assignment to AlterPartitionReassignmentsRequest It do not verify the assignment already exists or not
func (*AlterPartitionReassignmentsRequest) Encode ¶
func (r *AlterPartitionReassignmentsRequest) Encode(version uint16) (payload []byte)
Encode encodes AlterPartitionReassignmentsRequest to []byte
type AlterPartitionReassignmentsResponse ¶
type AlterPartitionReassignmentsResponse struct { ResponseHeader ThrottleTimeMs int32 `json:"throttle_time_ms"` ErrorCode int16 `json:"error_code"` ErrorMsg *string `json:"error_msg"` Responses []*alterPartitionReassignmentsResponseTopic `json:"responses"` TaggedFields TaggedFields `json:"tagged_fields"` }
AlterPartitionReassignmentsResponse is the response of AlterPartitionReassignmentsRequest
func NewAlterPartitionReassignmentsResponse ¶
func NewAlterPartitionReassignmentsResponse(payload []byte, version uint16) (*AlterPartitionReassignmentsResponse, error)
NewAlterPartitionReassignmentsResponse create a new AlterPartitionReassignmentsResponse
func (*AlterPartitionReassignmentsResponse) Encode ¶
func (r *AlterPartitionReassignmentsResponse) Encode(version uint16) (payload []byte)
just for test
func (*AlterPartitionReassignmentsResponse) Error ¶
func (r *AlterPartitionReassignmentsResponse) Error() error
type AlterPartitionReassignmentsTopic ¶
type AlterPartitionReassignmentsTopic struct { TopicName string `json:"topic_name"` Partitions []*AlterPartitionReassignmentsPartition `json:"partitions"` TaggedFields TaggedFields `json:"tagged_fields"` }
AlterPartitionReassignmentsTopic is the topic of AlterPartitionReassignmentsRequest
type AssignmentStrategy ¶
type AssignmentStrategy interface { // generally topicMetadatas is returned by metaDataRequest sent by GroupConsumer Assign([]Member, []TopicMetadata) GroupAssignment }
AssignmentStrategy is the interface for different assignment strategies, it returns GroupAssignment
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func NewBroker ¶
func NewBroker(address string, nodeID int32, config *BrokerConfig) (*Broker, error)
NewBroker is only called in NewBrokers, user must always init a Brokers instance by NewBrokers
func (*Broker) GetAddress ¶
GetAddress returns the broker address
func (*Broker) Request ¶
func (broker *Broker) Request(r Request) (ReadParser, error)
Request sends a request to the broker and returns a readParser user should call RequestAndGet() to get the response
func (*Broker) RequestAndGet ¶
RequestAndGet sends a request to the broker and returns the response
func (*Broker) RequestListGroups ¶
func (broker *Broker) RequestListGroups(clientID string) (r *ListGroupsResponse, err error)
type BrokerConfig ¶
type BrokerConfig struct { Net NetConfig `json:"net" mapstructure:"net"` Sasl SaslConfig `json:"sasl" mapstructure:"sasl"` MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"` TLSEnabled bool `json:"tls.enabled,string" mapstructure:"tls.enabled"` TLS *TLSConfig `json:"tls" mapstructure:"tls"` }
func DefaultBrokerConfig ¶
func DefaultBrokerConfig() *BrokerConfig
type BrokerInfo ¶
BrokerInfo holds all the fields of broker info, which is used in metadata response
func (*BrokerInfo) NetAddress ¶
func (b *BrokerInfo) NetAddress() string
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
func NewBrokers ¶
NewBrokers create a new broker with default config
func NewBrokersWithConfig ¶
func NewBrokersWithConfig(bootstrapServers string, config *BrokerConfig) (*Brokers, error)
NewBrokersWithConfig create a new broker with config
func (*Brokers) AlterPartitionReassignments ¶
func (brokers *Brokers) AlterPartitionReassignments(req *AlterPartitionReassignmentsRequest) (r *AlterPartitionReassignmentsResponse, err error)
AlterPartitionReassignments requests AlterPartitionReassignments from controller and returns response
func (*Brokers) BrokersInfo ¶
func (brokers *Brokers) BrokersInfo() map[int32]*BrokerInfo
BrokersInfo returns brokers info, it is a private member and should not be changed from outside
func (*Brokers) Controller ¶
Controller return controller broker id
func (*Brokers) ElectLeaders ¶
func (brokers *Brokers) ElectLeaders(req *ElectLeadersRequest) (r *ElectLeadersResponse, err error)
ElectLeaders requests ElectLeaders from controller and returns response
func (*Brokers) FindCoordinator ¶
func (brokers *Brokers) FindCoordinator(clientID, groupID string) (r FindCoordinatorResponse, err error)
FindCoordinator try to requests FindCoordinator from all brokers and returns response
func (*Brokers) GetController ¶
func (*Brokers) ListPartitionReassignments ¶
func (brokers *Brokers) ListPartitionReassignments(req ListPartitionReassignmentsRequest) (r *ListPartitionReassignmentsResponse, err error)
ListPartitionReassignments requests ListPartitionReassignments from controller and returns response
func (*Brokers) RequestMetaData ¶
func (brokers *Brokers) RequestMetaData(clientID string, topics []string) (r MetadataResponse, err error)
func (*Brokers) RequestOffsets ¶
func (brokers *Brokers) RequestOffsets(clientID, topic string, partitionID int32, timeValue int64, offsets uint32) ([]OffsetsResponse, error)
RequestOffsets return the offset values array. return all partitions if partitionID < 0
type ByMemberID ¶
type ByMemberID []string
func (ByMemberID) Len ¶
func (a ByMemberID) Len() int
func (ByMemberID) Less ¶
func (a ByMemberID) Less(i, j int) bool
func (ByMemberID) Swap ¶
func (a ByMemberID) Swap(i, j int)
type ByPartitionID ¶
type ByPartitionID []int32
func (ByPartitionID) Len ¶
func (a ByPartitionID) Len() int
func (ByPartitionID) Less ¶
func (a ByPartitionID) Less(i, j int) bool
func (ByPartitionID) Swap ¶
func (a ByPartitionID) Swap(i, j int)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CreateAcls ¶
func (c *Client) CreateAcls(creations []AclCreation) (*CreateAclsResponse, error)
func (*Client) DeleteAcls ¶
func (c *Client) DeleteAcls(filters []*DeleteAclsFilter) (*DeleteAclsResponse, error)
func (*Client) DeleteTopics ¶
func (c *Client) DeleteTopics(topics []string, timeoutMs int32) (r DeleteTopicsResponse, err error)
func (*Client) DescribeAcls ¶
func (c *Client) DescribeAcls(r DescribeAclsRequestBody) (DescribeAclsResponse, error)
func (*Client) DescribeConfigs ¶
func (c *Client) DescribeConfigs(resourceType, resourceName string, keys []string) (r DescribeConfigsResponse, err error)
func (*Client) DescribeLogDirs ¶
func (c *Client) DescribeLogDirs(topics []string) (map[int32]DescribeLogDirsResponse, error)
func (*Client) ListGroups ¶
ListGroups lists all consumer groups from all brokers
func (*Client) RefreshMetadata ¶
func (c *Client) RefreshMetadata()
RefreshMetadata refreshes metadata for c.brokers
type Compressor ¶
func NewCompressor ¶
func NewCompressor(cType string) Compressor
type ConfigEntry ¶
ConfigEntry is sub struct in CreateTopicRequest
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer instance is built to consume messages from kafka broker
func NewConsumer ¶
NewConsumer creates a new consumer instance
func (*Consumer) Assign ¶
Assign assigns the given partitions to the consumer, the consumer will only consume the given partitions Do not call this after calling Consume
func (*Consumer) AwaitClose ¶
func (*Consumer) Consume ¶
func (c *Consumer) Consume(messageChan chan *FullMessage) (<-chan *FullMessage, error)
Consume consumes messages from kafka broker, returns a channel of messages
type ConsumerConfig ¶
type ConsumerConfig struct { Net NetConfig `json:"net" mapstructure:"net"` Sasl SaslConfig `json:"sasl" mapstructure:"sasl"` BootstrapServers string `json:"bootstrap.servers" mapstructure:"bootstrap.servers"` ClientID string `json:"client.id" mapstructure:"client.id"` GroupID string `json:"group.id" mapstructure:"group.id"` RetryBackOffMS int `json:"retry.backoff.ms,string" mapstructure:"retry.backoff.ms"` MetadataMaxAgeMS int `json:"metadata.max.age.ms,string" mapstructure:"metadata.max.age.ms"` SessionTimeoutMS int32 `json:"session.timeout.ms,string" mapstructure:"session.timeout.ms"` FetchMaxWaitMS int32 `json:"fetch.max.wait.ms,string" mapstructure:"fetch.max.wait.ms"` FetchMaxBytes int32 `json:"fetch.max.bytes,string" mapstructure:"fetch.max.bytes"` // if this is too small, healer will double it automatically FetchMinBytes int32 `json:"fetch.min.bytes,string" mapstructure:"fetch.min.bytes"` FromBeginning bool `json:"from.beginning,string" mapstructure:"from.beginning"` AutoCommit bool `json:"auto.commit,string" mapstructure:"auto.commit"` AutoCommitIntervalMS int `json:"auto.commit.interval.ms,string" mapstructure:"auto.commit.interval.ms"` OffsetsStorage int `json:"offsets.storage,string" mapstructure:"offsets.storage"` MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"` TLSEnabled bool `json:"tls.enabled,string" mapstructure:"tls.enabled"` TLS *TLSConfig `json:"tls" mapstructure:"tls"` }
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() ConsumerConfig
type Coordinator ¶
Coordinator is the struct of coordinator, including nodeID, host and port
type CreateAclsRequest ¶
type CreateAclsRequest struct { RequestHeader Creations []AclCreation TaggedFields TaggedFields }
func DecodeCreateAclsRequest ¶
func DecodeCreateAclsRequest(payload []byte) (r CreateAclsRequest, err error)
just for test
func (*CreateAclsRequest) Encode ¶
func (r *CreateAclsRequest) Encode(version uint16) (payload []byte)
type CreateAclsResponse ¶
type CreateAclsResponse struct { ResponseHeader Results []AclCreationResult TaggedFields TaggedFields }
func DecodeCreateAclsResponse ¶
func DecodeCreateAclsResponse(payload []byte, version uint16) (*CreateAclsResponse, error)
func (*CreateAclsResponse) Encode ¶
func (r *CreateAclsResponse) Encode() (payload []byte)
just for test
func (*CreateAclsResponse) Error ¶
func (r *CreateAclsResponse) Error() error
type CreatePartitionsRequest ¶
type CreatePartitionsRequest struct { *RequestHeader Topics []createPartitionsRequestTopicBlock `json:"topics"` TimeoutMS int32 `json:"timeout_ms"` ValidateOnly bool `json:"validate_only"` TaggedFields TaggedFields `json:"tagged_fields"` }
CreatePartitionsRequest holds the parameters of a create-partitions request.
func NewCreatePartitionsRequest ¶
func NewCreatePartitionsRequest(clientID string, timeout uint32, validateOnly bool) CreatePartitionsRequest
NewCreatePartitionsRequest creates a new CreatePartitionsRequest.
func (*CreatePartitionsRequest) AddTopic ¶
func (r *CreatePartitionsRequest) AddTopic(topic string, count int32, assignments [][]int32)
AddTopic adds a topic to the request.
func (CreatePartitionsRequest) Encode ¶
func (r CreatePartitionsRequest) Encode(version uint16) []byte
Encode encodes CreatePartitionsRequest to []byte
type CreatePartitionsResponse ¶
type CreatePartitionsResponse struct { CorrelationID uint32 `json:"correlation_id"` ThrottleTimeMS int32 `json:"throttle_time_ms"` Results []createPartitionsResponseResultBlock `json:"results"` }
CreatePartitionsResponse holds the parameters of a create-partitions response
func NewCreatePartitionsResponse ¶
func NewCreatePartitionsResponse(payload []byte, version uint16) (r CreatePartitionsResponse, err error)
NewCreatePartitionsResponse creates a new CreatePartitionsResponse from []byte
func (CreatePartitionsResponse) Error ¶
func (r CreatePartitionsResponse) Error() error
Error implements the error interface, it returns error from error code in the response
type CreateTopicRequest ¶
type CreateTopicRequest struct { Topic string NumPartitions int32 ReplicationFactor int16 ReplicaAssignments []*ReplicaAssignment ConfigEntries []*ConfigEntry }
CreateTopicRequest is sub struct in CreateTopicsRequest
type CreateTopicsRequest ¶
type CreateTopicsRequest struct { *RequestHeader CreateTopicRequests []*CreateTopicRequest Timeout uint32 }
func NewCreateTopicsRequest ¶
func NewCreateTopicsRequest(clientID string, timeout uint32) *CreateTopicsRequest
func (*CreateTopicsRequest) AddReplicaAssignment ¶
func (r *CreateTopicsRequest) AddReplicaAssignment(topic string, pid int32, replicas []int32) error
AddReplicaAssignment add replicas of certain topic & pid to CreateTopicRequests. It returns errorTopicNotFound if topic has not beed added to the request; it overwrite replicas if pid exists
func (*CreateTopicsRequest) AddTopic ¶
func (r *CreateTopicsRequest) AddTopic(topic string, partitions int32, replicationFactor int16) error
AddTopic gives user easy way to fill CreateTopicsRequest. it set ReplicaAssignment and ConfigEntries as nil, user can set them by AddReplicaAssignment and ADDConfigEntry
func (*CreateTopicsRequest) Encode ¶
func (r *CreateTopicsRequest) Encode(version uint16) []byte
Encode encodes CreateTopicsRequest to binary bytes
func (*CreateTopicsRequest) Length ¶
func (r *CreateTopicsRequest) Length() int
Length returns the length of bytes returned by Encode func
type CreateTopicsResponse ¶
type CreateTopicsResponse struct { CorrelationID uint32 TopicErrors []TopicError }
CreateTopicsResponse is response of create_topics request
func NewCreateTopicsResponse ¶
func NewCreateTopicsResponse(payload []byte) (r CreateTopicsResponse, err error)
NewCreateTopicsResponse decode binary bytes to CreateTopicsResponse struct
func (CreateTopicsResponse) Error ¶
func (r CreateTopicsResponse) Error() error
type DeleteAclsFilter ¶
type DeleteAclsFilter struct { ResourceType AclsResourceType ResourceName *string PatternType AclsPatternType Principal *string Host *string Operation AclsOperation PermissionType AclsPermissionType TaggedFields TaggedFields }
type DeleteAclsFilterResult ¶
type DeleteAclsFilterResult struct { ErrorCode int16 ErrorMessage *string MatchingAcls []DeleteAclsMatchingAcl TaggedFields TaggedFields }
type DeleteAclsMatchingAcl ¶
type DeleteAclsMatchingAcl struct { ErrorCode int16 ErrorMessage *string ResourceType AclsResourceType ResourceName string PatternType AclsPatternType Principal string Host string Operation AclsOperation PermissionType AclsPermissionType TaggedFields TaggedFields }
func DecodeDeleteAclsMatchingAcl ¶
func DecodeDeleteAclsMatchingAcl(payload []byte, version uint16, isFlexible bool) (m DeleteAclsMatchingAcl, offset int)
type DeleteAclsRequest ¶
type DeleteAclsRequest struct { RequestHeader Filters []*DeleteAclsFilter TaggedFields TaggedFields }
func DecodeDeleteAclsRequest ¶
func DecodeDeleteAclsRequest(payload []byte) (*DeleteAclsRequest, error)
just for test
func NewDeleteAclsRequest ¶
func NewDeleteAclsRequest(clientID string, filters []*DeleteAclsFilter) *DeleteAclsRequest
func (*DeleteAclsRequest) Encode ¶
func (r *DeleteAclsRequest) Encode(version uint16) (payload []byte)
type DeleteAclsResponse ¶
type DeleteAclsResponse struct { ResponseHeader ThrottleTimeMs int32 FilterResults []DeleteAclsFilterResult TaggedFields TaggedFields }
func DecodeDeleteAclsResponse ¶
func DecodeDeleteAclsResponse(payload []byte, version uint16) (*DeleteAclsResponse, error)
func (*DeleteAclsResponse) Encode ¶
func (r *DeleteAclsResponse) Encode() (payload []byte)
just for test
func (*DeleteAclsResponse) Error ¶
func (r *DeleteAclsResponse) Error() (err error)
type DeleteGroupsRequest ¶
type DeleteGroupsRequest struct { *RequestHeader GroupsNames []string `json:"groups_names"` }
DeleteGroupsRequest Request holds the argument of DeleteGroupsRequest
func NewDeleteGroupsRequest ¶
func NewDeleteGroupsRequest(clientID string, groupsNames []string) DeleteGroupsRequest
NewDeleteGroupsRequest creates a new DeleteGroupsRequest
func (DeleteGroupsRequest) Encode ¶
func (r DeleteGroupsRequest) Encode(version uint16) []byte
Encode encodes DeleteGroupsRequest to []byte
type DeleteGroupsResponse ¶
type DeleteGroupsResponse struct { CorrelationID uint32 `json:"correlation_id"` ThrottleTimeMs int32 `json:"throttle_time_ms"` Results []struct { GroupID string `json:"group_id"` ErrorCode int16 `json:"error_code"` } `json:"results"` }
DeleteGroupsResponse Request holds the argument of DeleteGroupsResponse
func NewDeleteGroupsResponse ¶
func NewDeleteGroupsResponse(payload []byte) (r DeleteGroupsResponse, err error)
NewDeleteGroupsResponse creates a new DeleteGroupsResponse from []byte
func (DeleteGroupsResponse) Error ¶
func (r DeleteGroupsResponse) Error() error
FIXME: multiple error code
type DeleteTopicsRequest ¶
type DeleteTopicsRequest struct { *RequestHeader TopicsNames []string `json:"topics_names"` TimeoutMS int32 `json:"timeout_ms"` }
DeleteTopicsRequest Request holds the argument of DeleteTopicsRequest
func NewDeleteTopicsRequest ¶
func NewDeleteTopicsRequest(clientID string, topicsNames []string, timeoutMS int32) DeleteTopicsRequest
NewDeleteTopicsRequest creates a new DeleteTopicsRequest
func (DeleteTopicsRequest) Encode ¶
func (r DeleteTopicsRequest) Encode(version uint16) []byte
Encode encodes DeleteTopicsRequest to []byte
type DeleteTopicsResponse ¶
type DeleteTopicsResponse struct { CorrelationID uint32 `json:"correlation_id"` Results []struct { TopicName string `json:"topic_name"` ErrorCode int16 `json:"error_code"` } `json:"results"` }
DeleteTopicsResponse Request holds the argument of DeleteTopicsResponse
func NewDeleteTopicsResponse ¶
func NewDeleteTopicsResponse(payload []byte, version uint16) (r DeleteTopicsResponse, err error)
NewDeleteTopicsResponse creates a new DeleteTopicsResponse from []byte
func (DeleteTopicsResponse) Error ¶
func (r DeleteTopicsResponse) Error() error
Error returns error list of all failed topics
type DescribeAclsRequest ¶
type DescribeAclsRequest struct { RequestHeader DescribeAclsRequestBody }
func DecodeDescribeAclsRequest ¶
func DecodeDescribeAclsRequest(payload []byte, version uint16) (r DescribeAclsRequest, err error)
just for test
func (*DescribeAclsRequest) Encode ¶
func (r *DescribeAclsRequest) Encode(version uint16) (rst []byte)
type DescribeAclsRequestBody ¶
type DescribeAclsRequestBody struct { ResourceType AclsResourceType ResourceName *string PatternType AclsPatternType Principal *string Host *string Operation AclsOperation PermissionType AclsPermissionType TaggedFields TaggedFields }
type DescribeAclsResponse ¶
type DescribeAclsResponse struct { CorrelationID uint32 ThrottleTimeMs int32 ErrorCode int16 ErrorMessage *string Resources []AclResource TaggedFields TaggedFields }
func NewDescribeAclsResponse ¶
func NewDescribeAclsResponse(payload []byte, version uint16) (response DescribeAclsResponse, err error)
func (*DescribeAclsResponse) Encode ¶
func (r *DescribeAclsResponse) Encode(version uint16) (rst []byte, err error)
just for test
func (DescribeAclsResponse) Error ¶
func (r DescribeAclsResponse) Error() error
type DescribeConfigsRequest ¶
type DescribeConfigsRequest struct { *RequestHeader Resources []*DescribeConfigsRequestResource }
DescribeConfigsRequest holds the request parameters for DescribeConfigsRequest
func NewDescribeConfigsRequest ¶
func NewDescribeConfigsRequest(clientID string, resources []*DescribeConfigsRequestResource) *DescribeConfigsRequest
func (*DescribeConfigsRequest) Encode ¶
func (r *DescribeConfigsRequest) Encode(version uint16) []byte
func (*DescribeConfigsRequest) Length ¶
func (r *DescribeConfigsRequest) Length() int
type DescribeConfigsRequestResource ¶
type DescribeConfigsRequestResource struct { ResourceType uint8 ResourceName string ConfigNames []string }
DescribeConfigsRequestResource is part of DescribeConfigsRequest
type DescribeConfigsResponse ¶
type DescribeConfigsResponse struct { CorrelationID uint32 ThrottleTimeMS uint32 Resources []describeConfigsResponseResource }
DescribeConfigsResponse holds the parameters of a describe-configs response.
func NewDescribeConfigsResponse ¶
func NewDescribeConfigsResponse(payload []byte) (r DescribeConfigsResponse, err error)
NewDescribeConfigsResponse creates a new DescribeConfigsResponse from the given payload
func (DescribeConfigsResponse) Error ¶
func (r DescribeConfigsResponse) Error() error
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct { *RequestHeader Groups []string }
DescribeGroupsRequest holds the parameters for the DescribeGroups request API
func NewDescribeGroupsRequest ¶
func NewDescribeGroupsRequest(clientID string, groups []string) *DescribeGroupsRequest
NewDescribeGroupsRequest creates a new DescribeGroupsRequest
func (*DescribeGroupsRequest) Encode ¶
func (r *DescribeGroupsRequest) Encode(version uint16) []byte
Encode encodes the request into byte array, this implements the Request interface
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct { CorrelationID uint32 Groups []*GroupDetail }
func NewDescribeGroupsResponse ¶
func NewDescribeGroupsResponse(payload []byte) (r DescribeGroupsResponse, err error)
func (DescribeGroupsResponse) Error ¶
func (r DescribeGroupsResponse) Error() error
type DescribeLogDirsRequest ¶
type DescribeLogDirsRequest struct { *RequestHeader Topics []DescribeLogDirsRequestTopic }
DescribeLogDirsRequest is a request of DescribeLogDirsRequest
func NewDescribeLogDirsRequest ¶
func NewDescribeLogDirsRequest(clientID string, topics []string) (r DescribeLogDirsRequest)
NewDescribeLogDirsRequest returns a new DescribeLogDirsRequest
func (*DescribeLogDirsRequest) AddTopicPartition ¶
func (r *DescribeLogDirsRequest) AddTopicPartition(topic string, pid int32)
AddTopicPartition add a topic and partition to DescribeLogDirsRequest
func (DescribeLogDirsRequest) Encode ¶
func (r DescribeLogDirsRequest) Encode(version uint16) []byte
Encode encode DescribeLogDirsRequest to []byte
type DescribeLogDirsRequestTopic ¶
DescribeLogDirsRequestTopic is a topic in DescribeLogDirsRequest
type DescribeLogDirsResponse ¶
type DescribeLogDirsResponse struct { CoordinatorID uint32 `json:"-"` ThrottleTimeMS int32 `json:"throttle_time_ms"` Results []DescribeLogDirsResponseResult `json:"results"` }
DescribeLogDirsResponse is a response of DescribeLogDirsRequest
func NewDescribeLogDirsResponse ¶
func NewDescribeLogDirsResponse(payload []byte, version uint16) (r DescribeLogDirsResponse, err error)
NewDescribeLogDirsResponse create a DescribeLogDirsResponse from the given payload
func (DescribeLogDirsResponse) Error ¶
func (r DescribeLogDirsResponse) Error() error
type DescribeLogDirsResponseResult ¶
type DescribeLogDirsResponseResult struct { ErrorCode int16 `json:"error_code"` LogDir string `json:"log_dir"` Topics []DescribeLogDirsResponseTopic `json:"topics"` }
type DescribeLogDirsResponseTopic ¶
type DescribeLogDirsResponseTopic struct { TopicName string `json:"topic"` Partitions []DescribeLogDirsResponsePartition `json:"partitions"` }
type ElectLeadersRequest ¶
type ElectLeadersRequest struct { *RequestHeader ElectionType int8 `json:"election_type"` Topics []*TopicPartition `json:"topics"` TimeoutMS int32 `json:"timeout.ms"` }
func NewElectLeadersRequest ¶
func NewElectLeadersRequest(timeoutMS int32) ElectLeadersRequest
NewElectLeadersRequest returns a new ElectLeadersRequest
func (*ElectLeadersRequest) Add ¶
func (r *ElectLeadersRequest) Add(topic string, pid int32)
Add adds a topic partition to the request, it does not check if the topic partition already exists
func (*ElectLeadersRequest) Encode ¶
func (r *ElectLeadersRequest) Encode(version uint16) []byte
Encode encodes a create partitions request into []byte
type ElectLeadersResponse ¶
type ElectLeadersResponse struct { CorrelationID uint32 `json:"correlation_id"` ThrottleTimeMS int32 `json:"throttle_time_ms"` ReplicaElectionResults []*ReplicaElectionResult `json:"replica_election_results"` }
func NewElectLeadersResponse ¶
func NewElectLeadersResponse(payload []byte, version uint16) (r *ElectLeadersResponse, err error)
NewElectLeadersResponse creates a new ElectLeadersResponse.
func (*ElectLeadersResponse) Error ¶
func (r *ElectLeadersResponse) Error() error
type FetchRequest ¶
type FetchRequest struct { *RequestHeader ReplicaID int32 MaxWaitTime int32 MinBytes int32 MaxBytes int32 ISOLationLevel int8 SessionID int32 SessionEpoch int32 Topics map[string][]*PartitionBlock ForgottenTopicsDatas map[string][]int32 }
FetchRequest holds all the parameters of fetch request
func NewFetchRequest ¶
func NewFetchRequest(clientID string, maxWaitTime int32, minBytes int32) *FetchRequest
NewFetchRequest creates a new FetchRequest
func (*FetchRequest) Encode ¶
func (fetchRequest *FetchRequest) Encode(version uint16) []byte
Encode encodes request to []byte
type FetchResponse ¶
type FindCoordinatorRequest ¶
type FindCoordinatorRequest struct { *RequestHeader GroupID string }
func NewFindCoordinatorRequest ¶
func NewFindCoordinatorRequest(clientID, groupID string) *FindCoordinatorRequest
func (*FindCoordinatorRequest) Encode ¶
func (findCoordinatorR *FindCoordinatorRequest) Encode(version uint16) []byte
type FindCoordinatorResponse ¶
type FindCoordinatorResponse struct { CorrelationID uint32 ErrorCode int16 Coordinator Coordinator }
FindCoordinatorResponse is the response of findcoordinator request, including correlationID, errorCode, coordinator
func NewFindCoordinatorResponse ¶
func NewFindCoordinatorResponse(payload []byte, version uint16) (r FindCoordinatorResponse, err error)
NewFindCoordinatorResponse create a NewFindCoordinatorResponse instance from response payload bytes
func (FindCoordinatorResponse) Error ¶
func (r FindCoordinatorResponse) Error() error
type FullMessage ¶
FullMessage contains message value and topic and partition
type Group ¶
type Group struct { GroupID string ProtocolType string GroupState string `healer:"minVersion:4"` GroupType string `healer:"minVersion:5"` TaggedFields TaggedFields `json:",omitempty"` }
type GroupAssignment ¶
TODO map
type GroupConsumer ¶
type GroupConsumer struct {
// contains filtered or unexported fields
}
GroupConsumer can join one group with other GroupConsumers with the same groupID and they consume messages from Kafka they will rebalance when new GroupConsumer joins or one leaves
func NewGroupConsumer ¶
func NewGroupConsumer(topic string, config interface{}) (*GroupConsumer, error)
NewGroupConsumer cretae a new GroupConsumer
func (*GroupConsumer) AwaitClose ¶
func (c *GroupConsumer) AwaitClose(timeout time.Duration)
AwaitClose will wait all simple consumers stop and then return or timeout and return after some time
func (*GroupConsumer) Close ¶
func (c *GroupConsumer) Close()
Close will wait all simple consumers stop and then return or timeout and return after 30s
func (*GroupConsumer) CommitOffset ¶
func (c *GroupConsumer) CommitOffset()
CommitOffset commit offset to kafka server
func (*GroupConsumer) Consume ¶
func (c *GroupConsumer) Consume(messages chan *FullMessage) (<-chan *FullMessage, error)
Consume will join group and then cosumes messages from kafka. it return a chan, and client could get messages from the chan
type GroupDetail ¶
type GroupProtocol ¶
GroupProtocol is sub struct in JoinGroupRequest
type GzipCompressor ¶
type GzipCompressor struct { }
type HealerError ¶
type HealerError int32
func (*HealerError) Error ¶
func (healerError *HealerError) Error() string
type HeartbeatRequest ¶
type HeartbeatRequest struct { *RequestHeader GroupID string GenerationID int32 MemberID string }
TODO version0
func NewHeartbeatRequest ¶
func NewHeartbeatRequest(clientID, groupID string, generationID int32, memberID string) *HeartbeatRequest
func (*HeartbeatRequest) Encode ¶
func (heartbeatR *HeartbeatRequest) Encode(version uint16) []byte
func (*HeartbeatRequest) Length ¶
func (heartbeatR *HeartbeatRequest) Length() int
type HeartbeatResponse ¶
func NewHeartbeatResponse ¶
func NewHeartbeatResponse(payload []byte) (r HeartbeatResponse, err error)
func (HeartbeatResponse) Error ¶
func (r HeartbeatResponse) Error() error
type Helper ¶
type Helper struct {
// contains filtered or unexported fields
}
func NewHelperFromBrokers ¶
func (*Helper) UpdateMeta ¶
type IncrementalAlterConfigsRequest ¶
type IncrementalAlterConfigsRequest struct { *RequestHeader Resources []IncrementalAlterConfigsRequestResource `json:"resources"` ValidateOnly bool `json:"validate_only"` }
IncrementalAlterConfigsRequest struct holds params in AlterConfigsRequest
func DecodeIncrementalAlterConfigsRequest ¶
func DecodeIncrementalAlterConfigsRequest(payload []byte, version uint16) (r IncrementalAlterConfigsRequest)
just for test
func NewIncrementalAlterConfigsRequest ¶
func NewIncrementalAlterConfigsRequest(clientID string) IncrementalAlterConfigsRequest
NewIncrementalAlterConfigsRequest create a new IncrementalAlterConfigsRequest
func (*IncrementalAlterConfigsRequest) AddConfig ¶
func (r *IncrementalAlterConfigsRequest) AddConfig(resourceType uint8, resourceName, configName, configValue string) error
AddConfig add new config entry to request
func (IncrementalAlterConfigsRequest) Encode ¶
func (r IncrementalAlterConfigsRequest) Encode(version uint16) []byte
Encode encodes AlterConfigsRequest object to []byte. it implement Request Interface
func (*IncrementalAlterConfigsRequest) SetValidateOnly ¶
func (r *IncrementalAlterConfigsRequest) SetValidateOnly(validateOnly bool) *IncrementalAlterConfigsRequest
SetValidateOnly set validateOnly in request
type IncrementalAlterConfigsRequestConfigEntry ¶
type IncrementalAlterConfigsRequestConfigEntry struct { Name string `json:"name"` Operation int8 `json:"operation"` Value string `json:"value"` }
IncrementalAlterConfigsRequestConfigEntry is sub struct in AlterConfigsRequestResource
type IncrementalAlterConfigsRequestResource ¶
type IncrementalAlterConfigsRequestResource struct { ResourceType uint8 `json:"type"` ResourceName string `json:"name"` Entries []IncrementalAlterConfigsRequestConfigEntry `json:"entries"` }
IncrementalAlterConfigsRequestResource is sub struct in AlterConfigsRequest
type IncrementalAlterConfigsResponse ¶
type IncrementalAlterConfigsResponse struct { CorrelationID uint32 `json:"correlation_id"` ThrottleTimeMs uint32 `json:"throttle_time_ms"` Resources []IncrementalAlterConfigsResponseResource `json:"resources"` }
IncrementalAlterConfigsResponse struct holds params in AlterConfigsRequest
func NewIncrementalAlterConfigsResponse ¶
func NewIncrementalAlterConfigsResponse(payload []byte, version uint16) (r IncrementalAlterConfigsResponse, err error)
NewIncrementalAlterConfigsResponse create a new IncrementalAlterConfigsResponse. This does not return error in the response. user may need to check the error code in the response by themselves.
func (IncrementalAlterConfigsResponse) Error ¶
func (r IncrementalAlterConfigsResponse) Error() error
type IncrementalAlterConfigsResponseResource ¶
type IncrementalAlterConfigsResponseResource struct { ErrorCode int16 `json:"error_code"` ErrorMessage string `json:"error_message"` ResourceType uint8 `json:"resource_type"` ResourceName string `json:"resource_name"` }
IncrementalAlterConfigsResponseResource is sub struct in AlterConfigsRequest
type JoinGroupRequest ¶
type JoinGroupRequest struct { *RequestHeader GroupID string SessionTimeout int32 // ms RebalanceTimeout int32 // ms. this is NOT included in verions 0 MemberID string ProtocolType string GroupProtocols []*GroupProtocol }
JoinGroupRequest struct holds params in JoinGroupRequest
func NewJoinGroupRequest ¶
func NewJoinGroupRequest(apiVersion uint16, clientID string) *JoinGroupRequest
NewJoinGroupRequest create a JoinGroupRequest
func (*JoinGroupRequest) AddGroupProtocal ¶
func (r *JoinGroupRequest) AddGroupProtocal(gp *GroupProtocol)
AddGroupProtocal add new GroupProtocol to JoinGroupReuqest
func (*JoinGroupRequest) Encode ¶
func (r *JoinGroupRequest) Encode(version uint16) []byte
Encode encodes the JoinGroupRequest object to []byte. it implement Request Interface
type JoinGroupResponse ¶
type JoinGroupResponse struct { CorrelationID uint32 ErrorCode int16 GenerationID int32 GroupProtocol string LeaderID string MemberID string Members []Member }
func NewJoinGroupResponse ¶
func NewJoinGroupResponse(payload []byte) (r JoinGroupResponse, err error)
func (JoinGroupResponse) Error ¶
func (r JoinGroupResponse) Error() error
type KafkaError ¶
type KafkaError int16
func (KafkaError) Error ¶
func (kafkaError KafkaError) Error() string
func (KafkaError) IsRetriable ¶
func (kafkaError KafkaError) IsRetriable() bool
type LZ4Compressor ¶
type LZ4Compressor struct { }
type LeaveGroupRequest ¶
type LeaveGroupRequest struct { *RequestHeader GroupID string MemberID string }
version 0
func NewLeaveGroupRequest ¶
func NewLeaveGroupRequest(clientID, groupID, memberID string) *LeaveGroupRequest
func (*LeaveGroupRequest) Encode ¶
func (r *LeaveGroupRequest) Encode(version uint16) []byte
func (*LeaveGroupRequest) Length ¶
func (r *LeaveGroupRequest) Length() int
type LeaveGroupResponse ¶
func NewLeaveGroupResponse ¶
func NewLeaveGroupResponse(payload []byte) (r LeaveGroupResponse, err error)
func (LeaveGroupResponse) Error ¶
func (r LeaveGroupResponse) Error() error
type ListGroupsRequest ¶
type ListGroupsRequest struct { *RequestHeader StatesFilter []string `healer:"minVersion:4"` TypesFilter []string `healer:"minVersion:5"` TaggedFields TaggedFields }
version0
func DecodeListGroupsRequest ¶
func DecodeListGroupsRequest(payload []byte) (r *ListGroupsRequest, err error)
just for test
func NewListGroupsRequest ¶
func NewListGroupsRequest(clientID string) *ListGroupsRequest
func (*ListGroupsRequest) Encode ¶
func (r *ListGroupsRequest) Encode(version uint16) (payload []byte)
func (*ListGroupsRequest) SetStatesFilter ¶
func (r *ListGroupsRequest) SetStatesFilter(statesFilter []string)
func (*ListGroupsRequest) SetTypesFilter ¶
func (r *ListGroupsRequest) SetTypesFilter(typesFilter []string)
type ListGroupsResponse ¶
type ListGroupsResponse struct { ResponseHeader ThrottleTimeMS int32 `healer:"minVersion:1"` ErrorCode uint16 Groups []*Group TaggedFields TaggedFields }
func NewListGroupsResponse ¶
func NewListGroupsResponse(payload []byte, version uint16) (r *ListGroupsResponse, err error)
func (*ListGroupsResponse) Encode ¶
func (r *ListGroupsResponse) Encode(version uint16) (payload []byte)
just for test
func (*ListGroupsResponse) Error ¶
func (r *ListGroupsResponse) Error() error
type ListPartitionReassignmentsRequest ¶
type ListPartitionReassignmentsRequest struct { *RequestHeader TimeoutMS int32 Topics []struct { Name string Partitions []int32 TaggedFields TaggedFields } TaggedFields TaggedFields }
ListPartitionReassignmentsRequest is a request to kafka to list partition reassignments
func NewListPartitionReassignmentsRequest ¶
func NewListPartitionReassignmentsRequest(clientID string, timeoutMS int32) ListPartitionReassignmentsRequest
NewListPartitionReassignmentsRequest creates a new ListPartitionReassignmentsRequest
func (*ListPartitionReassignmentsRequest) AddTP ¶
func (r *ListPartitionReassignmentsRequest) AddTP(topicName string, pid int32)
AddTP adds a topic/partition to the request
func (ListPartitionReassignmentsRequest) Encode ¶
func (r ListPartitionReassignmentsRequest) Encode(version uint16) []byte
Encode encodes a ListPartitionReassignmentsRequest into a byte array.
type ListPartitionReassignmentsResponse ¶
type ListPartitionReassignmentsResponse struct { CorrelationID uint32 `json:"correlation_id"` ThrottleTimeMS int32 `json:"throttle_time_ms"` ErrorCode int16 `json:"error_code"` ErrorMessage *string `json:"error_message"` Topics []listPartitionReassignmentsTopicBlock `json:"topics"` }
ListPartitionReassignmentsResponse is a response from kafka to list partition reassignments
func NewListPartitionReassignmentsResponse ¶
func NewListPartitionReassignmentsResponse(payload []byte, version uint16) (r *ListPartitionReassignmentsResponse, err error)
NewListPartitionReassignmentsResponse decode byte array to ListPartitionReassignmentsResponse instance
func (*ListPartitionReassignmentsResponse) Error ¶
func (r *ListPartitionReassignmentsResponse) Error() error
FIXME: add error message too
type MemberAssignment ¶
type MemberAssignment struct { Version int16 PartitionAssignments []*PartitionAssignment UserData []byte }
MemberAssignment will be encoded to []byte that used as memeber of GroupAssignment in Sync Request. and sync and group response returns []byte that can be decoded to MemberAssignment
func NewMemberAssignment ¶
func NewMemberAssignment(payload []byte) (*MemberAssignment, error)
func (*MemberAssignment) Encode ¶
func (memberAssignment *MemberAssignment) Encode() []byte
func (*MemberAssignment) Length ¶
func (memberAssignment *MemberAssignment) Length() int
type MemberDetail ¶
type Message ¶
type Message struct { Offset int64 MessageSize int32 //Message Crc uint32 MagicByte int8 Attributes int8 Timestamp uint64 Key []byte Value []byte // only for version 2 Headers []RecordHeader }
Message is a message in a topic
type MessageSet ¶
type MessageSet []*Message
MessageSet is a batch of messages
func DecodeToMessageSet ¶
func DecodeToMessageSet(payload []byte) (MessageSet, error)
DecodeToMessageSet decodes a MessageSet from a byte array. MessageSet is [offset message_size message], but it only decode one message in healer generally, loops inside decodeMessageSetMagic0or1. if message.Value is compressed, it will uncompress the value and returns an array of messages.
func (*MessageSet) Length ¶
func (messageSet *MessageSet) Length() int
type MetadataRequest ¶
type MetadataRequest struct { *RequestHeader Topics []string AllowAutoTopicCreation bool }
func DecodeMetadataRequest ¶
func DecodeMetadataRequest(payload []byte) (r MetadataRequest, err error)
just for test
func NewMetadataRequest ¶
func NewMetadataRequest(clientID string, topics []string) *MetadataRequest
func (*MetadataRequest) Encode ¶
func (metadataRequest *MetadataRequest) Encode(version uint16) []byte
type MetadataResponse ¶
type MetadataResponse struct { CorrelationID uint32 ThrottleTimeMs int32 Brokers []*BrokerInfo ClusterID string ControllerID int32 TopicMetadatas []TopicMetadata }
MetadataResponse holds all the fields of metadata response, including the brokers and topics
func NewMetadataResponse ¶
func NewMetadataResponse(payload []byte, version uint16) (r MetadataResponse, err error)
NewMetadataResponse decodes a byte slice into a MetadataResponse object.
func (MetadataResponse) Error ¶
func (r MetadataResponse) Error() error
Error returns the error from the error code
type MockConn ¶
type MockConn struct { MockRead func(p []byte) (n int, err error) MockWrite func(p []byte) (n int, err error) MockClose func() error MockLocalAddr func() net.Addr MockRemoteAddr func() net.Addr MockSetDeadline func(t time.Time) error MockSetReadDeadline func(t time.Time) error MockSetWriteDeadline func(t time.Time) error }
MockConn is a mock struct for the Conn type
func (*MockConn) RemoteAddr ¶
type NetConfig ¶
type NetConfig struct { ConnectTimeoutMS int `json:"connect.timeout.ms,string" mapstructure:"connect.timeout.ms"` TimeoutMS int `json:"timeout.ms,string" mapstructure:"timeout.ms"` TimeoutMSForEachAPI []int `json:"timeout.ms.for.eachapi" mapstructure:"timeout.ms.for.eachapi"` KeepAliveMS int `json:"keepalive.ms,string" mapstructure:"keepalive.ms"` }
type NoneCompressor ¶
type NoneCompressor struct { }
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { *RequestHeader GroupID string GenerationID int32 MemberID string RetentionTime int64 Topics []*OffsetCommitRequestTopic }
func NewOffsetCommitRequest ¶
func NewOffsetCommitRequest(apiVersion uint16, clientID, groupID string) *OffsetCommitRequest
request only ONE topic
func (*OffsetCommitRequest) AddPartiton ¶
func (r *OffsetCommitRequest) AddPartiton(topic string, partitionID int32, offset int64, metadata string)
func (*OffsetCommitRequest) Encode ¶
func (r *OffsetCommitRequest) Encode(version uint16) []byte
func (*OffsetCommitRequest) Length ¶
func (r *OffsetCommitRequest) Length() int
func (*OffsetCommitRequest) SetGenerationID ¶
func (r *OffsetCommitRequest) SetGenerationID(generationID int32)
func (*OffsetCommitRequest) SetMemberID ¶
func (r *OffsetCommitRequest) SetMemberID(memberID string)
func (*OffsetCommitRequest) SetRetentionTime ¶
func (r *OffsetCommitRequest) SetRetentionTime(retentionTime int64)
type OffsetCommitRequestTopic ¶
type OffsetCommitRequestTopic struct { Topic string Partitions []*OffsetCommitRequestPartition }
type OffsetCommitResponse ¶
type OffsetCommitResponse struct { CorrelationID uint32 Topics []*OffsetCommitResponseTopic }
func NewOffsetCommitResponse ¶
func NewOffsetCommitResponse(payload []byte) (r OffsetCommitResponse, err error)
func (OffsetCommitResponse) Error ¶
func (r OffsetCommitResponse) Error() error
type OffsetCommitResponseTopic ¶
type OffsetCommitResponseTopic struct { Topic string Partitions []*OffsetCommitResponsePartition }
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { *RequestHeader GroupID string Topics []*OffsetFetchRequestTopic }
func NewOffsetFetchRequest ¶
func NewOffsetFetchRequest(apiVersion uint16, clientID, groupID string) *OffsetFetchRequest
request only ONE topic
func (*OffsetFetchRequest) AddPartiton ¶
func (r *OffsetFetchRequest) AddPartiton(topic string, partitionID int32)
func (*OffsetFetchRequest) Encode ¶
func (r *OffsetFetchRequest) Encode(version uint16) []byte
func (*OffsetFetchRequest) Length ¶
func (r *OffsetFetchRequest) Length() int
type OffsetFetchRequestTopic ¶
type OffsetFetchResponse ¶
type OffsetFetchResponse struct { CorrelationID uint32 Topics []*OffsetFetchResponseTopic }
func NewOffsetFetchResponse ¶
func NewOffsetFetchResponse(payload []byte) (r OffsetFetchResponse, err error)
NewOffsetFetchResponse decodes the response byte array to a OffsetFetchResponse struct
func (OffsetFetchResponse) Error ¶
func (r OffsetFetchResponse) Error() error
type OffsetFetchResponseTopic ¶
type OffsetFetchResponseTopic struct { Topic string Partitions []*OffsetFetchResponsePartition }
type OffsetsRequest ¶
type OffsetsRequest struct { *RequestHeader ReplicaId int32 RequestInfo map[string]map[int32]*PartitionOffsetRequestInfo }
func NewOffsetsRequest ¶
func NewOffsetsRequest(topic string, partitionIDs []int32, timeValue int64, offsets uint32, clientID string) *OffsetsRequest
request only ONE topic
func (*OffsetsRequest) Encode ¶
func (offsetR *OffsetsRequest) Encode(version uint16) []byte
type OffsetsResponse ¶
type OffsetsResponse struct { CorrelationID uint32 ThrottleTimeMs int32 TopicPartitionOffsets map[string][]PartitionOffset }
func NewOffsetsResponse ¶
func NewOffsetsResponse(payload []byte, version uint16) (r OffsetsResponse, err error)
func (OffsetsResponse) Error ¶
func (r OffsetsResponse) Error() error
type PartitionAssignment ¶
type PartitionBlock ¶
type PartitionBlock struct { Partition int32 CurrentLeaderEpoch int32 FetchOffset int64 LogStartOffset int64 MaxBytes int32 }
PartitionBlock is the partition to fetch.
type PartitionMetadataInfo ¶
type PartitionMetadataInfo struct { PartitionErrorCode int16 PartitionID int32 Leader int32 LeaderEpoch int32 Replicas []int32 Isr []int32 OfflineReplicas []int32 }
PartitionMetadataInfo holds all the fields of partition metadata info, which is used in metadata response
type PartitionOffset ¶
type PartitionOffset struct { Partition int32 ErrorCode int16 OldStyleOffsets []int64 Timestamp int64 Offset int64 }
func (*PartitionOffset) GetOffset ¶
func (p *PartitionOffset) GetOffset() int64
get the offset of the given partition from OldStyleOffsets or Offset
type PartitionResponse ¶
type PartitionResult ¶
type PlainSasl ¶
type PlainSasl struct {
// contains filtered or unexported fields
}
func NewPlainSasl ¶
type ProduceRequest ¶
type ProduceRequest struct { *RequestHeader RequiredAcks int16 Timeout int32 TopicBlocks []struct { TopicName string PartitonBlocks []struct { Partition int32 MessageSetSize int32 MessageSet MessageSet } } }
func (*ProduceRequest) Encode ¶
func (produceRequest *ProduceRequest) Encode(version uint16) []byte
func (*ProduceRequest) Length ¶
func (produceRequest *ProduceRequest) Length() int
type ProduceResponse ¶
type ProduceResponse struct { CorrelationID uint32 ProduceResponses []ProduceResponsePiece }
func NewProduceResponse ¶
func NewProduceResponse(payload []byte) (r ProduceResponse, err error)
func (ProduceResponse) Error ¶
func (r ProduceResponse) Error() error
type ProduceResponsePiece ¶
type ProduceResponsePiece struct { Topic string Partitions []ProduceResponse_PartitionResponse }
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
NewProducer creates a new console producer. config can be a map[string]interface{} or a ProducerConfig, use DefaultProducerConfig if config is nil
func (*Producer) AddMessage ¶
AddMessage add message to the producer, if key is nil, use current simple producer, else use the simple producer of the partition of the key if the simple producer of the partition of the key not exist, create a new one if the simple producer closed, retry 3 times
type ProducerConfig ¶
type ProducerConfig struct { Net NetConfig `json:"net" mapstructure:"net"` Sasl SaslConfig `json:"sasl" mapstructure:"sasl"` BootstrapServers string `json:"bootstrap.servers" mapstructure:"bootstrap.servers"` ClientID string `json:"client.id" mapstructure:"client.id"` Acks int16 `json:"acks,string" mapstructure:"acks"` CompressionType string `json:"compress.type" mapstructure:"compress.type"` BatchSize int `json:"batch.size,string" mapstructure:"batch.size"` MessageMaxCount int `json:"message.max.count,string" mapstructure:"message.max.count"` FlushIntervalMS int `json:"flush.interval.ms,string" mapstructure:"flush.interval.ms,string"` MetadataMaxAgeMS int `json:"metadata.max.age.ms,string" mapstructure:"metadata.max.age.ms"` FetchTopicMetaDataRetrys int `json:"fetch.topic.metadata.retrys,string" mapstructure:"fetch.topic.metadata.retrys"` ConnectionsMaxIdleMS int `json:"connections.max.idle.ms,string" mapstructure:"connections.max.idle.ms"` RetryBackOffMS int `json:"retry.backoff.ms,string" mapstructure:"retry.backoff.ms"` MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"` TLSEnabled bool `json:"tls.enabled,string" mapstructure:"tls.enabled"` TLS *TLSConfig `json:"tls" mapstructure:"tls"` // TODO Retries int `json:"retries,string" mapstructure:"retries"` RequestTimeoutMS int32 `json:"request.timeout.ms,string" mapstructure:"request.timeout.ms"` // producer.AddMessage will use this config to assemble Message // only 0 and 1 is implemented for now HealerMagicByte int `json:"healer.magicbyte,string" mapstructure:"healer.magicbyte"` }
ProducerConfig is the config for producer
func DefaultProducerConfig ¶
func DefaultProducerConfig() ProducerConfig
DefaultProducerConfig returns a default ProducerConfig
type ProtocolMetadata ¶
ProtocolMetadata is used in join request/response
func NewProtocolMetadata ¶
func NewProtocolMetadata(payload []byte) *ProtocolMetadata
func (*ProtocolMetadata) Encode ¶
func (m *ProtocolMetadata) Encode() []byte
func (*ProtocolMetadata) Length ¶
func (m *ProtocolMetadata) Length() int
type ReadParser ¶
type ReadParser interface { Read() ([]byte, error) Parse(data []byte) (Response, error) ReadAndParse() (Response, error) }
ReadParser read data from a connection of broker and parse the response
type Record ¶
type Record struct { Headers []RecordHeader // contains filtered or unexported fields }
Record is element of Records
func DecodeToRecord ¶
DecodeToRecord decodes the struct Record from the given payload.
type RecordBatch ¶
type RecordHeader ¶
RecordHeader is concluded in Record
type ReplicaAssignment ¶
ReplicaAssignment is sub struct in CreateTopicRequest
type ReplicaElectionResult ¶
type ReplicaElectionResult struct { Topic string `json:"topic"` PartitionResults []*PartitionResult `json:"partition_result"` }
type Request ¶
type Request interface { Encode(version uint16) []byte API() uint16 SetCorrelationID(uint32) SetVersion(uint16) }
Request is implemented by all detailed request
func NewApiVersionsRequest ¶
type RequestHeader ¶
type RequestHeader struct { APIKey uint16 APIVersion uint16 CorrelationID uint32 ClientID *string TaggedFields TaggedFields // contains filtered or unexported fields }
RequestHeader is the request header, which is used in all requests. It contains apiKey, apiVersion, correlationID, clientID
func DecodeRequestHeader ¶
func DecodeRequestHeader(payload []byte) (h RequestHeader, offset int)
DecodeRequestHeader decodes request header from []byte, just used in test cases
func (*RequestHeader) API ¶
func (requestHeader *RequestHeader) API() uint16
API returns APiKey of the request(which hold the request header)
func (*RequestHeader) EncodeTo ¶
func (h *RequestHeader) EncodeTo(payload []byte) int
EncodeTo encodes request header to []byte. this is used the all request If the playload is too small, EncodeTo will panic. https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields https://kafka.apache.org/protocol#protocol_messages
func (*RequestHeader) IsFlexible ¶
func (h *RequestHeader) IsFlexible() bool
func (*RequestHeader) SetCorrelationID ¶
func (requestHeader *RequestHeader) SetCorrelationID(c uint32)
SetCorrelationID set request's correlationID
func (*RequestHeader) SetVersion ¶
func (requestHeader *RequestHeader) SetVersion(version uint16)
SetVersion set request's apiversion
func (*RequestHeader) Version ¶
func (requestHeader *RequestHeader) Version() uint16
Version returns API version of the request
type Response ¶
type Response interface {
Error() error
}
Response is the interface of all response. Error() returns the error abstracted from the error code of the response
type ResponseHeader ¶
type ResponseHeader struct { CorrelationID uint32 TaggedFields TaggedFields // version +1 // contains filtered or unexported fields }
func DecodeResponseHeader ¶
func DecodeResponseHeader(payload []byte, apiKey uint16, apiVersion uint16) (header ResponseHeader, offset int)
func NewResponseHeader ¶
func NewResponseHeader(apiKey, apiVersion uint16) ResponseHeader
func (*ResponseHeader) Encode ¶
func (h *ResponseHeader) Encode() []byte
func (*ResponseHeader) EncodeTo ¶
func (h *ResponseHeader) EncodeTo(payload []byte) (offset int)
func (*ResponseHeader) IsFlexible ¶
func (h *ResponseHeader) IsFlexible() bool
type SaslAuthenticateRequest ¶
type SaslAuthenticateRequest struct { *RequestHeader SaslAuthBytes []byte }
version0
func NewSaslAuthenticateRequest ¶
func NewSaslAuthenticateRequest(clientID string, user, password, typ string) (r SaslAuthenticateRequest)
func (SaslAuthenticateRequest) Encode ¶
func (r SaslAuthenticateRequest) Encode(version uint16) []byte
Encode encodes SaslAuthenticateRequest to []byte
func (*SaslAuthenticateRequest) Length ¶
func (r *SaslAuthenticateRequest) Length() int
type SaslAuthenticateResponse ¶
type SaslAuthenticateResponse struct { CorrelationID uint32 ErrorCode int16 ErrorMessage string SaslAuthBytes []byte }
SaslAuthenticateResponse is the response of saslauthenticate request
func NewSaslAuthenticateResponse ¶
func NewSaslAuthenticateResponse(payload []byte) (r SaslAuthenticateResponse, err error)
NewSaslAuthenticateResponse create a NewSaslAuthenticateResponse instance from response payload bytes
func (SaslAuthenticateResponse) Error ¶
func (r SaslAuthenticateResponse) Error() error
type SaslConfig ¶
type SaslHandShakeRequest ¶
type SaslHandShakeRequest struct { *RequestHeader Mechanism string }
version0
func NewSaslHandShakeRequest ¶
func NewSaslHandShakeRequest(clientID string, mechanism string) *SaslHandShakeRequest
func (*SaslHandShakeRequest) Encode ¶
func (r *SaslHandShakeRequest) Encode(version uint16) []byte
func (*SaslHandShakeRequest) Length ¶
func (r *SaslHandShakeRequest) Length() int
type SaslHandshakeResponse ¶
type SaslHandshakeResponse struct { CorrelationID uint32 ErrorCode int16 EnabledMechanisms []string }
SaslHandshakeResponse is the response of saslhandshake request
func NewSaslHandshakeResponse ¶
func NewSaslHandshakeResponse(payload []byte) (r SaslHandshakeResponse, err error)
NewSaslHandshakeResponse create a NewSaslHandshakeResponse instance from response payload bytes
func (SaslHandshakeResponse) Error ¶
func (r SaslHandshakeResponse) Error() error
type SimpleConsumer ¶
type SimpleConsumer struct {
// contains filtered or unexported fields
}
SimpleConsumer instance is built to consume messages from kafka broker TODO make messages have direction
func NewSimpleConsumer ¶
func NewSimpleConsumer(topic string, partitionID int32, config interface{}) (*SimpleConsumer, error)
NewSimpleConsumer create a simple consumer
func NewSimpleConsumerWithBrokers ¶
func NewSimpleConsumerWithBrokers(topic string, partitionID int32, config ConsumerConfig, brokers *Brokers) *SimpleConsumer
NewSimpleConsumerWithBrokers create a simple consumer with existing brokers
func (*SimpleConsumer) CommitOffset ¶
func (c *SimpleConsumer) CommitOffset()
CommitOffset commit offset to coordinator if simpleConsumer belong to a GroupConsumer, it uses groupconsumer to commit else if it has GroupId, it use its own coordinator to commit
func (*SimpleConsumer) Consume ¶
func (c *SimpleConsumer) Consume(offset int64, messageChan chan *FullMessage) (<-chan *FullMessage, error)
Consume begins to fetch messages. It create and return a new channel if you pass nil, or it returns the channel you passed.
func (*SimpleConsumer) Stop ¶
func (c *SimpleConsumer) Stop()
Stop the consumer and wait for all relating go-routines to exit
func (*SimpleConsumer) String ¶
func (c *SimpleConsumer) String() string
type SimpleProducer ¶
type SimpleProducer struct {
// contains filtered or unexported fields
}
SimpleProducer is a simple producer that send message to certain one topic-partition
func NewSimpleProducer ¶
func NewSimpleProducer(ctx context.Context, topic string, partition int32, config interface{}) (*SimpleProducer, error)
NewSimpleProducer creates a new simple producer config can be a map[string]interface{} or a ProducerConfig, use DefaultProducerConfig if config is nil
func (*SimpleProducer) AddMessage ¶
func (p *SimpleProducer) AddMessage(key []byte, value []byte) error
AddMessage add message to message set. If message set is full, send it to kafka synchronously
func (*SimpleProducer) Flush ¶
func (p *SimpleProducer) Flush() error
Flush send all messages to kafka
type SnappyCompressor ¶
type SnappyCompressor struct { }
type SyncGroupRequest ¶
type SyncGroupRequest struct { *RequestHeader GroupID string GenerationID int32 MemberID string GroupAssignment GroupAssignment }
func NewSyncGroupRequest ¶
func NewSyncGroupRequest(clientID, groupID string, generationID int32, memberID string, groupAssignment GroupAssignment) *SyncGroupRequest
func (*SyncGroupRequest) Encode ¶
func (r *SyncGroupRequest) Encode(version uint16) []byte
Encode encodes SyncGroupRequest to []byte
func (*SyncGroupRequest) Length ¶
func (r *SyncGroupRequest) Length() int
type SyncGroupResponse ¶
type SyncGroupResponse struct { CorrelationID uint32 `json:"correlation_id"` ErrorCode int16 `json:"error_code"` MemberAssignment []byte `json:"member_assignment"` }
SyncGroupResponse is the response of syncgroup request
func NewSyncGroupResponse ¶
func NewSyncGroupResponse(payload []byte) (r SyncGroupResponse, err error)
NewSyncGroupResponse create a NewSyncGroupResponse instance from response payload bytes
func (SyncGroupResponse) Error ¶
func (r SyncGroupResponse) Error() error
type TLSConfig ¶
type TLSConfig struct { Cert string `json:"cert" mapstructure:"cert"` Key string `json:"key" mapstructure:"key"` CA string `json:"ca" mapstructure:"ca"` InsecureSkipVerify bool `json:"insecure.skip.verify,string" mapstructure:"insecure.skip.verify"` ServerName string `json:"servername" mapstructure:"servername"` }
type TaggedField ¶
type TaggedFields ¶
type TaggedFields []TaggedField
func DecodeTaggedFields ¶
func DecodeTaggedFields(payload []byte) (r TaggedFields, length int)
func (TaggedFields) Encode ¶
func (r TaggedFields) Encode() []byte
func (TaggedFields) EncodeTo ¶
func (r TaggedFields) EncodeTo(payload []byte) (offset int)
type TopicError ¶
TopicError is sub struct in CreateTopicsResponse
type TopicMetadata ¶
type TopicMetadata struct { TopicErrorCode int16 TopicName string IsInternal bool PartitionMetadatas []*PartitionMetadataInfo }
TopicMetadata holds all the fields of topic metadata, which is used in metadata response
type TopicPartition ¶
Source Files
¶
- acl_types.go
- alter_config_request.go
- alter_config_response.go
- alter_partition_reassignments_request.go
- alter_partition_reassignments_response.go
- apiversions_request.go
- apiversions_response.go
- assign.go
- broker.go
- brokers.go
- client.go
- compressor.go
- config.go
- conn_mock.go
- consumer.go
- create_acls_request.go
- create_acls_response.go
- create_partitions_request.go
- create_partitions_response.go
- create_topics_request.go
- create_topics_response.go
- delete_acls_request.go
- delete_acls_response.go
- delete_groups_request.go
- delete_groups_response.go
- delete_topics_request.go
- delete_topics_response.go
- describe_acls_request.go
- describe_configs_request.go
- describe_configs_response.go
- describe_groups_request.go
- describe_groups_response.go
- describe_logdirs_request.go
- describe_logdirs_response.go
- desscribe_acls_response.go
- elect_leaders_request.go
- elect_leaders_response.go
- error.go
- errorcode.go
- fetch_request.go
- fetch_response.go
- findcoordinator_request.go
- findcoordinator_response.go
- group.go
- group_consumer.go
- gzip_compressor.go
- healer_tags.go
- heartbeat_request.go
- heartbeat_response.go
- helper.go
- incremental_alter_config_request.go
- incremental_alter_config_response.go
- joingroup_request.go
- joingroup_response.go
- leave_group_request.go
- leave_group_response.go
- list_groups_request.go
- list_groups_response.go
- list_partition_reassignments_request.go
- list_partition_reassignments_response.go
- log.go
- lz4_compressor.go
- message.go
- message_encode.go
- metadata_request.go
- metadata_response.go
- none_compressor.go
- offset_commit_request.go
- offset_commit_response.go
- offset_fetch_request.go
- offset_fetch_response.go
- offset_request.go
- offset_response.go
- primitive_type_encode.go
- primitive_types_decode.go
- produce_request.go
- produce_response.go
- producer.go
- request.go
- resource_types.go
- response.go
- sasl_authenticate_request.go
- sasl_authenticate_response.go
- sasl_handshake_request.go
- sasl_handshake_response.go
- sasl_plain.go
- simple_consumer.go
- simple_producer.go
- snappy_compressor.go
- sync_group_request.go
- sync_group_response.go
- tagged_fields.go