kafka

package module
v2.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: MIT Imports: 14 Imported by: 0

README

kafka support

Documentation

Overview

Работа с kafka

Используется библиотека github.com/confluentinc/confluent-kafka-go/kafka, основанная на нативном клиенте.

Так как под windows такового нет, то под ними работать не будет. Ну и не надо. Но если приспичит, есть другие библиотеки, где клиент реализован на go. Но они, по очевидным причинам, сильно медленнее и прожорливее по памяти.

Index

Constants

View Source
const (
	// Смешение - начало
	OffsetBeginning = Offset(kafka.OffsetBeginning)
	// Смещение - конец
	OffsetEnd = Offset(kafka.OffsetEnd)
	// Смещение - сохраненное в kafka
	OffsetStored = Offset(kafka.OffsetStored)

	// PartitionAny represents any partition (for partitioning),
	// or unspecified value (for all other cases)
	PartitionAny = kafka.PartitionAny
)

Variables

View Source
var (
	// Log facility
	Log = log.NewFacility("kafka")

	// Ошибка - конец данных
	ErrPartitionEOF = errors.New("partition EOF")
)

Functions

func LibraryVersion

func LibraryVersion() (version string)

Версия нативной библиотки

Types

type AdminClient

type AdminClient struct {
	// contains filtered or unexported fields
}

Админский клиент

func (*AdminClient) Close

func (c *AdminClient) Close()

Закрыть админское соединение

func (*AdminClient) CreateTopic

func (c *AdminClient) CreateTopic(name string, topic *ProducerTopicConfig) (err error)

Создать топик

func (*AdminClient) DeleteTopic

func (c *AdminClient) DeleteTopic(topic string) (err error)

Удалить топик

func (*AdminClient) GetMetadata

func (c *AdminClient) GetMetadata(topic string) (m *Metadata, err error)

Получить метаданные для топика. Если передано пустое имя, то всех.

type AssignedPartitions

type AssignedPartitions map[string]AssignedPartitionsList // [topic]

func GetAssignedPartitions added in v2.0.1

func GetAssignedPartitions(servers string) (parts AssignedPartitions)

type AssignedPartitionsList

type AssignedPartitionsList []int

type Config

type Config struct {
	Servers string `toml:"servers"` // Список kafka серверов

	User     string `toml:"user"`     // Пользователь
	Password string `toml:"password"` // Пароль

	Timeout config.Duration `toml:"timeout"` // Таймаут

	RetryTimeout config.Duration `toml:"retry-timeout"` // Таймаут повторной отправки

	MaxRequestSize   int    `toml:"max-request-size"`   // Максимальный размер сообщения
	ConsumerQueueLen int    `toml:"consumer-queue-len"` // Длина внутренней очереди для консьюмера
	AutoCommit       bool   `toml:"auto-commit"`        // Использовать auto commit для консьюмера?
	Group            string `toml:"group"`              // Группа для консьюмера
	Acks             string `toml:"acks"`               // Producer acks

	ProducerTopics map[string]*ProducerTopicConfig `toml:"producer-topics"` // Список топиков продюсера с их параметрами map[virtualName]*config
	ConsumerTopics map[string]*ConsumerTopicConfig `toml:"consumer-topics"` // Список топиков консьюмера с их параметрами map[virtualName]*config
}

Конфигурация

func (*Config) Check

func (c *Config) Check(cfg any) (err error)

Проверка валидности Config

func (*Config) NewAdmin

func (c *Config) NewAdmin() (client *AdminClient, err error)

Создать новое админское соединение

func (*Config) NewAdminEx

func (c *Config) NewAdminEx(extra misc.InterfaceMap) (client *AdminClient, err error)

func (*Config) NewConsumer

func (c *Config) NewConsumer() (client *Consumer, err error)

Создать новое консьюмерское соединение

func (*Config) NewConsumerEx

func (c *Config) NewConsumerEx(extra misc.InterfaceMap) (client *Consumer, err error)

func (*Config) NewProducer

func (c *Config) NewProducer() (client *Producer, err error)

Создать новое продюсерское соединение

func (*Config) NewProducerEx

func (c *Config) NewProducerEx(extra misc.InterfaceMap) (client *Producer, err error)

type Consumer

type Consumer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

консьюмер

func (*Consumer) AddEventHandler added in v2.0.6

func (c *Consumer) AddEventHandler(h EventHandler)

func (*Consumer) Close

func (c *Consumer) Close()

Закрыть консьюмерское соединение

func (*Consumer) Commit

func (c *Consumer) Commit(message *Message) (err error)

Зафиксировать последнюю прочитанную позицию для топика

func (*Consumer) DelEventHandler added in v2.0.6

func (c *Consumer) DelEventHandler(h EventHandler)

func (*Consumer) GetAssignedPartitions added in v2.0.6

func (c *Consumer) GetAssignedPartitions() (parts AssignedPartitions)

func (*Consumer) Offsets

func (c *Consumer) Offsets(topics []string) (offsets []Offset, err error)

Получить текущие смещения для списка топиков

func (*Consumer) Read

func (c *Consumer) Read(timeout time.Duration) (message *Message, err error)

Получить сообщение из топика, если оно там есть

func (*Consumer) Seek

func (c *Consumer) Seek(topic string, offset Offset) (err error)

Установить указатель чтения для топика

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topics []string) (err error)

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe() (err error)

Отписаться от всех подписок

func (*Consumer) WaitingForAssign

func (c *Consumer) WaitingForAssign()

Ожидание получения первого AssignedPartitions

type ConsumerTopicConfig

type ConsumerTopicConfig struct {
	Active   bool   `toml:"active"`   // Активный?
	Type     string `toml:"type"`     // Тип топика. Произвольное необязательное значение на усмотрение разработчика
	Encoding string `toml:"encoding"` // Формат данных

	Extra any `toml:"extra"` // Произвольные дополнительные данные
}

Параметры топика консьюмера

func (*ConsumerTopicConfig) Check

func (c *ConsumerTopicConfig) Check(cfg any) (err error)

Проверка валидности ProducerTopicConfig

type Error

type Error kafka.Error

Ошибка

func (Error) Error

func (e Error) Error() string

type EventHandler added in v2.0.6

type EventHandler func(c *Consumer, assigned bool, partitions TopicPartitions)

type Message

type Message kafka.Message

Сообщение

func NewMessage

func NewMessage(topic string, key []byte, value []byte) Message

Создать сообщение

func NewMessageEx

func NewMessageEx(topic string, partition int32, key []byte, value []byte) Message

type Messages

type Messages []Message

Набор сообщений

type Metadata

type Metadata kafka.Metadata

Метаданные

type Offset

type Offset kafka.Offset

Смещение

type Producer

type Producer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Продюсер

func (*Producer) Close

func (c *Producer) Close()

Закрыть продюсерское соединение

func (*Producer) SaveMessages

func (c *Producer) SaveMessages(m Messages) (err error)

Сохранить сообщения в kafka

type ProducerTopicConfig

type ProducerTopicConfig struct {
	Active   bool   `toml:"active"`   // Активный?
	Type     string `toml:"type"`     // Тип топика. Произвольное необязательное значение на усмотрение разработчика
	Encoding string `toml:"encoding"` // Формат данных

	NumPartitions     int             `toml:"num-partitions"`     // Количество партиций при создании
	ReplicationFactor int             `toml:"replication-factor"` // Фактор репликации при создании
	RetentionTime     config.Duration `toml:"retention-time"`     // Время жизни данных
	RetentionSize     int64           `toml:"retention-size"`     // Максимальный размер для очистки по размеру

	Extra any `toml:"extra"` // Произвольные дополнительные данные
}

Параметры топика продюсера

func (*ProducerTopicConfig) Check

func (c *ProducerTopicConfig) Check(cfg any) (err error)

Проверка валидности ProducerTopicConfig

type TopicPartition added in v2.0.6

type TopicPartition kafka.TopicPartition

type TopicPartitions added in v2.0.6

type TopicPartitions kafka.TopicPartitions

Directories

Path Synopsis
kafka reader
kafka reader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL