azbus

package
v0.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2025 License: MIT Imports: 14 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DebugLevel = logger.DebugLevel
)
View Source
const (
	// DefaultRenewalTime is the how often we want to renew the message PEEK lock
	// If RenewMessageLock is true then this is the default value for RenewMessageTime.
	//
	// Note that the default aligns with the default value for topics and queues in Azure Service Bus.
	// Unless the topic or queue has been configured with a different value, you should not need to change this.
	//
	// Inspection of the topics and subscription shows that the PeekLock timeout is one minute.
	//
	// This clarifies the peeklock duration as 60s: https://github.com/MicrosoftDocs/azure-docs/issues/106047
	//
	// "The default lock duration is indeed 1 minute, we will get this updated in our documentation."
	// "As for your question about RenewLock, it's best to set the lock duration to something higher than your normal"
	// "processing time, so you don't have to call the RenewLock. Note that the maximum value is 5 minutes, so you will"
	// "need to call RenewLock if you want to have this longer. Also note that having a longer lock duration then needed"
	// "has some implications as well, f.e. when your client stops working, the message will only become available again"
	// "after the lock duration has passed."
	//
	// An analysis of elapsed times when processing msgs shows no message takes longer than 10s to process during our
	// normal test suites.
	//
	// Set to 50 seconds, well within the 60 seconds peek lock timeout
	DefaultRenewalTime = 50 * time.Second
)

Variables

View Source
var (
	ErrConnectionLost     = errors.New("connection lost")
	ErrLockLost           = errors.New("lock lost")
	ErrUnauthorizedAccess = errors.New("unauthorized")
	ErrTimeout            = errors.New("timeout")
)

Azure package expects the user to elucidate errors like so:

    var servicebusError *azservicebus.Error
    if errors.As(err, &servicebusError) && servicebusError.code == azservicebus.CodeUnauthorizedAccess {
	         ...

which is rather clumsy.

This code maps the internal code to an actual error so one can:

if errors.Is(err, azbus.ErrConnectionLost) {
    ...

which is easier and more idiomatic

View Source
var (
	ErrMessageOversized = errors.New("message is too large")
)
View Source
var (
	ErrNoHandler = errors.New("no handler defined")
)
View Source
var (
	ErrPeekLockTimeout = errors.New("peeklock deadline reached")
)

Set a timeout for processing the message, this should be no later than the message lock time. It is quite surprising that the azure service bus package does not add a deadline to the context input to the message handler.

NB: this has no effect as cancellaton is removed to get the azure sdk for go retry

logic which increases reliability.

Inspection of logs shows that the deadline is always 60s in the future which we will never exceed.

The use of the context returned here is problematic. Inspection of code that uses it shows that submethods called do not generally obey cancellation - they do not even have a context.Context as first argument.

Code that follows from calling this method should be wrapped in a select statement that terminates when the timeout expires - i.e. waits on ctx.Done(). Even this is not bulletproof as it is unclear how to terminate any of these submethods.

Probably the best solution is to remove this entirely and rely on RenewMessageLock. If it does timeout then it is too late anyway as the peeklock will already be released.

for the time being we impose a timeout as it is safe.

Functions

func EnableAzureLogging

func EnableAzureLogging(log Logger)

EnableAzureLogging emits log messages using local logger. This must be called before any senders or receivers are opened. TODO: Generalise this for any azure downstream package.

func NewAzbusError

func NewAzbusError(err error) error

func OutMessageProperties added in v0.13.3

func OutMessageProperties(o *OutMessage) map[string]any

func OutMessageSetProperty added in v0.13.3

func OutMessageSetProperty(o *OutMessage, k string, v any)

SetProperty adds key-value pair to OutMessage and can be chained.

func ReceivedProperties added in v0.13.3

func ReceivedProperties(r *ReceivedMessage) map[string]any

func ReceivedSetProperty added in v0.13.3

func ReceivedSetProperty(r *ReceivedMessage, k string, v any)

SetProperty adds key-value pair to Message and can be chained.

Types

type AZClient

type AZClient struct {
	// ConnectionString contains all the details necessary to connect,
	// authenticate and authorize a client for communicating with azure servicebus.
	ConnectionString string
	// contains filtered or unexported fields
}

func NewAZClient

func NewAZClient(connectionString string) AZClient

type BatchHandler added in v0.19.1

type BatchHandler interface {
	Handle(context.Context, Disposer, []*ReceivedMessage) error
	Open() error
	Close()
}

BatchHandler is completely responsible for the processing of a batch of messages. Implementations take complete responsibility for the peek lock renewal and disposal of messages.

type BatchReceiver added in v0.19.1

type BatchReceiver struct {
	Cfg BatchReceiverConfig

	Receiver *azservicebus.Receiver
	Options  *azservicebus.ReceiverOptions
	Handler  BatchHandler
	Cancel   context.CancelFunc
	// contains filtered or unexported fields
}

BatchReceiver to receive messages on a queue

func NewBatchReceiver added in v0.19.1

func NewBatchReceiver(log Logger, handler BatchHandler, cfg BatchReceiverConfig, opts ...BatchReceiverOption) *BatchReceiver

NewBatchReceiver creates a new BatchReceiver.

func (*BatchReceiver) CreateBatchReceivedMessageTracingContext added in v0.19.1

func (r *BatchReceiver) CreateBatchReceivedMessageTracingContext(ctx context.Context, spanProps map[string]string) (context.Context, opentracing.Span)

func (*BatchReceiver) Dispose added in v0.19.1

func (r *BatchReceiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage)

func (*BatchReceiver) Listen added in v0.19.1

func (r *BatchReceiver) Listen() error

The following 2 methods satisfy the startup.Listener interface.

func (*BatchReceiver) Shutdown added in v0.19.1

func (r *BatchReceiver) Shutdown(ctx context.Context) error

func (*BatchReceiver) String added in v0.19.1

func (r *BatchReceiver) String() string

String - returns string representation of receiver.

type BatchReceiverConfig added in v0.19.1

type BatchReceiverConfig struct {
	ConnectionString string

	// Name is the name of the queue or topic
	TopicOrQueueName string

	// Subscriptioon is the name of the topic subscription.
	// If blank then messages are received from a Queue.
	SubscriptionName string

	// If a deadletter receiver then this is true
	Deadletter bool

	// Receive messages in a batch and completely delegate processing to a single dedicated handler
	BatchSize int

	// A batch operation must abandon any message that takes longer than this to process.
	// Defaults to DefaultRenewalTime.
	BatchDeadline time.Duration
}

BatchRecieverConfig provides the configuration for receivers that work with azure batched send * There is not autmatic message lock renewal provision for the batched receiver * There is no support for deadletter queues on the batched receiver

type BatchReceiverOption added in v0.19.1

type BatchReceiverOption func(*BatchReceiver)

func WithBatchDeadline added in v0.19.1

func WithBatchDeadline(d time.Duration) BatchReceiverOption

WithBatchDeadline sets the context deadline used for the batch operation. If this is not set, the default is DefaultRenewalTime.

type Disposer added in v0.19.1

type Disposer interface {
	Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage)
}

type Disposition added in v0.10.3

type Disposition int

Disposition describes the eventual demise of the message after processing by the client. Upstream is notified whether the message can be deleted, deadlettered or will be reprocessed later.

const (
	DeadletterDisposition Disposition = iota
	AbandonDisposition
	RescheduleDisposition
	CompleteDisposition
)

func (Disposition) String added in v0.10.3

func (d Disposition) String() string

type Handler

type Handler interface {
	Handle(context.Context, *ReceivedMessage) (Disposition, context.Context, error)
	Open() error
	Close()
}

Handler processes a ReceivedMessage. Use this style of handler to take advantage of the automatic peek lock renewal and disposal of messages.

type Logger

type Logger = logger.Logger

type MsgReceiver

type MsgReceiver interface {
	// Listener interface
	Listen() error
	Shutdown(context.Context) error

	String() string
}

type MsgSender

type MsgSender interface {
	Open() error
	Close(context.Context)

	Send(context.Context, *OutMessage) error
	NewMessageBatch(context.Context) (*OutMessageBatch, error)
	BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error

	SendBatch(context.Context, *OutMessageBatch) error
	String() string
}

type OutMessage

type OutMessage = azservicebus.Message

OutMessage abstracts the output message interface.

func NewOutMessage

func NewOutMessage(data []byte) *OutMessage

We dont use With style options as this is executed in the hotpath.

type OutMessageBatch added in v0.19.1

type OutMessageBatch = azservicebus.MessageBatch

OutMessageBatch aliases the azure service bus batch message type.

type ReceivedMessage

type ReceivedMessage = azservicebus.ReceivedMessage

type Receiver

type Receiver struct {
	Cfg ReceiverConfig
	// contains filtered or unexported fields
}

Receiver to receive messages on a queue

func NewReceiver

func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver

NewReceiver creates a new Receiver that will process a number of messages simultaneously. Each handler executes in its own goroutine.

func (*Receiver) CreateReceivedMessageTracingContext added in v0.19.1

func (r *Receiver) CreateReceivedMessageTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (context.Context, opentracing.Span)

func (*Receiver) Listen

func (r *Receiver) Listen() error

The following 2 methods satisfy the startup.Listener interface.

func (*Receiver) Shutdown

func (r *Receiver) Shutdown(ctx context.Context) error

func (*Receiver) String

func (r *Receiver) String() string

String - returns string representation of receiver.

type ReceiverConfig

type ReceiverConfig struct {
	ConnectionString string

	// Name is the name of the queue or topic
	TopicOrQueueName string

	// Subscriptioon is the name of the topic subscription.
	// If blank then messages are received from a Queue.
	SubscriptionName string

	// See azbus/receiver.go
	// Note: RenewMessageLock has no effect when using the batched handler (BatchSize > 0)
	RenewMessageLock bool

	// RenewMessageTime is the how often we want to renew the message PEEK lock
	RenewMessageTime time.Duration

	// If a deadletter receiver then this is true
	Deadletter bool
}

ReceiverConfig configuration for an azure servicebus queue

type ReceiverOption

type ReceiverOption func(*Receiver)

func WithHandlers added in v0.10.3

func WithHandlers(h ...Handler) ReceiverOption

WithHandlers Add's individual message handlers to the receiver. Mutually exclusive with WithBatchHandler.

func WithRenewalTime

func WithRenewalTime(t int) ReceiverOption

WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less than the peek lock timeout. For example: the default peek lock timeout is 60s and the default renewal time is 50s.

Note! Only use this if you know what you're doing and you require custom timeout behaviour.

type Sender

type Sender struct {
	Cfg SenderConfig
	// contains filtered or unexported fields
}

Sender to send or receive messages on a queue or topic

func NewSender

func NewSender(log Logger, cfg SenderConfig) *Sender

NewSender creates a new client

func (*Sender) BatchAddMessage added in v0.19.1

func (s *Sender) BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error

BatchAddMessage calls Addmessage on batch Note: this method is a direct pass through and exists only to provide a mockable interface for adding messages to a batch.

func (*Sender) Close

func (s *Sender) Close(ctx context.Context)

func (*Sender) NewMessageBatch added in v0.19.1

func (s *Sender) NewMessageBatch(ctx context.Context) (*OutMessageBatch, error)

func (*Sender) Open

func (s *Sender) Open() error

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, message *OutMessage) error

Send submits a message to the queue. Ignores cancellation.

func (*Sender) SendBatch added in v0.19.1

func (s *Sender) SendBatch(ctx context.Context, batch *OutMessageBatch) error

SendBatch submits a message batch to the broker. Ignores cancellation.

func (*Sender) String

func (s *Sender) String() string

type SenderConfig

type SenderConfig struct {
	ConnectionString string

	// Name is the name of the queue or topic to send to.
	TopicOrQueueName string
}

SenderConfig configuration for an azure servicebus namespace and queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL