azuresb

package
v0.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2023 License: Apache-2.0 Imports: 16 Imported by: 26

Documentation

Overview

Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview for an overview.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers for the scheme "azuresb". The default URL opener will use a Service Bus Connection String based on the environment variable "SERVICEBUS_CONNECTION_STRING". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Message Delivery Semantics

Azure ServiceBus supports at-least-once semantics in the default Peek-Lock mode; messages will be redelivered if they are not Acked, or if they are explicitly Nacked.

ServiceBus also supports a Receive-Delete mode, which essentially auto-acks a message when it is delivered, resulting in at-most-once semantics. Set SubscriberOptions.ReceiveAndDelete to true to tell azuresb.Subscription that you've enabled Receive-Delete mode. When enabled, pubsub.Message.Ack is a no-op, pubsub.Message.Nackable will return false, and pubsub.Message.Nack will panic.

See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

azuresb exposes the following types for As:

  • Topic: *servicebus.Topic
  • Subscription: *servicebus.Subscription
  • Message.BeforeSend: *servicebus.Message
  • Message.AfterSend: None
  • Message: *servicebus.Message
  • Error: common.Retryable, *amqp.Error, *amqp.LinkError
Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/azuresb"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will open the subscription "mysubscription" for the topic
	// "mytopic" using a connection string from the environment variable
	// SERVICEBUS_CONNECTION_STRING.
	subscription, err := pubsub.OpenSubscription(ctx,
		"azuresb://mytopic?subscription=mysubscription")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/azuresb"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will open the topic "mytopic" using a connection string
	// from the environment variable SERVICEBUS_CONNECTION_STRING.
	topic, err := pubsub.OpenTopic(ctx, "azuresb://mytopic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "azuresb"

Scheme is the URL scheme azuresb registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func NewClientFromConnectionString added in v0.27.0

func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error)

NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

func NewReceiver added in v0.27.0

func NewReceiver(sbClient *servicebus.Client, topicName, subscriptionName string, opts *servicebus.ReceiverOptions) (*servicebus.Receiver, error)

NewReceiver returns a *servicebus.Receiver associated with a Service Bus Topic.

func NewSender added in v0.27.0

func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error)

NewSender returns a *servicebus.Sender associated with a Service Bus Client.

func OpenSubscription

func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiver *servicebus.Receiver, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.

Example
package main

import (
	"context"
	"log"
	"os"

	"gocloud.dev/pubsub/azuresb"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Change these as needed for your application.
	serviceBusConnString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	const topicName = "test-topic"
	const subscriptionName = "test-subscription"

	// Connect to Azure Service Bus for the given subscription.
	sbClient, err := azuresb.NewClientFromConnectionString(serviceBusConnString, nil)
	if err != nil {
		log.Fatal(err)
	}
	sbReceiver, err := azuresb.NewReceiver(sbClient, topicName, subscriptionName, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer sbReceiver.Close(ctx)

	// Construct a *pubsub.Subscription.
	subscription, err := azuresb.OpenSubscription(ctx, sbClient, sbReceiver, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

func OpenTopic

func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic initializes a pubsub Topic on a given Service Bus Sender.

Example
package main

import (
	"context"
	"log"
	"os"

	"gocloud.dev/pubsub/azuresb"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Change these as needed for your application.
	connString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	topicName := "test-topic"

	if connString == "" {
		log.Fatal("Service Bus ConnectionString is not set")
	}

	// Connect to Azure Service Bus for the given topic.
	sbClient, err := azuresb.NewClientFromConnectionString(connString, nil)
	if err != nil {
		log.Fatal(err)
	}
	sbSender, err := azuresb.NewSender(sbClient, topicName, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer sbSender.Close(ctx)

	// Construct a *pubsub.Topic.
	topic, err := azuresb.OpenTopic(ctx, sbSender, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// If false, the serviceBus.Subscription MUST be in the default Peek-Lock mode.
	// If true, the serviceBus.Subscription MUST be in Receive-and-Delete mode.
	// When true: pubsub.Message.Ack will be a no-op, pubsub.Message.Nackable
	// will return true, and pubsub.Message.Nack will panic.
	ReceiveAndDelete bool

	// ReceiveBatcherOptions adds constraints to the default batching done for receives.
	ReceiveBatcherOptions batcher.Options

	// AckBatcherOptions adds constraints to the default batching done for acks.
	// Only used when ReceiveAndDelete is false.
	AckBatcherOptions batcher.Options

	// ListenerTimeout is the amount of time to wait before timing out the
	// ReceiveMessages RPC call. This is used to ensure the receive operation is
	// non-blocking as the RPC blocks if there are no messages.
	// Defaults to 2 seconds.
	ListenerTimeout time.Duration
}

SubscriptionOptions will contain configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
}

TopicOptions provides configuration options for an Azure SB Topic.

type URLOpener

type URLOpener struct {
	// ConnectionString is the Service Bus connection string (required).
	// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
	ConnectionString string

	// ClientOptions are options when creating the Client.
	ServiceBusClientOptions *servicebus.ClientOptions

	// Options passed when creating the ServiceBus Topic/Subscription.
	ServiceBusSenderOptions   *servicebus.NewSenderOptions
	ServiceBusReceiverOptions *servicebus.ReceiverOptions

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens Azure Service Bus URLs like "azuresb://mytopic" for topics or "azuresb://mytopic?subscription=mysubscription" for subscriptions.

  • The URL's host+path is used as the topic name.
  • For subscriptions, the subscription name must be provided in the "subscription" query parameter.
  • For subscriptions, the ListenerTimeout can be overridden with time.Duration parseable values in "listener_timeout".

No other query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL