kafka

package
v0.5.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package kafka implements the Kafka service handling requests and responses.

Package kafka implements the Kafka service handling requests and responses.

Package kafka implements the Kafka service handling requests and responses.

Package kafka implements the Kafka service handling requests and responses.

Package kafka implements the Kafka service handling requests and responses.

Index

Constants

View Source
const (
	SetConfigOperation    int8 = 0
	DeleteConfigOperation int8 = 1
)

Config operation types.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigOperation

type ConfigOperation struct {
	Name  string
	Value *string
	Op    int8 // 0: SET, 1: DELETE, 2: APPEND, 3: SUBTRACT.
}

ConfigOperation represents an alter config operation.

type ConfigOperations

type ConfigOperations []ConfigOperation

ConfigOperations represents a slice of ConfigOperation.

func (ConfigOperations) Contains

func (c ConfigOperations) Contains(name string) bool

Contains determines if the specified config key name exists.

func (ConfigOperations) ContainsOp

func (c ConfigOperations) ContainsOp(operation int8) bool

ContainsOp determines if the specified operation type exists.

type Metadata

type Metadata struct {
	ClusterID string
	Brokers   meta.Brokers
	Topics    []TopicMetadata
}

Metadata represents cluster metadata.

type ResourceACLs added in v0.1.0

type ResourceACLs struct {
	ResourceName        string
	ResourceType        string
	ResourcePatternType string
	ACLs                def.ACLEntryGroups
}

ResourceACLs represents ACLs for a named resource.

type ResourceConfigs

type ResourceConfigs struct {
	ResourceName string
	Configs      def.Configs
}

ResourceConfigs represents configs for a named resource.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service represents a Kafka service.

func NewService

func NewService(
	cl *client.Client,
) *Service

NewService creates a new Kafka service.

func (*Service) AlterAllBrokerConfigs

func (s *Service) AlterAllBrokerConfigs(
	ctx context.Context,
	configOps ConfigOperations,
	validateOnly bool,
) error

AlterAllBrokerConfigs executes a request to alter cluster-wide broker configs (Kafka 0.11.0+/2.3.0+).

func (*Service) AlterBrokerConfigs

func (s *Service) AlterBrokerConfigs(
	ctx context.Context,
	brokerID string,
	configOps ConfigOperations,
	validateOnly bool,
) error

AlterBrokerConfigs executes a request to alter broker configs (Kafka 0.11.0+/2.3.0+).

func (*Service) AlterPartitionAssignments

func (s *Service) AlterPartitionAssignments(
	ctx context.Context,
	topic string,
	assignments def.PartitionAssignments,
) error

AlterPartitionAssignments executes a request to alter partition assignments (Kafka 2.4.0+).

func (*Service) AlterTopicConfigs

func (s *Service) AlterTopicConfigs(
	ctx context.Context,
	topic string,
	configOps ConfigOperations,
	validateOnly bool,
) error

AlterTopicConfigs executes a request to alter topic configs (Kafka 0.11.0+/2.3.0+).

func (*Service) CreateACLs added in v0.1.0

func (s *Service) CreateACLs(
	ctx context.Context,
	name string,
	resourceType string,
	resourcePatternType string,
	acls def.ACLEntryGroups,
) error

CreateACLs executes a request to create ACLs (Kafka 0.11.0+).

func (*Service) CreatePartitions

func (s *Service) CreatePartitions(
	ctx context.Context,
	topic string,
	partitions int,
	assignments def.PartitionAssignments,
	validateOnly bool,
) error

CreatePartitions executes a request to create partitions (Kafka 0.10.0+).

func (*Service) CreateTopic

func (s *Service) CreateTopic(
	ctx context.Context,
	topicDef def.TopicDefinition,
	assignments def.PartitionAssignments,
	validateOnly bool,
) error

CreateTopic executes a request to create a topic (Kafka 0.10.1+).

func (*Service) DeleteACLs added in v0.1.0

func (s *Service) DeleteACLs(
	ctx context.Context,
	name string,
	resourceType string,
	resourcePatternType string,
	acls def.ACLEntryGroups,
) error

DeleteACLs executes a request to delete acls (Kafka 0.11.0+).

func (*Service) DescribeAllBrokerConfigs

func (s *Service) DescribeAllBrokerConfigs(ctx context.Context) (def.Configs, error)

DescribeAllBrokerConfigs executes a request to describe all broker configs (Kafka 0.11.0+).

func (*Service) DescribeAllResourceACLs added in v0.1.0

func (s *Service) DescribeAllResourceACLs(
	ctx context.Context,
	resourceType string,
) ([]ResourceACLs, error)

DescribeAllResourceACLs executes a request to describe ACLs for all resources (Kafka 0.11.0+).

func (*Service) DescribeBrokerConfigs

func (s *Service) DescribeBrokerConfigs(ctx context.Context, brokerID string) (def.Configs, error)

DescribeBrokerConfigs executes a request to describe broker configs (Kafka 0.11.0+).

func (*Service) DescribeMetadata

func (s *Service) DescribeMetadata(
	ctx context.Context,
	topics []string,
	errorOnNonExistence bool,
) (*Metadata, error)

DescribeMetadata executes a request for metadata (Kafka 0.8.0+).

func (*Service) DescribeResourceACLs added in v0.1.0

func (s *Service) DescribeResourceACLs(
	ctx context.Context,
	name string,
	resourceType string,
	resourcePatternType string,
) (def.ACLEntryGroups, error)

DescribeResourceACLs executes a request to describe ACLs of a specific resource (Kafka 0.11.0+).

func (*Service) DescribeTopicConfigs

func (s *Service) DescribeTopicConfigs(ctx context.Context, topics []string) ([]ResourceConfigs, error)

DescribeTopicConfigs executes a request to describe topic configs (Kafka 0.11.0+).

func (*Service) ElectLeaders added in v0.2.0

func (s *Service) ElectLeaders(
	ctx context.Context,
	topic string,
	partitions []int32,
) error

ElectLeaders executes a request to elect preferred partition leaders (Kafka 2.4.0+).

func (*Service) IsKafkaReady

func (s *Service) IsKafkaReady(ctx context.Context, minBrokers int, timeoutSec int) bool

IsKafkaReady executes describe cluster requests until a minimum number of brokers are alive (Kafka 2.8.0+).

func (*Service) ListPartitionReassignments

func (s *Service) ListPartitionReassignments(
	ctx context.Context,
	topic string,
	partitions []int32,
) (meta.PartitionReassignments, error)

ListPartitionReassignments executes a request to list partition reassignments (Kafka 2.4.0+).

func (*Service) NewConfigOps

func (s *Service) NewConfigOps(
	ctx context.Context,
	localConfigs def.ConfigsMap,
	remoteConfigsMap def.ConfigsMap,
	remoteConfigs def.Configs,
	deleteUndefinedConfigs bool,
) (ConfigOperations, error)

NewConfigOps creates alter configs operations.

func (*Service) TryRequestTopic

TryRequestTopic executes a request for the metadata of a topic that may or may not exist (Kafka 0.11.0+).

type TopicMetadata

type TopicMetadata struct {
	Topic                string
	PartitionAssignments def.PartitionAssignments
	PartitionRacks       def.PartitionRacks
	PartitionLeaders     def.PartitionLeaders
	PartitionISR         def.PartitionAssignments
	Exists               bool
}

TopicMetadata represents topic metadata.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL