producer

package
v0.33.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// KafkaDefaultBatchMessageCount is the Kafka default value for batch size.
	KafkaDefaultBatchMessageCount = 100

	// KafkaDefaultBatchMaxBytes is the Kafka default value for batch size.
	KafkaDefaultBatchMaxBytes = 1000012 - (100 * 1024)

	// KafkaMinBatchInterval is a reasonable minimum for batch timeout.
	KafkaMinBatchInterval = 250 * time.Millisecond
)
View Source
const (
	// RequireAllReplicas means that ALL nodes in the replica-set must to confirm the write for a write
	// to be considered durable.
	RequireAllReplicas = -1

	// DefaultWriterWriteTimeout is how much to wait for a write to go through.
	DefaultWriterWriteTimeout = 30 * time.Second

	// DefaultWriterReadTimeout is how much to wait for reads.
	DefaultWriterReadTimeout = 30 * time.Second

	// DefaultMaxRetryAttempts is how many times to retry a failed operation.
	DefaultMaxRetryAttempts = 3

	// DefaultMetadataTTL is the frequency of metadata refreshes.
	DefaultMetadataTTL = 5 * time.Second

	// DefaultIdleTimeout is the period during which an idle connection can be resued.
	DefaultIdleTimeout = 30 * time.Second
)

Variables

View Source
var ErrInvalidMessage = errors.New("kafka: message is invalid")

Functions

func NewWriter

func NewWriter(
	logger log.Logger,
	dialer *kafka.Dialer,
	bootstrap []string,
	opts ...WriterOption,
) (*kafka.Writer, error)

Types

type Producer

type Producer struct {
	Settings *Settings
	Writer   Writer
	Logger   log.Logger
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(ctx context.Context, conf cfg.Config, logger log.Logger, name string) (*Producer, error)

NewProducer returns a topic producer.

func NewProducerWithInterfaces

func NewProducerWithInterfaces(conf *Settings, logger log.Logger, writer Writer) (*Producer, error)

func (*Producer) Run

func (p *Producer) Run(ctx context.Context) error

Run starts background routine for flushing messages.

func (*Producer) Write

func (p *Producer) Write(ctx context.Context, ms ...kafka.Message) error

func (*Producer) WriteOne

func (p *Producer) WriteOne(ctx context.Context, m kafka.Message) error

type Settings

type Settings struct {
	ConnectionName string `cfg:"connection" validate:"required"`
	Topic          string `cfg:"topic" validate:"required"`
	// FQTopic is the fully-qualified topic name (with prefixes).
	FQTopic      string
	BatchSize    int           `cfg:"batch_size"`
	BatchTimeout time.Duration `cfg:"idle_timeout"`
	AsyncWrites  bool          `cfg:"async_writes"`
	// contains filtered or unexported fields
}

func ParseSettings

func ParseSettings(config cfg.Config, key string) *Settings

func (*Settings) Connection

func (s *Settings) Connection() *connection.Settings

func (*Settings) WithConnection

func (s *Settings) WithConnection(conn *connection.Settings) *Settings

type Writer

type Writer interface {
	WriteMessages(ctx context.Context, msgs ...kafka.Message) error
	Stats() kafka.WriterStats
	Close() error
}

type WriterOption

type WriterOption func(*kafka.WriterConfig)

func WithAsyncWrites

func WithAsyncWrites() WriterOption

WithAsyncWrites makes writes async.

func WithBatch

func WithBatch(maxCount int, interval time.Duration) WriterOption

WithBatch sets batching configuration.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL