kafka

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdminInterface

type AdminInterface interface {
}

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

func (*AsyncProducer) Close

func (a *AsyncProducer) Close() error

func (*AsyncProducer) Write

func (a *AsyncProducer) Write(data interface{}) error

func (*AsyncProducer) WriteN

func (a *AsyncProducer) WriteN(data ...interface{}) (int, error)

type BatchReaderConfig

type BatchReaderConfig ConsumerConfig

type BatchReaderOption

type BatchReaderOption interface{}

BatchReaderOption .

func WithReaderDecoder

func WithReaderDecoder(dec Decoder) BatchReaderOption

WithReaderDecoder .

type ConsumerConfig

type ConsumerConfig struct {
	Topics []string `file:"topics" desc:"topics"`
	// group related
	Group             string        `file:"group" desc:"consumer group id"`
	SessionTimeout    time.Duration `file:"session_timeout" default:"2m" env:"KAFKA_C_G_SESSION_TIMEOUT"`
	MaxProcessingTime time.Duration `file:"max_processing_time" default:"30s" env:"KAFKA_C_MAX_PROCESSING_TIME"`
	ChannelBufferSize int           `file:"channel_buffer_size" default:"500" env:"KAFKA_CHANNEL_BUFFER_SIZE"`

	Offsets struct {
		AutoCommit struct {
			Enable   bool          `file:"enable" default:"true"`
			Interval time.Duration `file:"interval" default:"1s"`
		} `file:"auto_commit"`
		Initial string `file:"initial" default:"latest"`
	} `file:"offsets"`
}

ConsumerConfig .

type ConsumerFunc

type ConsumerFunc func(key []byte, value []byte, topic *string, timestamp time.Time) error

deprecated

type ConsumerFuncV2

type ConsumerFuncV2 func(msg *sarama.ConsumerMessage) error

type ConsumerGroupManager

type ConsumerGroupManager struct {
	// contains filtered or unexported fields
}

func NewConsumerGroupManager

func NewConsumerGroupManager(cg sarama.ConsumerGroup, handler sarama.ConsumerGroupHandler, topics []string) *ConsumerGroupManager

func (*ConsumerGroupManager) Close

func (cgm *ConsumerGroupManager) Close() error

type Decoder

type Decoder func(key, value []byte, topic *string, timestamp time.Time) (interface{}, error)

type Interface

type Interface interface {
	NewProducer(c *ProducerConfig) (*AsyncProducer, error)
	NewConsumerGroup(c *ConsumerConfig, handler ConsumerFuncV2) (*ConsumerGroupManager, error)
	NewAdminClient() (AdminInterface, error)
	Brokers() []string
	//deprecated. Please use NewConsumerGroup
	NewBatchReader(c *BatchReaderConfig, options ...BatchReaderOption) (storekit.BatchReader, error)
	//deprecated. Please use NewConsumerGroup
	NewConsumer(c *ConsumerConfig, handler ConsumerFunc) error
	//deprecated. Please use NewConsumerGroup
	NewConsumerWitchCreator(c *ConsumerConfig, creator func(i int) (ConsumerFunc, error)) error
}

type Message

type Message struct {
	Topic *string
	Data  []byte
	Key   []byte
}

Message .

type ProducerConfig

type ProducerConfig struct {
	Topic string `file:"topic" env:"KAFKA_P_TOPIC" desc:"topic"`
}

type ProducerOption

type ProducerOption interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL