Documentation
¶
Index ¶
- func NewKafkaConsumer(address []string, topics string, opts SubscribeOptions) (*kgo.Client, error)
- func NewKafkaProducer(address []string, autoAck ...bool) (*kgo.Client, error)
- func WithKafkaKey(ctx context.Context, key string) context.Context
- func WithKafkaPartition(ctx context.Context, partition int32) context.Context
- type Broker
- type Event
- type Handler
- type KafkaBroker
- type KafkaSubscriber
- type Option
- type Options
- type SubscribeOption
- type SubscribeOptions
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewKafkaConsumer ¶
func NewKafkaProducer ¶
Types ¶
type Broker ¶
type Broker interface { Connect() error Disconnect() error Publish(ctx context.Context, topic string, m *transport.Message) error Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) }
Broker is an interface used for asynchronous messaging.
type Handler ¶
Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.
type KafkaBroker ¶
type KafkaBroker struct {
// contains filtered or unexported fields
}
func NewKafkaBroker ¶
func NewKafkaBroker(opts ...Option) *KafkaBroker
func (*KafkaBroker) Connect ¶
func (k *KafkaBroker) Connect() error
func (*KafkaBroker) Disconnect ¶
func (k *KafkaBroker) Disconnect() error
func (*KafkaBroker) Subscribe ¶
func (k *KafkaBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)
type KafkaSubscriber ¶
type KafkaSubscriber struct {
// contains filtered or unexported fields
}
func (*KafkaSubscriber) Topic ¶
func (s *KafkaSubscriber) Topic() string
func (*KafkaSubscriber) Unsubscribe ¶
func (s *KafkaSubscriber) Unsubscribe() error
type Option ¶
type Option func(*Options)
func ErrorHandler ¶
ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors.
type Options ¶
type Options struct { // Registry s Registry micro.Registry // Handler executed when error happens in broker message // processing ErrorHandler func(uint8, *kgo.Record, error) TLSConfig *tls.Config Address []string Secure bool }
func NewOptions ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
func UnmarshalHander ¶
func UnmarshalHander(unmarshal func([]byte) (*transport.Message, error)) SubscribeOption
func WithQueue ¶
func WithQueue(name string) SubscribeOption
WithQueue sets the name of the queue to share messages on.
type SubscribeOptions ¶
type SubscribeOptions struct { // Subscribers with the same queue name // will create a shared subscription where each // receives a subset of messages. Queue string // AutoAck defaults to true. When a handler returns // with a nil error the message is acked. AutoAck bool // 解析 Unmarshal func([]byte) (*transport.Message, error) }
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
type Subscriber ¶
Subscriber is a convenience return type for the Subscribe method.
Click to show internal directories.
Click to hide internal directories.