kafka

package
v1.0.0-beta.168 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PartitionKeyMetadataKey = "x-kafka-partition-key"
)

Variables

This section is empty.

Functions

func AddPartitionKeyFromSubject

func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error)

AddPartitionKeyFromSubject adds partition key to the message based on the CloudEvent subject.

func NewPublisher

func NewPublisher(ctx context.Context, in PublisherOptions) (*kafka.Publisher, error)

func NewSubscriber

func NewSubscriber(in SubscriberOptions) (message.Subscriber, error)

Types

type AutoProvisionTopic

type AutoProvisionTopic struct {
	Topic         string
	NumPartitions int32
	Retention     time.Duration
}

type BrokerOptions

type BrokerOptions struct {
	KafkaConfig  config.KafkaConfiguration
	ClientID     string
	Logger       *slog.Logger
	MetricMeter  otelmetric.Meter
	MeterPrefix  string
	DebugLogging bool
}

func (*BrokerOptions) Validate

func (o *BrokerOptions) Validate() error

type LoggerFunc

type LoggerFunc func(fmt string, args ...any)

type PublisherOptions

type PublisherOptions struct {
	Broker          BrokerOptions
	ProvisionTopics []AutoProvisionTopic
}

func (*PublisherOptions) Validate

func (o *PublisherOptions) Validate() error

type SaramaLoggerAdaptor

type SaramaLoggerAdaptor struct {
	// contains filtered or unexported fields
}

func (*SaramaLoggerAdaptor) Print

func (s *SaramaLoggerAdaptor) Print(v ...interface{})

func (*SaramaLoggerAdaptor) Printf

func (s *SaramaLoggerAdaptor) Printf(format string, v ...interface{})

func (*SaramaLoggerAdaptor) Println

func (s *SaramaLoggerAdaptor) Println(v ...interface{})

type SubscriberOptions

type SubscriberOptions struct {
	Broker            BrokerOptions
	ConsumerGroupName string
}

func (*SubscriberOptions) Validate

func (o *SubscriberOptions) Validate() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL