kafka

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2023 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(options *Options)

Option function type

func WithBrokers

func WithBrokers(brokers ...string) Option

WithBrokers overrides the list of brokers to connect

func WithClientID

func WithClientID(clientID string) Option

WithClientID value

func WithCompression

func WithCompression(codec sarama.CompressionCodec, level int) Option

WithCompression of the pipe

func WithErrorHandler

func WithErrorHandler(h nc.ErrorHandler) Option

WithErrorHandler set handler of error processing

func WithFlashFrequency

func WithFlashFrequency(frequency time.Duration) Option

WithFlashFrequency of flushing

func WithFlashMessages

func WithFlashMessages(messageCount int) Option

WithFlashMessages minimal count

func WithGroupName

func WithGroupName(name string) Option

WithGroupName of the message consuming

func WithKafkaURL

func WithKafkaURL(urlString string) Option

WithKafkaURL is an Option to set the URL the client should connect to. The url can contain username/password semantics. e.g. kafka://derek:pass@localhost:4222/{groupName}?topics=topic1,topic2 Comma separated arrays are also supported, e.g. urlA, urlB.

func WithKafkaVersion

func WithKafkaVersion(version sarama.KafkaVersion) Option

WithKafkaVersion minimal version

func WithPanicHandler

func WithPanicHandler(h nc.PanicHandler) Option

WithPanicHandler set handler of panic processing

func WithPublisherErrorHandler

func WithPublisherErrorHandler(h PublisherErrorHandler) Option

WithPublisherErrorHandler set handler of the sarama errors

func WithPublisherSuccessHandler

func WithPublisherSuccessHandler(h PublisherSuccessHandler) Option

WithPublisherSuccessHandler set handler of the sarama success

func WithSaramaConfig

func WithSaramaConfig(streamConfig *sarama.Config) Option

WithSaramaConfig custom config

func WithTopics

func WithTopics(topics ...string) Option

WithTopics will set the list of topics for publishing or subscribing

type Options

type Options struct {
	ClusterConfig sarama.Config

	// IsSynchronous type of producer
	// TODO: make it work for sync publisher
	IsSynchronous bool

	// Brokers contains list of broker hosts with port
	Brokers []string

	// Name of the subscription group
	GroupName string

	// Names of topics for subscribing or publishing
	Topics []string

	// ErrorHandler of message processing
	ErrorHandler nc.ErrorHandler

	// PanicHandler process panic
	PanicHandler nc.PanicHandler

	// PublisherErrorHandler provides handler of message send errors
	PublisherErrorHandler PublisherErrorHandler

	// PublisherSuccessHandler provides handler of message send success
	PublisherSuccessHandler PublisherSuccessHandler

	// Message encoder interface
	Encoder encoder.Encoder

	// Logger of subscriber
	Logger loggerInterface
}

Options for publisher or subscriber

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher implementation of Publisher interface for the Kafka driver

func MustNewPublisher

func MustNewPublisher(ctx context.Context, options ...Option) *Publisher

MustNewPublisher connection or panic

func NewPublisher

func NewPublisher(ctx context.Context, options ...Option) (*Publisher, error)

NewPublisher to the kafka with some brokers and topics for sending

func (*Publisher) Close

func (p *Publisher) Close() error

Close kafka producer

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, messages ...any) (err error)

Publish one or more messages to the pub-service

type PublisherErrorHandler

type PublisherErrorHandler func(*sarama.ProducerError)

PublisherErrorHandler callback function

type PublisherSuccessHandler

type PublisherSuccessHandler func(*sarama.ProducerMessage)

PublisherSuccessHandler callback function

type Subscriber

type Subscriber struct {
	notificationcenter.ModelSubscriber
	// contains filtered or unexported fields
}

Subscriber for kafka

func NewSubscriber

func NewSubscriber(options ...Option) (*Subscriber, error)

NewSubscriber connection to kafka "group" from list of topics

func (*Subscriber) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close kafka consumer

func (*Subscriber) ConsumeClaim

func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Subscriber) Listen

func (s *Subscriber) Listen(ctx context.Context) error

Listen kafka consumer

func (*Subscriber) Setup

Setup is run at the beginning of a new session, before ConsumeClaim.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL