Documentation ¶
Overview ¶
Package azservicebus provides clients for sending and receiving messages with Azure ServiceBus. NOTE: for creating and managing entities, use `admin.Client` instead.
Index ¶
- Variables
- type AbandonMessageOptions
- type Client
- func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName string, options *SessionReceiverOptions) (*SessionReceiver, error)
- func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, ...) (*SessionReceiver, error)
- func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, ...) (*SessionReceiver, error)
- func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, ...) (*SessionReceiver, error)
- func (client *Client) Close(ctx context.Context) error
- func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOptions) (*Receiver, error)
- func (client *Client) NewReceiverForSubscription(topicName string, subscriptionName string, options *ReceiverOptions) (*Receiver, error)
- func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions) (*Sender, error)
- type ClientOptions
- type DeadLetterOptions
- type DeferMessageOptions
- type Message
- type MessageBatch
- type MessageBatchOptions
- type NewSenderOptions
- type PeekMessagesOptions
- type Processor
- func (p *Processor) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
- func (p *Processor) Close(ctx context.Context) error
- func (p *Processor) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
- func (p *Processor) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
- func (p *Processor) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
- func (p *Processor) Start(ctx context.Context, handleMessage func(message *ReceivedMessage) error, ...) error
- type ProcessorOptions
- type ReceiveMessagesOptions
- type ReceiveMode
- type ReceivedMessage
- type Receiver
- func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
- func (r *Receiver) Close(ctx context.Context) error
- func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
- func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
- func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
- func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)
- func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)
- func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)
- func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error
- type ReceiverOptions
- type Sender
- func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumber []int64) error
- func (s *Sender) Close(ctx context.Context) error
- func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error)
- func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, scheduledEnqueueTime time.Time) ([]int64, error)
- func (s *Sender) SendMessage(ctx context.Context, message *Message) error
- func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch) error
- type SessionReceiver
- func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
- func (r *SessionReceiver) Close(ctx context.Context) error
- func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
- func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
- func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
- func (sr *SessionReceiver) GetSessionState(ctx context.Context) ([]byte, error)
- func (sr *SessionReceiver) LockedUntil() time.Time
- func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)
- func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)
- func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)
- func (r *SessionReceiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error
- func (sr *SessionReceiver) RenewSessionLock(ctx context.Context) error
- func (sr *SessionReceiver) SessionID() string
- func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte) error
- type SessionReceiverOptions
- type SubQueue
Examples ¶
- Client.AcceptNextSessionForQueue
- Client.AcceptSessionForQueue
- Client.NewReceiverForQueue
- Client.NewReceiverForQueue (DeadLetterQueue)
- Client.NewReceiverForSubscription
- Client.NewReceiverForSubscription (DeadLetterQueue)
- Client.NewSender
- NewClient
- NewClientFromConnectionString
- Receiver.ReceiveMessages
- Sender.ScheduleMessages
- Sender.SendMessage (Message)
- Sender.SendMessage (MessageBatch)
Constants ¶
This section is empty.
Variables ¶
var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch")
ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add()
Functions ¶
This section is empty.
Types ¶
type AbandonMessageOptions ¶
type AbandonMessageOptions struct { // PropertiesToModify specifies properties to modify in the message when it is abandoned. PropertiesToModify map[string]interface{} }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides methods to create Sender and Receiver instances to send and receive messages from Service Bus.
func NewClient ¶
func NewClient(fullyQualifiedNamespace string, credential azcore.TokenCredential, options *ClientOptions) (*Client, error)
NewClient creates a new Client for a Service Bus namespace, using a TokenCredential. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net) credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package.
Example ¶
// NOTE: If you'd like to authenticate using a Service Bus connection string // look at `NewClientFromConnectionString` instead. credential, err := azidentity.NewDefaultAzureCredential(nil) exitOnError("Failed to create a DefaultAzureCredential", err) client, err = azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil) exitOnError("Failed to create ServiceBusClient in example", err)
Output:
func NewClientFromConnectionString ¶
func NewClientFromConnectionString(connectionString string, options *ClientOptions) (*Client, error)
NewClientFromConnectionString creates a new Client for a Service Bus namespace using a connection string. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). connectionString is a Service Bus connection string for the namespace or for an entity.
Example ¶
// NOTE: If you'd like to authenticate via Azure Active Directory look at // the `NewClient` function instead. client, err = azservicebus.NewClientFromConnectionString(connectionString, nil) exitOnError("Failed to create ServiceBusClient in example", err)
Output:
func (*Client) AcceptNextSessionForQueue ¶
func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName string, options *SessionReceiverOptions) (*SessionReceiver, error)
AcceptNextSessionForQueue accepts the next available session from a queue. NOTE: this receiver is initialized immediately, not lazily.
Example ¶
sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), "exampleSessionQueue", nil) exitOnError("Failed to create session receiver", err) fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())
Output:
func (*Client) AcceptNextSessionForSubscription ¶
func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, options *SessionReceiverOptions) (*SessionReceiver, error)
AcceptNextSessionForSubscription accepts the next available session from a subscription. NOTE: this receiver is initialized immediately, not lazily.
func (*Client) AcceptSessionForQueue ¶
func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error)
AcceptSessionForQueue accepts a session from a queue with a specific session ID. NOTE: this receiver is initialized immediately, not lazily.
Example ¶
sessionReceiver, err := client.AcceptSessionForQueue(context.TODO(), "exampleSessionQueue", "Example Session ID", nil) exitOnError("Failed to create session receiver", err) // session receivers function the same as any other receiver messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 5, nil) exitOnError("Failed to receive a message", err) for _, message := range messages { err = sessionReceiver.CompleteMessage(context.TODO(), message) exitOnError("Failed to complete message", err) fmt.Printf("Received message from session ID \"%s\" and completed it", *message.SessionID) }
Output:
func (*Client) AcceptSessionForSubscription ¶
func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error)
AcceptSessionForSubscription accepts a session from a subscription with a specific session ID. NOTE: this receiver is initialized immediately, not lazily.
func (*Client) Close ¶
Close closes the current connection Service Bus as well as any Senders or Receivers created using this client.
func (*Client) NewReceiverForQueue ¶
func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOptions) (*Receiver, error)
NewReceiver creates a Receiver for a queue. A receiver allows you to receive messages.
Example ¶
receiver, err = client.NewReceiverForQueue( "exampleQueue", &azservicebus.ReceiverOptions{ ReceiveMode: azservicebus.ReceiveModePeekLock, }, ) exitOnError("Failed to create Receiver", err)
Output:
Example (DeadLetterQueue) ¶
receiver, err = client.NewReceiverForQueue( "exampleQueue", &azservicebus.ReceiverOptions{ ReceiveMode: azservicebus.ReceiveModePeekLock, SubQueue: azservicebus.SubQueueDeadLetter, }, ) exitOnError("Failed to create Receiver for DeadLetterQueue", err)
Output:
func (*Client) NewReceiverForSubscription ¶
func (client *Client) NewReceiverForSubscription(topicName string, subscriptionName string, options *ReceiverOptions) (*Receiver, error)
NewReceiver creates a Receiver for a subscription. A receiver allows you to receive messages.
Example ¶
receiver, err = client.NewReceiverForSubscription( "exampleTopic", "exampleSubscription", &azservicebus.ReceiverOptions{ ReceiveMode: azservicebus.ReceiveModePeekLock, }, ) exitOnError("Failed to create Receiver", err)
Output:
Example (DeadLetterQueue) ¶
receiver, err = client.NewReceiverForSubscription( "exampleTopic", "exampleSubscription", &azservicebus.ReceiverOptions{ ReceiveMode: azservicebus.ReceiveModePeekLock, SubQueue: azservicebus.SubQueueDeadLetter, }, ) exitOnError("Failed to create Receiver for DeadLetterQueue", err)
Output:
func (*Client) NewSender ¶
func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions) (*Sender, error)
NewSender creates a Sender, which allows you to send messages or schedule messages.
Example ¶
sender, err = client.NewSender("exampleQueue", nil) // or topicName exitOnError("Failed to create sender", err)
Output:
type ClientOptions ¶
type ClientOptions struct { // TLSConfig configures a client with a custom *tls.Config. TLSConfig *tls.Config }
ClientOptions contains options for the `NewClient` and `NewClientFromConnectionString` functions.
type DeadLetterOptions ¶
type DeadLetterOptions struct { // ErrorDescription that caused the dead lettering of the message. ErrorDescription *string // Reason for dead lettering the message. Reason *string // PropertiesToModify specifies properties to modify in the message when it is dead lettered. PropertiesToModify map[string]interface{} }
DeadLetterOptions describe the reason and error description for dead lettering a message using the `Receiver.DeadLetterMessage()`
type DeferMessageOptions ¶
type DeferMessageOptions struct { // PropertiesToModify specifies properties to modify in the message when it is deferred PropertiesToModify map[string]interface{} }
type Message ¶
type Message struct { MessageID string ContentType string CorrelationID string // Body corresponds to the first []byte array in the Data section of an AMQP message. Body []byte SessionID *string Subject string ReplyTo string ReplyToSessionID string To string TimeToLive *time.Duration PartitionKey *string TransactionPartitionKey *string ScheduledEnqueueTime *time.Time ApplicationProperties map[string]interface{} }
Message is a SendableMessage which can be sent using a Client.NewSender().
type MessageBatch ¶
type MessageBatch struct {
// contains filtered or unexported fields
}
MessageBatch represents a batch of messages to send to Service Bus in a single message
func (*MessageBatch) AddMessage ¶
func (mb *MessageBatch) AddMessage(m *Message) error
Add adds a message to the batch if the message will not exceed the max size of the batch Returns: - ErrMessageTooLarge if the message cannot fit - a non-nil error for other failures - nil, otherwise
func (*MessageBatch) NumBytes ¶
func (mb *MessageBatch) NumBytes() uint64
NumBytes is the number of bytes in the message batch
func (*MessageBatch) NumMessages ¶
func (mb *MessageBatch) NumMessages() int32
NumMessages returns the # of messages in the batch.
type MessageBatchOptions ¶
type MessageBatchOptions struct { // MaxBytes overrides the max size (in bytes) for a batch. // By default NewMessageBatch will use the max message size provided by the service. MaxBytes uint64 }
MessageBatchOptions contains options for the `Sender.NewMessageBatch` function.
type NewSenderOptions ¶
type NewSenderOptions struct { }
type PeekMessagesOptions ¶
type PeekMessagesOptions struct { // FromSequenceNumber is the sequence number to start with when peeking messages. FromSequenceNumber *int64 }
PeekMessagesOptions contains options for the `Receiver.PeekMessages` function.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a push-based receiver for Service Bus.
func NewProcessorForQueue ¶
func NewProcessorForQueue(client *Client, queue string, options *ProcessorOptions) (*Processor, error)
NewProcessorForQueue creates a Processor for a queue.
func NewProcessorForSubscription ¶
func NewProcessorForSubscription(client *Client, topic string, subscription string, options *ProcessorOptions) (*Processor, error)
NewProcessorForSubscription creates a Processor for a subscription.
func (*Processor) AbandonMessage ¶
func (p *Processor) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.
func (*Processor) Close ¶
Close will wait for any pending callbacks to complete. NOTE: Close() cannot be called synchronously in a message or error handler. You must run it asynchronously using `go processor.Close(ctx)` or similar.
func (*Processor) CompleteMessage ¶
func (p *Processor) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
CompleteMessage completes a message, deleting it from the queue or subscription.
func (*Processor) DeadLetterMessage ¶
func (p *Processor) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a processor with `Client.NewProcessorForQueue()` or `Client.NewProcessorForSubscription()` using the `ProcessorOptions.SubQueue` option.
func (*Processor) DeferMessage ¶
func (p *Processor) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.
func (*Processor) Start ¶
func (p *Processor) Start(ctx context.Context, handleMessage func(message *ReceivedMessage) error, handleError func(err error)) error
Start will start receiving messages from the queue or subscription.
if err := processor.Start(context.TODO(), messageHandler, errorHandler); err != nil { log.Fatalf("Processor failed to start: %s", err.Error()) }
Any errors that occur (such as network disconnects, failures in handleMessage) will be sent to your handleError function. The processor will retry and restart as needed - no user intervention is required.
type ProcessorOptions ¶
type ProcessorOptions struct { // ReceiveMode controls when a message is deleted from Service Bus. // // `azservicebus.PeekLock` is the default. The message is locked, preventing multiple // receivers from processing the message at once. You control the lock state of the message // using one of the message settlement functions like processor.CompleteMessage(), which removes // it from Service Bus, or processor.AbandonMessage(), which makes it available again. // // `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon // as it's received. // // More information about receive modes: // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations ReceiveMode ReceiveMode // SubQueue should be set to connect to the sub queue (ex: dead letter queue) // of the queue or subscription. SubQueue SubQueue // DisableAutoComplete controls whether messages must be settled explicitly via the // settlement methods (ie, Complete, Abandon) or if the processor will automatically // settle messages. // // If true, no automatic settlement is done. // If false, the return value of your `handleMessage` function will control if the // message is abandoned (non-nil error return) or completed (nil error return). // // This option is false, by default. DisableAutoComplete bool // MaxConcurrentCalls controls the maximum number of message processing // goroutines that are active at any time. // Default is 1. MaxConcurrentCalls int }
ProcessorOptions contains options for the `Client.NewProcessorForQueue` or `Client.NewProcessorForSubscription` functions.
type ReceiveMessagesOptions ¶
type ReceiveMessagesOptions struct { }
ReceiveMessagesOptions are options for the ReceiveMessages function.
type ReceiveMode ¶
type ReceiveMode = internal.ReceiveMode
ReceiveMode represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`
const ( // ReceiveModePeekLock will lock messages as they are received and can be settled // using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message // functions. ReceiveModePeekLock ReceiveMode = internal.PeekLock // ReceiveModeReceiveAndDelete will delete messages as they are received. ReceiveModeReceiveAndDelete ReceiveMode = internal.ReceiveAndDelete )
type ReceivedMessage ¶
type ReceivedMessage struct { MessageID string ContentType string CorrelationID string SessionID *string Subject string ReplyTo string ReplyToSessionID string To string TimeToLive *time.Duration PartitionKey *string TransactionPartitionKey *string ScheduledEnqueueTime *time.Time ApplicationProperties map[string]interface{} LockToken [16]byte DeliveryCount uint32 LockedUntil *time.Time SequenceNumber *int64 EnqueuedSequenceNumber *int64 EnqueuedTime *time.Time ExpiresAt *time.Time DeadLetterErrorDescription *string DeadLetterReason *string DeadLetterSource *string // contains filtered or unexported fields }
ReceivedMessage is a received message from a Client.NewReceiver().
func (*ReceivedMessage) Body ¶
func (rm *ReceivedMessage) Body() ([]byte, error)
Body returns the body for this received message. If the body not compatible with ReceivedMessage this function will return an error.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver receives messages using pull based functions (ReceiveMessages).
func (*Receiver) AbandonMessage ¶
func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.
func (*Receiver) CompleteMessage ¶
func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
CompleteMessage completes a message, deleting it from the queue or subscription.
func (*Receiver) DeadLetterMessage ¶
func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
func (*Receiver) DeferMessage ¶
func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.
func (*Receiver) PeekMessages ¶
func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)
PeekMessages will peek messages without locking or deleting messages. Messages that are peeked do not have lock tokens, so settlement methods like CompleteMessage, AbandonMessage, DeferMessage or DeadLetterMessage will not work with them.
func (*Receiver) ReceiveDeferredMessages ¶
func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)
ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`.
func (*Receiver) ReceiveMessages ¶
func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)
ReceiveMessages receives a fixed number of messages, up to numMessages. There are two ways to stop receiving messages:
- Cancelling the `ctx` parameter.
- An implicit timeout (default: 1 second) that starts after the first message has been received.
Example ¶
// ReceiveMessages respects the passed in context, and will gracefully stop // receiving when 'ctx' is cancelled. ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second) defer cancel() messages, err = receiver.ReceiveMessages(ctx, // The number of messages to receive. Note this is merely an upper // bound. It is possible to get fewer message (or zero), depending // on the contents of the remote queue or subscription and network // conditions. 1, nil, ) exitOnError("Failed to receive messages", err) for _, message := range messages { err = receiver.CompleteMessage(context.TODO(), message) fmt.Printf("Received and completed message\n") exitOnError("Failed to complete message", err) }
Output:
func (*Receiver) RenewMessageLock ¶
func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error
RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`.
type ReceiverOptions ¶
type ReceiverOptions struct { // ReceiveMode controls when a message is deleted from Service Bus. // // `azservicebus.PeekLock` is the default. The message is locked, preventing multiple // receivers from processing the message at once. You control the lock state of the message // using one of the message settlement functions like Receiver.CompleteMessage(), which removes // it from Service Bus, or Receiver.AbandonMessage(), which makes it available again. // // `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon // as it's received. // // More information about receive modes: // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations ReceiveMode ReceiveMode // SubQueue should be set to connect to the sub queue (ex: dead letter queue) // of the queue or subscription. SubQueue SubQueue }
ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` functions.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender is used to send messages as well as schedule them to be delivered at a later date.
func (*Sender) CancelScheduledMessages ¶
CancelScheduledMessages cancels multiple messages that were scheduled.
func (*Sender) NewMessageBatch ¶
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error)
NewMessageBatch can be used to create a batch that contain multiple messages. Sending a batch of messages is more efficient than sending the messages one at a time.
func (*Sender) ScheduleMessages ¶
func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, scheduledEnqueueTime time.Time) ([]int64, error)
ScheduleMessages schedules a slice of Messages to appear on Service Bus Queue/Subscription at a later time. Returns the sequence numbers of the messages that were scheduled. Messages that haven't been delivered can be cancelled using `Receiver.CancelScheduleMessage(s)`
Example ¶
// there are two ways of scheduling messages: // 1. Using the `Sender.ScheduleMessages()` function. // 2. Setting the `Message.ScheduledEnqueueTime` field on a message. // schedule the message to be delivered in an hour. sequenceNumbers, err := sender.ScheduleMessages(context.TODO(), []*azservicebus.Message{ {Body: []byte("hello world")}, }, time.Now().Add(time.Hour)) exitOnError("Failed to schedule messages", err) err = sender.CancelScheduledMessages(context.TODO(), sequenceNumbers) exitOnError("Failed to cancel scheduled messages", err) // or you can set the `ScheduledEnqueueTime` field on a message when you send it future := time.Now().Add(time.Hour) err = sender.SendMessage(context.TODO(), &azservicebus.Message{ Body: []byte("hello world"), // schedule the message to be delivered in an hour. ScheduledEnqueueTime: &future, }) exitOnError("Failed to schedule messages using SendMessage", err)
Output:
func (*Sender) SendMessage ¶
SendMessage sends a Message to a queue or topic.
Example (Message) ¶
message := &azservicebus.Message{ Body: []byte("hello, this is a message"), } err = sender.SendMessage(context.TODO(), message) exitOnError("Failed to send message", err)
Output:
Example (MessageBatch) ¶
batch, err := sender.NewMessageBatch(context.TODO(), nil) exitOnError("Failed to create message batch", err) // By calling AddMessage multiple times you can add multiple messages into a // batch. This can help with message throughput, as you can send multiple // messages in a single send. err = batch.AddMessage(&azservicebus.Message{Body: []byte("hello world")}) if err != nil { switch err { case azservicebus.ErrMessageTooLarge: // At this point you can do a few things: // 1. Ignore this message // 2. Send this batch (it's full) and create a new batch. // // The batch can still be used after this error if you have // smaller messages you'd still like to add in. fmt.Printf("Failed to add message to batch\n") default: exitOnError("Error while trying to add message to batch", err) } } // After you add all the messages to the batch you send it using // Sender.SendMessageBatch() err = sender.SendMessageBatch(context.TODO(), batch) exitOnError("Failed to send message batch", err)
Output:
func (*Sender) SendMessageBatch ¶
func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch) error
SendMessageBatch sends a MessageBatch to a queue or topic. Message batches can be created using `Sender.NewMessageBatch`.
type SessionReceiver ¶
type SessionReceiver struct {
// contains filtered or unexported fields
}
SessionReceiver is a Receiver that handles sessions.
func (*SessionReceiver) AbandonMessage ¶
func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.
func (*SessionReceiver) Close ¶
func (r *SessionReceiver) Close(ctx context.Context) error
Close permanently closes the receiver.
func (*SessionReceiver) CompleteMessage ¶
func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
CompleteMessage completes a message, deleting it from the queue or subscription.
func (*SessionReceiver) DeadLetterMessage ¶
func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
func (*SessionReceiver) DeferMessage ¶
func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.
func (*SessionReceiver) GetSessionState ¶
func (sr *SessionReceiver) GetSessionState(ctx context.Context) ([]byte, error)
GetSessionState retrieves state associated with the session.
func (*SessionReceiver) LockedUntil ¶
func (sr *SessionReceiver) LockedUntil() time.Time
LockedUntil is the time the lock on this session expires. The lock can be renewed using `SessionReceiver.RenewSessionLock`.
func (*SessionReceiver) PeekMessages ¶
func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)
PeekMessages will peek messages without locking or deleting messages. Messages that are peeked do not have lock tokens, so settlement methods like CompleteMessage, AbandonMessage, DeferMessage or DeadLetterMessage will not work with them.
func (*SessionReceiver) ReceiveDeferredMessages ¶
func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)
ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`.
func (*SessionReceiver) ReceiveMessages ¶
func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)
ReceiveMessages receives a fixed number of messages, up to numMessages. There are two ways to stop receiving messages:
- Cancelling the `ctx` parameter.
- An implicit timeout (default: 1 second) that starts after the first message has been received.
func (*SessionReceiver) RenewMessageLock ¶
func (r *SessionReceiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error
RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`.
func (*SessionReceiver) RenewSessionLock ¶
func (sr *SessionReceiver) RenewSessionLock(ctx context.Context) error
RenewSessionLock renews this session's lock. The new expiration time is available using `LockedUntil`.
func (*SessionReceiver) SessionID ¶
func (sr *SessionReceiver) SessionID() string
SessionID is the session ID for this SessionReceiver.
func (*SessionReceiver) SetSessionState ¶
func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte) error
SetSessionState sets the state associated with the session.
type SessionReceiverOptions ¶
type SessionReceiverOptions struct { // ReceiveMode controls when a message is deleted from Service Bus. // // `azservicebus.PeekLock` is the default. The message is locked, preventing multiple // receivers from processing the message at once. You control the lock state of the message // using one of the message settlement functions like SessionReceiver.CompleteMessage(), which removes // it from Service Bus, or SessionReceiver..AbandonMessage(), which makes it available again. // // `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon // as it's received. // // More information about receive modes: // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations ReceiveMode ReceiveMode }
SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription` functions.