kafka

package
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: Apache-2.0 Imports: 9 Imported by: 28

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CommittedOffsets

func CommittedOffsets(r BatchReader) ([]kafka.TopicPartition, error)

CommittedOffsets .

Types

type BatchReader

type BatchReader interface {
	ReadN(buf []interface{}, timeout time.Duration) (int, error)
	Confirm() error
	Close() error
}

BatchReader .

type BatchReaderConfig

type BatchReaderConfig struct {
	Topics  []string               `file:"topics" desc:"topics"`
	Group   string                 `file:"group" desc:"consumer group id"`
	Options map[string]interface{} `file:"options" desc:"options"`
}

BatchReaderConfig .

type BatchReaderOption

type BatchReaderOption interface{}

BatchReaderOption .

func WithReaderDecoder

func WithReaderDecoder(dec Decoder) BatchReaderOption

WithReaderDecoder .

type ConsumerConfig

type ConsumerConfig struct {
	Topics      []string               `file:"topics" desc:"topics"`
	Group       string                 `file:"group" desc:"consumer group id"`
	Parallelism uint64                 `file:"parallelism" desc:"parallelism"`
	Options     map[string]interface{} `file:"options" desc:"options"`
}

ConsumerConfig .

type ConsumerFunc

type ConsumerFunc func(key []byte, value []byte, topic *string, timestamp time.Time) error

ConsumerFunc .

type ConsumerOption

type ConsumerOption interface{}

ConsumerOption .

func WithConsumerListener

func WithConsumerListener(fn func(i int, c *kafka.Consumer)) ConsumerOption

WithConsumerListener .

type Decoder

type Decoder func(key, value []byte, topic *string, timestamp time.Time) (interface{}, error)

Decoder .

type Interface

type Interface interface {
	NewBatchReader(c *BatchReaderConfig, options ...BatchReaderOption) (BatchReader, error)
	NewConsumer(c *ConsumerConfig, handler ConsumerFunc, options ...ConsumerOption) error
	NewConsumerWitchCreator(c *ConsumerConfig, creator func(i int) (ConsumerFunc, error), options ...ConsumerOption) error
	NewProducer(c *ProducerConfig, options ...ProducerOption) (writer.Writer, error)
	Servers() string
	ProduceChannelSize() int
	ProduceEventsChannelSize() int
	NewAdminClient() (*kafka.AdminClient, error)
}

Interface .

type Message

type Message struct {
	Topic *string
	Data  []byte
	Key   []byte
}

Message .

type Producer

type Producer interface {
	writer.Writer
	ProduceChannelSize() int
	EventsChannelSize() int
}

Producer .

type ProducerConfig

type ProducerConfig struct {
	Topic       string `file:"topic" env:"KAFKA_P_TOPIC" desc:"topic"`
	Parallelism uint64 `file:"parallelism" env:"KAFKA_P_PARALLELISM" default:"4" desc:"parallelism"`
	Batch       struct {
		Size    uint64        `file:"size" env:"KAFKA_P_BATCH_SIZE" default:"100" desc:"batch size"`
		Timeout time.Duration `file:"timeout" env:"KAFKA_P_BUFFER_TIMEOUT" default:"30s" desc:"timeout to flush buffer for batch write"`
	} `file:"batch"`
	Shared  bool                   `file:"shared" default:"true" desc:"shared producer instance"`
	Options map[string]interface{} `file:"options" desc:"options"`
}

ProducerConfig .

type ProducerOption

type ProducerOption interface {
	// contains filtered or unexported methods
}

ProducerOption .

func WithAsyncWriteErrorHandler

func WithAsyncWriteErrorHandler(eh func(error) error) ProducerOption

WithAsyncWriteErrorHandler .

type StringMessage

type StringMessage struct {
	Topic *string
	Data  string
}

StringMessage .

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL