healer

package module
v0.6.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: MIT Imports: 29 Imported by: 63

README

INSTALL

go install github.com/childe/healer/command/healer@latest

Docker

docker pull rmself/healer:latest

what can healer command do

  • produce messages
  • consume messages
  • get metadata
  • create topics
  • delete topics
  • create(increase) partitons
  • describe configs
  • alter configs
  • alter partiton assignments
  • get offsets
  • get pendings
  • reset offsets
  • (re)elect leaders
  • describe acls
  • create acls
  • delete acls
  • rest apis of doing jobs above

Code Examples

Group Consumer

group consumer(cluster style)

https://github.com/childe/healer/blob/master/command/healer/cmd/group-consumer.go

Producer

https://github.com/childe/healer/blob/master/command/healer/cmd/console-producer.go

Console Consumer

one consumer consume messages from all partitons

https://github.com/childe/healer/blob/master/command/healer/cmd/console-consumer.go

Simple Consumer

consume from only one certain partition

https://github.com/childe/healer/blob/master/command/healer/cmd/simple-consumer.go

Documentation

Index

Constants

View Source
const (
	AclsResourceTypeUnknown         = 0
	AclsResourceTypeAny             = 1
	AclsResourceTypeTopic           = 2
	AclsResourceTypeGroup           = 3
	AclsResourceTypeBroker          = 4
	AclsResourceTypeCluster         = 4
	AclsResourceTypeTransactionalID = 5
	AclsResourceTypeDelegationToken = 6
	AclsResourceTypeUser            = 7
)
View Source
const (
	AclsPatternTypeUnknown  = 0
	AclsPatternTypeAny      = 1
	AclsPatternTypeMatch    = 2
	AclsPatternTypeLiteral  = 3
	AclsPatternTypePrefixed = 4
)
View Source
const (
	COMPRESSION_NONE   int8 = 0
	COMPRESSION_GZIP   int8 = 1
	COMPRESSION_SNAPPY int8 = 2
	COMPRESSION_LZ4    int8 = 3
)
View Source
const (
	API_ProduceRequest              uint16 = 0
	API_FetchRequest                uint16 = 1
	API_OffsetRequest               uint16 = 2
	API_MetadataRequest             uint16 = 3
	API_OffsetCommitRequest         uint16 = 8
	API_OffsetFetchRequest          uint16 = 9
	API_FindCoordinator             uint16 = 10
	API_JoinGroup                   uint16 = 11
	API_Heartbeat                   uint16 = 12
	API_LeaveGroup                  uint16 = 13
	API_SyncGroup                   uint16 = 14
	API_DescribeGroups              uint16 = 15
	API_ListGroups                  uint16 = 16
	API_SaslHandshake               uint16 = 17
	API_ApiVersions                 uint16 = 18
	API_CreateTopics                uint16 = 19
	API_DeleteTopics                uint16 = 20
	API_DescribeAcls                uint16 = 29
	API_CreateAcls                  uint16 = 30
	API_DeleteAcls                  uint16 = 31
	API_DescribeConfigs             uint16 = 32
	API_AlterConfigs                uint16 = 33
	API_DescribeLogDirs             uint16 = 35
	API_SaslAuthenticate            uint16 = 36
	API_CreatePartitions            uint16 = 37
	API_Delete_Groups               uint16 = 42
	API_ElectLeaders                uint16 = 43
	API_IncrementalAlterConfigs     uint16 = 44
	API_AlterPartitionReassignments uint16 = 45
	API_ListPartitionReassignments  uint16 = 46
)
View Source
const (
	RESOURCETYPE_UNKNOWN uint8 = iota
	RESOURCETYPE_ANY
	RESOURCETYPE_TOPIC
	RESOURCETYPE_GROUP
	RESOURCETYPE_CLUSTER
	RESOURCETYPE_TRANSACTIONAL_ID
	RESOURCETYPE_DELEGATION_TOKEN
)
View Source
const AclsOperationAll = 2
View Source
const AclsOperationAlter = 7
View Source
const AclsOperationAlterConfigs = 11
View Source
const AclsOperationAny = 1
View Source
const AclsOperationClusterAction = 9
View Source
const AclsOperationCreate = 5
View Source
const AclsOperationDelete = 6
View Source
const AclsOperationDescribe = 8
View Source
const AclsOperationDescribeConfigs = 10
View Source
const AclsOperationIdempotentWrite = 12
View Source
const AclsOperationRead = 3
View Source
const AclsOperationUnknown = 0
View Source
const AclsOperationWrite = 4

Variables

View Source
var AllError map[int16]*Error = make(map[int16]*Error)
View Source
var ErrProducerClosed = fmt.Errorf("producer closed")

ErrProducerClosed is returned when adding message while producer is closed

Functions

func ConvertConfigResourceType

func ConvertConfigResourceType(resourceType string) uint8

ConvertConfigResourceType convert string to uint8 that's used in DescribeConfigsRequest

func GetLogger

func GetLogger() logr.Logger

GetLogger returns the logger in healer lib

func SetLogger

func SetLogger(l logr.Logger)

SetLogger passes sets the logger in healer

Types

type APIVersion

type APIVersion struct {
	// contains filtered or unexported fields
}

APIVersion holds the parameters for a API version struct in the APIVersions response

type APIVersionsRequest

type APIVersionsRequest struct {
	*RequestHeader
}

func (*APIVersionsRequest) Encode

func (req *APIVersionsRequest) Encode(version uint16) []byte

Encode encodes ApiVersionsRequest to []byte

type APIVersionsResponse

type APIVersionsResponse struct {
	CorrelationID uint32
	ErrorCode     int16
	APIVersions   []APIVersion
}

APIVersionsResponse holds the parameters for a APIVersions response

func (APIVersionsResponse) Error

func (r APIVersionsResponse) Error() error

type Acl

type Acl struct {
	Principal      string
	Host           string
	Operation      AclsOperation
	PermissionType AclsPermissionType
	TaggedFields   TaggedFields
}

type AclCreation

type AclCreation struct {
	ResourceType   AclsResourceType
	ResourceName   string
	PatternType    AclsPatternType
	Principal      string
	Host           string
	Operation      AclsOperation
	PermissionType AclsPermissionType
	TaggedFields   TaggedFields
}

type AclCreationResult

type AclCreationResult struct {
	ErrorCode    uint16
	ErrorMessage *string
	TaggedFields TaggedFields
}

type AclResource

type AclResource struct {
	ResourceType AclsResourceType
	ResourceName string
	PatternType  AclsPatternType
	Acls         []Acl
	TaggedFields TaggedFields
}

type AclsOperation

type AclsOperation int8

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

func (AclsOperation) MarshalText

func (o AclsOperation) MarshalText() ([]byte, error)

func (AclsOperation) String

func (o AclsOperation) String() string

func (*AclsOperation) UnmarshalText

func (o *AclsOperation) UnmarshalText(text []byte) error

type AclsPatternType

type AclsPatternType int8

func (AclsPatternType) MarshalText

func (t AclsPatternType) MarshalText() ([]byte, error)

func (AclsPatternType) String

func (t AclsPatternType) String() string

func (*AclsPatternType) UnmarshalText

func (t *AclsPatternType) UnmarshalText(text []byte) error

type AclsPermissionType

type AclsPermissionType int8

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java

const (
	AclsPermissionTypeUnkown AclsPermissionType = 0
	AclsPermissionTypeAny    AclsPermissionType = 1
	AclsPermissionTypeDeny   AclsPermissionType = 2
	AclsPermissionTypeAllow  AclsPermissionType = 3
)

func (AclsPermissionType) MarshalText

func (p AclsPermissionType) MarshalText() ([]byte, error)

func (AclsPermissionType) String

func (p AclsPermissionType) String() string

func (*AclsPermissionType) UnmarshalText

func (p *AclsPermissionType) UnmarshalText(text []byte) error

type AclsResourceType

type AclsResourceType int8

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java

func (AclsResourceType) MarshalText

func (t AclsResourceType) MarshalText() ([]byte, error)

func (AclsResourceType) String

func (t AclsResourceType) String() string

func (*AclsResourceType) UnmarshalText

func (t *AclsResourceType) UnmarshalText(text []byte) error

type AlterConfigsRequest

type AlterConfigsRequest struct {
	*RequestHeader
	Resources []AlterConfigsRequestResource
	// contains filtered or unexported fields
}

AlterConfigsRequest struct holds params in AlterConfigsRequest

func NewAlterConfigsRequest

func NewAlterConfigsRequest(clientID string) AlterConfigsRequest

NewAlterConfigsRequest create a new AlterConfigsRequest

func (*AlterConfigsRequest) AddConfig

func (r *AlterConfigsRequest) AddConfig(resourceType uint8, resourceName, configName, configValue string) error

AddConfig add new config entry to request

func (AlterConfigsRequest) Encode

func (r AlterConfigsRequest) Encode(version uint16) []byte

Encode encodes AlterConfigsRequest object to []byte. it implement Request Interface

func (*AlterConfigsRequest) SetValidateOnly

func (r *AlterConfigsRequest) SetValidateOnly(validateOnly bool) *AlterConfigsRequest

SetValidateOnly set validateOnly in request

type AlterConfigsRequestConfigEntry

type AlterConfigsRequestConfigEntry struct {
	ConfigName  string
	ConfigValue string
}

AlterConfigsRequestConfigEntry is sub struct in AlterConfigsRequestResource

type AlterConfigsRequestResource

type AlterConfigsRequestResource struct {
	ResourceType  uint8
	ResourceName  string
	ConfigEntries []AlterConfigsRequestConfigEntry
}

AlterConfigsRequestResource is sub struct in AlterConfigsRequest

type AlterConfigsResponse

type AlterConfigsResponse struct {
	CorrelationID  uint32
	ThrottleTimeMS uint32
	Resources      []AlterConfigsResponseResource
}

func NewAlterConfigsResponse

func NewAlterConfigsResponse(payload []byte) (r AlterConfigsResponse, err error)

func (AlterConfigsResponse) Error

func (r AlterConfigsResponse) Error() error

type AlterConfigsResponseResource

type AlterConfigsResponseResource struct {
	ErrorCode    int16
	ErrorMessage string
	ResourceType uint8
	ResourceName string
}

type AlterPartitionReassignmentsPartition

type AlterPartitionReassignmentsPartition struct {
	PartitionID int32   `json:"partition_id"`
	Replicas    []int32 `json:"replicas"`

	TaggedFields TaggedFields `json:"tagged_fields"`
}

AlterPartitionReassignmentsPartition is the partition of AlterPartitionReassignmentsTopic

type AlterPartitionReassignmentsRequest

type AlterPartitionReassignmentsRequest struct {
	*RequestHeader
	TimeoutMs int32                               `json:"timeout_ms"`
	Topics    []*AlterPartitionReassignmentsTopic `json:"topics"`

	TaggedFields TaggedFields `json:"tagged_fields"`
}

AlterPartitionReassignmentsRequest is the request of AlterPartitionReassignmentsRequest

func DecodeAlterPartitionReassignmentsRequest

func DecodeAlterPartitionReassignmentsRequest(payload []byte, version uint16) (r AlterPartitionReassignmentsRequest, err error)

just for test

func NewAlterPartitionReassignmentsRequest

func NewAlterPartitionReassignmentsRequest(timeoutMs int32) (r AlterPartitionReassignmentsRequest)

NewAlterPartitionReassignmentsRequest is used to create a new AlterPartitionReassignmentsRequest

func (*AlterPartitionReassignmentsRequest) AddAssignment

func (r *AlterPartitionReassignmentsRequest) AddAssignment(topic string, partitionID int32, replicas []int32)

AddAssignment is used to add a new assignment to AlterPartitionReassignmentsRequest It do not verify the assignment already exists or not

func (*AlterPartitionReassignmentsRequest) Encode

func (r *AlterPartitionReassignmentsRequest) Encode(version uint16) (payload []byte)

Encode encodes AlterPartitionReassignmentsRequest to []byte

type AlterPartitionReassignmentsResponse

type AlterPartitionReassignmentsResponse struct {
	ResponseHeader
	ThrottleTimeMs int32                                       `json:"throttle_time_ms"`
	ErrorCode      int16                                       `json:"error_code"`
	ErrorMsg       *string                                     `json:"error_msg"`
	Responses      []*alterPartitionReassignmentsResponseTopic `json:"responses"`

	TaggedFields TaggedFields `json:"tagged_fields"`
}

AlterPartitionReassignmentsResponse is the response of AlterPartitionReassignmentsRequest

func NewAlterPartitionReassignmentsResponse

func NewAlterPartitionReassignmentsResponse(payload []byte, version uint16) (*AlterPartitionReassignmentsResponse, error)

NewAlterPartitionReassignmentsResponse create a new AlterPartitionReassignmentsResponse

func (*AlterPartitionReassignmentsResponse) Encode

func (r *AlterPartitionReassignmentsResponse) Encode(version uint16) (payload []byte)

just for test

func (*AlterPartitionReassignmentsResponse) Error

type AlterPartitionReassignmentsTopic

type AlterPartitionReassignmentsTopic struct {
	TopicName  string                                  `json:"topic_name"`
	Partitions []*AlterPartitionReassignmentsPartition `json:"partitions"`

	TaggedFields TaggedFields `json:"tagged_fields"`
}

AlterPartitionReassignmentsTopic is the topic of AlterPartitionReassignmentsRequest

type ApiKey

type ApiKey uint16

func (ApiKey) String

func (k ApiKey) String() string

type AssignmentStrategy

type AssignmentStrategy interface {
	// generally topicMetadatas is returned by metaDataRequest sent by GroupConsumer
	Assign([]Member, []TopicMetadata) GroupAssignment
}

AssignmentStrategy is the interface for different assignment strategies, it returns GroupAssignment

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(address string, nodeID int32, config *BrokerConfig) (*Broker, error)

NewBroker is only called in NewBrokers, user must always init a Brokers instance by NewBrokers

func (*Broker) Close

func (broker *Broker) Close()

Close closes the connection to the broker

func (*Broker) GetAddress

func (broker *Broker) GetAddress() string

GetAddress returns the broker address

func (*Broker) Request

func (broker *Broker) Request(r Request) (ReadParser, error)

Request sends a request to the broker and returns a readParser user should call RequestAndGet() to get the response

func (*Broker) RequestAndGet

func (broker *Broker) RequestAndGet(r Request) (resp Response, err error)

RequestAndGet sends a request to the broker and returns the response

func (*Broker) RequestListGroups

func (broker *Broker) RequestListGroups(clientID string) (r *ListGroupsResponse, err error)

func (*Broker) String

func (broker *Broker) String() string

type BrokerConfig

type BrokerConfig struct {
	Net                       NetConfig  `json:"net" mapstructure:"net"`
	Sasl                      SaslConfig `json:"sasl" mapstructure:"sasl"`
	MetadataRefreshIntervalMS int        `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"`
	TLSEnabled                bool       `json:"tls.enabled,string" mapstructure:"tls.enabled"`
	TLS                       *TLSConfig `json:"tls" mapstructure:"tls"`
}

func DefaultBrokerConfig

func DefaultBrokerConfig() *BrokerConfig

type BrokerInfo

type BrokerInfo struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   string
}

BrokerInfo holds all the fields of broker info, which is used in metadata response

func (*BrokerInfo) NetAddress

func (b *BrokerInfo) NetAddress() string

type Brokers

type Brokers struct {
	// contains filtered or unexported fields
}

func NewBrokers

func NewBrokers(bootstrapServers string) (*Brokers, error)

NewBrokers create a new broker with default config

func NewBrokersWithConfig

func NewBrokersWithConfig(bootstrapServers string, config *BrokerConfig) (*Brokers, error)

NewBrokersWithConfig create a new broker with config

func (*Brokers) AlterPartitionReassignments

func (brokers *Brokers) AlterPartitionReassignments(req *AlterPartitionReassignmentsRequest) (r *AlterPartitionReassignmentsResponse, err error)

AlterPartitionReassignments requests AlterPartitionReassignments from controller and returns response

func (*Brokers) BrokersInfo

func (brokers *Brokers) BrokersInfo() map[int32]*BrokerInfo

BrokersInfo returns brokers info, it is a private member and should not be changed from outside

func (*Brokers) Close

func (brokers *Brokers) Close()

Close close all brokers

func (*Brokers) Controller

func (brokers *Brokers) Controller() int32

Controller return controller broker id

func (*Brokers) ElectLeaders

func (brokers *Brokers) ElectLeaders(req *ElectLeadersRequest) (r *ElectLeadersResponse, err error)

ElectLeaders requests ElectLeaders from controller and returns response

func (*Brokers) FindCoordinator

func (brokers *Brokers) FindCoordinator(clientID, groupID string) (r FindCoordinatorResponse, err error)

FindCoordinator try to requests FindCoordinator from all brokers and returns response

func (*Brokers) GetBroker

func (brokers *Brokers) GetBroker(nodeID int32) (*Broker, error)

GetBroker returns broker from cache or create a new one

func (*Brokers) GetController

func (brokers *Brokers) GetController() (*Broker, error)

func (*Brokers) ListPartitionReassignments

func (brokers *Brokers) ListPartitionReassignments(req ListPartitionReassignmentsRequest) (r *ListPartitionReassignmentsResponse, err error)

ListPartitionReassignments requests ListPartitionReassignments from controller and returns response

func (*Brokers) NewBroker

func (brokers *Brokers) NewBroker(nodeID int32) (*Broker, error)

TODO merge with GetBroker

func (*Brokers) Request

func (brokers *Brokers) Request(req Request) (Response, error)

Request try to do request from all brokers until get the response

func (*Brokers) RequestMetaData

func (brokers *Brokers) RequestMetaData(clientID string, topics []string) (r MetadataResponse, err error)

func (*Brokers) RequestOffsets

func (brokers *Brokers) RequestOffsets(clientID, topic string, partitionID int32, timeValue int64, offsets uint32) ([]OffsetsResponse, error)

RequestOffsets return the offset values array. return all partitions if partitionID < 0

type ByMemberID

type ByMemberID []string

func (ByMemberID) Len

func (a ByMemberID) Len() int

func (ByMemberID) Less

func (a ByMemberID) Less(i, j int) bool

func (ByMemberID) Swap

func (a ByMemberID) Swap(i, j int)

type ByPartitionID

type ByPartitionID []int32

func (ByPartitionID) Len

func (a ByPartitionID) Len() int

func (ByPartitionID) Less

func (a ByPartitionID) Less(i, j int) bool

func (ByPartitionID) Swap

func (a ByPartitionID) Swap(i, j int)

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(bs, clientID string) (*Client, error)

NewClient creates a new Client

func (*Client) Close

func (c *Client) Close()

Close closes the connections to kafka brokers

func (*Client) CreateAcls

func (c *Client) CreateAcls(creations []AclCreation) (*CreateAclsResponse, error)

func (*Client) DeleteAcls

func (c *Client) DeleteAcls(filters []*DeleteAclsFilter) (*DeleteAclsResponse, error)

func (*Client) DeleteTopics

func (c *Client) DeleteTopics(topics []string, timeoutMs int32) (r DeleteTopicsResponse, err error)

func (*Client) DescribeAcls

func (*Client) DescribeConfigs

func (c *Client) DescribeConfigs(resourceType, resourceName string, keys []string) (r DescribeConfigsResponse, err error)

func (*Client) DescribeLogDirs

func (c *Client) DescribeLogDirs(topics []string) (map[int32]DescribeLogDirsResponse, error)

func (*Client) ListGroups

func (c *Client) ListGroups() (groups map[int32][]*Group, err error)

ListGroups lists all consumer groups from all brokers

func (*Client) RefreshMetadata

func (c *Client) RefreshMetadata()

RefreshMetadata refreshes metadata for c.brokers

func (*Client) WithLogger

func (client *Client) WithLogger(logger logr.Logger) *Client

type Compressor

type Compressor interface {
	Compress([]byte) ([]byte, error)
}

func NewCompressor

func NewCompressor(cType string) Compressor

type ConfigEntry

type ConfigEntry struct {
	ConfigName  string
	ConfigValue string
}

ConfigEntry is sub struct in CreateTopicRequest

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer instance is built to consume messages from kafka broker

func NewConsumer

func NewConsumer(config interface{}, topics ...string) (*Consumer, error)

NewConsumer creates a new consumer instance

func (*Consumer) Assign

func (c *Consumer) Assign(topicPartitons map[string][]int)

Assign assigns the given partitions to the consumer, the consumer will only consume the given partitions Do not call this after calling Consume

func (*Consumer) AwaitClose

func (consumer *Consumer) AwaitClose(timeout time.Duration)

func (*Consumer) Consume

func (c *Consumer) Consume(messageChan chan *FullMessage) (<-chan *FullMessage, error)

Consume consumes messages from kafka broker, returns a channel of messages

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topics ...string)

Subscribe subscribes to the given list of topics, consume all the partitions of the topics. Do not call this after calling Consume

type ConsumerConfig

type ConsumerConfig struct {
	Net                  NetConfig  `json:"net" mapstructure:"net"`
	Sasl                 SaslConfig `json:"sasl" mapstructure:"sasl"`
	BootstrapServers     string     `json:"bootstrap.servers" mapstructure:"bootstrap.servers"`
	ClientID             string     `json:"client.id" mapstructure:"client.id"`
	GroupID              string     `json:"group.id" mapstructure:"group.id"`
	RetryBackOffMS       int        `json:"retry.backoff.ms,string" mapstructure:"retry.backoff.ms"`
	MetadataMaxAgeMS     int        `json:"metadata.max.age.ms,string" mapstructure:"metadata.max.age.ms"`
	SessionTimeoutMS     int32      `json:"session.timeout.ms,string" mapstructure:"session.timeout.ms"`
	FetchMaxWaitMS       int32      `json:"fetch.max.wait.ms,string" mapstructure:"fetch.max.wait.ms"`
	FetchMaxBytes        int32      `json:"fetch.max.bytes,string" mapstructure:"fetch.max.bytes"` // if this is too small, healer will double it automatically
	FetchMinBytes        int32      `json:"fetch.min.bytes,string" mapstructure:"fetch.min.bytes"`
	FromBeginning        bool       `json:"from.beginning,string" mapstructure:"from.beginning"`
	AutoCommit           bool       `json:"auto.commit,string" mapstructure:"auto.commit"`
	AutoCommitIntervalMS int        `json:"auto.commit.interval.ms,string" mapstructure:"auto.commit.interval.ms"`
	OffsetsStorage       int        `json:"offsets.storage,string" mapstructure:"offsets.storage"`

	MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"`

	TLSEnabled bool       `json:"tls.enabled,string" mapstructure:"tls.enabled"`
	TLS        *TLSConfig `json:"tls" mapstructure:"tls"`
}

func DefaultConsumerConfig

func DefaultConsumerConfig() ConsumerConfig

type Coordinator

type Coordinator struct {
	NodeID int32
	Host   string
	Port   int32
}

Coordinator is the struct of coordinator, including nodeID, host and port

type CreateAclsRequest

type CreateAclsRequest struct {
	RequestHeader
	Creations    []AclCreation
	TaggedFields TaggedFields
}

func DecodeCreateAclsRequest

func DecodeCreateAclsRequest(payload []byte) (r CreateAclsRequest, err error)

just for test

func (*CreateAclsRequest) Encode

func (r *CreateAclsRequest) Encode(version uint16) (payload []byte)

type CreateAclsResponse

type CreateAclsResponse struct {
	ResponseHeader
	Results      []AclCreationResult
	TaggedFields TaggedFields
}

func DecodeCreateAclsResponse

func DecodeCreateAclsResponse(payload []byte, version uint16) (*CreateAclsResponse, error)

func (*CreateAclsResponse) Encode

func (r *CreateAclsResponse) Encode() (payload []byte)

just for test

func (*CreateAclsResponse) Error

func (r *CreateAclsResponse) Error() error

type CreatePartitionsRequest

type CreatePartitionsRequest struct {
	*RequestHeader
	Topics       []createPartitionsRequestTopicBlock `json:"topics"`
	TimeoutMS    int32                               `json:"timeout_ms"`
	ValidateOnly bool                                `json:"validate_only"`

	TaggedFields TaggedFields `json:"tagged_fields"`
}

CreatePartitionsRequest holds the parameters of a create-partitions request.

func NewCreatePartitionsRequest

func NewCreatePartitionsRequest(clientID string, timeout uint32, validateOnly bool) CreatePartitionsRequest

NewCreatePartitionsRequest creates a new CreatePartitionsRequest.

func (*CreatePartitionsRequest) AddTopic

func (r *CreatePartitionsRequest) AddTopic(topic string, count int32, assignments [][]int32)

AddTopic adds a topic to the request.

func (CreatePartitionsRequest) Encode

func (r CreatePartitionsRequest) Encode(version uint16) []byte

Encode encodes CreatePartitionsRequest to []byte

type CreatePartitionsResponse

type CreatePartitionsResponse struct {
	CorrelationID  uint32                                `json:"correlation_id"`
	ThrottleTimeMS int32                                 `json:"throttle_time_ms"`
	Results        []createPartitionsResponseResultBlock `json:"results"`
}

CreatePartitionsResponse holds the parameters of a create-partitions response

func NewCreatePartitionsResponse

func NewCreatePartitionsResponse(payload []byte, version uint16) (r CreatePartitionsResponse, err error)

NewCreatePartitionsResponse creates a new CreatePartitionsResponse from []byte

func (CreatePartitionsResponse) Error

func (r CreatePartitionsResponse) Error() error

Error implements the error interface, it returns error from error code in the response

type CreateTopicRequest

type CreateTopicRequest struct {
	Topic              string
	NumPartitions      int32
	ReplicationFactor  int16
	ReplicaAssignments []*ReplicaAssignment
	ConfigEntries      []*ConfigEntry
}

CreateTopicRequest is sub struct in CreateTopicsRequest

type CreateTopicsRequest

type CreateTopicsRequest struct {
	*RequestHeader
	CreateTopicRequests []*CreateTopicRequest
	Timeout             uint32
}

func NewCreateTopicsRequest

func NewCreateTopicsRequest(clientID string, timeout uint32) *CreateTopicsRequest

func (*CreateTopicsRequest) AddReplicaAssignment

func (r *CreateTopicsRequest) AddReplicaAssignment(topic string, pid int32, replicas []int32) error

AddReplicaAssignment add replicas of certain topic & pid to CreateTopicRequests. It returns errorTopicNotFound if topic has not beed added to the request; it overwrite replicas if pid exists

func (*CreateTopicsRequest) AddTopic

func (r *CreateTopicsRequest) AddTopic(topic string, partitions int32, replicationFactor int16) error

AddTopic gives user easy way to fill CreateTopicsRequest. it set ReplicaAssignment and ConfigEntries as nil, user can set them by AddReplicaAssignment and ADDConfigEntry

func (*CreateTopicsRequest) Encode

func (r *CreateTopicsRequest) Encode(version uint16) []byte

Encode encodes CreateTopicsRequest to binary bytes

func (*CreateTopicsRequest) Length

func (r *CreateTopicsRequest) Length() int

Length returns the length of bytes returned by Encode func

type CreateTopicsResponse

type CreateTopicsResponse struct {
	CorrelationID uint32
	TopicErrors   []TopicError
}

CreateTopicsResponse is response of create_topics request

func NewCreateTopicsResponse

func NewCreateTopicsResponse(payload []byte) (r CreateTopicsResponse, err error)

NewCreateTopicsResponse decode binary bytes to CreateTopicsResponse struct

func (CreateTopicsResponse) Error

func (r CreateTopicsResponse) Error() error

type DeleteAclsFilter

type DeleteAclsFilter struct {
	ResourceType   AclsResourceType
	ResourceName   *string
	PatternType    AclsPatternType
	Principal      *string
	Host           *string
	Operation      AclsOperation
	PermissionType AclsPermissionType
	TaggedFields   TaggedFields
}

func (*DeleteAclsFilter) EncodeTo

func (f *DeleteAclsFilter) EncodeTo(payload []byte, version uint16, isFlexible bool) (offset int)

type DeleteAclsFilterResult

type DeleteAclsFilterResult struct {
	ErrorCode    int16
	ErrorMessage *string
	MatchingAcls []DeleteAclsMatchingAcl
	TaggedFields TaggedFields
}

type DeleteAclsMatchingAcl

type DeleteAclsMatchingAcl struct {
	ErrorCode      int16
	ErrorMessage   *string
	ResourceType   AclsResourceType
	ResourceName   string
	PatternType    AclsPatternType
	Principal      string
	Host           string
	Operation      AclsOperation
	PermissionType AclsPermissionType
	TaggedFields   TaggedFields
}

func DecodeDeleteAclsMatchingAcl

func DecodeDeleteAclsMatchingAcl(payload []byte, version uint16, isFlexible bool) (m DeleteAclsMatchingAcl, offset int)

type DeleteAclsRequest

type DeleteAclsRequest struct {
	RequestHeader
	Filters      []*DeleteAclsFilter
	TaggedFields TaggedFields
}

func DecodeDeleteAclsRequest

func DecodeDeleteAclsRequest(payload []byte) (*DeleteAclsRequest, error)

just for test

func NewDeleteAclsRequest

func NewDeleteAclsRequest(clientID string, filters []*DeleteAclsFilter) *DeleteAclsRequest

func (*DeleteAclsRequest) Encode

func (r *DeleteAclsRequest) Encode(version uint16) (payload []byte)

type DeleteAclsResponse

type DeleteAclsResponse struct {
	ResponseHeader
	ThrottleTimeMs int32
	FilterResults  []DeleteAclsFilterResult
	TaggedFields   TaggedFields
}

func DecodeDeleteAclsResponse

func DecodeDeleteAclsResponse(payload []byte, version uint16) (*DeleteAclsResponse, error)

func (*DeleteAclsResponse) Encode

func (r *DeleteAclsResponse) Encode() (payload []byte)

just for test

func (*DeleteAclsResponse) Error

func (r *DeleteAclsResponse) Error() (err error)

type DeleteGroupsRequest

type DeleteGroupsRequest struct {
	*RequestHeader
	GroupsNames []string `json:"groups_names"`
}

DeleteGroupsRequest Request holds the argument of DeleteGroupsRequest

func NewDeleteGroupsRequest

func NewDeleteGroupsRequest(clientID string, groupsNames []string) DeleteGroupsRequest

NewDeleteGroupsRequest creates a new DeleteGroupsRequest

func (DeleteGroupsRequest) Encode

func (r DeleteGroupsRequest) Encode(version uint16) []byte

Encode encodes DeleteGroupsRequest to []byte

type DeleteGroupsResponse

type DeleteGroupsResponse struct {
	CorrelationID  uint32 `json:"correlation_id"`
	ThrottleTimeMs int32  `json:"throttle_time_ms"`
	Results        []struct {
		GroupID   string `json:"group_id"`
		ErrorCode int16  `json:"error_code"`
	} `json:"results"`
}

DeleteGroupsResponse Request holds the argument of DeleteGroupsResponse

func NewDeleteGroupsResponse

func NewDeleteGroupsResponse(payload []byte) (r DeleteGroupsResponse, err error)

NewDeleteGroupsResponse creates a new DeleteGroupsResponse from []byte

func (DeleteGroupsResponse) Error

func (r DeleteGroupsResponse) Error() error

FIXME: multiple error code

type DeleteTopicsRequest

type DeleteTopicsRequest struct {
	*RequestHeader
	TopicsNames []string `json:"topics_names"`
	TimeoutMS   int32    `json:"timeout_ms"`
}

DeleteTopicsRequest Request holds the argument of DeleteTopicsRequest

func NewDeleteTopicsRequest

func NewDeleteTopicsRequest(clientID string, topicsNames []string, timeoutMS int32) DeleteTopicsRequest

NewDeleteTopicsRequest creates a new DeleteTopicsRequest

func (DeleteTopicsRequest) Encode

func (r DeleteTopicsRequest) Encode(version uint16) []byte

Encode encodes DeleteTopicsRequest to []byte

type DeleteTopicsResponse

type DeleteTopicsResponse struct {
	CorrelationID uint32 `json:"correlation_id"`
	Results       []struct {
		TopicName string `json:"topic_name"`
		ErrorCode int16  `json:"error_code"`
	} `json:"results"`
}

DeleteTopicsResponse Request holds the argument of DeleteTopicsResponse

func NewDeleteTopicsResponse

func NewDeleteTopicsResponse(payload []byte, version uint16) (r DeleteTopicsResponse, err error)

NewDeleteTopicsResponse creates a new DeleteTopicsResponse from []byte

func (DeleteTopicsResponse) Error

func (r DeleteTopicsResponse) Error() error

Error returns error list of all failed topics

type DescribeAclsRequest

type DescribeAclsRequest struct {
	RequestHeader
	DescribeAclsRequestBody
}

func DecodeDescribeAclsRequest

func DecodeDescribeAclsRequest(payload []byte, version uint16) (r DescribeAclsRequest, err error)

just for test

func (*DescribeAclsRequest) Encode

func (r *DescribeAclsRequest) Encode(version uint16) (rst []byte)

type DescribeAclsRequestBody

type DescribeAclsRequestBody struct {
	ResourceType   AclsResourceType
	ResourceName   *string
	PatternType    AclsPatternType
	Principal      *string
	Host           *string
	Operation      AclsOperation
	PermissionType AclsPermissionType
	TaggedFields   TaggedFields
}

type DescribeAclsResponse

type DescribeAclsResponse struct {
	CorrelationID uint32

	ThrottleTimeMs int32
	ErrorCode      int16
	ErrorMessage   *string
	Resources      []AclResource
	TaggedFields   TaggedFields
}

func NewDescribeAclsResponse

func NewDescribeAclsResponse(payload []byte, version uint16) (response DescribeAclsResponse, err error)

func (*DescribeAclsResponse) Encode

func (r *DescribeAclsResponse) Encode(version uint16) (rst []byte, err error)

just for test

func (DescribeAclsResponse) Error

func (r DescribeAclsResponse) Error() error

type DescribeConfigsRequest

type DescribeConfigsRequest struct {
	*RequestHeader
	Resources []*DescribeConfigsRequestResource
}

DescribeConfigsRequest holds the request parameters for DescribeConfigsRequest

func NewDescribeConfigsRequest

func NewDescribeConfigsRequest(clientID string, resources []*DescribeConfigsRequestResource) *DescribeConfigsRequest

func (*DescribeConfigsRequest) Encode

func (r *DescribeConfigsRequest) Encode(version uint16) []byte

func (*DescribeConfigsRequest) Length

func (r *DescribeConfigsRequest) Length() int

type DescribeConfigsRequestResource

type DescribeConfigsRequestResource struct {
	ResourceType uint8
	ResourceName string
	ConfigNames  []string
}

DescribeConfigsRequestResource is part of DescribeConfigsRequest

type DescribeConfigsResponse

type DescribeConfigsResponse struct {
	CorrelationID  uint32
	ThrottleTimeMS uint32
	Resources      []describeConfigsResponseResource
}

DescribeConfigsResponse holds the parameters of a describe-configs response.

func NewDescribeConfigsResponse

func NewDescribeConfigsResponse(payload []byte) (r DescribeConfigsResponse, err error)

NewDescribeConfigsResponse creates a new DescribeConfigsResponse from the given payload

func (DescribeConfigsResponse) Error

func (r DescribeConfigsResponse) Error() error

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	*RequestHeader
	Groups []string
}

DescribeGroupsRequest holds the parameters for the DescribeGroups request API

func NewDescribeGroupsRequest

func NewDescribeGroupsRequest(clientID string, groups []string) *DescribeGroupsRequest

NewDescribeGroupsRequest creates a new DescribeGroupsRequest

func (*DescribeGroupsRequest) Encode

func (r *DescribeGroupsRequest) Encode(version uint16) []byte

Encode encodes the request into byte array, this implements the Request interface

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	CorrelationID uint32
	Groups        []*GroupDetail
}

func NewDescribeGroupsResponse

func NewDescribeGroupsResponse(payload []byte) (r DescribeGroupsResponse, err error)

func (DescribeGroupsResponse) Error

func (r DescribeGroupsResponse) Error() error

type DescribeLogDirsRequest

type DescribeLogDirsRequest struct {
	*RequestHeader
	Topics []DescribeLogDirsRequestTopic
}

DescribeLogDirsRequest is a request of DescribeLogDirsRequest

func NewDescribeLogDirsRequest

func NewDescribeLogDirsRequest(clientID string, topics []string) (r DescribeLogDirsRequest)

NewDescribeLogDirsRequest returns a new DescribeLogDirsRequest

func (*DescribeLogDirsRequest) AddTopicPartition

func (r *DescribeLogDirsRequest) AddTopicPartition(topic string, pid int32)

AddTopicPartition add a topic and partition to DescribeLogDirsRequest

func (DescribeLogDirsRequest) Encode

func (r DescribeLogDirsRequest) Encode(version uint16) []byte

Encode encode DescribeLogDirsRequest to []byte

type DescribeLogDirsRequestTopic

type DescribeLogDirsRequestTopic struct {
	TopicName  string
	Partitions []int32
}

DescribeLogDirsRequestTopic is a topic in DescribeLogDirsRequest

type DescribeLogDirsResponse

type DescribeLogDirsResponse struct {
	CoordinatorID  uint32                          `json:"-"`
	ThrottleTimeMS int32                           `json:"throttle_time_ms"`
	Results        []DescribeLogDirsResponseResult `json:"results"`
}

DescribeLogDirsResponse is a response of DescribeLogDirsRequest

func NewDescribeLogDirsResponse

func NewDescribeLogDirsResponse(payload []byte, version uint16) (r DescribeLogDirsResponse, err error)

NewDescribeLogDirsResponse create a DescribeLogDirsResponse from the given payload

func (DescribeLogDirsResponse) Error

func (r DescribeLogDirsResponse) Error() error

type DescribeLogDirsResponsePartition

type DescribeLogDirsResponsePartition struct {
	PartitionID int32 `json:"partition_id"`
	Size        int64 `json:"size"`
	OffsetLag   int64 `json:"offset_lag"`
	IsFutureKey bool  `json:"is_future_key"`
}

type DescribeLogDirsResponseResult

type DescribeLogDirsResponseResult struct {
	ErrorCode int16                          `json:"error_code"`
	LogDir    string                         `json:"log_dir"`
	Topics    []DescribeLogDirsResponseTopic `json:"topics"`
}

type DescribeLogDirsResponseTopic

type DescribeLogDirsResponseTopic struct {
	TopicName  string                             `json:"topic"`
	Partitions []DescribeLogDirsResponsePartition `json:"partitions"`
}

type ElectLeadersRequest

type ElectLeadersRequest struct {
	*RequestHeader

	ElectionType int8              `json:"election_type"`
	Topics       []*TopicPartition `json:"topics"`
	TimeoutMS    int32             `json:"timeout.ms"`
}

func NewElectLeadersRequest

func NewElectLeadersRequest(timeoutMS int32) ElectLeadersRequest

NewElectLeadersRequest returns a new ElectLeadersRequest

func (*ElectLeadersRequest) Add

func (r *ElectLeadersRequest) Add(topic string, pid int32)

Add adds a topic partition to the request, it does not check if the topic partition already exists

func (*ElectLeadersRequest) Encode

func (r *ElectLeadersRequest) Encode(version uint16) []byte

Encode encodes a create partitions request into []byte

type ElectLeadersResponse

type ElectLeadersResponse struct {
	CorrelationID uint32 `json:"correlation_id"`

	ThrottleTimeMS         int32                    `json:"throttle_time_ms"`
	ReplicaElectionResults []*ReplicaElectionResult `json:"replica_election_results"`
}

func NewElectLeadersResponse

func NewElectLeadersResponse(payload []byte, version uint16) (r *ElectLeadersResponse, err error)

NewElectLeadersResponse creates a new ElectLeadersResponse.

func (*ElectLeadersResponse) Error

func (r *ElectLeadersResponse) Error() error

type Error

type Error struct {
	ErrorCode int16
	ErrorMsg  string
	ErrorDesc string
	Retriable bool
}

type FetchRequest

type FetchRequest struct {
	*RequestHeader
	ReplicaID            int32
	MaxWaitTime          int32
	MinBytes             int32
	MaxBytes             int32
	ISOLationLevel       int8
	SessionID            int32
	SessionEpoch         int32
	Topics               map[string][]*PartitionBlock
	ForgottenTopicsDatas map[string][]int32
}

FetchRequest holds all the parameters of fetch request

func NewFetchRequest

func NewFetchRequest(clientID string, maxWaitTime int32, minBytes int32) *FetchRequest

NewFetchRequest creates a new FetchRequest

func (*FetchRequest) Encode

func (fetchRequest *FetchRequest) Encode(version uint16) []byte

Encode encodes request to []byte

type FetchResponse

type FetchResponse struct {
	CorrelationID  int32
	ThrottleTimeMs int32
	ErrorCode      int16
	SessionID      int32
	Responses      map[string][]PartitionResponse
}

func (*FetchResponse) Encode

func (r *FetchResponse) Encode(version uint16) ([]byte, error)

type FindCoordinatorRequest

type FindCoordinatorRequest struct {
	*RequestHeader
	GroupID string
}

func NewFindCoordinatorRequest

func NewFindCoordinatorRequest(clientID, groupID string) *FindCoordinatorRequest

func (*FindCoordinatorRequest) Encode

func (findCoordinatorR *FindCoordinatorRequest) Encode(version uint16) []byte

type FindCoordinatorResponse

type FindCoordinatorResponse struct {
	CorrelationID uint32
	ErrorCode     int16
	Coordinator   Coordinator
}

FindCoordinatorResponse is the response of findcoordinator request, including correlationID, errorCode, coordinator

func NewFindCoordinatorResponse

func NewFindCoordinatorResponse(payload []byte, version uint16) (r FindCoordinatorResponse, err error)

NewFindCoordinatorResponse create a NewFindCoordinatorResponse instance from response payload bytes

func (FindCoordinatorResponse) Error

func (r FindCoordinatorResponse) Error() error

type FullMessage

type FullMessage struct {
	TopicName   string
	PartitionID int32
	Error       error
	Message     *Message
}

FullMessage contains message value and topic and partition

type Group

type Group struct {
	GroupID      string
	ProtocolType string
	GroupState   string       `healer:"minVersion:4"`
	GroupType    string       `healer:"minVersion:5"`
	TaggedFields TaggedFields `json:",omitempty"`
}

type GroupAssignment

type GroupAssignment []struct {
	MemberID         string
	MemberAssignment []byte
}

TODO map

type GroupConsumer

type GroupConsumer struct {
	// contains filtered or unexported fields
}

GroupConsumer can join one group with other GroupConsumers with the same groupID and they consume messages from Kafka they will rebalance when new GroupConsumer joins or one leaves

func NewGroupConsumer

func NewGroupConsumer(topic string, config interface{}) (*GroupConsumer, error)

NewGroupConsumer cretae a new GroupConsumer

func (*GroupConsumer) AwaitClose

func (c *GroupConsumer) AwaitClose(timeout time.Duration)

AwaitClose will wait all simple consumers stop and then return or timeout and return after some time

func (*GroupConsumer) Close

func (c *GroupConsumer) Close()

Close will wait all simple consumers stop and then return or timeout and return after 30s

func (*GroupConsumer) CommitOffset

func (c *GroupConsumer) CommitOffset()

CommitOffset commit offset to kafka server

func (*GroupConsumer) Consume

func (c *GroupConsumer) Consume(messages chan *FullMessage) (<-chan *FullMessage, error)

Consume will join group and then cosumes messages from kafka. it return a chan, and client could get messages from the chan

type GroupDetail

type GroupDetail struct {
	ErrorCode    int16
	GroupID      string
	State        string
	ProtocolType string
	Protocol     string
	Members      []MemberDetail
}

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}

GroupProtocol is sub struct in JoinGroupRequest

type GzipCompressor

type GzipCompressor struct {
}

func (*GzipCompressor) Compress

func (c *GzipCompressor) Compress(value []byte) ([]byte, error)

type HealerError

type HealerError int32

func (*HealerError) Error

func (healerError *HealerError) Error() string

type HeartbeatRequest

type HeartbeatRequest struct {
	*RequestHeader
	GroupID      string
	GenerationID int32
	MemberID     string
}

TODO version0

func NewHeartbeatRequest

func NewHeartbeatRequest(clientID, groupID string, generationID int32, memberID string) *HeartbeatRequest

func (*HeartbeatRequest) Encode

func (heartbeatR *HeartbeatRequest) Encode(version uint16) []byte

func (*HeartbeatRequest) Length

func (heartbeatR *HeartbeatRequest) Length() int

type HeartbeatResponse

type HeartbeatResponse struct {
	CorrelationID uint32
	ErrorCode     int16
}

func NewHeartbeatResponse

func NewHeartbeatResponse(payload []byte) (r HeartbeatResponse, err error)

func (HeartbeatResponse) Error

func (r HeartbeatResponse) Error() error

type Helper

type Helper struct {
	// contains filtered or unexported fields
}

func NewHelper

func NewHelper(brokerList, clientID string, config *BrokerConfig) (*Helper, error)

func NewHelperFromBrokers

func NewHelperFromBrokers(brokers *Brokers, clientID string) *Helper

func (*Helper) GetGroups

func (h *Helper) GetGroups() []string

func (*Helper) UpdateMeta

func (h *Helper) UpdateMeta() error

type IncrementalAlterConfigsRequest

type IncrementalAlterConfigsRequest struct {
	*RequestHeader
	Resources    []IncrementalAlterConfigsRequestResource `json:"resources"`
	ValidateOnly bool                                     `json:"validate_only"`
}

IncrementalAlterConfigsRequest struct holds params in AlterConfigsRequest

func DecodeIncrementalAlterConfigsRequest

func DecodeIncrementalAlterConfigsRequest(payload []byte, version uint16) (r IncrementalAlterConfigsRequest)

just for test

func NewIncrementalAlterConfigsRequest

func NewIncrementalAlterConfigsRequest(clientID string) IncrementalAlterConfigsRequest

NewIncrementalAlterConfigsRequest create a new IncrementalAlterConfigsRequest

func (*IncrementalAlterConfigsRequest) AddConfig

func (r *IncrementalAlterConfigsRequest) AddConfig(resourceType uint8, resourceName, configName, configValue string) error

AddConfig add new config entry to request

func (IncrementalAlterConfigsRequest) Encode

func (r IncrementalAlterConfigsRequest) Encode(version uint16) []byte

Encode encodes AlterConfigsRequest object to []byte. it implement Request Interface

func (*IncrementalAlterConfigsRequest) SetValidateOnly

func (r *IncrementalAlterConfigsRequest) SetValidateOnly(validateOnly bool) *IncrementalAlterConfigsRequest

SetValidateOnly set validateOnly in request

type IncrementalAlterConfigsRequestConfigEntry

type IncrementalAlterConfigsRequestConfigEntry struct {
	Name      string `json:"name"`
	Operation int8   `json:"operation"`
	Value     string `json:"value"`
}

IncrementalAlterConfigsRequestConfigEntry is sub struct in AlterConfigsRequestResource

type IncrementalAlterConfigsRequestResource

type IncrementalAlterConfigsRequestResource struct {
	ResourceType uint8                                       `json:"type"`
	ResourceName string                                      `json:"name"`
	Entries      []IncrementalAlterConfigsRequestConfigEntry `json:"entries"`
}

IncrementalAlterConfigsRequestResource is sub struct in AlterConfigsRequest

type IncrementalAlterConfigsResponse

type IncrementalAlterConfigsResponse struct {
	CorrelationID  uint32                                    `json:"correlation_id"`
	ThrottleTimeMs uint32                                    `json:"throttle_time_ms"`
	Resources      []IncrementalAlterConfigsResponseResource `json:"resources"`
}

IncrementalAlterConfigsResponse struct holds params in AlterConfigsRequest

func NewIncrementalAlterConfigsResponse

func NewIncrementalAlterConfigsResponse(payload []byte, version uint16) (r IncrementalAlterConfigsResponse, err error)

NewIncrementalAlterConfigsResponse create a new IncrementalAlterConfigsResponse. This does not return error in the response. user may need to check the error code in the response by themselves.

func (IncrementalAlterConfigsResponse) Error

type IncrementalAlterConfigsResponseResource

type IncrementalAlterConfigsResponseResource struct {
	ErrorCode    int16  `json:"error_code"`
	ErrorMessage string `json:"error_message"`
	ResourceType uint8  `json:"resource_type"`
	ResourceName string `json:"resource_name"`
}

IncrementalAlterConfigsResponseResource is sub struct in AlterConfigsRequest

type JoinGroupRequest

type JoinGroupRequest struct {
	*RequestHeader
	GroupID          string
	SessionTimeout   int32 // ms
	RebalanceTimeout int32 // ms. this is NOT included in verions 0
	MemberID         string
	ProtocolType     string
	GroupProtocols   []*GroupProtocol
}

JoinGroupRequest struct holds params in JoinGroupRequest

func NewJoinGroupRequest

func NewJoinGroupRequest(apiVersion uint16, clientID string) *JoinGroupRequest

NewJoinGroupRequest create a JoinGroupRequest

func (*JoinGroupRequest) AddGroupProtocal

func (r *JoinGroupRequest) AddGroupProtocal(gp *GroupProtocol)

AddGroupProtocal add new GroupProtocol to JoinGroupReuqest

func (*JoinGroupRequest) Encode

func (r *JoinGroupRequest) Encode(version uint16) []byte

Encode encodes the JoinGroupRequest object to []byte. it implement Request Interface

type JoinGroupResponse

type JoinGroupResponse struct {
	CorrelationID uint32
	ErrorCode     int16
	GenerationID  int32
	GroupProtocol string
	LeaderID      string
	MemberID      string
	Members       []Member
}

func NewJoinGroupResponse

func NewJoinGroupResponse(payload []byte) (r JoinGroupResponse, err error)

func (JoinGroupResponse) Error

func (r JoinGroupResponse) Error() error

type KafkaError

type KafkaError int16

func (KafkaError) Error

func (kafkaError KafkaError) Error() string

func (KafkaError) IsRetriable

func (kafkaError KafkaError) IsRetriable() bool

type LZ4Compressor

type LZ4Compressor struct {
}

func (*LZ4Compressor) Compress

func (c *LZ4Compressor) Compress(value []byte) ([]byte, error)

type LeaveGroupRequest

type LeaveGroupRequest struct {
	*RequestHeader
	GroupID  string
	MemberID string
}

version 0

func NewLeaveGroupRequest

func NewLeaveGroupRequest(clientID, groupID, memberID string) *LeaveGroupRequest

func (*LeaveGroupRequest) Encode

func (r *LeaveGroupRequest) Encode(version uint16) []byte

func (*LeaveGroupRequest) Length

func (r *LeaveGroupRequest) Length() int

type LeaveGroupResponse

type LeaveGroupResponse struct {
	CorrelationID uint32
	ErrorCode     int16
}

func NewLeaveGroupResponse

func NewLeaveGroupResponse(payload []byte) (r LeaveGroupResponse, err error)

func (LeaveGroupResponse) Error

func (r LeaveGroupResponse) Error() error

type ListGroupsRequest

type ListGroupsRequest struct {
	*RequestHeader
	StatesFilter []string `healer:"minVersion:4"`
	TypesFilter  []string `healer:"minVersion:5"`
	TaggedFields TaggedFields
}

version0

func DecodeListGroupsRequest

func DecodeListGroupsRequest(payload []byte) (r *ListGroupsRequest, err error)

just for test

func NewListGroupsRequest

func NewListGroupsRequest(clientID string) *ListGroupsRequest

func (*ListGroupsRequest) Encode

func (r *ListGroupsRequest) Encode(version uint16) (payload []byte)

func (*ListGroupsRequest) SetStatesFilter

func (r *ListGroupsRequest) SetStatesFilter(statesFilter []string)

func (*ListGroupsRequest) SetTypesFilter

func (r *ListGroupsRequest) SetTypesFilter(typesFilter []string)

type ListGroupsResponse

type ListGroupsResponse struct {
	ResponseHeader
	ThrottleTimeMS int32 `healer:"minVersion:1"`
	ErrorCode      uint16
	Groups         []*Group
	TaggedFields   TaggedFields
}

func NewListGroupsResponse

func NewListGroupsResponse(payload []byte, version uint16) (r *ListGroupsResponse, err error)

func (*ListGroupsResponse) Encode

func (r *ListGroupsResponse) Encode(version uint16) (payload []byte)

just for test

func (*ListGroupsResponse) Error

func (r *ListGroupsResponse) Error() error

type ListPartitionReassignmentsRequest

type ListPartitionReassignmentsRequest struct {
	*RequestHeader
	TimeoutMS int32
	Topics    []struct {
		Name         string
		Partitions   []int32
		TaggedFields TaggedFields
	}
	TaggedFields TaggedFields
}

ListPartitionReassignmentsRequest is a request to kafka to list partition reassignments

func NewListPartitionReassignmentsRequest

func NewListPartitionReassignmentsRequest(clientID string, timeoutMS int32) ListPartitionReassignmentsRequest

NewListPartitionReassignmentsRequest creates a new ListPartitionReassignmentsRequest

func (*ListPartitionReassignmentsRequest) AddTP

func (r *ListPartitionReassignmentsRequest) AddTP(topicName string, pid int32)

AddTP adds a topic/partition to the request

func (ListPartitionReassignmentsRequest) Encode

func (r ListPartitionReassignmentsRequest) Encode(version uint16) []byte

Encode encodes a ListPartitionReassignmentsRequest into a byte array.

type ListPartitionReassignmentsResponse

type ListPartitionReassignmentsResponse struct {
	CorrelationID  uint32                                 `json:"correlation_id"`
	ThrottleTimeMS int32                                  `json:"throttle_time_ms"`
	ErrorCode      int16                                  `json:"error_code"`
	ErrorMessage   *string                                `json:"error_message"`
	Topics         []listPartitionReassignmentsTopicBlock `json:"topics"`
}

ListPartitionReassignmentsResponse is a response from kafka to list partition reassignments

func NewListPartitionReassignmentsResponse

func NewListPartitionReassignmentsResponse(payload []byte, version uint16) (r *ListPartitionReassignmentsResponse, err error)

NewListPartitionReassignmentsResponse decode byte array to ListPartitionReassignmentsResponse instance

func (*ListPartitionReassignmentsResponse) Error

FIXME: add error message too

type Member

type Member struct {
	MemberID       string
	MemberMetadata []byte
}

version 0

type MemberAssignment

type MemberAssignment struct {
	Version              int16
	PartitionAssignments []*PartitionAssignment
	UserData             []byte
}

MemberAssignment will be encoded to []byte that used as memeber of GroupAssignment in Sync Request. and sync and group response returns []byte that can be decoded to MemberAssignment

func NewMemberAssignment

func NewMemberAssignment(payload []byte) (*MemberAssignment, error)

func (*MemberAssignment) Encode

func (memberAssignment *MemberAssignment) Encode() []byte

func (*MemberAssignment) Length

func (memberAssignment *MemberAssignment) Length() int

type MemberDetail

type MemberDetail struct {
	MemberID            string
	ClientID            string
	ClientHost          string
	MemberMetadata      []byte
	RawMemberAssignment []byte `json:"-"`
	MemberAssignment    *MemberAssignment
}

type Message

type Message struct {
	Offset      int64
	MessageSize int32

	//Message
	Crc        uint32
	MagicByte  int8
	Attributes int8
	Timestamp  uint64
	Key        []byte
	Value      []byte

	// only for version 2
	Headers []RecordHeader
}

Message is a message in a topic

type MessageSet

type MessageSet []*Message

MessageSet is a batch of messages

func DecodeToMessageSet

func DecodeToMessageSet(payload []byte) (MessageSet, error)

DecodeToMessageSet decodes a MessageSet from a byte array. MessageSet is [offset message_size message], but it only decode one message in healer generally, loops inside decodeMessageSetMagic0or1. if message.Value is compressed, it will uncompress the value and returns an array of messages.

func (*MessageSet) Encode

func (messageSet *MessageSet) Encode(payload []byte, offset int) int

func (*MessageSet) Length

func (messageSet *MessageSet) Length() int

type MetaInfo

type MetaInfo struct {
	// contains filtered or unexported fields
}

type MetadataRequest

type MetadataRequest struct {
	*RequestHeader
	Topics                 []string
	AllowAutoTopicCreation bool
}

func DecodeMetadataRequest

func DecodeMetadataRequest(payload []byte) (r MetadataRequest, err error)

just for test

func NewMetadataRequest

func NewMetadataRequest(clientID string, topics []string) *MetadataRequest

func (*MetadataRequest) Encode

func (metadataRequest *MetadataRequest) Encode(version uint16) []byte

type MetadataResponse

type MetadataResponse struct {
	CorrelationID  uint32
	ThrottleTimeMs int32
	Brokers        []*BrokerInfo
	ClusterID      string
	ControllerID   int32
	TopicMetadatas []TopicMetadata
}

MetadataResponse holds all the fields of metadata response, including the brokers and topics

func NewMetadataResponse

func NewMetadataResponse(payload []byte, version uint16) (r MetadataResponse, err error)

NewMetadataResponse decodes a byte slice into a MetadataResponse object.

func (MetadataResponse) Error

func (r MetadataResponse) Error() error

Error returns the error from the error code

type MockConn

type MockConn struct {
	MockRead             func(p []byte) (n int, err error)
	MockWrite            func(p []byte) (n int, err error)
	MockClose            func() error
	MockLocalAddr        func() net.Addr
	MockRemoteAddr       func() net.Addr
	MockSetDeadline      func(t time.Time) error
	MockSetReadDeadline  func(t time.Time) error
	MockSetWriteDeadline func(t time.Time) error
}

MockConn is a mock struct for the Conn type

func (*MockConn) Close

func (_m *MockConn) Close() error

func (*MockConn) LocalAddr

func (_m *MockConn) LocalAddr() net.Addr

func (*MockConn) Read

func (_m *MockConn) Read(p []byte) (n int, err error)

func (*MockConn) RemoteAddr

func (_m *MockConn) RemoteAddr() net.Addr

func (*MockConn) SetDeadline

func (_m *MockConn) SetDeadline(t time.Time) error

func (*MockConn) SetReadDeadline

func (_m *MockConn) SetReadDeadline(t time.Time) error

func (*MockConn) SetWriteDeadline

func (_m *MockConn) SetWriteDeadline(t time.Time) error

func (*MockConn) Write

func (_m *MockConn) Write(p []byte) (n int, err error)

type NetConfig

type NetConfig struct {
	ConnectTimeoutMS    int   `json:"connect.timeout.ms,string" mapstructure:"connect.timeout.ms"`
	TimeoutMS           int   `json:"timeout.ms,string" mapstructure:"timeout.ms"`
	TimeoutMSForEachAPI []int `json:"timeout.ms.for.eachapi" mapstructure:"timeout.ms.for.eachapi"`
	KeepAliveMS         int   `json:"keepalive.ms,string" mapstructure:"keepalive.ms"`
}

type NoneCompressor

type NoneCompressor struct {
}

func (*NoneCompressor) Compress

func (c *NoneCompressor) Compress(value []byte) ([]byte, error)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	*RequestHeader
	GroupID       string
	GenerationID  int32
	MemberID      string
	RetentionTime int64
	Topics        []*OffsetCommitRequestTopic
}

func NewOffsetCommitRequest

func NewOffsetCommitRequest(apiVersion uint16, clientID, groupID string) *OffsetCommitRequest

request only ONE topic

func (*OffsetCommitRequest) AddPartiton

func (r *OffsetCommitRequest) AddPartiton(topic string, partitionID int32, offset int64, metadata string)

func (*OffsetCommitRequest) Encode

func (r *OffsetCommitRequest) Encode(version uint16) []byte

func (*OffsetCommitRequest) Length

func (r *OffsetCommitRequest) Length() int

func (*OffsetCommitRequest) SetGenerationID

func (r *OffsetCommitRequest) SetGenerationID(generationID int32)

func (*OffsetCommitRequest) SetMemberID

func (r *OffsetCommitRequest) SetMemberID(memberID string)

func (*OffsetCommitRequest) SetRetentionTime

func (r *OffsetCommitRequest) SetRetentionTime(retentionTime int64)

type OffsetCommitRequestPartition

type OffsetCommitRequestPartition struct {
	PartitionID int32
	Offset      int64
	Metadata    string
}

type OffsetCommitRequestTopic

type OffsetCommitRequestTopic struct {
	Topic      string
	Partitions []*OffsetCommitRequestPartition
}

type OffsetCommitResponse

type OffsetCommitResponse struct {
	CorrelationID uint32
	Topics        []*OffsetCommitResponseTopic
}

func NewOffsetCommitResponse

func NewOffsetCommitResponse(payload []byte) (r OffsetCommitResponse, err error)

func (OffsetCommitResponse) Error

func (r OffsetCommitResponse) Error() error

type OffsetCommitResponsePartition

type OffsetCommitResponsePartition struct {
	PartitionID uint32
	ErrorCode   int16
}

type OffsetCommitResponseTopic

type OffsetCommitResponseTopic struct {
	Topic      string
	Partitions []*OffsetCommitResponsePartition
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	*RequestHeader
	GroupID string
	Topics  []*OffsetFetchRequestTopic
}

func NewOffsetFetchRequest

func NewOffsetFetchRequest(apiVersion uint16, clientID, groupID string) *OffsetFetchRequest

request only ONE topic

func (*OffsetFetchRequest) AddPartiton

func (r *OffsetFetchRequest) AddPartiton(topic string, partitionID int32)

func (*OffsetFetchRequest) Encode

func (r *OffsetFetchRequest) Encode(version uint16) []byte

func (*OffsetFetchRequest) Length

func (r *OffsetFetchRequest) Length() int

type OffsetFetchRequestTopic

type OffsetFetchRequestTopic struct {
	Topic      string
	Partitions []int32
}

type OffsetFetchResponse

type OffsetFetchResponse struct {
	CorrelationID uint32
	Topics        []*OffsetFetchResponseTopic
}

func NewOffsetFetchResponse

func NewOffsetFetchResponse(payload []byte) (r OffsetFetchResponse, err error)

NewOffsetFetchResponse decodes the response byte array to a OffsetFetchResponse struct

func (OffsetFetchResponse) Error

func (r OffsetFetchResponse) Error() error

type OffsetFetchResponsePartition

type OffsetFetchResponsePartition struct {
	PartitionID int32
	Offset      int64
	Metadata    string
	ErrorCode   int16
}

type OffsetFetchResponseTopic

type OffsetFetchResponseTopic struct {
	Topic      string
	Partitions []*OffsetFetchResponsePartition
}

type OffsetsRequest

type OffsetsRequest struct {
	*RequestHeader
	ReplicaId   int32
	RequestInfo map[string]map[int32]*PartitionOffsetRequestInfo
}

func NewOffsetsRequest

func NewOffsetsRequest(topic string, partitionIDs []int32, timeValue int64, offsets uint32, clientID string) *OffsetsRequest

request only ONE topic

func (*OffsetsRequest) Encode

func (offsetR *OffsetsRequest) Encode(version uint16) []byte

type OffsetsResponse

type OffsetsResponse struct {
	CorrelationID         uint32
	ThrottleTimeMs        int32
	TopicPartitionOffsets map[string][]PartitionOffset
}

func NewOffsetsResponse

func NewOffsetsResponse(payload []byte, version uint16) (r OffsetsResponse, err error)

func (OffsetsResponse) Error

func (r OffsetsResponse) Error() error

type PartitionAssignment

type PartitionAssignment struct {
	Topic      string
	Partitions []int32
}

type PartitionBlock

type PartitionBlock struct {
	Partition          int32
	CurrentLeaderEpoch int32
	FetchOffset        int64
	LogStartOffset     int64
	MaxBytes           int32
}

PartitionBlock is the partition to fetch.

type PartitionMetadataInfo

type PartitionMetadataInfo struct {
	PartitionErrorCode int16
	PartitionID        int32
	Leader             int32
	LeaderEpoch        int32
	Replicas           []int32
	Isr                []int32
	OfflineReplicas    []int32
}

PartitionMetadataInfo holds all the fields of partition metadata info, which is used in metadata response

type PartitionOffset

type PartitionOffset struct {
	Partition       int32
	ErrorCode       int16
	OldStyleOffsets []int64
	Timestamp       int64
	Offset          int64
}

func (*PartitionOffset) GetOffset

func (p *PartitionOffset) GetOffset() int64

get the offset of the given partition from OldStyleOffsets or Offset

type PartitionOffsetRequestInfo

type PartitionOffsetRequestInfo struct {
	Time               int64
	MaxNumberOfOffsets uint32
}

type PartitionResponse

type PartitionResponse struct {
	PartitionID         int32
	ErrorCode           int16
	HighWatermark       int64
	LastStableOffset    int64
	LogStartOffset      int64
	AbortedTransactions []struct {
		ProducerID  int64
		FirstOffset int64
	}
	RecordBatchLength int32
	RecordBatch       RecordBatch
}

type PartitionResult

type PartitionResult struct {
	PartitionID  int32  `json:"partition_id"`
	ErrorCode    int16  `json:"error_code"`
	ErrorMessage string `json:"error_message"`
}

type PlainSasl

type PlainSasl struct {
	// contains filtered or unexported fields
}

func NewPlainSasl

func NewPlainSasl(user, password string) *PlainSasl

func (*PlainSasl) Encode

func (p *PlainSasl) Encode() []byte

type ProduceRequest

type ProduceRequest struct {
	*RequestHeader
	RequiredAcks int16
	Timeout      int32
	TopicBlocks  []struct {
		TopicName      string
		PartitonBlocks []struct {
			Partition      int32
			MessageSetSize int32
			MessageSet     MessageSet
		}
	}
}

func (*ProduceRequest) Encode

func (produceRequest *ProduceRequest) Encode(version uint16) []byte

func (*ProduceRequest) Length

func (produceRequest *ProduceRequest) Length() int

type ProduceResponse

type ProduceResponse struct {
	CorrelationID    uint32
	ProduceResponses []ProduceResponsePiece
}

func NewProduceResponse

func NewProduceResponse(payload []byte) (r ProduceResponse, err error)

func (ProduceResponse) Error

func (r ProduceResponse) Error() error

type ProduceResponsePiece

type ProduceResponsePiece struct {
	Topic      string
	Partitions []ProduceResponse_PartitionResponse
}

type ProduceResponse_PartitionResponse

type ProduceResponse_PartitionResponse struct {
	PartitionID int32
	ErrorCode   int16
	BaseOffset  int64
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(topic string, config interface{}) (*Producer, error)

NewProducer creates a new console producer. config can be a map[string]interface{} or a ProducerConfig, use DefaultProducerConfig if config is nil

func (*Producer) AddMessage

func (p *Producer) AddMessage(key []byte, value []byte) error

AddMessage add message to the producer, if key is nil, use current simple producer, else use the simple producer of the partition of the key if the simple producer of the partition of the key not exist, create a new one if the simple producer closed, retry 3 times

func (*Producer) Close

func (p *Producer) Close()

Close close all simple producers in the console producer

type ProducerConfig

type ProducerConfig struct {
	Net                      NetConfig  `json:"net" mapstructure:"net"`
	Sasl                     SaslConfig `json:"sasl" mapstructure:"sasl"`
	BootstrapServers         string     `json:"bootstrap.servers" mapstructure:"bootstrap.servers"`
	ClientID                 string     `json:"client.id" mapstructure:"client.id"`
	Acks                     int16      `json:"acks,string" mapstructure:"acks"`
	CompressionType          string     `json:"compress.type" mapstructure:"compress.type"`
	BatchSize                int        `json:"batch.size,string" mapstructure:"batch.size"`
	MessageMaxCount          int        `json:"message.max.count,string" mapstructure:"message.max.count"`
	FlushIntervalMS          int        `json:"flush.interval.ms,string" mapstructure:"flush.interval.ms,string"`
	MetadataMaxAgeMS         int        `json:"metadata.max.age.ms,string" mapstructure:"metadata.max.age.ms"`
	FetchTopicMetaDataRetrys int        `json:"fetch.topic.metadata.retrys,string" mapstructure:"fetch.topic.metadata.retrys"`
	ConnectionsMaxIdleMS     int        `json:"connections.max.idle.ms,string" mapstructure:"connections.max.idle.ms"`
	RetryBackOffMS           int        `json:"retry.backoff.ms,string" mapstructure:"retry.backoff.ms"`

	MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"`

	TLSEnabled bool       `json:"tls.enabled,string" mapstructure:"tls.enabled"`
	TLS        *TLSConfig `json:"tls" mapstructure:"tls"`

	// TODO
	Retries          int   `json:"retries,string" mapstructure:"retries"`
	RequestTimeoutMS int32 `json:"request.timeout.ms,string" mapstructure:"request.timeout.ms"`

	// producer.AddMessage will use this config to assemble Message
	// only 0 and 1 is implemented for now
	HealerMagicByte int `json:"healer.magicbyte,string" mapstructure:"healer.magicbyte"`
}

ProducerConfig is the config for producer

func DefaultProducerConfig

func DefaultProducerConfig() ProducerConfig

DefaultProducerConfig returns a default ProducerConfig

type ProtocolMetadata

type ProtocolMetadata struct {
	Version      uint16
	Subscription []string
	UserData     []byte
}

ProtocolMetadata is used in join request/response

func NewProtocolMetadata

func NewProtocolMetadata(payload []byte) *ProtocolMetadata

func (*ProtocolMetadata) Encode

func (m *ProtocolMetadata) Encode() []byte

func (*ProtocolMetadata) Length

func (m *ProtocolMetadata) Length() int

type ReadParser

type ReadParser interface {
	Read() ([]byte, error)
	Parse(data []byte) (Response, error)
	ReadAndParse() (Response, error)
}

ReadParser read data from a connection of broker and parse the response

type Record

type Record struct {
	Headers []RecordHeader
	// contains filtered or unexported fields
}

Record is element of Records

func DecodeToRecord

func DecodeToRecord(payload []byte) (record Record, offset int, err error)

DecodeToRecord decodes the struct Record from the given payload.

func (*Record) Encode

func (r *Record) Encode(version uint16) (payload []byte, err error)

Encode encodes a record to a byte slice just for test

type RecordBatch

type RecordBatch struct {
	BaseOffset           int64
	BatchLength          int32
	PartitionLeaderEpoch int32
	Magic                int8
	CRC                  uint32
	Attributes           int16
	LastOffsetDelta      int32
	BaseTimestamp        int64
	MaxTimestamp         int64
	ProducerID           int64
	ProducerEpoch        int16
	BaseSequence         int32
	Records              []Record
}

func (*RecordBatch) Encode

func (r *RecordBatch) Encode(version uint16) (payload []byte, err error)

type RecordHeader

type RecordHeader struct {
	Key string

	Value []byte
	// contains filtered or unexported fields
}

RecordHeader is concluded in Record

type ReplicaAssignment

type ReplicaAssignment struct {
	Partition int32
	Replicas  []int32
}

ReplicaAssignment is sub struct in CreateTopicRequest

type ReplicaElectionResult

type ReplicaElectionResult struct {
	Topic            string             `json:"topic"`
	PartitionResults []*PartitionResult `json:"partition_result"`
}

type Request

type Request interface {
	Encode(version uint16) []byte
	API() uint16
	SetCorrelationID(uint32)
	SetVersion(uint16)
}

Request is implemented by all detailed request

func NewApiVersionsRequest

func NewApiVersionsRequest(clientID string) Request

type RequestHeader

type RequestHeader struct {
	APIKey        uint16
	APIVersion    uint16
	CorrelationID uint32
	ClientID      *string
	TaggedFields  TaggedFields
	// contains filtered or unexported fields
}

RequestHeader is the request header, which is used in all requests. It contains apiKey, apiVersion, correlationID, clientID

func DecodeRequestHeader

func DecodeRequestHeader(payload []byte) (h RequestHeader, offset int)

DecodeRequestHeader decodes request header from []byte, just used in test cases

func (*RequestHeader) API

func (requestHeader *RequestHeader) API() uint16

API returns APiKey of the request(which hold the request header)

func (*RequestHeader) EncodeTo

func (h *RequestHeader) EncodeTo(payload []byte) int

EncodeTo encodes request header to []byte. this is used the all request If the playload is too small, EncodeTo will panic. https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields https://kafka.apache.org/protocol#protocol_messages

func (*RequestHeader) IsFlexible

func (h *RequestHeader) IsFlexible() bool

func (*RequestHeader) SetCorrelationID

func (requestHeader *RequestHeader) SetCorrelationID(c uint32)

SetCorrelationID set request's correlationID

func (*RequestHeader) SetVersion

func (requestHeader *RequestHeader) SetVersion(version uint16)

SetVersion set request's apiversion

func (*RequestHeader) Version

func (requestHeader *RequestHeader) Version() uint16

Version returns API version of the request

type Response

type Response interface {
	Error() error
}

Response is the interface of all response. Error() returns the error abstracted from the error code of the response

type ResponseHeader

type ResponseHeader struct {
	CorrelationID uint32
	TaggedFields  TaggedFields // version +1
	// contains filtered or unexported fields
}

func DecodeResponseHeader

func DecodeResponseHeader(payload []byte, apiKey uint16, apiVersion uint16) (header ResponseHeader, offset int)

func NewResponseHeader

func NewResponseHeader(apiKey, apiVersion uint16) ResponseHeader

func (*ResponseHeader) Encode

func (h *ResponseHeader) Encode() []byte

func (*ResponseHeader) EncodeTo

func (h *ResponseHeader) EncodeTo(payload []byte) (offset int)

func (*ResponseHeader) IsFlexible

func (h *ResponseHeader) IsFlexible() bool

type SaslAuth

type SaslAuth interface {
	Encode() []byte
}

type SaslAuthenticateRequest

type SaslAuthenticateRequest struct {
	*RequestHeader
	SaslAuthBytes []byte
}

version0

func NewSaslAuthenticateRequest

func NewSaslAuthenticateRequest(clientID string, user, password, typ string) (r SaslAuthenticateRequest)

func (SaslAuthenticateRequest) Encode

func (r SaslAuthenticateRequest) Encode(version uint16) []byte

Encode encodes SaslAuthenticateRequest to []byte

func (*SaslAuthenticateRequest) Length

func (r *SaslAuthenticateRequest) Length() int

type SaslAuthenticateResponse

type SaslAuthenticateResponse struct {
	CorrelationID uint32
	ErrorCode     int16
	ErrorMessage  string
	SaslAuthBytes []byte
}

SaslAuthenticateResponse is the response of saslauthenticate request

func NewSaslAuthenticateResponse

func NewSaslAuthenticateResponse(payload []byte) (r SaslAuthenticateResponse, err error)

NewSaslAuthenticateResponse create a NewSaslAuthenticateResponse instance from response payload bytes

func (SaslAuthenticateResponse) Error

func (r SaslAuthenticateResponse) Error() error

type SaslConfig

type SaslConfig struct {
	Mechanism string `json:"mechanism" mapstructure:"mechanism"`
	User      string `json:"user" mapstructure:"user"`
	Password  string `json:"password" mapstructure:"password"`
}

type SaslHandShakeRequest

type SaslHandShakeRequest struct {
	*RequestHeader
	Mechanism string
}

version0

func NewSaslHandShakeRequest

func NewSaslHandShakeRequest(clientID string, mechanism string) *SaslHandShakeRequest

func (*SaslHandShakeRequest) Encode

func (r *SaslHandShakeRequest) Encode(version uint16) []byte

func (*SaslHandShakeRequest) Length

func (r *SaslHandShakeRequest) Length() int

type SaslHandshakeResponse

type SaslHandshakeResponse struct {
	CorrelationID     uint32
	ErrorCode         int16
	EnabledMechanisms []string
}

SaslHandshakeResponse is the response of saslhandshake request

func NewSaslHandshakeResponse

func NewSaslHandshakeResponse(payload []byte) (r SaslHandshakeResponse, err error)

NewSaslHandshakeResponse create a NewSaslHandshakeResponse instance from response payload bytes

func (SaslHandshakeResponse) Error

func (r SaslHandshakeResponse) Error() error

type SimpleConsumer

type SimpleConsumer struct {
	// contains filtered or unexported fields
}

SimpleConsumer instance is built to consume messages from kafka broker TODO make messages have direction

func NewSimpleConsumer

func NewSimpleConsumer(topic string, partitionID int32, config interface{}) (*SimpleConsumer, error)

NewSimpleConsumer create a simple consumer

func NewSimpleConsumerWithBrokers

func NewSimpleConsumerWithBrokers(topic string, partitionID int32, config ConsumerConfig, brokers *Brokers) *SimpleConsumer

NewSimpleConsumerWithBrokers create a simple consumer with existing brokers

func (*SimpleConsumer) CommitOffset

func (c *SimpleConsumer) CommitOffset()

CommitOffset commit offset to coordinator if simpleConsumer belong to a GroupConsumer, it uses groupconsumer to commit else if it has GroupId, it use its own coordinator to commit

func (*SimpleConsumer) Consume

func (c *SimpleConsumer) Consume(offset int64, messageChan chan *FullMessage) (<-chan *FullMessage, error)

Consume begins to fetch messages. It create and return a new channel if you pass nil, or it returns the channel you passed.

func (*SimpleConsumer) Stop

func (c *SimpleConsumer) Stop()

Stop the consumer and wait for all relating go-routines to exit

func (*SimpleConsumer) String

func (c *SimpleConsumer) String() string

type SimpleProducer

type SimpleProducer struct {
	// contains filtered or unexported fields
}

SimpleProducer is a simple producer that send message to certain one topic-partition

func NewSimpleProducer

func NewSimpleProducer(ctx context.Context, topic string, partition int32, config interface{}) (*SimpleProducer, error)

NewSimpleProducer creates a new simple producer config can be a map[string]interface{} or a ProducerConfig, use DefaultProducerConfig if config is nil

func (*SimpleProducer) AddMessage

func (p *SimpleProducer) AddMessage(key []byte, value []byte) error

AddMessage add message to message set. If message set is full, send it to kafka synchronously

func (*SimpleProducer) Close

func (p *SimpleProducer) Close()

Close closes the producer

func (*SimpleProducer) Flush

func (p *SimpleProducer) Flush() error

Flush send all messages to kafka

type SnappyCompressor

type SnappyCompressor struct {
}

func (*SnappyCompressor) Compress

func (c *SnappyCompressor) Compress(value []byte) ([]byte, error)

type SyncGroupRequest

type SyncGroupRequest struct {
	*RequestHeader
	GroupID         string
	GenerationID    int32
	MemberID        string
	GroupAssignment GroupAssignment
}

func NewSyncGroupRequest

func NewSyncGroupRequest(clientID, groupID string, generationID int32, memberID string, groupAssignment GroupAssignment) *SyncGroupRequest

func (*SyncGroupRequest) Encode

func (r *SyncGroupRequest) Encode(version uint16) []byte

Encode encodes SyncGroupRequest to []byte

func (*SyncGroupRequest) Length

func (r *SyncGroupRequest) Length() int

type SyncGroupResponse

type SyncGroupResponse struct {
	CorrelationID    uint32 `json:"correlation_id"`
	ErrorCode        int16  `json:"error_code"`
	MemberAssignment []byte `json:"member_assignment"`
}

SyncGroupResponse is the response of syncgroup request

func NewSyncGroupResponse

func NewSyncGroupResponse(payload []byte) (r SyncGroupResponse, err error)

NewSyncGroupResponse create a NewSyncGroupResponse instance from response payload bytes

func (SyncGroupResponse) Error

func (r SyncGroupResponse) Error() error

type TLSConfig

type TLSConfig struct {
	Cert               string `json:"cert" mapstructure:"cert"`
	Key                string `json:"key" mapstructure:"key"`
	CA                 string `json:"ca" mapstructure:"ca"`
	InsecureSkipVerify bool   `json:"insecure.skip.verify,string" mapstructure:"insecure.skip.verify"`
	ServerName         string `json:"servername" mapstructure:"servername"`
}

type TaggedField

type TaggedField struct {
	Tag  int
	Data []byte
}

type TaggedFields

type TaggedFields []TaggedField

func DecodeTaggedFields

func DecodeTaggedFields(payload []byte) (r TaggedFields, length int)

func (TaggedFields) Encode

func (r TaggedFields) Encode() []byte

func (TaggedFields) EncodeTo

func (r TaggedFields) EncodeTo(payload []byte) (offset int)

type TopicError

type TopicError struct {
	Topic     string
	ErrorCode int16
}

TopicError is sub struct in CreateTopicsResponse

type TopicMetadata

type TopicMetadata struct {
	TopicErrorCode     int16
	TopicName          string
	IsInternal         bool
	PartitionMetadatas []*PartitionMetadataInfo
}

TopicMetadata holds all the fields of topic metadata, which is used in metadata response

type TopicPartition

type TopicPartition struct {
	Topic      string  `json:"topic"`
	Partitions []int32 `json:"partitions"`
}

Source Files

Directories

Path Synopsis
command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL