producer

package
v1.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: MIT Imports: 11 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Builder

type Builder func(configs *Config) (Producer, error)

type Config

type Config struct {
	Id string
	*sarama.Config
	Pool struct {
		NumOfWorkers int
	}
	BootstrapServers []string
	RequiredAcks     RequiredAcks
	Partitioner      Partitioner
	Logger           log.Logger
	MetricsReporter  metrics.Reporter
}

func NewConfig

func NewConfig() *Config

type MockStreamProducer

type MockStreamProducer struct {
	// contains filtered or unexported fields
}

func NewMockProducer

func NewMockProducer(topics *admin.Topics) *MockStreamProducer

func (*MockStreamProducer) Close

func (msp *MockStreamProducer) Close() error

func (*MockStreamProducer) Produce

func (msp *MockStreamProducer) Produce(ctx context.Context, message *data.Record) (partition int32, offset int64, err error)

func (*MockStreamProducer) ProduceBatch

func (msp *MockStreamProducer) ProduceBatch(ctx context.Context, messages []*data.Record) error

type Partitioner

type Partitioner int
const (
	HashBased Partitioner = iota
	Manual
	Random
)

type Pool

type Pool struct {
	NumOfWorkers int64
	// contains filtered or unexported fields
}

func NewPool

func NewPool(NumOfWorkers int, builder Builder) (*Pool, error)

func (*Pool) Close

func (p *Pool) Close() error

func (*Pool) Produce

func (p *Pool) Produce(ctx context.Context, message *data.Record) (partition int32, offset int64, err error)

func (*Pool) ProduceBatch

func (p *Pool) ProduceBatch(ctx context.Context, messages []*data.Record) error

type Producer

type Producer interface {
	Produce(ctx context.Context, message *data.Record) (partition int32, offset int64, err error)
	ProduceBatch(ctx context.Context, messages []*data.Record) error
	Close() error
}

func NewProducer

func NewProducer(configs *Config) (Producer, error)

type RequiredAcks

type RequiredAcks int
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0

	// WaitForLeader waits for only the local commit to succeed before responding.
	WaitForLeader RequiredAcks = 1

	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

func (RequiredAcks) String

func (ack RequiredAcks) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL