broker

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultCodec encoding.Codec = nil
)

Functions

func Marshal

func Marshal(codec encoding.Codec, msg Any) ([]byte, error)

func Unmarshal

func Unmarshal(codec encoding.Codec, inputData []byte, outValue interface{}) error

Types

type Any

type Any interface{}

type Binder

type Binder func() Any

type Broker

type Broker interface {
	Name() string

	Options() Options

	Address() string

	Init(...Option) error

	Connect() error

	Disconnect() error

	Publish(ctx context.Context, topic string, msg Any, opts ...PublishOption) error

	Subscribe(topic string, handler Handler, binder Binder, opts ...SubscribeOption) (Subscriber, error)

	Request(ctx context.Context, topic string, msg Any, opts ...RequestOption) (Any, error)
}

type Event

type Event interface {
	Topic() string

	Message() *Message
	RawMessage() interface{}

	Ack() error

	Error() error
}

type Handler

type Handler func(ctx context.Context, evt Event) error

type Headers

type Headers map[string]string

type Message

type Message struct {
	Headers   Headers
	Body      Any
	Partition int
	Offset    int64
	Msg       Any
}

func (Message) GetHeader

func (m Message) GetHeader(key string) string

func (Message) GetHeaders

func (m Message) GetHeaders() Headers

type MiddlewareFunc

type MiddlewareFunc func(Handler) Handler

type Option

type Option func(*Options)

func OptionContextWithValue

func OptionContextWithValue(k, v interface{}) Option

func WithAddress

func WithAddress(addressList ...string) Option

WithAddress set broker address

func WithCodec

func WithCodec(name string) Option

WithCodec set codec, support: json, proto.

func WithEnableSecure

func WithEnableSecure(enable bool) Option

func WithErrorHandler

func WithErrorHandler(handler Handler) Option

func WithGlobalPropagator

func WithGlobalPropagator() Option

func WithGlobalTracerProvider

func WithGlobalTracerProvider() Option

func WithOptionContext

func WithOptionContext(ctx context.Context) Option

func WithPropagator

func WithPropagator(propagators propagation.TextMapPropagator) Option

func WithTLSConfig

func WithTLSConfig(config *tls.Config) Option

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider, tracerName string) Option

type Options

type Options struct {
	Addrs []string

	Codec encoding.Codec

	ErrorHandler Handler

	Secure    bool
	TLSConfig *tls.Config

	Context context.Context

	Tracings []tracing.Option
}

func NewOptions

func NewOptions() Options

func NewOptionsAndApply

func NewOptionsAndApply(opts ...Option) Options

func (*Options) Apply

func (o *Options) Apply(opts ...Option)

type PublishOption

type PublishOption func(*PublishOptions)

func PublishContextWithValue

func PublishContextWithValue(k, v interface{}) PublishOption

func WithPublishContext

func WithPublishContext(ctx context.Context) PublishOption

type PublishOptions

type PublishOptions struct {
	Context context.Context
}

func NewPublishOptions

func NewPublishOptions(opts ...PublishOption) PublishOptions

func (*PublishOptions) Apply

func (o *PublishOptions) Apply(opts ...PublishOption)

type RequestOption

type RequestOption func(*RequestOptions)

func RequestContextWithValue

func RequestContextWithValue(k, v interface{}) RequestOption

func WithRequestContext

func WithRequestContext(ctx context.Context) RequestOption

type RequestOptions

type RequestOptions struct {
	Context context.Context
}

func NewRequestOptions

func NewRequestOptions(opts ...RequestOption) RequestOptions

func (*RequestOptions) Apply

func (o *RequestOptions) Apply(opts ...RequestOption)

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

func SubscribeContextWithValue

func SubscribeContextWithValue(k, v interface{}) SubscribeOption

func WithQueueName

func WithQueueName(name string) SubscribeOption

func WithSubscribeContext

func WithSubscribeContext(ctx context.Context) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	AutoAck bool
	Queue   string
	Context context.Context
}

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

func (*SubscribeOptions) Apply

func (o *SubscribeOptions) Apply(opts ...SubscribeOption)

type Subscriber

type Subscriber interface {
	// Options .
	Options() SubscribeOptions

	// Topic .
	Topic() string

	// Unsubscribe .
	Unsubscribe(removeFromManager bool) error
}

Subscriber .

func Subscribe

func Subscribe[T any](broker Broker, topic string, handler func(context.Context, string, Headers, *T) error, opts ...SubscribeOption) (Subscriber, error)

type SubscriberMap

type SubscriberMap map[string]Subscriber

type SubscriberSyncMap

type SubscriberSyncMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSubscriberSyncMap

func NewSubscriberSyncMap() *SubscriberSyncMap

func (*SubscriberSyncMap) Add

func (sm *SubscriberSyncMap) Add(topic string, sub Subscriber)

func (*SubscriberSyncMap) Clear

func (sm *SubscriberSyncMap) Clear()

func (*SubscriberSyncMap) ForceClear

func (sm *SubscriberSyncMap) ForceClear()

func (*SubscriberSyncMap) Foreach

func (sm *SubscriberSyncMap) Foreach(fnc func(topic string, sub Subscriber))

func (*SubscriberSyncMap) Get

func (sm *SubscriberSyncMap) Get(topic string) Subscriber

func (*SubscriberSyncMap) Remove

func (sm *SubscriberSyncMap) Remove(topic string) error

func (*SubscriberSyncMap) RemoveOnly

func (sm *SubscriberSyncMap) RemoveOnly(topic string) bool

Directories

Path Synopsis
kafka module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL