Documentation ¶
Overview ¶
Работа с kafka
Используется библиотека github.com/confluentinc/confluent-kafka-go/kafka, основанная на нативном клиенте.
Так как под windows такового нет, то под ними работать не будет. Ну и не надо. Но если приспичит, есть другие библиотеки, где клиент реализован на go. Но они, по очевидным причинам, сильно медленнее и прожорливее по памяти.
Index ¶
- Constants
- Variables
- func LibraryVersion() (version string)
- type AdminClient
- type AssignedPartitions
- type AssignedPartitionsList
- type Config
- func (c *Config) Check(cfg any) (err error)
- func (c *Config) NewAdmin() (client *AdminClient, err error)
- func (c *Config) NewAdminEx(extra misc.InterfaceMap) (client *AdminClient, err error)
- func (c *Config) NewConsumer() (client *Consumer, err error)
- func (c *Config) NewConsumerEx(extra misc.InterfaceMap) (client *Consumer, err error)
- func (c *Config) NewProducer() (client *Producer, err error)
- func (c *Config) NewProducerEx(extra misc.InterfaceMap) (client *Producer, err error)
- type Consumer
- func (c *Consumer) AddEventHandler(h EventHandler)
- func (c *Consumer) Close()
- func (c *Consumer) Commit(message *Message) (err error)
- func (c *Consumer) DelEventHandler(h EventHandler)
- func (c *Consumer) GetAssignedPartitions() (parts AssignedPartitions)
- func (c *Consumer) Offsets(topics []string) (offsets []Offset, err error)
- func (c *Consumer) Read(timeout time.Duration) (message *Message, err error)
- func (c *Consumer) Seek(topic string, offset Offset) (err error)
- func (c *Consumer) Subscribe(topics []string) (err error)
- func (c *Consumer) Unsubscribe() (err error)
- func (c *Consumer) WaitingForAssign()
- type ConsumerTopicConfig
- type Error
- type EventHandler
- type Message
- type Messages
- type Metadata
- type Offset
- type Producer
- type ProducerTopicConfig
- type TopicPartition
- type TopicPartitions
Constants ¶
const ( // Смешение - начало OffsetBeginning = Offset(kafka.OffsetBeginning) // Смещение - конец OffsetEnd = Offset(kafka.OffsetEnd) // Смещение - сохраненное в kafka OffsetStored = Offset(kafka.OffsetStored) // PartitionAny represents any partition (for partitioning), // or unspecified value (for all other cases) PartitionAny = kafka.PartitionAny )
Variables ¶
var ( // Log facility Log = log.NewFacility("kafka") // Ошибка - конец данных ErrPartitionEOF = errors.New("partition EOF") )
Functions ¶
Types ¶
type AdminClient ¶
type AdminClient struct {
// contains filtered or unexported fields
}
Админский клиент
func (*AdminClient) CreateTopic ¶
func (c *AdminClient) CreateTopic(name string, topic *ProducerTopicConfig) (err error)
Создать топик
func (*AdminClient) DeleteTopic ¶
func (c *AdminClient) DeleteTopic(topic string) (err error)
Удалить топик
func (*AdminClient) GetMetadata ¶
func (c *AdminClient) GetMetadata(topic string) (m *Metadata, err error)
Получить метаданные для топика. Если передано пустое имя, то всех.
type AssignedPartitions ¶
type AssignedPartitions map[string]AssignedPartitionsList // [topic]
func GetAssignedPartitions ¶ added in v2.0.1
func GetAssignedPartitions(servers string) (parts AssignedPartitions)
type AssignedPartitionsList ¶
type AssignedPartitionsList []int
type Config ¶
type Config struct { Servers string `toml:"servers"` // Список kafka серверов User string `toml:"user"` // Пользователь Password string `toml:"password"` // Пароль Timeout config.Duration `toml:"timeout"` // Таймаут RetryTimeout config.Duration `toml:"retry-timeout"` // Таймаут повторной отправки MaxRequestSize int `toml:"max-request-size"` // Максимальный размер сообщения ConsumerQueueLen int `toml:"consumer-queue-len"` // Длина внутренней очереди для консьюмера AutoCommit bool `toml:"auto-commit"` // Использовать auto commit для консьюмера? Group string `toml:"group"` // Группа для консьюмера Acks string `toml:"acks"` // Producer acks ProducerTopics map[string]*ProducerTopicConfig `toml:"producer-topics"` // Список топиков продюсера с их параметрами map[virtualName]*config ConsumerTopics map[string]*ConsumerTopicConfig `toml:"consumer-topics"` // Список топиков консьюмера с их параметрами map[virtualName]*config }
Конфигурация
func (*Config) NewAdmin ¶
func (c *Config) NewAdmin() (client *AdminClient, err error)
Создать новое админское соединение
func (*Config) NewAdminEx ¶
func (c *Config) NewAdminEx(extra misc.InterfaceMap) (client *AdminClient, err error)
func (*Config) NewConsumer ¶
Создать новое консьюмерское соединение
func (*Config) NewConsumerEx ¶
func (c *Config) NewConsumerEx(extra misc.InterfaceMap) (client *Consumer, err error)
func (*Config) NewProducer ¶
Создать новое продюсерское соединение
func (*Config) NewProducerEx ¶
func (c *Config) NewProducerEx(extra misc.InterfaceMap) (client *Producer, err error)
type Consumer ¶
консьюмер
func (*Consumer) AddEventHandler ¶ added in v2.0.6
func (c *Consumer) AddEventHandler(h EventHandler)
func (*Consumer) DelEventHandler ¶ added in v2.0.6
func (c *Consumer) DelEventHandler(h EventHandler)
func (*Consumer) GetAssignedPartitions ¶ added in v2.0.6
func (c *Consumer) GetAssignedPartitions() (parts AssignedPartitions)
func (*Consumer) Unsubscribe ¶
Отписаться от всех подписок
func (*Consumer) WaitingForAssign ¶
func (c *Consumer) WaitingForAssign()
Ожидание получения первого AssignedPartitions
type ConsumerTopicConfig ¶
type ConsumerTopicConfig struct { Active bool `toml:"active"` // Активный? Type string `toml:"type"` // Тип топика. Произвольное необязательное значение на усмотрение разработчика Encoding string `toml:"encoding"` // Формат данных Extra any `toml:"extra"` // Произвольные дополнительные данные }
Параметры топика консьюмера
func (*ConsumerTopicConfig) Check ¶
func (c *ConsumerTopicConfig) Check(cfg any) (err error)
Проверка валидности ProducerTopicConfig
type EventHandler ¶ added in v2.0.6
type EventHandler func(c *Consumer, assigned bool, partitions TopicPartitions)
type ProducerTopicConfig ¶
type ProducerTopicConfig struct { Active bool `toml:"active"` // Активный? Type string `toml:"type"` // Тип топика. Произвольное необязательное значение на усмотрение разработчика Encoding string `toml:"encoding"` // Формат данных NumPartitions int `toml:"num-partitions"` // Количество партиций при создании ReplicationFactor int `toml:"replication-factor"` // Фактор репликации при создании RetentionTime config.Duration `toml:"retention-time"` // Время жизни данных RetentionSize int64 `toml:"retention-size"` // Максимальный размер для очистки по размеру Extra any `toml:"extra"` // Произвольные дополнительные данные }
Параметры топика продюсера
func (*ProducerTopicConfig) Check ¶
func (c *ProducerTopicConfig) Check(cfg any) (err error)
Проверка валидности ProducerTopicConfig
type TopicPartition ¶ added in v2.0.6
type TopicPartition kafka.TopicPartition
type TopicPartitions ¶ added in v2.0.6
type TopicPartitions kafka.TopicPartitions