Documentation ¶
Overview ¶
Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview for an overview.
URLs ¶
For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers for the scheme "azuresb". The default URL opener will use a Service Bus Connection String based on AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables. SERVICEBUS_CONNECTION_STRING takes precedence. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.
Message Delivery Semantics ¶
Azure ServiceBus supports at-least-once semantics in the default Peek-Lock mode; messages will be redelivered if they are not Acked, or if they are explicitly Nacked.
ServiceBus also supports a Receive-Delete mode, which essentially auto-acks a message when it is delivered, resulting in at-most-once semantics. Set SubscriberOptions.ReceiveAndDelete to true to tell azuresb.Subscription that you've enabled Receive-Delete mode. When enabled, pubsub.Message.Ack is a no-op, pubsub.Message.Nackable will return false, and pubsub.Message.Nack will panic.
See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.
As ¶
azuresb exposes the following types for As:
- Topic: *servicebus.Topic
- Subscription: *servicebus.Subscription
- Message.BeforeSend: *servicebus.Message
- Message.AfterSend: None
- Message: *servicebus.Message
- Error: common.Retryable, *amqp.Error, *amqp.LinkError
Example (OpenSubscriptionFromURL) ¶
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/azuresb" // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // This URL will open the subscription "mysubscription" for the topic // "mytopic" using a connection string from the environment variable // SERVICEBUS_CONNECTION_STRING. subscription, err := pubsub.OpenSubscription(ctx, "azuresb://mytopic?subscription=mysubscription") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenTopicFromURL) ¶
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/azuresb" // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenTopic creates a *pubsub.Topic from a URL. // This URL will open the topic "mytopic" using a connection string // from the environment variable SERVICEBUS_CONNECTION_STRING. topic, err := pubsub.OpenTopic(ctx, "azuresb://mytopic") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Index ¶
- Constants
- func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error)
- func NewClientFromServiceBusHostname(serviceBusHostname string, opts *servicebus.ClientOptions) (*servicebus.Client, error)
- func NewReceiver(sbClient *servicebus.Client, topicName, subscriptionName string, ...) (*servicebus.Receiver, error)
- func NewSender(sbClient *servicebus.Client, topicName string, ...) (*servicebus.Sender, error)
- func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, ...) (*pubsub.Subscription, error)
- func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const Scheme = "azuresb"
Scheme is the URL scheme azuresb registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
This section is empty.
Functions ¶
func NewClientFromConnectionString ¶ added in v0.27.0
func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error)
NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string, using shared key for auth. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
func NewClientFromServiceBusHostname ¶ added in v0.36.0
func NewClientFromServiceBusHostname(serviceBusHostname string, opts *servicebus.ClientOptions) (*servicebus.Client, error)
NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string, using shared key for auth. for example you can use workload identity autorization. https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash
func NewReceiver ¶ added in v0.27.0
func NewReceiver(sbClient *servicebus.Client, topicName, subscriptionName string, opts *servicebus.ReceiverOptions) (*servicebus.Receiver, error)
NewReceiver returns a *servicebus.Receiver associated with a Service Bus Topic.
func NewSender ¶ added in v0.27.0
func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error)
NewSender returns a *servicebus.Sender associated with a Service Bus Client.
func OpenSubscription ¶
func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiver *servicebus.Receiver, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.
Example ¶
package main import ( "context" "log" "os" "gocloud.dev/pubsub/azuresb" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() // Change these as needed for your application. serviceBusConnString := os.Getenv("SERVICEBUS_CONNECTION_STRING") const topicName = "test-topic" const subscriptionName = "test-subscription" // Connect to Azure Service Bus for the given subscription. sbClient, err := azuresb.NewClientFromConnectionString(serviceBusConnString, nil) if err != nil { log.Fatal(err) } sbReceiver, err := azuresb.NewReceiver(sbClient, topicName, subscriptionName, nil) if err != nil { log.Fatal(err) } defer sbReceiver.Close(ctx) // Construct a *pubsub.Subscription. subscription, err := azuresb.OpenSubscription(ctx, sbClient, sbReceiver, nil) if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
func OpenTopic ¶
func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOptions) (*pubsub.Topic, error)
OpenTopic initializes a pubsub Topic on a given Service Bus Sender.
Example ¶
package main import ( "context" "log" "os" "gocloud.dev/pubsub/azuresb" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() // Change these as needed for your application. connString := os.Getenv("SERVICEBUS_CONNECTION_STRING") topicName := "test-topic" if connString == "" { log.Fatal("Service Bus ConnectionString is not set") } // Connect to Azure Service Bus for the given topic. sbClient, err := azuresb.NewClientFromConnectionString(connString, nil) if err != nil { log.Fatal(err) } sbSender, err := azuresb.NewSender(sbClient, topicName, nil) if err != nil { log.Fatal(err) } defer sbSender.Close(ctx) // Construct a *pubsub.Topic. topic, err := azuresb.OpenTopic(ctx, sbSender, nil) if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct { // If false, the serviceBus.Subscription MUST be in the default Peek-Lock mode. // If true, the serviceBus.Subscription MUST be in Receive-and-Delete mode. // When true: pubsub.Message.Ack will be a no-op, pubsub.Message.Nackable // will return true, and pubsub.Message.Nack will panic. ReceiveAndDelete bool // ReceiveBatcherOptions adds constraints to the default batching done for receives. ReceiveBatcherOptions batcher.Options // AckBatcherOptions adds constraints to the default batching done for acks. // Only used when ReceiveAndDelete is false. AckBatcherOptions batcher.Options // ListenerTimeout is the amount of time to wait before timing out the // ReceiveMessages RPC call. This is used to ensure the receive operation is // non-blocking as the RPC blocks if there are no messages. // Defaults to 2 seconds. ListenerTimeout time.Duration }
SubscriptionOptions will contain configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct { // BatcherOptions adds constraints to the default batching done for sends. BatcherOptions batcher.Options }
TopicOptions provides configuration options for an Azure SB Topic.
type URLOpener ¶
type URLOpener struct { // ConnectionString is the Service Bus connection string (required if ServiceBusHostname is not defined). // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues ConnectionString string // Azure ServiceBus hostname. // https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash ServiceBusHostname string // ClientOptions are options when creating the Client. ServiceBusClientOptions *servicebus.ClientOptions // Options passed when creating the ServiceBus Topic/Subscription. ServiceBusSenderOptions *servicebus.NewSenderOptions ServiceBusReceiverOptions *servicebus.ReceiverOptions // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens Azure Service Bus URLs like "azuresb://mytopic" for topics or "azuresb://mytopic?subscription=mysubscription" for subscriptions.
- The URL's host+path is used as the topic name.
- For subscriptions, the subscription name must be provided in the "subscription" query parameter.
- For subscriptions, the ListenerTimeout can be overridden with time.Duration parseable values in "listener_timeout".
No other query parameters are supported.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.