Documentation
¶
Index ¶
- Constants
- Variables
- func EnableAzureLogging(log Logger)
- func NewAzbusError(err error) error
- func OutMessageProperties(o *OutMessage) map[string]any
- func OutMessageSetProperty(o *OutMessage, k string, v any)
- func ReceivedProperties(r *ReceivedMessage) map[string]any
- func ReceivedSetProperty(r *ReceivedMessage, k string, v any)
- type AZClient
- type BatchHandler
- type BatchReceiver
- func (r *BatchReceiver) CreateBatchReceivedMessageTracingContext(ctx context.Context, spanProps map[string]string) (context.Context, opentracing.Span)
- func (r *BatchReceiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage)
- func (r *BatchReceiver) Listen() error
- func (r *BatchReceiver) Shutdown(ctx context.Context) error
- func (r *BatchReceiver) String() string
- type BatchReceiverConfig
- type BatchReceiverOption
- type Disposer
- type Disposition
- type Handler
- type Logger
- type MsgReceiver
- type MsgSender
- type OutMessage
- type OutMessageBatch
- type ReceivedMessage
- type Receiver
- type ReceiverConfig
- type ReceiverOption
- type Sender
- func (s *Sender) BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error
- func (s *Sender) Close(ctx context.Context)
- func (s *Sender) NewMessageBatch(ctx context.Context) (*OutMessageBatch, error)
- func (s *Sender) Open() error
- func (s *Sender) Send(ctx context.Context, message *OutMessage) error
- func (s *Sender) SendBatch(ctx context.Context, batch *OutMessageBatch) error
- func (s *Sender) String() string
- type SenderConfig
Constants ¶
const (
DebugLevel = logger.DebugLevel
)
const ( // DefaultRenewalTime is the how often we want to renew the message PEEK lock // If RenewMessageLock is true then this is the default value for RenewMessageTime. // // Note that the default aligns with the default value for topics and queues in Azure Service Bus. // Unless the topic or queue has been configured with a different value, you should not need to change this. // // Inspection of the topics and subscription shows that the PeekLock timeout is one minute. // // This clarifies the peeklock duration as 60s: https://github.com/MicrosoftDocs/azure-docs/issues/106047 // // "The default lock duration is indeed 1 minute, we will get this updated in our documentation." // "As for your question about RenewLock, it's best to set the lock duration to something higher than your normal" // "processing time, so you don't have to call the RenewLock. Note that the maximum value is 5 minutes, so you will" // "need to call RenewLock if you want to have this longer. Also note that having a longer lock duration then needed" // "has some implications as well, f.e. when your client stops working, the message will only become available again" // "after the lock duration has passed." // // An analysis of elapsed times when processing msgs shows no message takes longer than 10s to process during our // normal test suites. // // Set to 50 seconds, well within the 60 seconds peek lock timeout DefaultRenewalTime = 50 * time.Second )
Variables ¶
var ( ErrConnectionLost = errors.New("connection lost") ErrLockLost = errors.New("lock lost") ErrTimeout = errors.New("timeout") )
Azure package expects the user to elucidate errors like so:
var servicebusError *azservicebus.Error if errors.As(err, &servicebusError) && servicebusError.code == azservicebus.CodeUnauthorizedAccess { ...
which is rather clumsy.
This code maps the internal code to an actual error so one can:
if errors.Is(err, azbus.ErrConnectionLost) { ...
which is easier and more idiomatic
var (
ErrMessageOversized = errors.New("message is too large")
)
var (
ErrNoHandler = errors.New("no handler defined")
)
var (
ErrPeekLockTimeout = errors.New("peeklock deadline reached")
)
Set a timeout for processing the message, this should be no later than the message lock time. It is quite surprising that the azure service bus package does not add a deadline to the context input to the message handler.
NB: this has no effect as cancellaton is removed to get the azure sdk for go retry
logic which increases reliability.
Inspection of logs shows that the deadline is always 60s in the future which we will never exceed.
The use of the context returned here is problematic. Inspection of code that uses it shows that submethods called do not generally obey cancellation - they do not even have a context.Context as first argument.
Code that follows from calling this method should be wrapped in a select statement that terminates when the timeout expires - i.e. waits on ctx.Done(). Even this is not bulletproof as it is unclear how to terminate any of these submethods.
Probably the best solution is to remove this entirely and rely on RenewMessageLock. If it does timeout then it is too late anyway as the peeklock will already be released.
for the time being we impose a timeout as it is safe.
Functions ¶
func EnableAzureLogging ¶
func EnableAzureLogging(log Logger)
EnableAzureLogging emits log messages using local logger. This must be called before any senders or receivers are opened. TODO: Generalise this for any azure downstream package.
func NewAzbusError ¶
func OutMessageProperties ¶ added in v0.13.3
func OutMessageProperties(o *OutMessage) map[string]any
func OutMessageSetProperty ¶ added in v0.13.3
func OutMessageSetProperty(o *OutMessage, k string, v any)
SetProperty adds key-value pair to OutMessage and can be chained.
func ReceivedProperties ¶ added in v0.13.3
func ReceivedProperties(r *ReceivedMessage) map[string]any
func ReceivedSetProperty ¶ added in v0.13.3
func ReceivedSetProperty(r *ReceivedMessage, k string, v any)
SetProperty adds key-value pair to Message and can be chained.
Types ¶
type AZClient ¶
type AZClient struct { // ConnectionString contains all the details necessary to connect, // authenticate and authorize a client for communicating with azure servicebus. ConnectionString string // contains filtered or unexported fields }
func NewAZClient ¶
type BatchHandler ¶ added in v0.19.1
type BatchHandler interface { Handle(context.Context, Disposer, []*ReceivedMessage) error Open() error Close() }
BatchHandler is completely responsible for the processing of a batch of messages. Implementations take complete responsibility for the peek lock renewal and disposal of messages.
type BatchReceiver ¶ added in v0.19.1
type BatchReceiver struct { Cfg BatchReceiverConfig Receiver *azservicebus.Receiver Options *azservicebus.ReceiverOptions Handler BatchHandler Cancel context.CancelFunc // contains filtered or unexported fields }
BatchReceiver to receive messages on a queue
func NewBatchReceiver ¶ added in v0.19.1
func NewBatchReceiver(log Logger, handler BatchHandler, cfg BatchReceiverConfig, opts ...BatchReceiverOption) *BatchReceiver
NewBatchReceiver creates a new BatchReceiver.
func (*BatchReceiver) CreateBatchReceivedMessageTracingContext ¶ added in v0.19.1
func (r *BatchReceiver) CreateBatchReceivedMessageTracingContext(ctx context.Context, spanProps map[string]string) (context.Context, opentracing.Span)
func (*BatchReceiver) Dispose ¶ added in v0.19.1
func (r *BatchReceiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage)
func (*BatchReceiver) Listen ¶ added in v0.19.1
func (r *BatchReceiver) Listen() error
The following 2 methods satisfy the startup.Listener interface.
func (*BatchReceiver) Shutdown ¶ added in v0.19.1
func (r *BatchReceiver) Shutdown(ctx context.Context) error
func (*BatchReceiver) String ¶ added in v0.19.1
func (r *BatchReceiver) String() string
String - returns string representation of receiver.
type BatchReceiverConfig ¶ added in v0.19.1
type BatchReceiverConfig struct { ConnectionString string // Name is the name of the queue or topic TopicOrQueueName string // Subscriptioon is the name of the topic subscription. // If blank then messages are received from a Queue. SubscriptionName string // If a deadletter receiver then this is true Deadletter bool // Receive messages in a batch and completely delegate processing to a single dedicated handler BatchSize int // A batch operation must abandon any message that takes longer than this to process. // Defaults to DefaultRenewalTime. BatchDeadline time.Duration }
BatchRecieverConfig provides the configuration for receivers that work with azure batched send * There is not autmatic message lock renewal provision for the batched receiver * There is no support for deadletter queues on the batched receiver
type BatchReceiverOption ¶ added in v0.19.1
type BatchReceiverOption func(*BatchReceiver)
func WithBatchDeadline ¶ added in v0.19.1
func WithBatchDeadline(d time.Duration) BatchReceiverOption
WithBatchDeadline sets the context deadline used for the batch operation. If this is not set, the default is DefaultRenewalTime.
type Disposer ¶ added in v0.19.1
type Disposer interface {
Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage)
}
type Disposition ¶ added in v0.10.3
type Disposition int
Disposition describes the eventual demise of the message after processing by the client. Upstream is notified whether the message can be deleted, deadlettered or will be reprocessed later.
const ( DeadletterDisposition Disposition = iota AbandonDisposition RescheduleDisposition CompleteDisposition )
func (Disposition) String ¶ added in v0.10.3
func (d Disposition) String() string
type Handler ¶
type Handler interface { Handle(context.Context, *ReceivedMessage) (Disposition, context.Context, error) Open() error Close() }
Handler processes a ReceivedMessage. Use this style of handler to take advantage of the automatic peek lock renewal and disposal of messages.
type MsgReceiver ¶
type MsgSender ¶
type MsgSender interface { Open() error Close(context.Context) Send(context.Context, *OutMessage) error NewMessageBatch(context.Context) (*OutMessageBatch, error) BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error SendBatch(context.Context, *OutMessageBatch) error String() string }
type OutMessage ¶
type OutMessage = azservicebus.Message
OutMessage abstracts the output message interface.
func NewOutMessage ¶
func NewOutMessage(data []byte) *OutMessage
We dont use With style options as this is executed in the hotpath.
type OutMessageBatch ¶ added in v0.19.1
type OutMessageBatch = azservicebus.MessageBatch
OutMessageBatch aliases the azure service bus batch message type.
type ReceivedMessage ¶
type ReceivedMessage = azservicebus.ReceivedMessage
type Receiver ¶
type Receiver struct { Cfg ReceiverConfig // contains filtered or unexported fields }
Receiver to receive messages on a queue
func NewReceiver ¶
func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver
NewReceiver creates a new Receiver that will process a number of messages simultaneously. Each handler executes in its own goroutine.
func (*Receiver) CreateReceivedMessageTracingContext ¶ added in v0.19.1
func (r *Receiver) CreateReceivedMessageTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (context.Context, opentracing.Span)
type ReceiverConfig ¶
type ReceiverConfig struct { ConnectionString string // Name is the name of the queue or topic TopicOrQueueName string // Subscriptioon is the name of the topic subscription. // If blank then messages are received from a Queue. SubscriptionName string // See azbus/receiver.go // Note: RenewMessageLock has no effect when using the batched handler (BatchSize > 0) RenewMessageLock bool // RenewMessageTime is the how often we want to renew the message PEEK lock RenewMessageTime time.Duration // If a deadletter receiver then this is true Deadletter bool }
ReceiverConfig configuration for an azure servicebus queue
type ReceiverOption ¶
type ReceiverOption func(*Receiver)
func WithHandlers ¶ added in v0.10.3
func WithHandlers(h ...Handler) ReceiverOption
WithHandlers Add's individual message handlers to the receiver. Mutually exclusive with WithBatchHandler.
func WithRenewalTime ¶
func WithRenewalTime(t int) ReceiverOption
WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less than the peek lock timeout. For example: the default peek lock timeout is 60s and the default renewal time is 50s.
Note! Only use this if you know what you're doing and you require custom timeout behaviour.
type Sender ¶
type Sender struct { Cfg SenderConfig // contains filtered or unexported fields }
Sender to send or receive messages on a queue or topic
func NewSender ¶
func NewSender(log Logger, cfg SenderConfig) *Sender
NewSender creates a new client
func (*Sender) BatchAddMessage ¶ added in v0.19.1
func (s *Sender) BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error
BatchAddMessage calls Addmessage on batch Note: this method is a direct pass through and exists only to provide a mockable interface for adding messages to a batch.
func (*Sender) NewMessageBatch ¶ added in v0.19.1
func (s *Sender) NewMessageBatch(ctx context.Context) (*OutMessageBatch, error)
func (*Sender) Send ¶
func (s *Sender) Send(ctx context.Context, message *OutMessage) error
Send submits a message to the queue. Ignores cancellation.
type SenderConfig ¶
type SenderConfig struct { ConnectionString string // Name is the name of the queue or topic to send to. TopicOrQueueName string }
SenderConfig configuration for an azure servicebus namespace and queue