broker

package
v0.0.0-...-3e22abe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewKafkaConsumer

func NewKafkaConsumer(address []string, topics string, opts SubscribeOptions) (*kgo.Client, error)

func NewKafkaProducer

func NewKafkaProducer(address []string, autoAck ...bool) (*kgo.Client, error)

func WithKafkaKey

func WithKafkaKey(ctx context.Context, key string) context.Context

func WithKafkaPartition

func WithKafkaPartition(ctx context.Context, partition int32) context.Context

Types

type Broker

type Broker interface {
	Connect() error
	Disconnect() error
	Publish(ctx context.Context, topic string, m *transport.Message) error
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
}

Broker is an interface used for asynchronous messaging.

type Event

type Event interface {
	Message() *transport.Message
	Ack() error
}

Event is given to a subscription handler for processing.

type Handler

type Handler func(Event) error

Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.

type KafkaBroker

type KafkaBroker struct {
	// contains filtered or unexported fields
}

func NewKafkaBroker

func NewKafkaBroker(opts ...Option) *KafkaBroker

func (*KafkaBroker) Connect

func (k *KafkaBroker) Connect() error

func (*KafkaBroker) Disconnect

func (k *KafkaBroker) Disconnect() error

func (*KafkaBroker) Publish

func (k *KafkaBroker) Publish(ctx context.Context, topic string, msg *transport.Message) error

func (*KafkaBroker) Subscribe

func (k *KafkaBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)

type KafkaSubscriber

type KafkaSubscriber struct {
	// contains filtered or unexported fields
}

func (*KafkaSubscriber) Topic

func (s *KafkaSubscriber) Topic() string

func (*KafkaSubscriber) Unsubscribe

func (s *KafkaSubscriber) Unsubscribe() error

type Option

type Option func(*Options)

func Address

func Address(address ...string) Option

Address Address address sets the host addresses to be used by the broker.

func ErrorHandler

func ErrorHandler(fallback func(uint8, *kgo.Record, error)) Option

ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors.

func Registry

func Registry(registry micro.Registry) Option

func Secure

func Secure(b bool) Option

Secure communication with the broker.

type Options

type Options struct {
	// Registry s
	Registry micro.Registry

	// Handler executed when error happens in broker message
	// processing
	ErrorHandler func(uint8, *kgo.Record, error)

	TLSConfig *tls.Config
	Address   []string
	Secure    bool
}

func NewOptions

func NewOptions(opts ...Option) *Options

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

func UnmarshalHander

func UnmarshalHander(unmarshal func([]byte) (*transport.Message, error)) SubscribeOption

func WithQueue

func WithQueue(name string) SubscribeOption

WithQueue sets the name of the queue to share messages on.

type SubscribeOptions

type SubscribeOptions struct {

	// Subscribers with the same queue name
	// will create a shared subscription where each
	// receives a subset of messages.
	Queue string

	// AutoAck defaults to true. When a handler returns
	// with a nil error the message is acked.
	AutoAck bool

	// 解析
	Unmarshal func([]byte) (*transport.Message, error)
}

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

type Subscriber

type Subscriber interface {
	Topic() string
	Unsubscribe() error
}

Subscriber is a convenience return type for the Subscribe method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL