kafka

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidOffsetFormat = errors.New("invalid format for kafka offset")
)

Functions

This section is empty.

Types

type ConnConfig

type ConnConfig struct {
	Servers []string
	Topic   TopicConfig
	TLS     tlslib.Config
}

type Message

type Message kafka.Message

Message is a wrapper around the kafkago library message

type MessageReader

type MessageReader interface {
	FetchMessage(ctx context.Context) (*Message, error)
	CommitOffsets(ctx context.Context, offsets ...*Offset) error
	Close() error
}

type MessageWriter

type MessageWriter interface {
	WriteMessages(context.Context, ...Message) error
	Close() error
}

type Offset

type Offset struct {
	Topic     string
	Partition int
	Offset    int64
}

type OffsetParser

type OffsetParser interface {
	ToString(o *Offset) string
	FromString(s string) (*Offset, error)
}

type Parser

type Parser struct{}

func NewOffsetParser

func NewOffsetParser() *Parser

func (*Parser) FromString

func (p *Parser) FromString(s string) (*Offset, error)

func (*Parser) ToString

func (p *Parser) ToString(o *Offset) string

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(config ReaderConfig, logger loglib.Logger) (*Reader, error)

func (*Reader) Close

func (r *Reader) Close() error

func (*Reader) CommitOffsets

func (r *Reader) CommitOffsets(ctx context.Context, offsets ...*Offset) error

func (*Reader) FetchMessage

func (r *Reader) FetchMessage(ctx context.Context) (*Message, error)

FetchMessage returns the next message from the reader. This call will block until a message is available, or an error occurs. It can be stopped by canceling the context. The message offset needs to be explicitly committed by using CommitMessages.

type ReaderConfig

type ReaderConfig struct {
	Conn                     ConnConfig
	ConsumerGroupID          string
	ConsumerGroupStartOffset string
}

type TopicConfig

type TopicConfig struct {
	Name string
	// Number of partitions to be created for the topic. Defaults to 1.
	NumPartitions int
	// Replication factor for the topic. Defaults to 1.
	ReplicationFactor int
	// AutoCreate defines if the topic should be created if it doesn't exist.
	// Defaults to false.
	AutoCreate bool
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is a wrapper around the kafkago library writer

func NewWriter

func NewWriter(config WriterConfig, logger loglib.Logger) (*Writer, error)

NewWriter returns a kafka writer that produces messages to the configured topic, using the CRC32 hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition.

If the topic auto create setting is enabled in the config, it will create it.

func (*Writer) Close

func (w *Writer) Close() error

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error

type WriterConfig

type WriterConfig struct {
	Conn ConnConfig
	// BatchTimeout is the time limit on how often incomplete message batches
	// will be flushed to kafka. Defaults to 1s.
	BatchTimeout time.Duration
	// BatchBytes limits the maximum size of a request in bytes before being
	// sent to a partition. Defaults to 1048576 bytes.
	BatchBytes int64
	// BatchSize limits how many messages will be buffered before being sent to
	// a partition. Defaults to 100 messages.
	BatchSize int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL