producer

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2025 License: MIT Imports: 10 Imported by: 0

README

Kafka Producer

The package provides two types of Kafka producers:

Sync Producer

  • Blocking message delivery
  • Guaranteed message ordering
  • Returns delivery status (partition, offset, error)
  • Suitable for critical messages requiring confirmation
producer, err := producer.NewSyncProducer(ctx, "example-service", brokers)
if err != nil {
    return nil, fmt.Errorf("failed to create producer: %w", err)
}

msg := &sarama.ProducerMessage{
    Topic: "example-topic",
    Value: sarama.ByteEncoder(jsonData),
}

// Send single message with delivery confirmation
partition, offset, err := producer.SendMessage(ctx, msg)

// Send multiple messages
err = producer.SendMessages(ctx, []*sarama.ProducerMessage{msg1, msg2})

Async Producer

  • Non-blocking message delivery
  • Higher throughput
  • No delivery confirmation
  • Suitable for high-volume, non-critical messages
producer, err := producer.NewAsyncProducer(ctx, "example-service", brokers)
if err != nil {
    return nil, fmt.Errorf("failed to create producer: %w", err)
}

// Send single message
producer.SendMessage(ctx, msg)

// Send multiple messages
producer.SendMessages(ctx, []*sarama.ProducerMessage{msg1, msg2})

Producer Configuration

Both producer types support the following configuration options:

// Common options
WithConfig(config *sarama.Config)  // Custom Sarama configuration
WithName(name string)              // Producer name for metrics
WithTelemetry(telemetry ITelemetry) // Custom telemetry implementation
WithErrorLogger(logger IErrorLogger) // Custom error logger

// Sync Producer specific
WithRetryMax(max int)              // Maximum retry attempts
WithRetryBackoff(backoff time.Duration) // Retry backoff duration
WithRequiredAcks(acks sarama.RequiredAcks) // Required acknowledgements

Telemetry Integration

Implement the ITelemetry interface to collect producer metrics:

type CustomTelemetry struct{}

func (t *CustomTelemetry) CollectWriteLatency(
    ctx context.Context,
    serviceName string,
    duration time.Duration,
    topic string,
    partition int32,
    typeName ProducerType,
    clientID string,
    success bool,
) {
    // Collect message delivery latency
}

func (t *CustomTelemetry) CollectWriteSize(
    ctx context.Context,
    serviceName string,
    size int,
    topic string,
    partition int32,
    typeName ProducerType,
    clientID string,
    success bool,
) {
    // Collect message size metrics
}

Error Handling

Implement the IErrorLogger interface for custom error logging:

type CustomErrorLogger struct{}

func (l *CustomErrorLogger) LogError(ctx context.Context, err error) {
    // Log producer errors with context
    // Includes message delivery failures and connection errors
}

Documentation

Overview

Package producer is a generated GoMock package.

Package producer is a generated GoMock package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultAsyncConfig

func DefaultAsyncConfig() *sarama.Config

DefaultAsyncConfig returns the default configuration for AsyncProducer.

func DefaultAsyncConfigSASL

func DefaultAsyncConfigSASL(
	mechanism sarama.SASLMechanism,
	user string,
	password string,
) *sarama.Config

DefaultAsyncConfigSASL returns the default configuration for AsyncProducer with SASL.

func DefaultSyncConfig

func DefaultSyncConfig() *sarama.Config

DefaultSyncConfig returns the default configuration for SyncProducer.

func DefaultSyncConfigSASL

func DefaultSyncConfigSASL(
	mechanism sarama.SASLMechanism,
	user string,
	password string,
) *sarama.Config

DefaultSyncConfigSASL returns the default configuration for AsyncProducer with SASL.

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

AsyncProducer asynchronous producer.

func NewAsyncProducer

func NewAsyncProducer(_ context.Context, serviceName string, brokers []string,
	opts ...Option,
) (*AsyncProducer, error)

NewAsyncProducer creates a new instance of asynchronous producer.

func (*AsyncProducer) SendMessage

func (a *AsyncProducer) SendMessage(ctx context.Context, msg *sarama.ProducerMessage)

SendMessage sends a message to the queue.

func (*AsyncProducer) SendMessages

func (a *AsyncProducer) SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage)

SendMessages sends messages to the queue.

func (*AsyncProducer) Start

func (a *AsyncProducer) Start(ctx context.Context) error

Start starts the producer.

func (*AsyncProducer) Stop

func (a *AsyncProducer) Stop(_ context.Context) error

Stop stops the producer.

type IAsyncProducer

type IAsyncProducer interface {
	// SendMessage sends message to queue.
	// Producer doesn't support context operations (using WithTimeout doesn't make sense).
	// Context is here just in case (e.g. for metrics).
	// When sending a message, the start time is added to its header,
	// so resending the same message is not allowed.
	SendMessage(ctx context.Context, msg *sarama.ProducerMessage)

	// SendMessages sends messages to queue.
	// If context is canceled, messages won't be sent and false will be returned.
	// Producer doesn't support context operations (using WithTimeout doesn't make sense).
	// Context is here just in case (e.g. for metrics).
	// When sending messages, the start time is added to their headers,
	// so resending the same messages is not allowed.
	SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage)
}

IAsyncProducer asynchronous producer.

type IErrorLogger

type IErrorLogger interface {
	LogError(ctx context.Context, err error)
}

IErrorLogger interface for error logging.

type ISyncProducer

type ISyncProducer interface {
	// SendMessage synchronously sends message to queue.
	// Producer doesn't support context operations (using WithTimeout doesn't make sense).
	// Context is here for working with metrics.
	SendMessage(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

	// SendMessages synchronously sends messages to queue.
	// Synchronous producer doesn't support context operations (using WithTimeout doesn't make sense).
	// Context is here for working with metrics.
	SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage) error
}

ISyncProducer synchronous producer.

type ITelemetry

type ITelemetry interface {
	CollectWriteLatency(
		ctx context.Context,
		serviceName string,
		duration time.Duration,
		topic string,
		partition int32,
		typeName ProducerType,
		clientID string,
		success bool,
	)
	CollectWriteSize(
		ctx context.Context,
		serviceName string,
		size int,
		topic string,
		partition int32,
		typeName ProducerType,
		clientID string,
		success bool,
	)
}

ITelemetry interface for working with metrics.

type MockAsyncProducer

type MockAsyncProducer struct {
	// contains filtered or unexported fields
}

MockAsyncProducer is a mock of AsyncProducer interface.

func NewMockAsyncProducer

func NewMockAsyncProducer(ctrl *gomock.Controller) *MockAsyncProducer

NewMockAsyncProducer creates a new mock instance.

func (*MockAsyncProducer) AbortTxn

func (m *MockAsyncProducer) AbortTxn() error

AbortTxn mocks base method.

func (*MockAsyncProducer) AddMessageToTxn

func (m *MockAsyncProducer) AddMessageToTxn(arg0 *sarama.ConsumerMessage, arg1 string, arg2 *string) error

AddMessageToTxn mocks base method.

func (*MockAsyncProducer) AddOffsetsToTxn

func (m *MockAsyncProducer) AddOffsetsToTxn(arg0 map[string][]*sarama.PartitionOffsetMetadata, arg1 string) error

AddOffsetsToTxn mocks base method.

func (*MockAsyncProducer) AsyncClose

func (m *MockAsyncProducer) AsyncClose()

AsyncClose mocks base method.

func (*MockAsyncProducer) BeginTxn

func (m *MockAsyncProducer) BeginTxn() error

BeginTxn mocks base method.

func (*MockAsyncProducer) Close

func (m *MockAsyncProducer) Close() error

Close mocks base method.

func (*MockAsyncProducer) CommitTxn

func (m *MockAsyncProducer) CommitTxn() error

CommitTxn mocks base method.

func (*MockAsyncProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockAsyncProducer) Errors

func (m *MockAsyncProducer) Errors() <-chan *sarama.ProducerError

Errors mocks base method.

func (*MockAsyncProducer) Input

func (m *MockAsyncProducer) Input() chan<- *sarama.ProducerMessage

Input mocks base method.

func (*MockAsyncProducer) IsTransactional

func (m *MockAsyncProducer) IsTransactional() bool

IsTransactional mocks base method.

func (*MockAsyncProducer) Successes

func (m *MockAsyncProducer) Successes() <-chan *sarama.ProducerMessage

Successes mocks base method.

func (*MockAsyncProducer) TxnStatus

TxnStatus mocks base method.

type MockAsyncProducerMockRecorder

type MockAsyncProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockAsyncProducerMockRecorder is the mock recorder for MockAsyncProducer.

func (*MockAsyncProducerMockRecorder) AbortTxn

func (mr *MockAsyncProducerMockRecorder) AbortTxn() *gomock.Call

AbortTxn indicates an expected call of AbortTxn.

func (*MockAsyncProducerMockRecorder) AddMessageToTxn

func (mr *MockAsyncProducerMockRecorder) AddMessageToTxn(arg0, arg1, arg2 any) *gomock.Call

AddMessageToTxn indicates an expected call of AddMessageToTxn.

func (*MockAsyncProducerMockRecorder) AddOffsetsToTxn

func (mr *MockAsyncProducerMockRecorder) AddOffsetsToTxn(arg0, arg1 any) *gomock.Call

AddOffsetsToTxn indicates an expected call of AddOffsetsToTxn.

func (*MockAsyncProducerMockRecorder) AsyncClose

func (mr *MockAsyncProducerMockRecorder) AsyncClose() *gomock.Call

AsyncClose indicates an expected call of AsyncClose.

func (*MockAsyncProducerMockRecorder) BeginTxn

func (mr *MockAsyncProducerMockRecorder) BeginTxn() *gomock.Call

BeginTxn indicates an expected call of BeginTxn.

func (*MockAsyncProducerMockRecorder) Close

Close indicates an expected call of Close.

func (*MockAsyncProducerMockRecorder) CommitTxn

func (mr *MockAsyncProducerMockRecorder) CommitTxn() *gomock.Call

CommitTxn indicates an expected call of CommitTxn.

func (*MockAsyncProducerMockRecorder) Errors

Errors indicates an expected call of Errors.

func (*MockAsyncProducerMockRecorder) Input

Input indicates an expected call of Input.

func (*MockAsyncProducerMockRecorder) IsTransactional

func (mr *MockAsyncProducerMockRecorder) IsTransactional() *gomock.Call

IsTransactional indicates an expected call of IsTransactional.

func (*MockAsyncProducerMockRecorder) Successes

func (mr *MockAsyncProducerMockRecorder) Successes() *gomock.Call

Successes indicates an expected call of Successes.

func (*MockAsyncProducerMockRecorder) TxnStatus

func (mr *MockAsyncProducerMockRecorder) TxnStatus() *gomock.Call

TxnStatus indicates an expected call of TxnStatus.

type MockIAsyncProducer

type MockIAsyncProducer struct {
	// contains filtered or unexported fields
}

MockIAsyncProducer is a mock of IAsyncProducer interface.

func NewMockIAsyncProducer

func NewMockIAsyncProducer(ctrl *gomock.Controller) *MockIAsyncProducer

NewMockIAsyncProducer creates a new mock instance.

func (*MockIAsyncProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockIAsyncProducer) SendMessage

func (m *MockIAsyncProducer) SendMessage(ctx context.Context, msg *sarama.ProducerMessage)

SendMessage mocks base method.

func (*MockIAsyncProducer) SendMessages

func (m *MockIAsyncProducer) SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage)

SendMessages mocks base method.

type MockIAsyncProducerMockRecorder

type MockIAsyncProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockIAsyncProducerMockRecorder is the mock recorder for MockIAsyncProducer.

func (*MockIAsyncProducerMockRecorder) SendMessage

func (mr *MockIAsyncProducerMockRecorder) SendMessage(ctx, msg any) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockIAsyncProducerMockRecorder) SendMessages

func (mr *MockIAsyncProducerMockRecorder) SendMessages(ctx, msgs any) *gomock.Call

SendMessages indicates an expected call of SendMessages.

type MockIErrorLogger

type MockIErrorLogger struct {
	// contains filtered or unexported fields
}

MockIErrorLogger is a mock of IErrorLogger interface.

func NewMockIErrorLogger

func NewMockIErrorLogger(ctrl *gomock.Controller) *MockIErrorLogger

NewMockIErrorLogger creates a new mock instance.

func (*MockIErrorLogger) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockIErrorLogger) LogError

func (m *MockIErrorLogger) LogError(ctx context.Context, err error)

LogError mocks base method.

type MockIErrorLoggerMockRecorder

type MockIErrorLoggerMockRecorder struct {
	// contains filtered or unexported fields
}

MockIErrorLoggerMockRecorder is the mock recorder for MockIErrorLogger.

func (*MockIErrorLoggerMockRecorder) LogError

func (mr *MockIErrorLoggerMockRecorder) LogError(ctx, err any) *gomock.Call

LogError indicates an expected call of LogError.

type MockISyncProducer

type MockISyncProducer struct {
	// contains filtered or unexported fields
}

MockISyncProducer is a mock of ISyncProducer interface.

func NewMockISyncProducer

func NewMockISyncProducer(ctrl *gomock.Controller) *MockISyncProducer

NewMockISyncProducer creates a new mock instance.

func (*MockISyncProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockISyncProducer) SendMessage

func (m *MockISyncProducer) SendMessage(ctx context.Context, msg *sarama.ProducerMessage) (int32, int64, error)

SendMessage mocks base method.

func (*MockISyncProducer) SendMessages

func (m *MockISyncProducer) SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage) error

SendMessages mocks base method.

type MockISyncProducerMockRecorder

type MockISyncProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockISyncProducerMockRecorder is the mock recorder for MockISyncProducer.

func (*MockISyncProducerMockRecorder) SendMessage

func (mr *MockISyncProducerMockRecorder) SendMessage(ctx, msg any) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockISyncProducerMockRecorder) SendMessages

func (mr *MockISyncProducerMockRecorder) SendMessages(ctx, msgs any) *gomock.Call

SendMessages indicates an expected call of SendMessages.

type MockITelemetry

type MockITelemetry struct {
	// contains filtered or unexported fields
}

MockITelemetry is a mock of ITelemetry interface.

func NewMockITelemetry

func NewMockITelemetry(ctrl *gomock.Controller) *MockITelemetry

NewMockITelemetry creates a new mock instance.

func (*MockITelemetry) CollectWriteLatency

func (m *MockITelemetry) CollectWriteLatency(ctx context.Context, serviceName string, duration time.Duration, topic string, partition int32, typeName ProducerType, clientID string, success bool)

CollectWriteLatency mocks base method.

func (*MockITelemetry) CollectWriteSize

func (m *MockITelemetry) CollectWriteSize(ctx context.Context, serviceName string, size int, topic string, partition int32, typeName ProducerType, clientID string, success bool)

CollectWriteSize mocks base method.

func (*MockITelemetry) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockITelemetryMockRecorder

type MockITelemetryMockRecorder struct {
	// contains filtered or unexported fields
}

MockITelemetryMockRecorder is the mock recorder for MockITelemetry.

func (*MockITelemetryMockRecorder) CollectWriteLatency

func (mr *MockITelemetryMockRecorder) CollectWriteLatency(ctx, serviceName, duration, topic, partition, typeName, clientID, success any) *gomock.Call

CollectWriteLatency indicates an expected call of CollectWriteLatency.

func (*MockITelemetryMockRecorder) CollectWriteSize

func (mr *MockITelemetryMockRecorder) CollectWriteSize(ctx, serviceName, size, topic, partition, typeName, clientID, success any) *gomock.Call

CollectWriteSize indicates an expected call of CollectWriteSize.

type MockSyncProducer

type MockSyncProducer struct {
	// contains filtered or unexported fields
}

MockSyncProducer is a mock of SyncProducer interface.

func NewMockSyncProducer

func NewMockSyncProducer(ctrl *gomock.Controller) *MockSyncProducer

NewMockSyncProducer creates a new mock instance.

func (*MockSyncProducer) AbortTxn

func (m *MockSyncProducer) AbortTxn() error

AbortTxn mocks base method.

func (*MockSyncProducer) AddMessageToTxn

func (m *MockSyncProducer) AddMessageToTxn(arg0 *sarama.ConsumerMessage, arg1 string, arg2 *string) error

AddMessageToTxn mocks base method.

func (*MockSyncProducer) AddOffsetsToTxn

func (m *MockSyncProducer) AddOffsetsToTxn(arg0 map[string][]*sarama.PartitionOffsetMetadata, arg1 string) error

AddOffsetsToTxn mocks base method.

func (*MockSyncProducer) BeginTxn

func (m *MockSyncProducer) BeginTxn() error

BeginTxn mocks base method.

func (*MockSyncProducer) Close

func (m *MockSyncProducer) Close() error

Close mocks base method.

func (*MockSyncProducer) CommitTxn

func (m *MockSyncProducer) CommitTxn() error

CommitTxn mocks base method.

func (*MockSyncProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSyncProducer) IsTransactional

func (m *MockSyncProducer) IsTransactional() bool

IsTransactional mocks base method.

func (*MockSyncProducer) SendMessage

func (m *MockSyncProducer) SendMessage(arg0 *sarama.ProducerMessage) (int32, int64, error)

SendMessage mocks base method.

func (*MockSyncProducer) SendMessages

func (m *MockSyncProducer) SendMessages(arg0 []*sarama.ProducerMessage) error

SendMessages mocks base method.

func (*MockSyncProducer) TxnStatus

TxnStatus mocks base method.

type MockSyncProducerMockRecorder

type MockSyncProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockSyncProducerMockRecorder is the mock recorder for MockSyncProducer.

func (*MockSyncProducerMockRecorder) AbortTxn

func (mr *MockSyncProducerMockRecorder) AbortTxn() *gomock.Call

AbortTxn indicates an expected call of AbortTxn.

func (*MockSyncProducerMockRecorder) AddMessageToTxn

func (mr *MockSyncProducerMockRecorder) AddMessageToTxn(arg0, arg1, arg2 any) *gomock.Call

AddMessageToTxn indicates an expected call of AddMessageToTxn.

func (*MockSyncProducerMockRecorder) AddOffsetsToTxn

func (mr *MockSyncProducerMockRecorder) AddOffsetsToTxn(arg0, arg1 any) *gomock.Call

AddOffsetsToTxn indicates an expected call of AddOffsetsToTxn.

func (*MockSyncProducerMockRecorder) BeginTxn

func (mr *MockSyncProducerMockRecorder) BeginTxn() *gomock.Call

BeginTxn indicates an expected call of BeginTxn.

func (*MockSyncProducerMockRecorder) Close

Close indicates an expected call of Close.

func (*MockSyncProducerMockRecorder) CommitTxn

func (mr *MockSyncProducerMockRecorder) CommitTxn() *gomock.Call

CommitTxn indicates an expected call of CommitTxn.

func (*MockSyncProducerMockRecorder) IsTransactional

func (mr *MockSyncProducerMockRecorder) IsTransactional() *gomock.Call

IsTransactional indicates an expected call of IsTransactional.

func (*MockSyncProducerMockRecorder) SendMessage

func (mr *MockSyncProducerMockRecorder) SendMessage(arg0 any) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockSyncProducerMockRecorder) SendMessages

func (mr *MockSyncProducerMockRecorder) SendMessages(arg0 any) *gomock.Call

SendMessages indicates an expected call of SendMessages.

func (*MockSyncProducerMockRecorder) TxnStatus

func (mr *MockSyncProducerMockRecorder) TxnStatus() *gomock.Call

TxnStatus indicates an expected call of TxnStatus.

type Option

type Option func(*baseProducer)

Option option for Producer.

func WithConfig

func WithConfig(config *sarama.Config) Option

WithConfig sets producer configuration.

func WithErrorLogger

func WithErrorLogger(errorLogger IErrorLogger) Option

WithErrorLogger sets error logger interface for producer.

func WithName

func WithName(name string) Option

WithName sets producer name.

func WithRestartPolicy

func WithRestartPolicy(policy backoff.BackOff) Option

WithRestartPolicy sets producer restart policy on error.

func WithTelemetry

func WithTelemetry(telemetry ITelemetry) Option

WithTelemetry sets telemetry interface for producer.

type ProducerType

type ProducerType string
const (
	SyncProducerType  ProducerType = "sync-producer"
	AsyncProducerType ProducerType = "async-producer"
)

type StubTelemetry

type StubTelemetry struct{}

StubTelemetry is a no-op implementation of ITelemetry interface.

func (*StubTelemetry) CollectWriteLatency

func (s *StubTelemetry) CollectWriteLatency(
	ctx context.Context,
	serviceName string,
	duration time.Duration,
	topic string,
	partition int32,
	typeName ProducerType,
	clientID string,
	success bool,
)

CollectWriteLatency does nothing.

func (*StubTelemetry) CollectWriteSize

func (s *StubTelemetry) CollectWriteSize(
	ctx context.Context,
	serviceName string,
	size int,
	topic string,
	partition int32,
	typeName ProducerType,
	clientID string,
	success bool,
)

CollectWriteSize does nothing.

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

SyncProducer synchronous producer.

func NewSyncProducer

func NewSyncProducer(_ context.Context, serviceName string, brokers []string, opts ...Option,
) (*SyncProducer, error)

NewSyncProducer creates a new instance of synchronous producer.

func (*SyncProducer) SendMessage

func (s *SyncProducer) SendMessage(ctx context.Context,
	msg *sarama.ProducerMessage,
) (partition int32, offset int64, err error)

SendMessage synchronously sends a message to the queue.

func (*SyncProducer) SendMessages

func (s *SyncProducer) SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage) (err error)

SendMessages synchronously sends messages to the queue.

func (*SyncProducer) Start

func (s *SyncProducer) Start(_ context.Context) error

Start starts the producer.

func (*SyncProducer) Stop

func (s *SyncProducer) Stop(_ context.Context) error

Stop stops the producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL