azservicebus

package module
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2024 License: MIT Imports: 17 Imported by: 61

README

Azure Service Bus client module for Go

Azure Service Bus is a highly reliable cloud messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.

Use the Service Bus client module github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus in your application to:

  • Send messages to a Service Bus queue or topic
  • Receive messages from a Service Bus queue or subscription

Key links: Source code | API reference documentation | REST API documentation | Product documentation | Samples

If you used the pre-release azure-service-bus-go module, see the Migration guide.

Getting started

Prerequisites
  • Go, version 1.18 or higher - Install Go
  • Azure subscription - Create a free account
  • Service Bus namespace - Create a namespace
  • A Service Bus queue, topic, or subscription - See the Azure Service Bus documentation to create an entity in your Service Bus namespace. For example, create a Service Bus queue using the Azure portal, the Azure CLI, or other tools.
Install the package

Install the Azure Service Bus client module for Go with go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Authenticate the client

The Service Bus Client can be created using a credential from the Azure Identity package, such as DefaultAzureCredential, or using a Service Bus connection string.

Using a service principal
import (
  "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For more information about the DefaultAzureCredential, see:
  // https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential
  credential, err := azidentity.NewDefaultAzureCredential(nil)

  if err != nil {
    panic(err)
  }

  // The service principal specified by the credential needs to be added to the appropriate Service Bus roles for your
  // resource. For more information about Service Bus roles, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/service-bus-managed-service-identity#azure-built-in-roles-for-azure-service-bus
  client, err := azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)

  if err != nil {
    panic(err)
  }
}
Using a connection string
import (
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For instructions on how to get a Service Bus connection string, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/service-bus-quickstart-portal#get-the-connection-string
  client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)

  if err != nil {
    panic(err)
  }
}

Key concepts

Once you've created a Client, you can interact with resources within a Service Bus namespace:

  • Queue: Allows for sending and receiving messages. Often used for point-to-point communication.
  • Topic: Similar to a queue but splits the receiving and sending into separate entities. Messages are sent to a topic and are broadcast to subscriptions, where they can be consumed independently, and in parallel by receivers.
  • Subscription: Consumes messages from a topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and filters can be used to tailor which messages are received by a specific subscription.

For more information about these resources, see What is Azure Service Bus?.

Using a Client you can do the following:

The queues, topics, and subscriptions should be created prior to using this library.

Examples

The following examples cover common tasks using Azure Service Bus:

Send messages

Once you've created a Client you can create a Sender, which will allow you to send messages.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

Send a single message:
err := sender.SendMessage(context.TODO(), &azservicebus.Message{
  Body: []byte("hello world!"),
}, nil)
Send multiple messages using a batch

You can also send messages in batches, which can be more efficient than sending them individually.

// Create a message batch. It will automatically be sized for the Service Bus
// namespace's maximum message size.
currentMessageBatch, err := sender.NewMessageBatch(context.TODO(), nil)

if err != nil {
  panic(err)
}

messagesToSend := []*azservicebus.Message{
  // any messages that you'd want to send would be here, or sourced from
  // somewhere else.
}

for i := 0; i < len(messagesToSend); i++ {
  // Add a message to our message batch. This can be called multiple times.
  err = currentMessageBatch.AddMessage(messagesToSend[i], nil)

  if errors.Is(err, azservicebus.ErrMessageTooLarge) {
    if currentMessageBatch.NumMessages() == 0 {
      // This means the message itself is too large to be sent, even on its own.
      // This will require intervention from the user.
      panic("Single message is too large to be sent in a batch.")
    }

    fmt.Printf("Message batch is full. Sending it and creating a new one.\n")

    // send what we have since the batch is full
    err := sender.SendMessageBatch(context.TODO(), currentMessageBatch, nil)

    if err != nil {
      panic(err)
    }

    // Create a new batch and retry adding this message to our batch.
    newBatch, err := sender.NewMessageBatch(context.TODO(), nil)

    if err != nil {
      panic(err)
    }

    currentMessageBatch = newBatch

    // rewind the counter and attempt to add the message again (this batch
    // was full so it didn't go out with the previous SendMessageBatch call).
    i--
  } else if err != nil {
    panic(err)
  }
}

// check if any messages are remaining to be sent.
if currentMessageBatch.NumMessages() > 0 {
  err := sender.SendMessageBatch(context.TODO(), currentMessageBatch, nil)

  if err != nil {
    panic(err)
  }
}
Receive messages

Once you've created a Client you can create a Receiver, which will allow you to receive messages.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

receiver, err := client.NewReceiverForQueue(
  "<queue>",
  nil,
)
// or
// client.NewReceiverForSubscription("<topic>", "<subscription>")

// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err := receiver.ReceiveMessages(ctx,
  // The number of messages to receive. Note this is merely an upper
  // bound. It is possible to get fewer message (or zero), depending
  // on the contents of the remote queue or subscription and network
  // conditions.
  1,
  nil,
)

if err != nil {
  panic(err)
}

for _, message := range messages {
  // The message body is a []byte. For this example we're assuming that the body
  // is a string, converted to bytes, but any []byte payload is valid.
  var body []byte = message.Body
  fmt.Printf("Message received with body: %s\n", string(body))

  // For more information about settling messages, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
  err = receiver.CompleteMessage(context.TODO(), message, nil)

  if err != nil {
    var sbErr *azservicebus.Error

    if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
      // The message lock has expired. This isn't fatal for the client, but it does mean
      // that this message can be received by another Receiver (or potentially this one).
      fmt.Printf("Message lock expired\n")

      // You can extend the message lock by calling Receiver.RenewMessageLock(msg) before the
      // message lock has expired.
      continue
    }

    panic(err)
	}

  fmt.Printf("Received and completed the message\n")
}
Dead letter queue

The dead letter queue is a sub-queue. Each queue or subscription has its own dead letter queue. Dead letter queues store messages that have been explicitly dead lettered using the Receiver.DeadLetterMessage function.

Opening a dead letter queue is a configuration option when creating a Receiver.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

deadLetterReceiver, err := client.NewReceiverForQueue("<queue>",
  &azservicebus.ReceiverOptions{
    SubQueue: azservicebus.SubQueueDeadLetter,
  })
// or 
// client.NewReceiverForSubscription("<topic>", "<subscription>", 
//   &azservicebus.ReceiverOptions{
//     SubQueue: azservicebus.SubQueueDeadLetter,
//   })

To see some example code for receiving messages using the Receiver, see the Receive messages example.

Troubleshooting

Logging

This module uses the classification-based logging implementation in azcore. To enable console logging for all SDK modules, set the environment variable AZURE_SDK_GO_LOGGING to all.

Use the azcore/log package to control log event output or to enable logs for azservicebus only. For example:

import (
  "fmt"
  azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
)

// print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// pick the set of events to log
azlog.SetEvents(
  // EventConn is used whenever we create a connection or any links (that is, receivers or senders).
  azservicebus.EventConn,
  // EventAuth is used when we're doing authentication/claims negotiation.
  azservicebus.EventAuth,
  // EventReceiver represents operations that happen on receivers.
  azservicebus.EventReceiver,
  // EventSender represents operations that happen on senders.
  azservicebus.EventSender,
  // EventAdmin is used for operations in the azservicebus/admin.Client
  azservicebus.EventAdmin,
)

Next steps

See the examples for using this library to send and receive messages to or from Service Bus queues, topics, and subscriptions.

Contributing

If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.

Impressions

Documentation

Overview

Package azservicebus provides clients for sending and receiving messages with Azure Service Bus. NOTE: for creating and managing entities, use the `Client` in the `github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin` package.

Example (AutoRenewLocks)
package main

import (
	"context"
	"errors"
	"log"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

// autoRenewMessageLock runs a loop calling [azservicebus.Receiver.RenewLock] until the message has been completed, the lock
// has expired or the passed in context has been cancelled.
//
// This function returns nil if the message's lock has expired.
//
// - receiver - the receiver that received 'msg'
// - msg - the received message
// - configuredLockDuration - the lock duration, configured at the queue or subscription level.
func autoRenewMessageLock(ctx context.Context, receiver *azservicebus.Receiver, msg *azservicebus.ReceivedMessage, configuredLockDuration time.Duration) error {
	for {
		log.Printf("Renewing lock for %s", msg.MessageID)
		if err := receiver.RenewMessageLock(ctx, msg, nil); err != nil {
			// You get this error if your lock has expired _or_ if you've settled the message.
			// We can ignore it here because the error will also be returned by the settlement
			// method (CompleteMessage, AbandonMessage, etc..)
			if sbErr := (*azservicebus.Error)(nil); errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
				log.Printf("Lock was lost for message with ID %s", msg.MessageID)
				return nil
			}

			return err
		}
		log.Printf("Renewed lock for %s", msg.MessageID)

		select {
		case <-ctx.Done():
			log.Printf("Cancelling lock renewal for %s", msg.MessageID)
			return ctx.Err()
		case <-time.After(configuredLockDuration / 2):
		}
	}
}

func main() {
	// This is configurable, on the service. Change this value to match since it's used
	// to figure out how often to renew a lock.
	const queueOrSubscriptionLockDuration = 30 * time.Second

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)

	if err != nil {
		//  TODO: Update the following line with your application specific error handling logic
		log.Printf("ERROR: %s", err)
		return
	}

	for _, msg := range messages {
		go func(msg *azservicebus.ReceivedMessage) {
			//
			// Each goroutine will _eventually_ stop once its message has been settled.
			//
			// If you want to be more proactive you can create a separate context for each message
			// and cancel that when you settle.
			//
			err := autoRenewMessageLock(context.TODO(), receiver, msg, queueOrSubscriptionLockDuration)

			if err != nil {
				//  TODO: Update the following line with your application specific error handling logic
				log.Printf("ERROR: %s", err)
				return
			}
		}(msg)
	}

	// TODO: Process the messages.
	//
	// Our lock renewal goroutines will stop once RenewLock returns that the lock has been lost. This happens
	// if the lock has expired or if we've settled the message.
}
Output:

Example (EnablingLogging)
package main

import (
	"fmt"

	azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
	// Required import:
	//   import azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"

	// print log output to stdout
	azlog.SetListener(func(event azlog.Event, s string) {
		fmt.Printf("[%s] %s\n", event, s)
	})

	// pick the set of events to log
	azlog.SetEvents(
		// EventConn is used whenever we create a connection or any links (ie: receivers, senders).
		azservicebus.EventConn,
		// EventAuth is used when we're doing authentication/claims negotiation.
		azservicebus.EventAuth,
		// EventReceiver represents operations that happen on Receivers.
		azservicebus.EventReceiver,
		// EventAdmin is used for operations in the azservicebus/admin.Client
		azservicebus.EventAdmin,
	)
}
Output:

Index

Examples

Constants

View Source
const (
	// CodeUnauthorizedAccess means the credentials provided are not valid for use with
	// a particular entity, or have expired.
	CodeUnauthorizedAccess = exported.CodeUnauthorizedAccess

	// CodeConnectionLost means our connection was lost and all retry attempts failed.
	// This typically reflects an extended outage or connection disruption and may
	// require manual intervention.
	CodeConnectionLost = exported.CodeConnectionLost

	// CodeLockLost means that the lock token you have for a message has expired.
	// This message will be available again after the lock period expires, or, potentially
	// go to the dead letter queue if delivery attempts have been exceeded.
	CodeLockLost = exported.CodeLockLost

	// CodeTimeout means the service timed out during an operation.
	// For instance, if you use ServiceBusClient.AcceptNextSessionForQueue() and there aren't
	// any available sessions it will eventually time out and return an *azservicebus.Error
	// with this code.
	CodeTimeout = exported.CodeTimeout
)
View Source
const (
	// EventConn is used whenever we create a connection or any links (ie: receivers, senders).
	EventConn = exported.EventConn

	// EventAuth is used when we're doing authentication/claims negotiation.
	EventAuth = exported.EventAuth

	// EventReceiver represents operations that happen on Receivers.
	EventReceiver = exported.EventReceiver

	// EventSender represents operations that happen on Senders.
	EventSender = exported.EventSender

	// EventAdmin is used for operations in the azservicebus/admin.Client
	EventAdmin = exported.EventAdmin
)

Variables

View Source
var ErrMessageTooLarge = errors.New("the message is too large")

ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add() or if the message is being sent on its own and is too large for the link.

Functions

This section is empty.

Types

type AMQPAnnotatedMessage added in v1.1.0

type AMQPAnnotatedMessage struct {
	// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	ApplicationProperties map[string]any

	// Body represents the body of an AMQP message.
	Body AMQPAnnotatedMessageBody

	// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	DeliveryAnnotations map[any]any

	// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
	// for this message.
	DeliveryTag []byte

	// Footer is the transport footers for this AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	Footer map[any]any

	// Header is the transport headers for this AMQP message.
	Header *AMQPAnnotatedMessageHeader

	// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	MessageAnnotations map[any]any

	// Properties corresponds to the properties section of an AMQP message.
	Properties *AMQPAnnotatedMessageProperties
	// contains filtered or unexported fields
}

AMQPAnnotatedMessage represents the AMQP message, as received from Service Bus. For details about these properties, refer to the AMQP specification:

https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format

Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some cases slices and maps.

AMQP simple types include: - int (any size), uint (any size) - float (any size) - string - bool - time.Time

type AMQPAnnotatedMessageBody added in v1.1.0

type AMQPAnnotatedMessageBody struct {
	// Data is encoded/decoded as multiple data sections in the body.
	Data [][]byte

	// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
	//
	// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	Sequence [][]any

	// Value is encoded/decoded as the amqp-value section in the body.
	//
	// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
	// as well as slices or maps of AMQP simple types.
	Value any
}

AMQPAnnotatedMessageBody represents the body of an AMQP message. Only one of these fields can be used a a time. They are mutually exclusive.

type AMQPAnnotatedMessageHeader added in v1.1.0

type AMQPAnnotatedMessageHeader struct {
	// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
	// It corresponds to the 'delivery-count' property.
	DeliveryCount uint32

	// Durable corresponds to the 'durable' property.
	Durable bool

	// FirstAcquirer corresponds to the 'first-acquirer' property.
	FirstAcquirer bool

	// Priority corresponds to the 'priority' property.
	Priority uint8

	// TTL corresponds to the 'ttl' property.
	TTL time.Duration
}

AMQPAnnotatedMessageHeader carries standard delivery details about the transfer of a message. See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header for more details.

type AMQPAnnotatedMessageProperties added in v1.1.0

type AMQPAnnotatedMessageProperties struct {
	// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
	AbsoluteExpiryTime *time.Time

	// ContentEncoding corresponds to the 'content-encoding' property.
	ContentEncoding *string

	// ContentType corresponds to the 'content-type' property
	ContentType *string

	// CorrelationID corresponds to the 'correlation-id' property.
	// The type of CorrelationID can be a uint64, UUID, []byte, or a string
	CorrelationID any

	// CreationTime corresponds to the 'creation-time' property.
	CreationTime *time.Time

	// GroupID corresponds to the 'group-id' property.
	GroupID *string

	// GroupSequence corresponds to the 'group-sequence' property.
	GroupSequence *uint32

	// MessageID corresponds to the 'message-id' property.
	// The type of MessageID can be a uint64, UUID, []byte, or string
	MessageID any

	// ReplyTo corresponds to the 'reply-to' property.
	ReplyTo *string

	// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
	ReplyToGroupID *string

	// Subject corresponds to the 'subject' property.
	Subject *string

	// To corresponds to the 'to' property.
	To *string

	// UserID corresponds to the 'user-id' property.
	UserID []byte
}

AMQPAnnotatedMessageProperties represents the properties of an AMQP message. See here for more details: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties

type AbandonMessageOptions added in v0.3.0

type AbandonMessageOptions struct {
	// PropertiesToModify specifies properties to modify in the message when it is abandoned.
	PropertiesToModify map[string]any
}

AbandonMessageOptions contains optional parameters for Client.AbandonMessage

type AddAMQPAnnotatedMessageOptions added in v1.1.0

type AddAMQPAnnotatedMessageOptions struct {
}

AddAMQPAnnotatedMessageOptions contains optional parameters for the AddAMQPAnnotatedMessage function.

type AddMessageOptions added in v0.4.0

type AddMessageOptions struct {
}

AddMessageOptions contains optional parameters for the AddMessage function.

type CancelScheduledMessagesOptions added in v0.4.0

type CancelScheduledMessagesOptions struct {
}

CancelScheduledMessagesOptions contains optional parameters for the CancelScheduledMessages function.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides methods to create Sender and Receiver instances to send and receive messages from Service Bus.

func NewClient

func NewClient(fullyQualifiedNamespace string, credential azcore.TokenCredential, options *ClientOptions) (*Client, error)

NewClient creates a new Client for a Service Bus namespace, using a TokenCredential. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net) credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package.

Example
// NOTE: If you'd like to authenticate using a Service Bus connection string
// look at `NewClientFromConnectionString` instead.

// `DefaultAzureCredential` tries several common credential types. For more credential types
// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
credential, err := azidentity.NewDefaultAzureCredential(nil)

if err != nil {
	panic(err)
}

client, err = azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)

if err != nil {
	panic(err)
}
Output:

Example (ConfiguringRetries)
// NOTE: If you'd like to authenticate using a Service Bus connection string
// look at `NewClientFromConnectionString` instead.
client, err = azservicebus.NewClient(endpoint, tokenCredential, &azservicebus.ClientOptions{
	// NOTE: you don't need to configure these explicitly if you like the defaults.
	// For more information see:
	//  https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus#RetryOptions
	RetryOptions: azservicebus.RetryOptions{
		// MaxRetries specifies the maximum number of attempts a failed operation will be retried
		// before producing an error.
		MaxRetries: 3,
		// RetryDelay specifies the initial amount of delay to use before retrying an operation.
		// The delay increases exponentially with each retry up to the maximum specified by MaxRetryDelay.
		RetryDelay: 4 * time.Second,
		// MaxRetryDelay specifies the maximum delay allowed before retrying an operation.
		// Typically the value is greater than or equal to the value specified in RetryDelay.
		MaxRetryDelay: 120 * time.Second,
	},
})

if err != nil {
	panic(err)
}
Output:

Example (UsingWebsocketsAndProxies)
// You can use an HTTP proxy, with websockets, by setting the appropriate HTTP(s)_PROXY
// variable in your environment, as described in the https://pkg.go.dev/net/http#ProxyFromEnvironment
// function.
//
// A proxy is NOT required to use websockets.
newWebSocketConnFn := func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) {
	opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}}
	wssConn, _, err := websocket.Dial(ctx, args.Host, opts)

	if err != nil {
		return nil, err
	}

	return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil
}

// NOTE: If you'd like to authenticate using a Service Bus connection string
// look at `NewClientFromConnectionString` instead.
client, err = azservicebus.NewClient(endpoint, tokenCredential, &azservicebus.ClientOptions{
	NewWebSocketConn: newWebSocketConnFn,
})

if err != nil {
	panic(err)
}

// NOTE: For users of `nhooyr.io/websocket` there's an open discussion here:
//   https://github.com/nhooyr/websocket/discussions/380
//
// An error ("failed to read frame header: EOF") can be returned when the
// websocket connection is closed. This error will be returned from the
// `Client.Close` function and can be ignored, as the websocket "close handshake"
// has already completed.
defer client.Close(context.TODO())
Output:

func NewClientFromConnectionString added in v0.2.0

func NewClientFromConnectionString(connectionString string, options *ClientOptions) (*Client, error)

NewClientFromConnectionString creates a new Client for a Service Bus namespace using a connection string. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). connectionString can be a Service Bus connection string for the namespace or for an entity, which contains a SharedAccessKeyName and SharedAccessKey properties (for instance, from the Azure Portal):

Endpoint=sb://<sb>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>

Or it can be a connection string with a SharedAccessSignature:

Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>
Example
// NOTE: If you'd like to authenticate via Azure Active Directory look at
// the `NewClient` function instead.

client, err = azservicebus.NewClientFromConnectionString(connectionString, nil)

if err != nil {
	panic(err)
}
Output:

func (*Client) AcceptNextSessionForQueue added in v0.2.0

func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptNextSessionForQueue accepts the next available session from a queue. NOTE: this receiver is initialized immediately, not lazily.

If the operation fails and the failure is actionable this function will return an *azservicebus.Error. If, for example, the operation times out because there are no available sessions it will return an *azservicebus.Error where the Code is CodeTimeout.

Example
for {
	func() {
		sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), "exampleSessionQueue", nil)

		if err != nil {
			var sbErr *azservicebus.Error

			if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
				// there are no sessions available. This isn't fatal - we can use the client and
				// try to AcceptNextSessionForQueue() again.
				fmt.Printf("No session available\n")
				return
			} else {
				panic(err)
			}
		}

		defer sessionReceiver.Close(context.TODO())

		fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())

		// see ExampleClient_AcceptSessionForQueue() for usage of the SessionReceiver.
	}()
}
Output:

Example (Roundrobin)
package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
	var sessionEnabledQueueName = "exampleSessionQueue"

	for {
		// You can have multiple active session receivers, provided they're each receiving
		// from different sessions.
		//
		// AcceptNextSessionForQueue (or AcceptNextSessionForSubscription) makes it simple to implement
		// this pattern, consuming multiple session receivers in parallel.
		sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), sessionEnabledQueueName, nil)

		if err != nil {
			var sbErr *azservicebus.Error

			if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
				fmt.Printf("No sessions available\n")

				// NOTE: you could also continue here, which will block and wait again for a
				// session to become available.
				break
			}

			panic(err)
		}

		fmt.Printf("Got receiving for session '%s'\n", sessionReceiver.SessionID())

		// consume the session
		go func() {
			defer func() {
				fmt.Printf("Closing receiver for session '%s'\n", sessionReceiver.SessionID())
				err := sessionReceiver.Close(context.TODO())

				if err != nil {
					panic(err)
				}
			}()

			// we're only reading a few messages here, but you can also receive in a loop
			messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 10, nil)

			if err != nil {
				panic(err)
			}

			fmt.Printf("Received %d messages from session '%s'\n", len(messages), sessionReceiver.SessionID())

			for _, m := range messages {
				if err := processMessageFromSession(context.TODO(), sessionReceiver, m); err != nil {
					panic(err)
				}
			}
		}()
	}
}

func processMessageFromSession(ctx context.Context, receiver *azservicebus.SessionReceiver, receivedMsg *azservicebus.ReceivedMessage) error {
	fmt.Printf("Received message for session %s, message ID %s\n", *receivedMsg.SessionID, receivedMsg.MessageID)
	return receiver.CompleteMessage(ctx, receivedMsg, nil)
}
Output:

func (*Client) AcceptNextSessionForSubscription added in v0.2.0

func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptNextSessionForSubscription accepts the next available session from a subscription. NOTE: this receiver is initialized immediately, not lazily.

If the operation fails and the failure is actionable this function will return an *azservicebus.Error. If, for example, the operation times out because there are no available sessions it will return an *azservicebus.Error where the Code is CodeTimeout.

Example
for {
	func() {
		sessionReceiver, err := client.AcceptNextSessionForSubscription(context.TODO(), "exampleTopicName", "exampleSubscriptionName", nil)

		if err != nil {
			var sbErr *azservicebus.Error

			if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
				// there are no sessions available. This isn't fatal - we can use the client and
				// try to AcceptNextSessionForSubscription() again.
				fmt.Printf("No session available\n")
				return
			} else {
				panic(err)
			}
		}

		defer sessionReceiver.Close(context.TODO())

		fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())

		// see AcceptSessionForSubscription() for some usage of the SessionReceiver itself.
	}()
}
Output:

func (*Client) AcceptSessionForQueue added in v0.2.0

func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptSessionForQueue accepts a session from a queue with a specific session ID. NOTE: this receiver is initialized immediately, not lazily. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

Example
sessionReceiver, err := client.AcceptSessionForQueue(context.TODO(), "exampleSessionQueue", "exampleSessionId", nil)

if err != nil {
	panic(err)
}

defer sessionReceiver.Close(context.TODO())

// session receivers work just like non-session receivers
// with one difference - instead of a lock per message there is a lock
// for the session itself.

// Like a message lock, you'll want to periodically renew your session lock.
err = sessionReceiver.RenewSessionLock(context.TODO(), nil)

if err != nil {
	panic(err)
}

messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 5, nil)

if err != nil {
	panic(err)
}

for _, message := range messages {
	err = sessionReceiver.CompleteMessage(context.TODO(), message, nil)

	if err != nil {
		panic(err)
	}

	fmt.Printf("Received message from session ID \"%s\" and completed it", *message.SessionID)
}
Output:

func (*Client) AcceptSessionForSubscription added in v0.2.0

func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptSessionForSubscription accepts a session from a subscription with a specific session ID. NOTE: this receiver is initialized immediately, not lazily. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

Example
sessionReceiver, err := client.AcceptSessionForSubscription(context.TODO(), "exampleTopic", "exampleSubscription", "exampleSessionId", nil)

if err != nil {
	panic(err)
}

defer sessionReceiver.Close(context.TODO())

// see ExampleClient_AcceptSessionForQueue() for usage of the SessionReceiver.
Output:

func (*Client) Close

func (client *Client) Close(ctx context.Context) error

Close closes the current connection Service Bus as well as any Senders or Receivers created using this client.

func (*Client) NewReceiverForQueue

func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOptions) (*Receiver, error)

NewReceiverForQueue creates a Receiver for a queue. A receiver allows you to receive messages.

Example
receiver, err = client.NewReceiverForQueue(
	"exampleQueue",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
	},
)
exitOnError("Failed to create Receiver", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
Output:

Example (DeadLetterQueue)
receiver, err = client.NewReceiverForQueue(
	"exampleQueue",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
		SubQueue:    azservicebus.SubQueueDeadLetter,
	},
)
exitOnError("Failed to create Receiver for DeadLetterQueue", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
Output:

func (*Client) NewReceiverForSubscription

func (client *Client) NewReceiverForSubscription(topicName string, subscriptionName string, options *ReceiverOptions) (*Receiver, error)

NewReceiverForSubscription creates a Receiver for a subscription. A receiver allows you to receive messages.

Example
receiver, err = client.NewReceiverForSubscription(
	"exampleTopic",
	"exampleSubscription",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
	},
)
exitOnError("Failed to create Receiver", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
Output:

Example (DeadLetterQueue)
receiver, err = client.NewReceiverForSubscription(
	"exampleTopic",
	"exampleSubscription",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
		SubQueue:    azservicebus.SubQueueDeadLetter,
	},
)
exitOnError("Failed to create Receiver for DeadLetterQueue", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
Output:

func (*Client) NewSender

func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions) (*Sender, error)

NewSender creates a Sender, which allows you to send messages or schedule messages.

Example
sender, err = client.NewSender("<queue or topic>", nil)

if err != nil {
	panic(err)
}

// close the sender when it's no longer needed
defer sender.Close(context.TODO())
Output:

type ClientOptions

type ClientOptions struct {
	// TLSConfig configures a client with a custom *tls.Config.
	TLSConfig *tls.Config

	// Application ID that will be passed to the namespace.
	ApplicationID string

	// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
	// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
	NewWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error)

	// RetryOptions controls how often operations are retried from this client and any
	// Receivers and Senders created from this client.
	RetryOptions RetryOptions
}

ClientOptions contains options for the `NewClient` and `NewClientFromConnectionString` functions.

type Code added in v0.4.1

type Code = exported.Code

Code is an error code, usable by consuming code to work with programatically.

type CompleteMessageOptions added in v0.4.0

type CompleteMessageOptions struct {
}

CompleteMessageOptions contains optional parameters for the CompleteMessage function.

type DeadLetterOptions

type DeadLetterOptions struct {
	// ErrorDescription that caused the dead lettering of the message.
	ErrorDescription *string

	// Reason for dead lettering the message.
	Reason *string

	// PropertiesToModify specifies properties to modify in the message when it is dead lettered.
	PropertiesToModify map[string]any
}

DeadLetterOptions describe the reason and error description for dead lettering a message using the `Receiver.DeadLetterMessage()`

type DeferMessageOptions added in v0.3.0

type DeferMessageOptions struct {
	// PropertiesToModify specifies properties to modify in the message when it is deferred
	PropertiesToModify map[string]any
}

DeferMessageOptions contains optional parameters for Client.DeferMessage

type Error added in v0.4.1

type Error = exported.Error

Error represents a Service Bus specific error. NOTE: the Code is considered part of the published API but the message that comes back from Error(), as well as the underlying wrapped error, are NOT and are subject to change.

type GetSessionStateOptions added in v0.4.0

type GetSessionStateOptions struct {
}

GetSessionStateOptions contains optional parameters for the GetSessionState function.

type Message

type Message struct {
	// ApplicationProperties can be used to store custom metadata for a message.
	ApplicationProperties map[string]any

	// Body corresponds to the first []byte array in the Data section of an AMQP message.
	Body []byte

	// ContentType describes the payload of the message, with a descriptor following
	// the format of Content-Type, specified by RFC2045 (ex: "application/json").
	ContentType *string

	// CorrelationID allows an application to specify a context for the message for the purposes of
	// correlation, for example reflecting the MessageID of a message that is being
	// replied to.
	CorrelationID *string

	// MessageID is an application-defined value that uniquely identifies
	// the message and its payload. The identifier is a free-form string.
	//
	// If enabled, the duplicate detection feature identifies and removes further submissions
	// of messages with the same MessageId.
	MessageID *string

	// PartitionKey is used with a partitioned entity and enables assigning related messages
	// to the same internal partition. This ensures that the submission sequence order is correctly
	// recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen
	// directly.
	//
	// For session-aware entities, the ReceivedMessage.SessionID overrides this value.
	PartitionKey *string

	// ReplyTo is an application-defined value specify a reply path to the receiver of the message. When
	// a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic
	// it expects the reply to be sent to.
	ReplyTo *string

	// ReplyToSessionID augments the ReplyTo information and specifies which SessionId should
	// be set for the reply when sent to the reply entity.
	ReplyToSessionID *string

	// ScheduledEnqueueTime specifies a time when a message will be enqueued. The message is transferred
	// to the broker but will not available until the scheduled time.
	ScheduledEnqueueTime *time.Time

	// SessionID is used with session-aware entities and associates a message with an application-defined
	// session ID. Note that an empty string is a valid session identifier.
	// Messages with the same session identifier are subject to summary locking and enable
	// exact in-order processing and demultiplexing. For session-unaware entities, this value is ignored.
	SessionID *string

	// Subject enables an application to indicate the purpose of the message, similar to an email subject line.
	Subject *string

	// TimeToLive is the duration after which the message expires, starting from the instant the
	// message has been accepted and stored by the broker, found in the ReceivedMessage.EnqueuedTime
	// property.
	//
	// When not set explicitly, the assumed value is the DefaultTimeToLive for the queue or topic.
	// A message's TimeToLive cannot be longer than the entity's DefaultTimeToLive is silently
	// adjusted if it does.
	TimeToLive *time.Duration

	// To is reserved for future use in routing scenarios but is not currently used by Service Bus.
	// Applications can use this value to indicate the logical destination of the message.
	To *string
}

Message is a message with a body and commonly used properties. Properties that are pointers are optional.

type MessageBatch

type MessageBatch struct {
	// contains filtered or unexported fields
}

MessageBatch represents a batch of messages to send to Service Bus in a single message

func (*MessageBatch) AddAMQPAnnotatedMessage added in v1.1.0

func (mb *MessageBatch) AddAMQPAnnotatedMessage(m *AMQPAnnotatedMessage, options *AddAMQPAnnotatedMessageOptions) error

AddAMQPAnnotatedMessage adds a message to the batch if the message will not exceed the max size of the batch Returns: - ErrMessageTooLarge if the message cannot fit - a non-nil error for other failures - nil, otherwise

func (*MessageBatch) AddMessage added in v0.3.0

func (mb *MessageBatch) AddMessage(m *Message, options *AddMessageOptions) error

AddMessage adds a message to the batch if the message will not exceed the max size of the batch Returns: - ErrMessageTooLarge if the message cannot fit - a non-nil error for other failures - nil, otherwise

func (*MessageBatch) NumBytes added in v0.3.0

func (mb *MessageBatch) NumBytes() uint64

NumBytes is the number of bytes in the message batch

func (*MessageBatch) NumMessages added in v0.3.0

func (mb *MessageBatch) NumMessages() int32

NumMessages returns the # of messages in the batch.

type MessageBatchOptions

type MessageBatchOptions struct {
	// MaxBytes overrides the max size (in bytes) for a batch.
	// By default NewMessageBatch will use the max message size provided by the service.
	MaxBytes uint64
}

MessageBatchOptions contains options for the `Sender.NewMessageBatch` function.

type MessageState added in v0.3.4

type MessageState int32

MessageState represents the current state of a message (Active, Scheduled, Deferred).

const (
	// MessageStateActive indicates the message is active.
	MessageStateActive MessageState = 0
	// MessageStateDeferred indicates the message is deferred.
	MessageStateDeferred MessageState = 1
	// MessageStateScheduled indicates the message is scheduled.
	MessageStateScheduled MessageState = 2
)

type NewSenderOptions added in v0.3.0

type NewSenderOptions struct {
}

NewSenderOptions contains optional parameters for Client.NewSender

type NewWebSocketConnArgs added in v0.3.2

type NewWebSocketConnArgs = exported.NewWebSocketConnArgs

NewWebSocketConnArgs are passed to your web socket creation function (ClientOptions.NewWebSocketConn)

type PeekMessagesOptions added in v0.2.0

type PeekMessagesOptions struct {
	// FromSequenceNumber is the sequence number to start with when peeking messages.
	FromSequenceNumber *int64
}

PeekMessagesOptions contains options for the `Receiver.PeekMessages` function.

type ReceiveDeferredMessagesOptions added in v0.4.0

type ReceiveDeferredMessagesOptions struct {
}

ReceiveDeferredMessagesOptions contains optional parameters for the ReceiveDeferredMessages function.

type ReceiveMessagesOptions added in v0.3.0

type ReceiveMessagesOptions struct {
	// TimeAfterFirstMessage controls how long, after a message has been received, before we return the
	// accumulated batch of messages.
	//
	// Default value depends on the receive mode:
	// - 20ms when the receiver is in ReceiveModePeekLock
	// - 1s when the receiver is in ReceiveModeReceiveAndDelete
	TimeAfterFirstMessage time.Duration
}

ReceiveMessagesOptions are options for the ReceiveMessages function.

type ReceiveMode

type ReceiveMode = exported.ReceiveMode

ReceiveMode represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`

const (
	// ReceiveModePeekLock will lock messages as they are received and can be settled
	// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
	// functions.
	ReceiveModePeekLock ReceiveMode = exported.PeekLock
	// ReceiveModeReceiveAndDelete will delete messages as they are received.
	ReceiveModeReceiveAndDelete ReceiveMode = exported.ReceiveAndDelete
)

type ReceivedMessage

type ReceivedMessage struct {
	// ApplicationProperties can be used to store custom metadata for a message.
	ApplicationProperties map[string]any

	// Body is the payload for a message.
	Body []byte

	// ContentType describes the payload of the message, with a descriptor following
	// the format of Content-Type, specified by RFC2045 (ex: "application/json").
	ContentType *string

	// CorrelationID allows an application to specify a context for the message for the purposes of
	// correlation, for example reflecting the MessageID of a message that is being
	// replied to.
	CorrelationID *string

	// DeadLetterErrorDescription is the description set when the message was dead-lettered.
	DeadLetterErrorDescription *string

	// DeadLetterReason is the reason set when the message was dead-lettered.
	DeadLetterReason *string

	// DeadLetterSource is the name of the queue or subscription this message was enqueued on
	// before it was dead-lettered.
	DeadLetterSource *string

	// DeliveryCount is number of times this message has been delivered.
	// This number is incremented when a message lock expires or if the message is explicitly abandoned
	// with Receiver.AbandonMessage.
	DeliveryCount uint32

	// EnqueuedSequenceNumber is the original sequence number assigned to a message, before it
	// was auto-forwarded.
	EnqueuedSequenceNumber *int64

	// EnqueuedTime is the UTC time when the message was accepted and stored by Service Bus.
	EnqueuedTime *time.Time

	// ExpiresAt is the time when this message will expire.
	//
	// This time is calculated by adding the TimeToLive property, set in the message that was sent, along  with the
	// EnqueuedTime of the message.
	ExpiresAt *time.Time

	// LockedUntil is the time when the lock expires for this message.
	// This can be extended by using Receiver.RenewMessageLock.
	LockedUntil *time.Time

	// LockToken is the lock token for a message received from a Receiver created with a receive mode of ReceiveModePeekLock.
	LockToken [16]byte

	// MessageID is an application-defined value that uniquely identifies
	// the message and its payload. The identifier is a free-form string.
	//
	// If enabled, the duplicate detection feature identifies and removes further submissions
	// of messages with the same MessageId.
	MessageID string

	// PartitionKey is used with a partitioned entity and enables assigning related messages
	// to the same internal partition. This ensures that the submission sequence order is correctly
	// recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen
	// directly.
	//
	// For session-aware entities, the ReceivedMessage.SessionID overrides this value.
	PartitionKey *string

	// ReplyTo is an application-defined value specify a reply path to the receiver of the message. When
	// a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic
	// it expects the reply to be sent to.
	ReplyTo *string

	// ReplyToSessionID augments the ReplyTo information and specifies which SessionId should
	// be set for the reply when sent to the reply entity.
	ReplyToSessionID *string

	// ScheduledEnqueueTime specifies a time when a message will be enqueued. The message is transferred
	// to the broker but will not available until the scheduled time.
	ScheduledEnqueueTime *time.Time

	// SequenceNumber is a unique number assigned to a message by Service Bus.
	SequenceNumber *int64

	// SessionID is used with session-aware entities and associates a message with an application-defined
	// session ID. Note that an empty string is a valid session identifier.
	// Messages with the same session identifier are subject to summary locking and enable
	// exact in-order processing and demultiplexing. For session-unaware entities, this value is ignored.
	SessionID *string

	// State represents the current state of the message (Active, Scheduled, Deferred).
	State MessageState

	// Subject enables an application to indicate the purpose of the message, similar to an email subject line.
	Subject *string

	// TimeToLive is the duration after which the message expires, starting from the instant the
	// message has been accepted and stored by the broker, found in the ReceivedMessage.EnqueuedTime
	// property.
	//
	// When not set explicitly, the assumed value is the DefaultTimeToLive for the queue or topic.
	// A message's TimeToLive cannot be longer than the entity's DefaultTimeToLive, and is silently
	// adjusted if it is.
	TimeToLive *time.Duration

	// To is reserved for future use in routing scenarios but is not currently used by Service Bus.
	// Applications can use this value to indicate the logical destination of the message.
	To *string

	// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
	// to properties that are not exposed by ReceivedMessage such as payloads encoded into the
	// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
	// and Header fields.
	RawAMQPMessage *AMQPAnnotatedMessage
	// contains filtered or unexported fields
}

ReceivedMessage is a received message from a Client.NewReceiver().

func (*ReceivedMessage) Message added in v1.4.1

func (rm *ReceivedMessage) Message() *Message

Message creates a shallow copy of the fields from this message to an instance of Message.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver receives messages using pull based functions (ReceiveMessages).

func (*Receiver) AbandonMessage

func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error

AbandonMessage will cause a message to be available again from the queue or subscription. This will increment its delivery count, and potentially cause it to be dead-lettered depending on your queue or subscription's configuration. This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Receiver) Close

func (r *Receiver) Close(ctx context.Context) error

Close permanently closes the receiver.

func (*Receiver) CompleteMessage

func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error

CompleteMessage completes a message, deleting it from the queue or subscription. This function can only be used when the Receiver has been opened with ReceiveModePeekLock. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Receiver) DeadLetterMessage

func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error

DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option. This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

Example
// Send a message to a queue
sbMessage := &azservicebus.Message{
	Body: []byte("body of message"),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
	panic(err)
}
// Create a receiver
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
	panic(err)
}
defer receiver.Close(context.TODO())
// Get the message from a queue
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
	panic(err)
}
// Send a message to the dead letter queue
for _, message := range messages {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}
	err := receiver.DeadLetterMessage(context.TODO(), message, deadLetterOptions)
	if err != nil {
		panic(err)
	}
}
Output:

func (*Receiver) DeferMessage

func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error

DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`. This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Receiver) PeekMessages

func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)

PeekMessages will peek messages without locking or deleting messages.

The Receiver stores the last peeked sequence number internally, and will use it as the start location for the next PeekMessages() call. You can override this behavior by passing an explicit sequence number in azservicebus.PeekMessagesOptions.FromSequenceNumber.

Messages that are peeked are not locked, so settlement methods like Receiver.CompleteMessage, Receiver.AbandonMessage, Receiver.DeferMessage or Receiver.DeadLetterMessage will not work with them.

If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

For more information about peeking/message-browsing see https://aka.ms/azsdk/servicebus/message-browsing

func (*Receiver) ReceiveDeferredMessages

func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)

ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Receiver) ReceiveMessages

func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)

ReceiveMessages receives a fixed number of messages, up to numMessages. This function will block until at least one message is received or until the ctx is cancelled. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

Example
// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err = receiver.ReceiveMessages(ctx,
	// The number of messages to receive. Note this is merely an upper
	// bound. It is possible to get fewer message (or zero), depending
	// on the contents of the remote queue or subscription and network
	// conditions.
	1,
	nil,
)

if err != nil {
	panic(err)
}

for _, message := range messages {
	// The message body is a []byte. For this example we're just assuming that the body
	// was a string, converted to bytes but any []byte payload is valid.
	var body []byte = message.Body
	fmt.Printf("Message received with body: %s\n", string(body))

	// For more information about settling messages:
	// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
	err = receiver.CompleteMessage(context.TODO(), message, nil)

	if err != nil {
		var sbErr *azservicebus.Error

		if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
			// The message lock has expired. This isn't fatal for the client, but it does mean
			// that this message can be received by another Receiver (or potentially this one!).
			fmt.Printf("Message lock expired\n")

			// You can extend the message lock by calling receiver.RenewMessageLock(msg) before the
			// message lock has expired.
			continue
		}

		panic(err)
	}

	fmt.Printf("Received and completed the message\n")
}
Output:

Example (AmqpMessage)
// AMQP is the underlying protocol for all interaction with Service Bus.
// You can, if needed, send and receive messages that have a 1:1 correspondence
// with an AMQP message. This gives you full control over details that are not
// exposed via the azservicebus.ReceivedMessage type.

messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)

if err != nil {
	panic(err)
}

// NOTE: For this example we'll assume we received at least one message.

// Every received message carries a RawAMQPMessage.
var rawAMQPMessage *azservicebus.AMQPAnnotatedMessage = messages[0].RawAMQPMessage

// All the various body encodings available for AMQP messages are exposed via Body
_ = rawAMQPMessage.Body.Data
_ = rawAMQPMessage.Body.Value
_ = rawAMQPMessage.Body.Sequence

// delivery and message annotations
_ = rawAMQPMessage.DeliveryAnnotations
_ = rawAMQPMessage.MessageAnnotations

// headers and footers
_ = rawAMQPMessage.Header
_ = rawAMQPMessage.Footer

// Settlement (if in azservicebus.ReceiveModePeekLockMode) stil works on the ReceivedMessage.
err = receiver.CompleteMessage(context.TODO(), messages[0], nil)

if err != nil {
	panic(err)
}
Output:

Example (Second)
// Create a dead letter receiver
deadLetterReceiver, err := client.NewReceiverForQueue(
	"myqueue",
	&azservicebus.ReceiverOptions{
		SubQueue: azservicebus.SubQueueDeadLetter,
	},
)
if err != nil {
	panic(err)
}
defer deadLetterReceiver.Close(context.TODO())
// Get messages from the dead letter queue
deadLetterMessages, err := deadLetterReceiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
	panic(err)
}
// Make messages in the dead letter queue as complete
for _, deadLetterMessage := range deadLetterMessages {
	fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *deadLetterMessage.DeadLetterReason, *deadLetterMessage.DeadLetterErrorDescription)
	err := deadLetterReceiver.CompleteMessage(context.TODO(), deadLetterMessage, nil)
	if err != nil {
		panic(err)
	}
}
Output:

func (*Receiver) RenewMessageLock added in v0.2.0

func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, options *RenewMessageLockOptions) error

RenewMessageLock renews the lock on a message, updating the `LockedUntil` field on `msg`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

type ReceiverOptions

type ReceiverOptions struct {
	// ReceiveMode controls when a message is deleted from Service Bus.
	//
	// ReceiveModePeekLock is the default. The message is locked, preventing multiple
	// receivers from processing the message at once. You control the lock state of the message
	// using one of the message settlement functions like Receiver.CompleteMessage(), which removes
	// it from Service Bus, or Receiver.AbandonMessage(), which makes it available again.
	//
	// ReceiveModeReceiveAndDelete causes Service Bus to remove the message as soon
	// as it's received.
	//
	// More information about receive modes:
	// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
	ReceiveMode ReceiveMode

	// SubQueue should be set to connect to the sub queue (ex: dead letter queue)
	// of the queue or subscription.
	SubQueue SubQueue
}

ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` functions.

type RenewMessageLockOptions added in v0.4.0

type RenewMessageLockOptions struct {
}

RenewMessageLockOptions contains optional parameters for the RenewMessageLock function.

type RenewSessionLockOptions added in v0.4.0

type RenewSessionLockOptions struct {
}

RenewSessionLockOptions contains optional parameters for the RenewSessionLock function.

type RetryOptions added in v0.3.4

type RetryOptions = exported.RetryOptions

RetryOptions controls how often operations are retried from this client and any Receivers and Senders created from this client.

type ScheduleAMQPAnnotatedMessagesOptions added in v1.1.0

type ScheduleAMQPAnnotatedMessagesOptions struct {
}

ScheduleAMQPAnnotatedMessagesOptions contains optional parameters for the ScheduleAMQPAnnotatedMessages function.

type ScheduleMessagesOptions added in v0.4.0

type ScheduleMessagesOptions struct {
}

ScheduleMessagesOptions contains optional parameters for the ScheduleMessages function.

type SendAMQPAnnotatedMessageOptions added in v1.1.0

type SendAMQPAnnotatedMessageOptions struct {
}

SendAMQPAnnotatedMessageOptions contains optional parameters for the SendAMQPAnnotatedMessage function.

type SendMessageBatchOptions added in v0.4.0

type SendMessageBatchOptions struct {
}

SendMessageBatchOptions contains optional parameters for the SendMessageBatch function.

type SendMessageOptions added in v0.4.0

type SendMessageOptions struct {
}

SendMessageOptions contains optional parameters for the SendMessage function.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender is used to send messages as well as schedule them to be delivered at a later date.

func (*Sender) CancelScheduledMessages added in v0.2.0

func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumbers []int64, options *CancelScheduledMessagesOptions) error

CancelScheduledMessages cancels multiple messages that were scheduled. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Sender) Close

func (s *Sender) Close(ctx context.Context) error

Close permanently closes the Sender.

func (*Sender) NewMessageBatch

func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error)

NewMessageBatch can be used to create a batch that contain multiple messages. Sending a batch of messages is more efficient than sending the messages one at a time. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Sender) ScheduleAMQPAnnotatedMessages added in v1.1.0

func (s *Sender) ScheduleAMQPAnnotatedMessages(ctx context.Context, messages []*AMQPAnnotatedMessage, scheduledEnqueueTime time.Time, options *ScheduleAMQPAnnotatedMessagesOptions) ([]int64, error)

ScheduleAMQPAnnotatedMessages schedules a slice of Messages to appear on Service Bus Queue/Subscription at a later time. Returns the sequence numbers of the messages that were scheduled. Messages that haven't been delivered can be cancelled using `Receiver.CancelScheduleMessage(s)` If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*Sender) ScheduleMessages added in v0.2.0

func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, scheduledEnqueueTime time.Time, options *ScheduleMessagesOptions) ([]int64, error)

ScheduleMessages schedules a slice of Messages to appear on Service Bus Queue/Subscription at a later time. Returns the sequence numbers of the messages that were scheduled. Messages that haven't been delivered can be cancelled using `Receiver.CancelScheduleMessage(s)` If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

Example
// there are two ways of scheduling messages:
// 1. Using the `Sender.ScheduleMessages()` function.
// 2. Setting the `Message.ScheduledEnqueueTime` field on a message.

// schedule the message to be delivered in an hour.
sequenceNumbers, err := sender.ScheduleMessages(context.TODO(),
	[]*azservicebus.Message{
		{Body: []byte("hello world")},
	}, time.Now().Add(time.Hour), nil)
exitOnError("Failed to schedule messages", err)

err = sender.CancelScheduledMessages(context.TODO(), sequenceNumbers, nil)
exitOnError("Failed to cancel scheduled messages", err)

// or you can set the `ScheduledEnqueueTime` field on a message when you send it
future := time.Now().Add(time.Hour)

err = sender.SendMessage(context.TODO(),
	&azservicebus.Message{
		Body: []byte("hello world"),
		// schedule the message to be delivered in an hour.
		ScheduledEnqueueTime: &future,
	}, nil)
exitOnError("Failed to schedule messages using SendMessage", err)
Output:

func (*Sender) SendAMQPAnnotatedMessage added in v1.1.0

func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error

SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic. Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better interoperability with pure AMQP clients. If the operation fails it can return:

Example
// AMQP is the underlying protocol for all interaction with Service Bus.
// You can, if needed, send and receive messages that have a 1:1 correspondence
// with an AMQP message. This gives you full control over details that are not
// exposed via the azservicebus.ReceivedMessage type.

message := &azservicebus.AMQPAnnotatedMessage{
	Body: azservicebus.AMQPAnnotatedMessageBody{
		// there are three kinds of different body encodings
		// Data, Value and Sequence. See the azservicebus.AMQPMessageBody
		// documentation for more details.
		Value: "hello",
	},
	// full access to fields not normally exposed in azservicebus.Message, like
	// the delivery and message annotations.
	MessageAnnotations: map[any]any{
		"x-opt-message-test": "test-annotation-value",
	},
	DeliveryAnnotations: map[any]any{
		"x-opt-delivery-test": "test-annotation-value",
	},
}

err := sender.SendAMQPAnnotatedMessage(context.TODO(), message, nil)

if err != nil {
	panic(err)
}
Output:

func (*Sender) SendMessage

func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error

SendMessage sends a Message to a queue or topic. If the operation fails it can return:

Example (Message)
message := &azservicebus.Message{
	Body: []byte("hello, this is a message"),
}

err = sender.SendMessage(context.TODO(), message, nil)
exitOnError("Failed to send message", err)
Output:

Example (MessageBatch)
batch, err := sender.NewMessageBatch(context.TODO(), nil)
exitOnError("Failed to create message batch", err)

// By calling AddMessage multiple times you can add multiple messages into a
// batch. This can help with message throughput, as you can send multiple
// messages in a single send.
err = batch.AddMessage(&azservicebus.Message{Body: []byte("hello world")}, nil)

// We also support adding AMQPMessages directly to a batch as well
// batch.AddAMQPMessage(&azservicebus.AMQPMessage{})

if err != nil {
	switch err {
	case azservicebus.ErrMessageTooLarge:
		// At this point you can do a few things:
		// 1. Ignore this message
		// 2. Send this batch (it's full) and create a new batch.
		//
		// The batch can still be used after this error if you have
		// smaller messages you'd still like to add in.
		fmt.Printf("Failed to add message to batch\n")
	default:
		exitOnError("Error while trying to add message to batch", err)
	}
}

// After you add all the messages to the batch you send it using
// Sender.SendMessageBatch()
err = sender.SendMessageBatch(context.TODO(), batch, nil)
exitOnError("Failed to send message batch", err)
Output:

func (*Sender) SendMessageBatch added in v0.2.0

func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch, options *SendMessageBatchOptions) error

SendMessageBatch sends a MessageBatch to a queue or topic. Message batches can be created using Sender.NewMessageBatch. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

type SessionReceiver added in v0.2.0

type SessionReceiver struct {
	// contains filtered or unexported fields
}

SessionReceiver is a Receiver that handles sessions.

func (*SessionReceiver) AbandonMessage added in v0.2.0

func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error

AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) Close added in v0.2.0

func (r *SessionReceiver) Close(ctx context.Context) error

Close permanently closes the receiver.

func (*SessionReceiver) CompleteMessage added in v0.2.0

func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error

CompleteMessage completes a message, deleting it from the queue or subscription. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) DeadLetterMessage added in v0.2.0

func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error

DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) DeferMessage added in v0.2.0

func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error

DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) GetSessionState added in v0.2.0

func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSessionStateOptions) ([]byte, error)

GetSessionState retrieves state associated with the session. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) LockedUntil added in v0.2.0

func (sr *SessionReceiver) LockedUntil() time.Time

LockedUntil is the time the lock on this session expires. The lock can be renewed using `SessionReceiver.RenewSessionLock`.

func (*SessionReceiver) PeekMessages added in v0.2.0

func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)

PeekMessages will peek messages without locking or deleting messages.

The SessionReceiver stores the last peeked sequence number internally, and will use it as the start location for the next PeekMessages() call. You can override this behavior by passing an explicit sequence number in azservicebus.PeekMessagesOptions.FromSequenceNumber.

Messages that are peeked are not locked, so settlement methods like SessionReceiver.CompleteMessage, SessionReceiver.AbandonMessage, SessionReceiver.DeferMessage or SessionReceiver.DeadLetterMessage will not work with them.

If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

For more information about peeking/message-browsing see https://aka.ms/azsdk/servicebus/message-browsing

func (*SessionReceiver) ReceiveDeferredMessages added in v0.2.0

func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)

ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) ReceiveMessages added in v0.2.0

func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)

ReceiveMessages receives a fixed number of messages, up to maxMessages. This function will block until at least one message is received or until the ctx is cancelled. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) RenewSessionLock added in v0.2.0

func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) error

RenewSessionLock renews this session's lock. The new expiration time is available using `LockedUntil`. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

func (*SessionReceiver) SessionID added in v0.2.0

func (sr *SessionReceiver) SessionID() string

SessionID is the session ID for this SessionReceiver.

func (*SessionReceiver) SetSessionState added in v0.2.0

func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error

SetSessionState sets the state associated with the session. Pass nil for the state parameter to clear the stored session state. If the operation fails it can return an *azservicebus.Error type if the failure is actionable.

type SessionReceiverOptions added in v0.2.0

type SessionReceiverOptions struct {
	// ReceiveMode controls when a message is deleted from Service Bus.
	//
	// ReceiveModePeekLock is the default. The message is locked, preventing multiple
	// receivers from processing the message at once. You control the lock state of the message
	// using one of the message settlement functions like SessionReceiver.CompleteMessage(), which removes
	// it from Service Bus, or SessionReceiver.AbandonMessage(), which makes it available again.
	//
	// ReceiveModeReceiveAndDelete causes Service Bus to remove the message as soon
	// as it's received.
	//
	// More information about receive modes:
	// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
	ReceiveMode ReceiveMode
}

SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription` functions.

type SetSessionStateOptions added in v0.4.0

type SetSessionStateOptions struct {
}

SetSessionStateOptions contains optional parameters for the SetSessionState function.

type SubQueue

type SubQueue int

SubQueue allows you to target a subqueue of a queue or subscription. Ex: the dead letter queue (SubQueueDeadLetter).

const (
	// SubQueueDeadLetter targets the dead letter queue for a queue or subscription.
	SubQueueDeadLetter SubQueue = 1
	// SubQueueTransfer targets the transfer dead letter queue for a queue or subscription.
	SubQueueTransfer SubQueue = 2
)

Directories

Path Synopsis
Package admin provides `Client`, which can create and manage Queues, Topics and Subscriptions.
Package admin provides `Client`, which can create and manage Queues, Topics and Subscriptions.
amqpwrap
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
auth
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
sas

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL