Documentation
¶
Overview ¶
Package broker is an interface used for asynchronous messaging
Index ¶
- Variables
- func BodyOnly(b bool) options.Option
- func ErrorHandler(h interface{}) options.Option
- func NewContext(ctx context.Context, s Broker) context.Context
- func PublishTopic(t string) options.Option
- func SubscribeAutoAck(b bool) options.Option
- func SubscribeBatchSize(n int) options.Option
- func SubscribeBatchWait(td time.Duration) options.Option
- func SubscribeQueueGroup(n string) options.Option
- func ValidateSubscriber(sub interface{}) error
- type Broker
- type MemoryBroker
- func (m *MemoryBroker) Address() string
- func (m *MemoryBroker) Connect(ctx context.Context) error
- func (m *MemoryBroker) Disconnect(ctx context.Context) error
- func (m *MemoryBroker) Init(opts ...options.Option) error
- func (m *MemoryBroker) Name() string
- func (m *MemoryBroker) Options() Options
- func (m *MemoryBroker) Publish(ctx context.Context, message interface{}, opts ...options.Option) error
- func (m *MemoryBroker) String() string
- func (m *MemoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error)
- type Message
- type MessageHandler
- type MessagesHandler
- type Options
- type PublishOptions
- type SubscribeOptions
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotConnected returns when broker used but not connected yet ErrNotConnected = errors.New("broker not connected") // ErrDisconnected returns when broker disconnected ErrDisconnected = errors.New("broker disconnected") // ErrInvalidMessage returns when message has nvalid format ErrInvalidMessage = errors.New("broker message has invalid format") )
Functions ¶
func ErrorHandler ¶
ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors
func NewContext ¶
NewContext savess broker in context
func PublishTopic ¶ added in v4.0.6
PublishTopic pass topic for messages
func SubscribeAutoAck ¶
SubscribeAutoAck contol auto acking of messages after they have been handled.
func SubscribeBatchSize ¶
SubscribeBatchSize specifies max batch size
func SubscribeBatchWait ¶
SubscribeBatchWait specifies max batch wait time
func SubscribeQueueGroup ¶ added in v4.0.2
SubscribeQueueGroup sets the shared queue name distributed messages across subscribers
func ValidateSubscriber ¶ added in v4.0.2
func ValidateSubscriber(sub interface{}) error
ValidateSubscriber func signature
Types ¶
type Broker ¶
type Broker interface { // Name returns broker instance name Name() string // Init initilize broker Init(opts ...options.Option) error // Options returns broker options Options() Options // Address return configured address Address() string // Connect connects to broker Connect(ctx context.Context) error // Disconnect disconnect from broker Disconnect(ctx context.Context) error // Publish message, msg can be single broker.Message or []broker.Message Publish(ctx context.Context, msg interface{}, opts ...options.Option) error // Subscribe subscribes to topic message via handler Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error) // String type of broker String() string }
Broker is an interface used for asynchronous messaging.
type MemoryBroker ¶ added in v4.0.6
func NewBroker ¶
func NewBroker(opts ...options.Option) *MemoryBroker
NewBroker return new memory broker
func (*MemoryBroker) Address ¶ added in v4.0.6
func (m *MemoryBroker) Address() string
func (*MemoryBroker) Connect ¶ added in v4.0.6
func (m *MemoryBroker) Connect(ctx context.Context) error
func (*MemoryBroker) Disconnect ¶ added in v4.0.6
func (m *MemoryBroker) Disconnect(ctx context.Context) error
func (*MemoryBroker) Init ¶ added in v4.0.6
func (m *MemoryBroker) Init(opts ...options.Option) error
func (*MemoryBroker) Name ¶ added in v4.0.6
func (m *MemoryBroker) Name() string
func (*MemoryBroker) Options ¶ added in v4.0.6
func (m *MemoryBroker) Options() Options
func (*MemoryBroker) String ¶ added in v4.0.6
func (m *MemoryBroker) String() string
func (*MemoryBroker) Subscribe ¶ added in v4.0.6
func (m *MemoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error)
type Message ¶
type Message interface { // Context for the message Context() context.Context // Topic Topic() string // Header returns message headers Header() metadata.Metadata // Body returns broker message may be []byte slice or some go struct Body() interface{} // Ack acknowledge message Ack() error // Error returns message error (like decoding errors or some other) // In this case Body contains raw []byte from broker Error() error }
Message is given to a subscription handler for processing
type MessageHandler ¶ added in v4.0.6
MessageHandler func signature for single message processing
type MessagesHandler ¶ added in v4.0.6
MessagesHandler func signature for batch message processing
type Options ¶
type Options struct { // Tracer used for tracing Tracer tracer.Tracer // Register can be used for clustering Register register.Register // Codecs holds the codec for marshal/unmarshal Codecs map[string]codec.Codec // Logger used for logging Logger logger.Logger // Meter used for metrics Meter meter.Meter // Context holds external options Context context.Context // TLSConfig holds tls.TLSConfig options TLSConfig *tls.Config // ErrorHandler used when broker have error while processing message ErrorHandler interface{} // Name holds the broker name Name string // Address holds the broker address Address []string }
Options struct
type PublishOptions ¶
type PublishOptions struct { // Context holds external options Context context.Context // BodyOnly flag says the message contains raw body bytes BodyOnly bool // Message metadata usually passed as message headers Metadata metadata.Metadata // Content-Type of message for marshal ContentType string // Topic destination Topic string }
PublishOptions struct
func NewPublishOptions ¶
func NewPublishOptions(opts ...options.Option) PublishOptions
NewPublishOptions creates PublishOptions struct
type SubscribeOptions ¶
type SubscribeOptions struct { // Context holds external options Context context.Context // ErrorHandler used when broker have error while processing message ErrorHandler interface{} // QueueGroup holds consumer group QueueGroup string // AutoAck flag specifies auto ack of incoming message when no error happens AutoAck bool // BodyOnly flag specifies that message contains only body bytes without header BodyOnly bool // BatchSize flag specifies max batch size BatchSize int // BatchWait flag specifies max wait time for batch filling BatchWait time.Duration }
SubscribeOptions struct
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...options.Option) SubscribeOptions
NewSubscribeOptions creates new SubscribeOptions
type Subscriber ¶
type Subscriber interface { // Options returns subscriber options Options() SubscribeOptions // Topic returns topic for subscription Topic() string // Unsubscribe from topic Unsubscribe(ctx context.Context) error }
Subscriber is a convenience return type for the Subscribe method