admin

package
v0.0.5-beta-2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2020 License: MIT Imports: 7 Imported by: 4

Documentation

Overview

Package admin provides an interface for kafka administrative operations

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewKafkaAdmin

func NewKafkaAdmin(bootstrapServer []string, options ...KafkaAdminOption) *kafkaAdmin

Types

type KafkaAdmin

type KafkaAdmin interface {
	FetchInfo(topics []string) (map[string]*Topic, error)
	CreateTopics(topics map[string]*Topic) error
	DeleteTopics(topics []string) (map[string]error, error)
	Close()
}

KafkaAdmin

type KafkaAdminOption

type KafkaAdminOption func(*kafkaAdminOptions)

func WithKafkaVersion

func WithKafkaVersion(version sarama.KafkaVersion) KafkaAdminOption

func WithLogger

func WithLogger(logger log.Logger) KafkaAdminOption

type MockKafkaAdmin

type MockKafkaAdmin struct {
	Topics *Topics
}

func NewMockAdminWithTopics

func NewMockAdminWithTopics(tps map[string]*Topic) *MockKafkaAdmin

func (*MockKafkaAdmin) Close

func (m *MockKafkaAdmin) Close()

func (*MockKafkaAdmin) CreateTopics

func (m *MockKafkaAdmin) CreateTopics(topics map[string]*Topic) error

func (*MockKafkaAdmin) DeleteTopics

func (m *MockKafkaAdmin) DeleteTopics(topics []string) (map[string]error, error)

func (*MockKafkaAdmin) FetchInfo

func (m *MockKafkaAdmin) FetchInfo(topics []string) (map[string]*Topic, error)

type MockPartition

type MockPartition struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

func (*MockPartition) Append

func (p *MockPartition) Append(r *data.Record) error

func (*MockPartition) Fetch

func (p *MockPartition) Fetch(start int64, limit int) (records []*data.Record, err error)

func (*MockPartition) FetchAll

func (p *MockPartition) FetchAll() (records []*data.Record)

func (*MockPartition) Latest

func (p *MockPartition) Latest() int64

type MockTopic

type MockTopic struct {
	Name string

	Meta *Topic
	// contains filtered or unexported fields
}

func (*MockTopic) AddPartition

func (tp *MockTopic) AddPartition(id int) error

func (*MockTopic) FetchAll

func (tp *MockTopic) FetchAll() (records []*data.Record)

func (*MockTopic) Partition

func (tp *MockTopic) Partition(id int) (*MockPartition, error)

func (*MockTopic) Partitions

func (tp *MockTopic) Partitions() []*MockPartition

type Partition

type Partition struct {
	Id    int32
	Error error
}

type Topic

type Topic struct {
	Name              string
	Partitions        []Partition
	Error             error
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	ConfigEntries     map[string]string
}

type Topics

type Topics struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

func NewMockTopics

func NewMockTopics() *Topics

func (*Topics) AddTopic

func (td *Topics) AddTopic(topic *MockTopic) error

func (*Topics) RemoveTopic

func (td *Topics) RemoveTopic(name string) error

func (*Topics) Topic

func (td *Topics) Topic(name string) (*MockTopic, error)

func (*Topics) Topics

func (td *Topics) Topics() map[string]*MockTopic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL