servicebus

package module
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2021 License: MIT Imports: 35 Imported by: 116

README

Microsoft Azure Service Bus Client for Golang

Go Report Card godoc Build Status Coverage Status

Microsoft Azure Service Bus is a reliable cloud messaging service (MaaS) which simplifies enterprise cloud messaging. It enables developers to build scalable cloud solutions and implement complex messaging workflows over an efficient binary protocol called AMQP.

This library provides a simple interface for sending, receiving and managing Service Bus entities such as Queues, Topics and Subscriptions.

For more information about Service Bus, check out the Azure documentation.

This library is a pure Golang implementation of Azure Service Bus over AMQP.

Preview of Service Bus for Golang

This library is currently a preview. There may be breaking interface changes until it reaches semantic version v1.0.0. If you run into an issue, please don't hesitate to log a new issue or open a pull request.

Install using Go modules

go get -u github.com/Azure/azure-service-bus-go

If you need to install Go, follow the official instructions

Examples

Find up-to-date examples and documentation on godoc.org.

Running tests

Most tests require a properly configured service bus in Azure. The easiest way to set this up is to use the Terraform deployment script. Running the integration tests will take longer than the default 10 mintues, please use a larger timeout go test -timeout 30m.

Have questions?

The developers of this library are all active on the Gopher Slack, it is likely easiest to get our attention in the Microsoft Channel. We'll also find your issue if you ask on Stack Overflow with the tags azure and go.

Code of Conduct

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Documentation

Overview

Example (AutoForward)
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-service-bus-go"
)

type MessagePrinter struct{}

func (mp MessagePrinter) Handle(ctx context.Context, msg *servicebus.Message) error {
	fmt.Println(string(msg.Data))
	return msg.Complete(ctx)
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
		return
	}

	// Create a client to communicate with a Service Bus Namespace.
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		fmt.Println(err)
		return
	}

	qm := ns.NewQueueManager()
	target, err := ensureQueue(ctx, qm, "AutoForwardTargetQueue")
	if err != nil {
		fmt.Println(err)
		return
	}

	source, err := ensureQueue(ctx, qm, "AutoForwardSourceQueue", servicebus.QueueEntityWithAutoForward(target))
	if err != nil {
		fmt.Println(err)
		return
	}

	sourceQueue, err := ns.NewQueue(source.Name)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer func() {
		_ = sourceQueue.Close(ctx)
	}()

	if err := sourceQueue.Send(ctx, servicebus.NewMessageFromString("forward me to target!")); err != nil {
		fmt.Println(err)
		return
	}

	targetQueue, err := ns.NewQueue(target.Name)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer func() {
		_ = targetQueue.Close(ctx)
	}()

	if err := targetQueue.ReceiveOne(ctx, MessagePrinter{}); err != nil {
		fmt.Println(err)
		return
	}

}

func ensureQueue(ctx context.Context, qm *servicebus.QueueManager, name string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) {
	qe, err := qm.Get(ctx, name)
	if err == nil {
		_ = qm.Delete(ctx, name)
	}

	qe, err = qm.Put(ctx, name, opts...)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}

	return qe, nil
}
Output:

forward me to target!
Example (BatchingMessages)
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println(err)
	return
}

qm := ns.NewQueueManager()
qe, err := ensureQueue(ctx, qm, "MessageBatchingExample")
if err != nil {
	fmt.Println(err)
	return
}

q, err := ns.NewQueue(qe.Name)
if err != nil {
	fmt.Println(err)
	return
}
defer func() {
	_ = q.Close(ctx)
}()

msgs := make([]*servicebus.Message, 10)
for i := 0; i < 10; i++ {
	msgs[i] = servicebus.NewMessageFromString(fmt.Sprintf("foo %d", i))
}

batcher := servicebus.NewMessageBatchIterator(servicebus.StandardMaxMessageSizeInBytes, msgs...)
if err := q.SendBatch(ctx, batcher); err != nil {
	fmt.Println(err)
	return
}

for i := 0; i < 10; i++ {
	err := q.ReceiveOne(ctx, MessagePrinter{})
	if err != nil {
		fmt.Println(err)
		return
	}
}
Output:

foo 0
foo 1
foo 2
foo 3
foo 4
foo 5
foo 6
foo 7
foo 8
foo 9
Example (DeadletterQueues)
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println(err)
	return
}

qm := ns.NewQueueManager()
qe, err := ensureQueue(ctx, qm, "DeadletterExample")
if err != nil {
	fmt.Println(err)
	return
}

q, err := ns.NewQueue(qe.Name)
if err != nil {
	fmt.Println(err)
	return
}
defer func() {
	_ = q.Close(ctx)
}()

if err := q.Send(ctx, servicebus.NewMessageFromString("foo")); err != nil {
	fmt.Println(err)
	return
}

// Abandon the message 10 times simulating attempting to process the message 10 times. After the 10th time, the
// message will be placed in the Deadletter Queue.
for count := 0; count < 10; count++ {
	err = q.ReceiveOne(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
		fmt.Printf("count: %d\n", count+1)
		return msg.Abandon(ctx)
	}))
	if err != nil {
		fmt.Println(err)
		return
	}
}

// receive one from the queue's deadletter queue. It should be the foo message.
qdl := q.NewDeadLetter()
if err := qdl.ReceiveOne(ctx, MessagePrinter{}); err != nil {
	fmt.Println(err)
	return
}
defer func() {
	_ = qdl.Close(ctx)
}()
Output:

count: 1
count: 2
count: 3
count: 4
count: 5
count: 6
count: 7
count: 8
count: 9
count: 10
foo
Example (DeferMessages)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"math/rand"
	"os"
	"time"

	"github.com/Azure/azure-service-bus-go"
)

type RecipeStep struct {
	Step  int    `json:"step,omitempty"`
	Title string `json:"title,omitempty"`
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
		return
	}

	// Create a client to communicate with a Service Bus Namespace.
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		fmt.Println(err)
		return
	}

	qm := ns.NewQueueManager()
	qe, err := ensureQueue(ctx, qm, "DeferExample")
	if err != nil {
		fmt.Println(err)
		return
	}

	q, err := ns.NewQueue(qe.Name)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer func() {
		_ = q.Close(ctx)
	}()

	steps := []RecipeStep{
		{
			Step:  1,
			Title: "Shop",
		},
		{
			Step:  2,
			Title: "Unpack",
		},
		{
			Step:  3,
			Title: "Prepare",
		},
		{
			Step:  4,
			Title: "Cook",
		},
		{
			Step:  5,
			Title: "Eat",
		},
	}

	for _, step := range steps {
		go func(s RecipeStep) {
			j, err := json.Marshal(s)
			if err != nil {
				fmt.Println(err)
				return
			}

			msg := &servicebus.Message{
				Data:        j,
				ContentType: "application/json",
				Label:       "RecipeStep",
			}

			// we shuffle the message order to introduce a random delay before each of the messages is sent to
			// simulate out of order sending
			time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
			if err := q.Send(ctx, msg); err != nil {
				fmt.Println(err)
				return
			}
		}(step)
	}

	sequenceByStepNumber := map[int]int64{}
	// collect and defer messages
	for i := 0; i < len(steps); i++ {
		err = q.ReceiveOne(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
			var step RecipeStep
			if err := json.Unmarshal(msg.Data, &step); err != nil {
				return err
			}
			sequenceByStepNumber[step.Step] = *msg.SystemProperties.SequenceNumber
			return msg.Defer(ctx)
		}))
		if err != nil {
			fmt.Println(err)
			return
		}
	}

	for i := 0; i < len(steps); i++ {
		err := q.ReceiveDeferred(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
			var step RecipeStep
			if err := json.Unmarshal(msg.Data, &step); err != nil {
				return err
			}
			fmt.Printf("step: %d, %s\n", step.Step, step.Title)
			return msg.Complete(ctx)
		}), sequenceByStepNumber[i+1])
		if err != nil {
			fmt.Println(err)
			return
		}
	}

}
Output:

step: 1, Shop
step: 2, Unpack
step: 3, Prepare
step: 4, Cook
step: 5, Eat
Example (DuplicateMessageDetection)
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println(err)
	return
}

window := 30 * time.Second
qm := ns.NewQueueManager()
qe, err := ensureQueue(ctx, qm, "DuplicateDetectionExample", servicebus.QueueEntityWithDuplicateDetection(&window))
if err != nil {
	fmt.Println(err)
	return
}

q, err := ns.NewQueue(qe.Name)
if err != nil {
	fmt.Println(err)
	return
}
defer func() {
	_ = q.Close(ctx)
}()

guid, err := uuid.NewV4()
if err != nil {
	fmt.Println(err)
	return
}

msg := servicebus.NewMessageFromString("foo")
msg.ID = guid.String()

// send the message twice with the same ID
for i := 0; i < 2; i++ {
	if err := q.Send(ctx, msg); err != nil {
		fmt.Println(err)
		return
	}
}

// there should be only 1 message received from the queue
go func() {
	if err := q.Receive(ctx, MessagePrinter{}); err != nil {
		if err.Error() != "context canceled" {
			fmt.Println(err)
			return
		}
	}
}()

time.Sleep(2 * time.Second)
Output:

foo
Example (MessageBrowse)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-service-bus-go"
)

type (
	Scientist struct {
		Surname   string `json:"surname,omitempty"`
		FirstName string `json:"firstname,omitempty"`
	}
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
		return
	}

	// Create a client to communicate with a Service Bus Namespace.
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		fmt.Println(err)
		return
	}

	qm := ns.NewQueueManager()
	qEntity, err := ensureQueue(ctx, qm, "MessageBrowseExample")
	if err != nil {
		fmt.Println(err)
		return
	}

	q, err := ns.NewQueue(qEntity.Name)
	if err != nil {
		fmt.Println(err)
		return
	}

	txRxCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	go sendMessages(txRxCtx, q)
	time.Sleep(1 * time.Second) // wait a second to ensure a message has landed in the queue
	go peekMessages(txRxCtx, q)

	<-txRxCtx.Done() // wait for the context to finish

}

func sendMessages(ctx context.Context, q *servicebus.Queue) {

	scientists := []Scientist{
		{
			Surname:   "Einstein",
			FirstName: "Albert",
		},
		{
			Surname:   "Heisenberg",
			FirstName: "Werner",
		},
		{
			Surname:   "Curie",
			FirstName: "Marie",
		},
		{
			Surname:   "Hawking",
			FirstName: "Steven",
		},
		{
			Surname:   "Newton",
			FirstName: "Isaac",
		},
		{
			Surname:   "Bohr",
			FirstName: "Niels",
		},
		{
			Surname:   "Faraday",
			FirstName: "Michael",
		},
		{
			Surname:   "Galilei",
			FirstName: "Galileo",
		},
		{
			Surname:   "Kepler",
			FirstName: "Johannes",
		},
		{
			Surname:   "Kopernikus",
			FirstName: "Nikolaus",
		},
	}

	for _, scientist := range scientists {
		bits, err := json.Marshal(scientist)
		if err != nil {
			fmt.Println(err)
			return
		}

		ttl := 2 * time.Minute
		msg := servicebus.NewMessage(bits)
		msg.ContentType = "application/json"
		msg.TTL = &ttl
		if err := q.Send(ctx, msg); err != nil {
			fmt.Println(err)
			return
		}
	}
}

func peekMessages(ctx context.Context, q *servicebus.Queue) {
	var opts []servicebus.PeekOption
	for {
		select {
		case <-ctx.Done():
			return
		default:
			msg, err := q.PeekOne(ctx, opts...)
			if err != nil {
				switch err.(type) {
				case servicebus.ErrNoMessages:
					// all done
					return
				default:
					fmt.Println(err)
					return
				}
			}

			var scientist Scientist
			if err := json.Unmarshal(msg.Data, &scientist); err != nil {
				fmt.Println(err)
				return
			}

			opts = []servicebus.PeekOption{servicebus.PeekFromSequenceNumber(*msg.SystemProperties.SequenceNumber)}
			fmt.Printf("Firstname: %s, Surname: %s\n", scientist.FirstName, scientist.Surname)
		}
	}
}
Output:

Firstname: Albert, Surname: Einstein
Firstname: Werner, Surname: Heisenberg
Firstname: Marie, Surname: Curie
Firstname: Steven, Surname: Hawking
Firstname: Isaac, Surname: Newton
Firstname: Niels, Surname: Bohr
Firstname: Michael, Surname: Faraday
Firstname: Galileo, Surname: Galilei
Firstname: Johannes, Surname: Kepler
Firstname: Nikolaus, Surname: Kopernikus
Example (MessageSessions)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-service-bus-go"
)

type StepSessionHandler struct {
	messageSession *servicebus.MessageSession
}

// Start is called when a new session is started
func (ssh *StepSessionHandler) Start(ms *servicebus.MessageSession) error {
	ssh.messageSession = ms
	fmt.Println("Begin session: ", *ssh.messageSession.SessionID())
	return nil
}

// Handle is called when a new session message is received
func (ssh *StepSessionHandler) Handle(ctx context.Context, msg *servicebus.Message) error {
	var step RecipeStep
	if err := json.Unmarshal(msg.Data, &step); err != nil {
		fmt.Println(err)
		return err
	}

	fmt.Printf("  Step: %d, %s\n", step.Step, step.Title)

	if step.Step == 5 {
		ssh.messageSession.Close()
	}
	return msg.Complete(ctx)
}

// End is called when the message session is closed. Service Bus will not automatically end your message session. Be
// sure to know when to terminate your own session.
func (ssh *StepSessionHandler) End() {
	fmt.Println("End session: ", *ssh.messageSession.SessionID())
	fmt.Println("")
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
		return
	}

	// Create a client to communicate with a Service Bus Namespace.
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		fmt.Println(err)
		return
	}

	// Create a Service Bus Queue with required sessions enabled. This will ensure that all messages sent and received
	// are bound to a session.
	qm := ns.NewQueueManager()
	qEntity, err := ensureQueue(ctx, qm, "MessageSessionsExample", servicebus.QueueEntityWithRequiredSessions())
	if err != nil {
		fmt.Println(err)
		return
	}

	q, err := ns.NewQueue(qEntity.Name)
	if err != nil {
		fmt.Println(err)
		return
	}

	sessions := []string{"foo", "bar", "bazz", "buzz"}
	for _, session := range sessions {
		// send recipe steps
		// note that order is preserved within a given session
		sendSessionRecipeSteps(ctx, session, q)
	}

	// receive messages for each session
	// you can also call q.NewSession(nil) to receive from any available session
	for _, session := range sessions {
		queueSession := q.NewSession(&session)
		err := queueSession.ReceiveOne(ctx, new(StepSessionHandler))
		if err != nil {
			fmt.Println(err)
			return
		}

		if err := queueSession.Close(ctx); err != nil {
			fmt.Println(err)
			return
		}
	}

}

func sendSessionRecipeSteps(ctx context.Context, sessionID string, q *servicebus.Queue) {
	steps := []RecipeStep{
		{
			Step:  1,
			Title: "Shop",
		},
		{
			Step:  2,
			Title: "Unpack",
		},
		{
			Step:  3,
			Title: "Prepare",
		},
		{
			Step:  4,
			Title: "Cook",
		},
		{
			Step:  5,
			Title: "Eat",
		},
	}

	for _, step := range steps {
		bits, err := json.Marshal(step)
		if err != nil {
			fmt.Println(err)
			return
		}

		msg := servicebus.NewMessage(bits)
		msg.ContentType = "application/json"
		msg.SessionID = &sessionID
		if err := q.Send(ctx, msg); err != nil {
			fmt.Println(err)
			return
		}
	}
}
Output:

Begin session:  foo
  Step: 1, Shop
  Step: 2, Unpack
  Step: 3, Prepare
  Step: 4, Cook
  Step: 5, Eat
End session:  foo

Begin session:  bar
  Step: 1, Shop
  Step: 2, Unpack
  Step: 3, Prepare
  Step: 4, Cook
  Step: 5, Eat
End session:  bar

Begin session:  bazz
  Step: 1, Shop
  Step: 2, Unpack
  Step: 3, Prepare
  Step: 4, Cook
  Step: 5, Eat
End session:  bazz

Begin session:  buzz
  Step: 1, Shop
  Step: 2, Unpack
  Step: 3, Prepare
  Step: 4, Cook
  Step: 5, Eat
End session:  buzz
Example (Prefetch)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println(err)
	return
}

qm := ns.NewQueueManager()
prefetch1, err := ensureQueue(ctx, qm, "Prefetch1Example")
if err != nil {
	fmt.Println(err)
	return
}

prefetch1000, err := ensureQueue(ctx, qm, "Prefetch1000Example")
if err != nil {
	fmt.Println(err)
	return
}

// sendAndReceive will send to the queue and read from the queue
sendAndReceive := func(ctx context.Context, name string, opt servicebus.QueueOption) error {
	messageCount := 200
	q, err := ns.NewQueue(name, opt, servicebus.QueueWithReceiveAndDelete())
	if err != nil {
		return err
	}

	buffer := make([]byte, 1000)
	if _, err := rand.Read(buffer); err != nil {
		return err
	}

	for i := 0; i < messageCount; i++ {
		if err := q.Send(ctx, servicebus.NewMessage(buffer)); err != nil {
			return err
		}
	}

	innerCtx, cancel := context.WithCancel(ctx)
	count := 0
	err = q.Receive(innerCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
		count++
		if count == messageCount-1 {
			defer cancel()
		}
		return msg.Complete(ctx)
	}))
	if err != nil {
		if err.Error() != "context canceled" {
			return err
		}
	}
	return nil
}

// run send and receive concurrently and compare the times
totalPrefetch1 := make(chan time.Duration)
go func() {
	start := time.Now()
	if err := sendAndReceive(ctx, prefetch1.Name, servicebus.QueueWithPrefetchCount(1)); err != nil {
		fmt.Println(err)
		return
	}
	totalPrefetch1 <- time.Now().Sub(start)
}()

totalPrefetch1000 := make(chan time.Duration)
go func() {
	start := time.Now()
	if err := sendAndReceive(ctx, prefetch1000.Name, servicebus.QueueWithPrefetchCount(1000)); err != nil {
		fmt.Println(err)
		return
	}
	totalPrefetch1000 <- time.Now().Sub(start)
}()

tp1 := <-totalPrefetch1
tp2 := <-totalPrefetch1000

if tp1 > tp2 {
	fmt.Println("prefetch of 1000 took less time!")
}
Output:

prefetch of 1000 took less time!
Example (PrioritySubscriptions)
package main

import (
	"context"
	"fmt"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/Azure/azure-service-bus-go"
)

type PrioritySubscription struct {
	Name         string
	Expression   string
	MessageCount int
}

type PriorityMessage struct {
	Body     string
	Priority int
}

type PriorityPrinter struct {
	SubName string
}

func (pp PriorityPrinter) Handle(ctx context.Context, msg *servicebus.Message) error {
	i, ok := msg.UserProperties["Priority"].(int64)
	if !ok {
		fmt.Println("Priority is not an int64")
	}

	fmt.Println(strings.Join([]string{pp.SubName, string(msg.Data), strconv.Itoa(int(i))}, "_"))
	return msg.Complete(ctx)
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
		return
	}

	// Create a client to communicate with a Service Bus Namespace.
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		fmt.Println(err)
		return
	}

	// build the topic for sending priority messages
	tm := ns.NewTopicManager()
	topicEntity, err := ensureTopic(ctx, tm, "PrioritySubscriptionsTopic")
	if err != nil {
		fmt.Println(err)
		return
	}

	sm, err := ns.NewSubscriptionManager(topicEntity.Name)
	if err != nil {
		fmt.Println(err)
		return
	}

	// build each priority subscription providing each with a SQL like expression to filter messages from the topic
	prioritySubs := []PrioritySubscription{
		{
			Name:         "Priority1",
			Expression:   "user.Priority=1",
			MessageCount: 1,
		},
		{
			Name:         "Priority2",
			Expression:   "user.Priority=2",
			MessageCount: 1,
		},
		{
			Name:         "PriorityGreaterThan2",
			Expression:   "user.Priority>2",
			MessageCount: 2,
		},
	}
	for _, s := range prioritySubs {
		subEntity, err := ensureSubscription(ctx, sm, s.Name)
		if err != nil {
			fmt.Println(err)
			return
		}

		// remove the default rule, which is the "TrueFilter" that accepts all messages
		err = sm.DeleteRule(ctx, subEntity.Name, "$Default")
		if err != nil {
			fmt.Println(err)
			return
		}

		_, err = sm.PutRule(ctx, subEntity.Name, s.Name+"Rule", servicebus.SQLFilter{Expression: s.Expression})
		if err != nil {
			fmt.Println(err)
			return
		}
	}

	priorityMessages := []PriorityMessage{
		{
			Body:     "foo",
			Priority: 1,
		},
		{
			Body:     "bar",
			Priority: 2,
		},
		{
			Body:     "bazz",
			Priority: 3,
		},
		{
			Body:     "buzz",
			Priority: 4,
		},
	}
	topic, err := ns.NewTopic(topicEntity.Name)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer func() {
		_ = topic.Close(ctx)
	}()

	for _, pMessage := range priorityMessages {
		msg := servicebus.NewMessageFromString(pMessage.Body)
		msg.UserProperties = map[string]interface{}{"Priority": pMessage.Priority}
		if err := topic.Send(ctx, msg); err != nil {
			fmt.Println(err)
			return
		}
	}

	for _, s := range prioritySubs {
		sub, err := topic.NewSubscription(s.Name)
		if err != nil {
			fmt.Println(err)
			return
		}

		for i := 0; i < s.MessageCount; i++ {
			err := sub.ReceiveOne(ctx, PriorityPrinter{SubName: sub.Name})
			if err != nil {
				fmt.Println(err)
				return
			}
		}
		err = sub.Close(ctx)
		if err != nil {
			fmt.Println(err)
			return
		}
	}

}

func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string, opts ...servicebus.TopicManagementOption) (*servicebus.TopicEntity, error) {
	te, err := tm.Get(ctx, name)
	if err == nil {
		_ = tm.Delete(ctx, name)
	}

	te, err = tm.Put(ctx, name, opts...)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}

	return te, nil
}

func ensureSubscription(ctx context.Context, sm *servicebus.SubscriptionManager, name string, opts ...servicebus.SubscriptionManagementOption) (*servicebus.SubscriptionEntity, error) {
	subEntity, err := sm.Get(ctx, name)
	if err == nil {
		_ = sm.Delete(ctx, name)
	}

	subEntity, err = sm.Put(ctx, name, opts...)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}

	return subEntity, nil
}
Output:

Priority1_foo_1
Priority2_bar_2
PriorityGreaterThan2_bazz_3
PriorityGreaterThan2_buzz_4
Example (QueueSendAndReceive)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println(err)
	return
}

// Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`)
q, err := ns.NewQueue("helloworld")
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

err = q.Send(ctx, servicebus.NewMessageFromString("Hello, World!!!"))
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

err = q.ReceiveOne(
	ctx,
	servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
		fmt.Println(string(message.Data))
		return message.Complete(ctx)
	}))
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}
Output:

Hello, World!!!
Example (ScheduledMessage)
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-service-bus-go"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
	defer cancel()

	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
		return
	}

	// Create a client to communicate with a Service Bus Namespace.
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		fmt.Println("FATAL: ", err)
		return
	}

	// Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`)
	client, err := ns.NewQueue("scheduledmessages")
	if err != nil {
		fmt.Println("FATAL: ", err)
		return
	}

	// purge all of the existing messages in the queue
	purgeMessages(ns)

	// The delay that we should schedule a message for.
	const waitTime = 1 * time.Minute
	// Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll buffer our expectations
	// on either side.
	const buffer = 20 * time.Second

	expectedTime := time.Now().Add(waitTime)
	msg := servicebus.NewMessageFromString("to the future!!")
	msg.ScheduleAt(expectedTime)

	err = client.Send(ctx, msg)
	if err != nil {
		fmt.Println("FATAL: ", err)
		return
	}

	err = client.ReceiveOne(
		ctx,
		servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
			received := time.Now()
			if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) {
				fmt.Println("Received when expected!")
			} else {
				fmt.Println("Received outside the expected window.")
			}
			return msg.Complete(ctx)
		}))
	if err != nil {
		fmt.Println("FATAL: ", err)
		return
	}

}

func purgeMessages(ns *servicebus.Namespace) {
	purgeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	client, _ := ns.NewQueue("scheduledmessages")
	defer func() {
		_ = client.Close(purgeCtx)
	}()
	defer cancel()
	_ = client.Receive(purgeCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error {
		return msg.Complete(ctx)
	}))
}
Output:

Received when expected!

Index

Examples

Constants

View Source
const (
	// PeekLockMode causes a Receiver to peek at a message, lock it so no others can consume and have the queue wait for
	// the DispositionAction
	PeekLockMode ReceiveMode = 0
	// ReceiveAndDeleteMode causes a Receiver to pop messages off of the queue without waiting for DispositionAction
	ReceiveAndDeleteMode ReceiveMode = 1

	// DeadLetterQueueName is the name of the dead letter queue to be appended to the entity path
	DeadLetterQueueName = "$DeadLetterQueue"

	// TransferDeadLetterQueueName is the name of the transfer dead letter queue which is appended to the entity name to
	// build the full address of the transfer dead letter queue.
	TransferDeadLetterQueueName = "$Transfer/" + DeadLetterQueueName
)
View Source
const (

	// Version is the semantic version number
	Version = "0.10.16"
)

Variables

This section is empty.

Functions

func IsErrNotFound added in v0.3.0

func IsErrNotFound(err error) bool

IsErrNotFound returns true if the error argument is an ErrNotFound type

Types

type ActionDescriber added in v0.2.0

type ActionDescriber interface {
	ToActionDescription() ActionDescription
}

ActionDescriber can transform itself into a ActionDescription

type ActionDescription added in v0.2.0

type ActionDescription struct {
	Type                  string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
	SQLExpression         string `xml:"SqlExpression"`
	RequiresPreprocessing bool   `xml:"RequiresPreprocessing"`
	CompatibilityLevel    int    `xml:"CompatibilityLevel,omitempty"`
}

ActionDescription describes an action upon a message that matches a filter

With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

type BaseEntityDescription

type BaseEntityDescription struct {
	InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"`
	ServiceBusSchema       *string `xml:"xmlns,attr,omitempty"`
}

BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions

type BatchDispositionError added in v0.7.0

type BatchDispositionError struct {
	Errors []DispositionError
}

BatchDispositionError is an error which returns a collection of DispositionError.

func (BatchDispositionError) Error added in v0.7.0

func (bde BatchDispositionError) Error() string

type BatchDispositionIterator added in v0.3.0

type BatchDispositionIterator struct {
	LockTokenIDs []*uuid.UUID
	Status       MessageStatus
	// contains filtered or unexported fields
}

BatchDispositionIterator provides an iterator over LockTokenIDs

func (*BatchDispositionIterator) Done added in v0.3.0

func (bdi *BatchDispositionIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*BatchDispositionIterator) Next added in v0.3.0

func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID)

Next iterates to the next LockToken

type BatchIterator added in v0.3.0

type BatchIterator interface {
	Done() bool
	Next(messageID string, opts *BatchOptions) (*MessageBatch, error)
}

BatchIterator offers a simple mechanism for batching a list of messages

type BatchOptions added in v0.3.0

type BatchOptions struct {
	SessionID *string
}

BatchOptions are optional information to add to a batch of messages

type Closer added in v0.2.0

type Closer interface {
	Close(ctx context.Context) error
}

Closer provides the ability to close an entity

type CorrelationFilter added in v0.2.0

type CorrelationFilter struct {
	CorrelationID    *string                `xml:"CorrelationId,omitempty"`
	MessageID        *string                `xml:"MessageId,omitempty"`
	To               *string                `xml:"To,omitempty"`
	ReplyTo          *string                `xml:"ReplyTo,omitempty"`
	Label            *string                `xml:"Label,omitempty"`
	SessionID        *string                `xml:"SessionId,omitempty"`
	ReplyToSessionID *string                `xml:"ReplyToSessionId,omitempty"`
	ContentType      *string                `xml:"ContentType,omitempty"`
	Properties       map[string]interface{} `xml:"Properties,omitempty"`
}

CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties. A common use is to match against the CorrelationId property, but the application can also choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any user-defined properties. A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, all conditions must match.

func (CorrelationFilter) ToFilterDescription added in v0.2.0

func (cf CorrelationFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the CorrelationFilter into a FilterDescription

type CountDetails

type CountDetails struct {
	XMLName                        xml.Name `xml:"CountDetails"`
	ActiveMessageCount             *int32   `xml:"ActiveMessageCount,omitempty"`
	DeadLetterMessageCount         *int32   `xml:"DeadLetterMessageCount,omitempty"`
	ScheduledMessageCount          *int32   `xml:"ScheduledMessageCount,omitempty"`
	TransferDeadLetterMessageCount *int32   `xml:"TransferDeadLetterMessageCount,omitempty"`
	TransferMessageCount           *int32   `xml:"TransferMessageCount,omitempty"`
}

CountDetails has current active (and other) messages for queue/topic.

type DeadLetter added in v0.2.0

type DeadLetter struct {
	// contains filtered or unexported fields
}

DeadLetter represents a dead letter queue in Azure Service Bus.

Azure Service Bus queues, topics and subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.

The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.

From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.

Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.

func NewDeadLetter added in v0.2.0

func NewDeadLetter(builder DeadLetterBuilder) *DeadLetter

NewDeadLetter constructs an instance of DeadLetter which represents a dead letter queue in Azure Service Bus

func (*DeadLetter) Close added in v0.2.0

func (dl *DeadLetter) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*DeadLetter) ReceiveOne added in v0.2.0

func (dl *DeadLetter) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will receive one message from the dead letter queue

type DeadLetterBuilder added in v0.2.0

type DeadLetterBuilder interface {
	NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
}

DeadLetterBuilder provides the ability to create a new receiver addressed to a given entity's dead letter queue.

type DefaultRuleDescription added in v0.10.3

type DefaultRuleDescription struct {
	XMLName xml.Name          `xml:"DefaultRuleDescription"`
	Filter  FilterDescription `xml:"Filter"`
	Name    *string           `xml:"Name,omitempty"`
}

DefaultRuleDescription is the content type for Subscription Rule management requests

type DispositionAction

type DispositionAction func(ctx context.Context) error

DispositionAction represents the action to notify Azure Service Bus of the Message's disposition

type DispositionError added in v0.7.0

type DispositionError struct {
	LockTokenID *uuid.UUID
	// contains filtered or unexported fields
}

DispositionError is an error associated with a LockTokenID.

func (DispositionError) Error added in v0.7.0

func (de DispositionError) Error() string

func (DispositionError) UnWrap added in v0.7.0

func (de DispositionError) UnWrap() error

UnWrap will return the private error.

type Entity added in v0.2.0

type Entity struct {
	Name string
	ID   string
}

Entity is represents the most basic form of an Azure Service Bus entity.

func (Entity) TargetURI added in v0.2.0

func (e Entity) TargetURI() string

TargetURI provides an absolute address to a target entity

type EntityManagementAddresser added in v0.2.0

type EntityManagementAddresser interface {
	ManagementPath() string
}

EntityManagementAddresser describes the ability of an entity to provide an addressable path to it's management endpoint

type EntityStatus added in v0.2.0

type EntityStatus string

EntityStatus enumerates the values for entity status.

const (
	// Active ...
	Active EntityStatus = "Active"
	// Creating ...
	Creating EntityStatus = "Creating"
	// Deleting ...
	Deleting EntityStatus = "Deleting"
	// Disabled ...
	Disabled EntityStatus = "Disabled"
	// ReceiveDisabled ...
	ReceiveDisabled EntityStatus = "ReceiveDisabled"
	// Renaming ...
	Renaming EntityStatus = "Renaming"
	// Restoring ...
	Restoring EntityStatus = "Restoring"
	// SendDisabled ...
	SendDisabled EntityStatus = "SendDisabled"
	// Unknown ...
	Unknown EntityStatus = "Unknown"
)

type ErrAMQP added in v0.2.0

type ErrAMQP rpc.Response

ErrAMQP indicates that the server communicated an AMQP error with a particular

func (ErrAMQP) Error added in v0.2.0

func (e ErrAMQP) Error() string

type ErrConnectionClosed added in v0.10.1

type ErrConnectionClosed string

ErrConnectionClosed indicates that the connection has been closed.

func (ErrConnectionClosed) Error added in v0.10.1

func (e ErrConnectionClosed) Error() string

type ErrIncorrectType added in v0.2.0

type ErrIncorrectType struct {
	Key          string
	ExpectedType reflect.Type
	ActualValue  interface{}
}

ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrIncorrectType) Error added in v0.2.0

func (e ErrIncorrectType) Error() string

type ErrMalformedMessage added in v0.2.0

type ErrMalformedMessage string

ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.

func (ErrMalformedMessage) Error added in v0.2.0

func (e ErrMalformedMessage) Error() string

type ErrMissingField added in v0.2.0

type ErrMissingField string

ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrMissingField) Error added in v0.2.0

func (e ErrMissingField) Error() string

type ErrNoMessages added in v0.2.0

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error added in v0.2.0

func (e ErrNoMessages) Error() string

type ErrNotFound added in v0.3.0

type ErrNotFound struct {
	EntityPath string
}

ErrNotFound is returned when an entity is not found (404)

func (ErrNotFound) Error added in v0.3.0

func (e ErrNotFound) Error() string

type FalseFilter added in v0.2.0

type FalseFilter struct{}

FalseFilter represents a always false sql expression which will deny all messages

func (FalseFilter) ToFilterDescription added in v0.2.0

func (ff FalseFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the FalseFilter into a FilterDescription

type FilterDescriber added in v0.2.0

type FilterDescriber interface {
	ToFilterDescription() FilterDescription
}

FilterDescriber can transform itself into a FilterDescription

type FilterDescription added in v0.2.0

type FilterDescription struct {
	XMLName xml.Name `xml:"Filter"`
	CorrelationFilter
	Type               string  `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
	SQLExpression      *string `xml:"SqlExpression,omitempty"`
	CompatibilityLevel int     `xml:"CompatibilityLevel,omitempty"`
}

FilterDescription describes a filter which can be applied to a subscription to filter messages from the topic.

Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules. Each rule consists of a condition that selects particular messages and an action that annotates the selected message. For each matching rule condition, the subscription produces a copy of the message, which may be differently annotated for each matching rule.

Each newly created topic subscription has an initial default subscription rule. If you don't explicitly specify a filter condition for the rule, the applied filter is the true filter that enables all messages to be selected into the subscription. The default rule has no associated annotation action.

type Handler

type Handler interface {
	Handle(context.Context, *Message) error
}

Handler exposes the functionality required to process a Service Bus message.

type HandlerFunc added in v0.2.0

type HandlerFunc func(context.Context, *Message) error

HandlerFunc is a type converter that allows a func to be used as a `Handler`

func (HandlerFunc) Handle added in v0.2.0

func (hf HandlerFunc) Handle(ctx context.Context, msg *Message) error

Handle redirects this call to the func that was provided.

type ListQueuesOption added in v0.10.14

type ListQueuesOption func(*ListQueuesOptions) error

ListQueuesOption represents named options for listing topics

func ListQueuesWithSkip added in v0.10.14

func ListQueuesWithSkip(skip int) ListQueuesOption

ListQueuesWithSkip will skip the specified number of entities

func ListQueuesWithTop added in v0.10.14

func ListQueuesWithTop(top int) ListQueuesOption

ListQueuesWithTop will return at most `top` results

type ListQueuesOptions added in v0.10.14

type ListQueuesOptions struct {
	// contains filtered or unexported fields
}

ListQueuesOptions provides options for List() to control things like page size. NOTE: Use the ListQueuesWith* methods to specify this.

type ListSubscriptionsOption added in v0.10.14

type ListSubscriptionsOption func(*ListSubscriptionsOptions) error

ListSubscriptionsOption represents named options for listing topics

func ListSubscriptionsWithSkip added in v0.10.14

func ListSubscriptionsWithSkip(skip int) ListSubscriptionsOption

ListSubscriptionsWithSkip will skip the specified number of entities

func ListSubscriptionsWithTop added in v0.10.14

func ListSubscriptionsWithTop(top int) ListSubscriptionsOption

ListSubscriptionsWithTop will return at most `top` results

type ListSubscriptionsOptions added in v0.10.14

type ListSubscriptionsOptions struct {
	// contains filtered or unexported fields
}

ListSubscriptionsOptions provides options for List() to control things like page size. NOTE: Use the ListSubscriptionsWith* methods to specify this.

type ListTopicsOption added in v0.10.14

type ListTopicsOption func(*ListTopicsOptions) error

ListTopicsOption represents named options for listing topics

func ListTopicsWithSkip added in v0.10.14

func ListTopicsWithSkip(skip int) ListTopicsOption

ListTopicsWithSkip will skip the specified number of entities

func ListTopicsWithTop added in v0.10.14

func ListTopicsWithTop(top int) ListTopicsOption

ListTopicsWithTop will return at most `top` results

type ListTopicsOptions added in v0.10.14

type ListTopicsOptions struct {
	// contains filtered or unexported fields
}

ListTopicsOptions provides options for List() to control things like page size. NOTE: Use the ListTopicsWith* methods to specify this.

type ListenerHandle

type ListenerHandle struct {
	// contains filtered or unexported fields
}

ListenerHandle provides the ability to close or listen to the close of a Receiver

func (*ListenerHandle) Close

func (lc *ListenerHandle) Close(ctx context.Context) error

Close will close the listener

func (*ListenerHandle) Done

func (lc *ListenerHandle) Done() <-chan struct{}

Done will close the channel when the listener has stopped

func (*ListenerHandle) Err

func (lc *ListenerHandle) Err() error

Err will return the last error encountered

type MaxMessageSizeInBytes added in v0.3.0

type MaxMessageSizeInBytes int

MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus

const (
	// StandardMaxMessageSizeInBytes is the maximum number of bytes in a message for the Standard tier
	StandardMaxMessageSizeInBytes MaxMessageSizeInBytes = 256000
	// PremiumMaxMessageSizeInBytes is the maximum number of bytes in a message for the Premium tier
	PremiumMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000
)

type Message

type Message struct {
	ContentType      string
	CorrelationID    string
	Data             []byte
	DeliveryCount    uint32
	SessionID        *string
	GroupSequence    *uint32
	ID               string
	Label            string
	ReplyTo          string
	ReplyToGroupID   string
	To               string
	TTL              *time.Duration
	LockToken        *uuid.UUID
	SystemProperties *SystemProperties
	UserProperties   map[string]interface{}
	Format           uint32
	// contains filtered or unexported fields
}

Message is an Service Bus message to be sent or received

func NewMessage

func NewMessage(data []byte) *Message

NewMessage builds an Message from a slice of data

func NewMessageFromString

func NewMessageFromString(message string) *Message

NewMessageFromString builds an Message from a string message

func (*Message) Abandon

func (m *Message) Abandon(ctx context.Context) error

Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.

func (*Message) AbandonAction added in v0.2.0

func (m *Message) AbandonAction() DispositionAction

AbandonAction will notify Azure Service Bus the message failed but should be re-queued for delivery.

func (*Message) Complete

func (m *Message) Complete(ctx context.Context) error

Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue

func (*Message) CompleteAction added in v0.2.0

func (m *Message) CompleteAction() DispositionAction

CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue

func (*Message) DeadLetter

func (m *Message) DeadLetter(ctx context.Context, err error) error

DeadLetter will notify Azure Service Bus the message failed and should not re-queued

func (*Message) DeadLetterAction added in v0.2.0

func (m *Message) DeadLetterAction(err error) DispositionAction

DeadLetterAction will notify Azure Service Bus the message failed and should not re-queued

func (*Message) DeadLetterWithInfo

func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition MessageErrorCondition, additionalData map[string]string) error

DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context

func (*Message) DeadLetterWithInfoAction added in v0.2.0

func (m *Message) DeadLetterWithInfoAction(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction

DeadLetterWithInfoAction will notify Azure Service Bus the message failed and should not be re-queued with additional context

func (*Message) Defer added in v0.2.0

func (m *Message) Defer(ctx context.Context) error

Defer will set aside the message for later processing

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*Message) GetKeyValues added in v0.8.0

func (m *Message) GetKeyValues() map[string]interface{}

GetKeyValues implements tab.Carrier

func (*Message) ScheduleAt added in v0.2.0

func (m *Message) ScheduleAt(t time.Time)

ScheduleAt will ensure Azure Service Bus delivers the message after the time specified (usually within 1 minute after the specified time)

func (*Message) Set

func (m *Message) Set(key string, value interface{})

Set implements tab.Carrier

type MessageBatch added in v0.3.0

type MessageBatch struct {
	*Message

	MaxSize MaxMessageSizeInBytes
	// contains filtered or unexported fields
}

MessageBatch represents a batch of messages to send to Service Bus in a single message

func NewMessageBatch added in v0.3.0

func NewMessageBatch(maxSize MaxMessageSizeInBytes, messageID string, opts *BatchOptions) *MessageBatch

NewMessageBatch builds a new message batch with a default standard max message size

func (*MessageBatch) Add added in v0.3.0

func (mb *MessageBatch) Add(m *Message) (bool, error)

Add adds a message to the batch if the message will not exceed the max size of the batch

func (*MessageBatch) Clear added in v0.3.0

func (mb *MessageBatch) Clear()

Clear will zero out the batch size and clear the buffered messages

func (*MessageBatch) Size added in v0.3.0

func (mb *MessageBatch) Size() int

Size is the number of bytes in the message batch

type MessageBatchIterator added in v0.3.0

type MessageBatchIterator struct {
	Messages []*Message
	Cursor   int
	MaxSize  MaxMessageSizeInBytes
}

MessageBatchIterator provides an easy way to iterate over a slice of messages to reliably create batches

func NewMessageBatchIterator added in v0.3.0

func NewMessageBatchIterator(maxBatchSize MaxMessageSizeInBytes, msgs ...*Message) *MessageBatchIterator

NewMessageBatchIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.

func (*MessageBatchIterator) Done added in v0.3.0

func (mbi *MessageBatchIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*MessageBatchIterator) Next added in v0.3.0

func (mbi *MessageBatchIterator) Next(messageID string, opts *BatchOptions) (*MessageBatch, error)

Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.

type MessageErrorCondition

type MessageErrorCondition string

MessageErrorCondition represents a well-known collection of AMQP errors

const (
	ErrorInternalError         MessageErrorCondition = "amqp:internal-error"
	ErrorNotFound              MessageErrorCondition = "amqp:not-found"
	ErrorUnauthorizedAccess    MessageErrorCondition = "amqp:unauthorized-access"
	ErrorDecodeError           MessageErrorCondition = "amqp:decode-error"
	ErrorResourceLimitExceeded MessageErrorCondition = "amqp:resource-limit-exceeded"
	ErrorNotAllowed            MessageErrorCondition = "amqp:not-allowed"
	ErrorInvalidField          MessageErrorCondition = "amqp:invalid-field"
	ErrorNotImplemented        MessageErrorCondition = "amqp:not-implemented"
	ErrorResourceLocked        MessageErrorCondition = "amqp:resource-locked"
	ErrorPreconditionFailed    MessageErrorCondition = "amqp:precondition-failed"
	ErrorResourceDeleted       MessageErrorCondition = "amqp:resource-deleted"
	ErrorIllegalState          MessageErrorCondition = "amqp:illegal-state"
)

Error Conditions

type MessageIterator added in v0.2.0

type MessageIterator interface {
	Done() bool
	Next(context.Context) (*Message, error)
}

MessageIterator offers a simple mechanism for iterating over a list of

Example
subject := servicebus.AsMessageSliceIterator([]*servicebus.Message{
	servicebus.NewMessageFromString("hello"),
	servicebus.NewMessageFromString("world"),
})

for !subject.Done() {
	cursor, err := subject.Next(context.Background())
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(string(cursor.Data))
}
Output:

hello
world

type MessageSession added in v0.2.0

type MessageSession struct {
	*Receiver
	// contains filtered or unexported fields
}

MessageSession represents and allows for interaction with a Service Bus Session.

func (*MessageSession) Close added in v0.2.0

func (ms *MessageSession) Close()

Close communicates that Handler receiving messages should no longer continue to be executed. This can happen when:

  • A Handler recognizes that no further messages will come to this session.
  • A Handler has given up on receiving more messages before a session. Future messages should be delegated to the next available session client.

func (*MessageSession) ListSessions added in v0.2.0

func (ms *MessageSession) ListSessions(ctx context.Context) ([]byte, error)

ListSessions will list all of the sessions available

func (*MessageSession) LockedUntil added in v0.2.0

func (ms *MessageSession) LockedUntil() time.Time

LockedUntil fetches the moment in time when the Session lock held by this Receiver will expire.

func (*MessageSession) RenewLock added in v0.2.0

func (ms *MessageSession) RenewLock(ctx context.Context) error

RenewLock requests that the Service Bus Server renews this client's lock on an existing Session.

func (*MessageSession) SessionID added in v0.2.0

func (ms *MessageSession) SessionID() *string

SessionID gets the unique identifier of the session being interacted with by this MessageSession.

func (*MessageSession) SetState added in v0.2.0

func (ms *MessageSession) SetState(ctx context.Context, state []byte) error

SetState updates the current State associated with this Session.

func (*MessageSession) State added in v0.2.0

func (ms *MessageSession) State(ctx context.Context) ([]byte, error)

State retrieves the current State associated with this Session. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#get-session-state

type MessageSliceIterator added in v0.2.0

type MessageSliceIterator struct {
	Target []*Message
	Cursor int
}

MessageSliceIterator is a wrapper, which lets any slice of Message pointers be used as a MessageIterator.

func AsMessageSliceIterator added in v0.2.0

func AsMessageSliceIterator(target []*Message) *MessageSliceIterator

AsMessageSliceIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.

func (MessageSliceIterator) Done added in v0.2.0

func (ms MessageSliceIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*MessageSliceIterator) Next added in v0.2.0

Next fetches the Message in the slice at a position one larger than the last one accessed.

type MessageStatus added in v0.3.0

type MessageStatus dispositionStatus

MessageStatus defines an acceptable Message disposition status.

const (
	// Complete exposes completedDisposition
	Complete MessageStatus = MessageStatus(completedDisposition)
	// Abort exposes abandonedDisposition
	Abort MessageStatus = MessageStatus(abandonedDisposition)
)

type MiddlewareFunc added in v0.2.0

type MiddlewareFunc func(next RestHandler) RestHandler

MiddlewareFunc allows a consumer of the entity manager to inject handlers within the request / response pipeline

The example below adds the atom xml content type to the request, calls the next middleware and returns the result.

addAtomXMLContentType MiddlewareFunc = func(next RestHandler) RestHandler {
		return func(ctx context.Context, req *http.Request) (res *http.Response, e error) {
			if req.Method != http.MethodGet && req.Method != http.MethodHead {
				req.Header.Add("content-Type", "application/atom+xml;type=entry;charset=utf-8")
			}
			return next(ctx, req)
		}
	}

func TraceReqAndResponseMiddleware added in v0.2.0

func TraceReqAndResponseMiddleware() MiddlewareFunc

TraceReqAndResponseMiddleware will print the dump of the management request and response.

This should only be used for debugging or educational purposes.

type Namespace

type Namespace struct {
	Name          string
	Suffix        string
	TokenProvider auth.TokenProvider
	Environment   azure.Environment
	// contains filtered or unexported fields
}

Namespace provides a simplified facade over the AMQP implementation of Azure Service Bus and is the entry point for using Queues, Topics and Subscriptions

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) NewQueue

func (ns *Namespace) NewQueue(name string, opts ...QueueOption) (*Queue, error)

NewQueue creates a new Queue Sender / Receiver

func (*Namespace) NewQueueManager

func (ns *Namespace) NewQueueManager() *QueueManager

NewQueueManager creates a new QueueManager for a Service Bus Namespace

func (*Namespace) NewReceiver added in v0.2.0

func (ns *Namespace) NewReceiver(ctx context.Context, entityPath string, opts ...ReceiverOption) (*Receiver, error)

NewReceiver creates a new Service Bus message listener given an AMQP client and an entity path

func (*Namespace) NewSender added in v0.2.0

func (ns *Namespace) NewSender(ctx context.Context, entityPath string, opts ...SenderOption) (*Sender, error)

NewSender creates a new Service Bus message Sender given an AMQP client and entity path

func (*Namespace) NewSubscriptionManager

func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)

NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace

func (*Namespace) NewTopic

func (ns *Namespace) NewTopic(name string, opts ...TopicOption) (*Topic, error)

NewTopic creates a new Topic Sender

func (*Namespace) NewTopicManager

func (ns *Namespace) NewTopicManager() *TopicManager

NewTopicManager creates a new TopicManager for a Service Bus Namespace

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Service Bus namespace

func NamespaceWithAzureEnvironment added in v0.10.7

func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption

NamespaceWithAzureEnvironment sets the namespace's Environment, Suffix and ResourceURI parameters according to the Azure Environment defined in "github.com/Azure/go-autorest/autorest/azure" package. This allows to configure the library to be used in the different Azure clouds. environmentName is the name of the cloud as defined in autorest : https://github.com/Azure/go-autorest/blob/b076c1437d051bf4c328db428b70f4fe22ad38b0/autorest/azure/environments.go#L34-L39

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string

func NamespaceWithEnvironmentBinding added in v0.10.0

func NamespaceWithEnvironmentBinding(name string) NamespaceOption

NamespaceWithEnvironmentBinding configures a namespace using the environment details. It uses one of the following methods:

  1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
  1. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"

3. Managed Identity (MI): attempt to authenticate via the MI assigned to the Azure resource

The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.

func NamespaceWithTLSConfig added in v0.6.0

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithTokenProvider added in v0.10.7

func NamespaceWithTokenProvider(provider auth.TokenProvider) NamespaceOption

NamespaceWithTokenProvider sets the token provider on the namespace

func NamespaceWithUserAgent added in v0.3.0

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket added in v0.5.0

func NamespaceWithWebSocket() NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

Example
const queueName = "wssQueue"

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a Service Bus Namespace using a connection string over wss:// on port 443
ns, err := servicebus.NewNamespace(
	servicebus.NamespaceWithConnectionString(connStr),
	servicebus.NamespaceWithWebSocket(),
)
if err != nil {
	fmt.Println(err)
	return
}

// Create a context to limit how long we will try to send, then push the message over the wire.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

qm := ns.NewQueueManager()
if _, err := ensureQueue(ctx, qm, queueName); err != nil {
	fmt.Println(err)
	return
}

client, err := ns.NewQueue(queueName)
if err != nil {
	fmt.Println(err)
	return
}

// Send a message to the queue
if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil {
	fmt.Println(err)
}

// Receive the message from the queue
if err := client.ReceiveOne(ctx, MessagePrinter{}); err != nil {
	fmt.Println(err)
}
Output:

Hello World!!!

type PeekOption added in v0.2.0

type PeekOption func(*peekIterator) error

PeekOption allows customization of parameters when querying a Service Bus entity for messages without committing to processing them.

func PeekFromSequenceNumber added in v0.2.0

func PeekFromSequenceNumber(seq int64) PeekOption

PeekFromSequenceNumber adds a filter to the Peek operation, so that no messages with a Sequence Number less than 'seq' are returned.

func PeekWithPageSize added in v0.2.0

func PeekWithPageSize(pageSize int) PeekOption

PeekWithPageSize adjusts how many messages are fetched at once while peeking from the server.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a Service Bus Queue entity, which offers First In, First Out (FIFO) message delivery to one or more competing consumers. That is, messages are typically expected to be received and processed by the receivers in the order in which they were added to the queue, and each message is received and processed by only one message consumer.

Example (GetOrBuildQueue)
const queueName = "myqueue"

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println(err)
	return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

qm := ns.NewQueueManager()
qe, err := qm.Get(ctx, queueName)
if err != nil && !servicebus.IsErrNotFound(err) {
	fmt.Println(err)
	return
}

if qe == nil {
	_, err := qm.Put(ctx, queueName)
	if err != nil {
		fmt.Println(err)
		return
	}
}

q, err := ns.NewQueue(queueName)
if err != nil {
	fmt.Println(err)
	return
}

fmt.Println(q.Name)
Output:

myqueue
Example (ScheduleAndCancelMessages)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
defer cancel()

connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

client, err := ns.NewQueue("schedulewithqueue")
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

// The delay that we should schedule a message for.
const waitTime = 1 * time.Minute
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")

scheduled, err := client.ScheduleAt(ctx, expectedTime, msg)
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

err = client.CancelScheduled(ctx, scheduled...)
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

fmt.Println("All Messages Scheduled and Cancelled")
Output:

All Messages Scheduled and Cancelled
Example (SessionsRoundTrip)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Setup the required clients for communicating with Service Bus.                                                 //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
	fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
	return
}

ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

client, err := ns.NewQueue("receivesession")
if err != nil {
	fmt.Println("FATAL: ", err)
	return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Publish five session's worth of data.                                                                          //
//                                                                                                                //
// The sessions are deliberately interleaved to demonstrate consumption semantics.                                //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const numSessions = 5
adjectives := []string{"Doltish", "Foolish", "Juvenile"}
nouns := []string{"Automaton", "Luddite", "Monkey", "Neanderthal"}

// seed chosen arbitrarily, see https://en.wikipedia.org/wiki/Taxicab_number
generator := rand.New(rand.NewSource(1729))

sessionIDs := make([]string, numSessions)

// Establish a set of sessions
for i := 0; i < numSessions; i++ {
	if rawSessionID, err := uuid.NewV4(); err == nil {
		sessionIDs[i] = rawSessionID.String()
	} else {
		fmt.Println("FATAL: ", err)
		return
	}
}

// Publish an adjective for each session
for i := 0; i < numSessions; i++ {
	adj := adjectives[generator.Intn(len(adjectives))]
	msg := servicebus.NewMessageFromString(adj)
	msg.SessionID = &sessionIDs[i]
	if err := client.Send(ctx, msg); err != nil {
		fmt.Println("FATAL: ", err)
		return
	}
}

// Publish a noun for each session
for i := 0; i < numSessions; i++ {
	noun := nouns[generator.Intn(len(nouns))]
	msg := servicebus.NewMessageFromString(noun)
	msg.SessionID = &sessionIDs[i]
	if err := client.Send(ctx, msg); err != nil {
		fmt.Println("FATAL: ", err)
		return
	}
}

// Publish a numeric suffix for each session
for i := 0; i < numSessions; i++ {
	suffix := fmt.Sprintf("%02d", generator.Intn(100))
	msg := servicebus.NewMessageFromString(suffix)
	msg.SessionID = &sessionIDs[i]
	if err := client.Send(ctx, msg); err != nil {
		fmt.Println("FATAL: ", err)
		return
	}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Receive and process the previously published sessions.                                                         //
//                                                                                                                //
// The order the sessions are received in is not guaranteed, so the expected output must be "Unordered output".   //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
for i := 0; i < numSessions; i++ {
	handler := &SessionPrinter{}
	qs := client.NewSession(nil)
	if err := qs.ReceiveOne(ctx, handler); err != nil {
		fmt.Println("FATAL: ", err)
		return
	}
}
Output:

FoolishMonkey63
FoolishLuddite05
JuvenileMonkey80
JuvenileLuddite84
FoolishLuddite68

func (*Queue) Close

func (q *Queue) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Queue) NewDeadLetter added in v0.2.0

func (q *Queue) NewDeadLetter() *DeadLetter

NewDeadLetter creates an entity that represents the dead letter sub queue of the queue

Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.

The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.

From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.

Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.

func (*Queue) NewDeadLetterReceiver added in v0.2.0

func (q *Queue) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewDeadLetterReceiver builds a receiver for the Queue's dead letter queue

func (*Queue) NewReceiver added in v0.2.0

func (q *Queue) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)

NewReceiver will create a new Receiver for receiving messages off of a queue

func (*Queue) NewSender added in v0.2.0

func (q *Queue) NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)

NewSender will create a new Sender for sending messages to the queue

func (*Queue) NewSession added in v0.2.0

func (q *Queue) NewSession(sessionID *string) *QueueSession

NewSession will create a new session based receiver and sender for the queue

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*Queue) NewTransferDeadLetter added in v0.2.0

func (q *Queue) NewTransferDeadLetter() *TransferDeadLetter

NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func (*Queue) NewTransferDeadLetterReceiver added in v0.2.0

func (q *Queue) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func (*Queue) Receive

func (q *Queue) Receive(ctx context.Context, handler Handler) error

Receive subscribes for messages sent to the Queue. If the messages not within a session, messages will arrive unordered.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

Example
// Define a function that should be executed when a message is received.
var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
	fmt.Println(string(msg.Data))
	return msg.Complete(ctx)
}

// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
	return
}

client, err := ns.NewQueue("myqueue")
if err != nil {
	return
}

// Define a context to limit how long we will block to receive messages, then start serving our function.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

if err := client.Receive(ctx, printMessage); err != nil {
	fmt.Println("FATAL: ", err)
}
Output:

Example (Second)
// Set concurrent number
const concurrentNum = 5
// Define msg chan
msgChan := make(chan *servicebus.Message, concurrentNum)
// Define a function that should be executed when a message is received.
var concurrentHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
	msgChan <- msg
	return nil
}

// Define msg workers
for i := 0; i < concurrentNum; i++ {
	go func() {
		for msg := range msgChan {

			ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
			defer cancel()

			fmt.Println(string(msg.Data))
			msg.Complete(ctx)
		}
	}()
}

// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
	close(msgChan)
	return
}

// Init queue client with prefetch count
client, err := ns.NewQueue("myqueue", servicebus.QueueWithPrefetchCount(concurrentNum))
if err != nil {
	close(msgChan)
	return
}

// Define a context to limit how long we will block to receive messages, then start serving our function.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

if err := client.Receive(ctx, concurrentHandler); err != nil {
	fmt.Println("FATAL: ", err)
}
// Close the message chan
close(msgChan)
Output:

func (*Queue) ReceiveOne

func (q *Queue) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

func (*Queue) Send

func (q *Queue) Send(ctx context.Context, msg *Message) error

Send sends messages to the Queue

Example
// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
	return
}

client, err := ns.NewQueue("myqueue")
if err != nil {
	return
}

// Create a context to limit how long we will try to send, then push the message over the wire.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil {
	fmt.Println("FATAL: ", err)
}
Output:

func (*Queue) SendBatch added in v0.3.0

func (q *Queue) SendBatch(ctx context.Context, iterator BatchIterator) error

SendBatch sends a batch of messages to the Queue

type QueueDescription

type QueueDescription struct {
	XMLName xml.Name `xml:"QueueDescription"`
	BaseEntityDescription
	LockDuration                        *string       `xml:"LockDuration,omitempty"`               // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
	MaxSizeInMegabytes                  *int32        `xml:"MaxSizeInMegabytes,omitempty"`         // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
	RequiresDuplicateDetection          *bool         `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
	RequiresSession                     *bool         `xml:"RequiresSession,omitempty"`
	DefaultMessageTimeToLive            *string       `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	DeadLetteringOnMessageExpiration    *bool         `xml:"DeadLetteringOnMessageExpiration,omitempty"`    // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
	DuplicateDetectionHistoryTimeWindow *string       `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
	MaxDeliveryCount                    *int32        `xml:"MaxDeliveryCount,omitempty"`                    // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
	EnableBatchedOperations             *bool         `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	SizeInBytes                         *int64        `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
	MessageCount                        *int64        `xml:"MessageCount,omitempty"`                        // MessageCount - The number of messages in the queue.
	IsAnonymousAccessible               *bool         `xml:"IsAnonymousAccessible,omitempty"`
	Status                              *EntityStatus `xml:"Status,omitempty"`
	CreatedAt                           *date.Time    `xml:"CreatedAt,omitempty"`
	UpdatedAt                           *date.Time    `xml:"UpdatedAt,omitempty"`
	SupportOrdering                     *bool         `xml:"SupportOrdering,omitempty"`
	AutoDeleteOnIdle                    *string       `xml:"AutoDeleteOnIdle,omitempty"`
	EnablePartitioning                  *bool         `xml:"EnablePartitioning,omitempty"`
	EnableExpress                       *bool         `xml:"EnableExpress,omitempty"`
	CountDetails                        *CountDetails `xml:"CountDetails,omitempty"`
	ForwardTo                           *string       `xml:"ForwardTo,omitempty"`
	ForwardDeadLetteredMessagesTo       *string       `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
}

QueueDescription is the content type for Queue management requests

type QueueEntity

type QueueEntity struct {
	*QueueDescription
	*Entity
}

QueueEntity is the Azure Service Bus description of a Queue for management activities

type QueueManagementOption

type QueueManagementOption func(*QueueDescription) error

QueueManagementOption represents named configuration options for queue mutation

func QueueEntityWithAutoDeleteOnIdle

func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption

QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func QueueEntityWithAutoForward added in v0.2.0

func QueueEntityWithAutoForward(target Targetable) QueueManagementOption

QueueEntityWithAutoForward configures the queue to automatically forward messages to the specified target.

The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func QueueEntityWithDeadLetteringOnMessageExpiration

func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption

QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue

func QueueEntityWithDuplicateDetection

func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption

QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func QueueEntityWithForwardDeadLetteredMessagesTo added in v0.2.0

func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption

QueueEntityWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target.

The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func QueueEntityWithLockDuration

func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption

QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func QueueEntityWithMaxDeliveryCount

func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption

QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before dead-lettering the message

func QueueEntityWithMaxSizeInMegabytes

func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption

QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the queue. Default is 1 MB (1 * 1024).

size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku

func QueueEntityWithMessageTimeToLive

func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption

QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func QueueEntityWithPartitioning

func QueueEntityWithPartitioning() QueueManagementOption

QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure FIFO message retrieval:

SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of session states.

PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional messages. The partition key ensures that all messages that are sent within a transaction are handled by the same messaging broker.

MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and eliminate duplicate messages

func QueueEntityWithRequiredSessions

func QueueEntityWithRequiredSessions() QueueManagementOption

QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager provides CRUD functionality for Service Bus Queues

func (*QueueManager) Delete

func (qm *QueueManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Queue entity by name

func (QueueManager) Execute

func (em QueueManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*QueueManager) Get

func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)

Get fetches a Service Bus Queue entity by name

func (*QueueManager) List

func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error)

List fetches all of the queues for a Service Bus Namespace

func (QueueManager) Post

func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*QueueManager) Put

func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)

Put creates or updates a Service Bus Queue

func (QueueManager) TokenProvider added in v0.2.0

func (em QueueManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (QueueManager) Use added in v0.2.0

func (em QueueManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type QueueOption

type QueueOption func(*Queue) error

QueueOption represents named options for assisting Queue message handling

func QueueWithPrefetchCount added in v0.2.0

func QueueWithPrefetchCount(prefetch uint32) QueueOption

QueueWithPrefetchCount configures the queue to attempt to fetch the number of messages specified by the prefetch count at one time.

The default is 1 message at a time.

Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.

func QueueWithReceiveAndDelete

func QueueWithReceiveAndDelete() QueueOption

QueueWithReceiveAndDelete configures a queue to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.

type QueueSession added in v0.2.0

type QueueSession struct {
	// contains filtered or unexported fields
}

QueueSession wraps Service Bus session functionality over a Queue

func NewQueueSession added in v0.2.0

func NewQueueSession(builder SendAndReceiveBuilder, sessionID *string) *QueueSession

NewQueueSession creates a new session sender and receiver to communicate with a Service Bus queue.

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*QueueSession) Close added in v0.2.0

func (qs *QueueSession) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*QueueSession) ManagementPath added in v0.9.0

func (qs *QueueSession) ManagementPath() string

ManagementPath provides an addressable path to the Entity management endpoint

func (*QueueSession) ReceiveDeferred added in v0.9.0

func (qs *QueueSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error

ReceiveDeferred will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*QueueSession) ReceiveOne added in v0.2.0

func (qs *QueueSession) ReceiveOne(ctx context.Context, handler SessionHandler) error

ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session. The session can contain multiple messages. ReceiveOne will receive all messages within that session.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

func (*QueueSession) Send added in v0.2.0

func (qs *QueueSession) Send(ctx context.Context, msg *Message) error

Send the message to the queue within a session

func (*QueueSession) SessionID added in v0.2.0

func (qs *QueueSession) SessionID() *string

SessionID is the identifier for the Service Bus session

type ReceiveBuilder added in v0.2.0

type ReceiveBuilder interface {
	ReceiverBuilder
	// contains filtered or unexported methods
}

ReceiveBuilder is a ReceiverBuilder and EntityManagementAddresser

type ReceiveMode

type ReceiveMode int

ReceiveMode represents the behavior when consuming a message from a queue

type ReceiveOner added in v0.2.0

type ReceiveOner interface {
	Closer
	ReceiveOne(ctx context.Context, handler Handler) error
}

ReceiveOner provides the ability to receive and handle events

type Receiver added in v0.2.0

type Receiver struct {
	Name string

	DefaultDisposition DispositionAction
	Closed             bool
	// contains filtered or unexported fields
}

Receiver provides connection, session and link handling for a receiving to an entity path

func (*Receiver) Close added in v0.2.0

func (r *Receiver) Close(ctx context.Context) error

Close will close the AMQP session and link of the Receiver

func (*Receiver) Listen added in v0.2.0

func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle

Listen start a listener for messages sent to the entity path

func (*Receiver) ReceiveOne added in v0.2.0

func (r *Receiver) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will receive one message from the link

func (*Receiver) Recover added in v0.2.0

func (r *Receiver) Recover(ctx context.Context) error

Recover will attempt to close the current session and link, then rebuild them

type ReceiverBuilder added in v0.2.0

type ReceiverBuilder interface {
	NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
}

ReceiverBuilder describes the ability of an entity to build receiver links

type ReceiverOption added in v0.2.0

type ReceiverOption func(receiver *Receiver) error

ReceiverOption provides a structure for configuring receivers

func ReceiverWithPrefetchCount added in v0.2.0

func ReceiverWithPrefetchCount(prefetch uint32) ReceiverOption

ReceiverWithPrefetchCount configures the receiver to attempt to fetch the number of messages specified by the prefect at one time.

The default is 1 message at a time.

Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.

func ReceiverWithReceiveMode added in v0.2.0

func ReceiverWithReceiveMode(mode ReceiveMode) ReceiverOption

ReceiverWithReceiveMode configures a Receiver to use the specified receive mode

func ReceiverWithSession added in v0.2.0

func ReceiverWithSession(sessionID *string) ReceiverOption

ReceiverWithSession configures a Receiver to use a session

type RestHandler added in v0.2.0

type RestHandler func(ctx context.Context, req *http.Request) (*http.Response, error)

RestHandler is used to transform a request and response within the http pipeline

type RuleDescription added in v0.2.0

type RuleDescription struct {
	XMLName xml.Name `xml:"RuleDescription"`
	BaseEntityDescription
	CreatedAt *date.Time         `xml:"CreatedAt,omitempty"`
	Filter    FilterDescription  `xml:"Filter"`
	Action    *ActionDescription `xml:"Action,omitempty"`
}

RuleDescription is the content type for Subscription Rule management requests

type RuleEntity added in v0.2.0

type RuleEntity struct {
	*RuleDescription
	*Entity
}

RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities

type SQLAction added in v0.2.0

type SQLAction struct {
	Expression string
}

SQLAction represents a SQL language-based action expression that is evaluated against a BrokeredMessage. A SQLAction supports a subset of the SQL-92 standard.

With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter

func (SQLAction) ToActionDescription added in v0.2.0

func (sf SQLAction) ToActionDescription() ActionDescription

ToActionDescription will transform the SqlAction into a ActionDescription

type SQLFilter added in v0.2.0

type SQLFilter struct {
	Expression string
}

SQLFilter represents a SQL language-based filter expression that is evaluated against a BrokeredMessage. A SQLFilter supports a subset of the SQL-92 standard.

see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter

func (SQLFilter) ToFilterDescription added in v0.2.0

func (sf SQLFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the SqlFilter into a FilterDescription

type SendAndReceiveBuilder added in v0.2.0

type SendAndReceiveBuilder interface {
	ReceiveBuilder
	SenderBuilder
}

SendAndReceiveBuilder is a ReceiverBuilder, SenderBuilder and EntityManagementAddresser

type SendOption

type SendOption func(event *Message) error

SendOption provides a way to customize a message on sending

type Sender added in v0.2.0

type Sender struct {
	Name string
	// contains filtered or unexported fields
}

Sender provides connection, session and link handling for an sending to an entity path

func (*Sender) Close added in v0.2.0

func (s *Sender) Close(ctx context.Context) error

Close will close the AMQP connection, session and link of the Sender

func (*Sender) Recover added in v0.2.0

func (s *Sender) Recover(ctx context.Context) error

Recover will attempt to close the current session and link, then rebuild them

func (*Sender) Send added in v0.2.0

func (s *Sender) Send(ctx context.Context, msg *Message, opts ...SendOption) error

Send will send a message to the entity path with options

This will retry sending the message if the server responds with a busy error.

func (*Sender) String added in v0.2.0

func (s *Sender) String() string

type SenderBuilder added in v0.2.0

type SenderBuilder interface {
	NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)
}

SenderBuilder describes the ability of an entity to build sender links

type SenderOption added in v0.2.0

type SenderOption func(*Sender) error

SenderOption provides a way to customize a Sender

func SenderWithSession added in v0.2.0

func SenderWithSession(sessionID *string) SenderOption

SenderWithSession configures the message to send with a specific session and sequence. By default, a Sender has a default session (uuid.NewV4()) and sequence generator.

type SessionHandler added in v0.2.0

type SessionHandler interface {
	Handler

	// Start is called when a Receiver is informed that has acquired a lock on a Service Bus Session.
	Start(*MessageSession) error

	// End is called when a Receiver is informed that the last message of a Session has been passed to it.
	End()
}

SessionHandler exposes a manner of handling a group of messages together. Instances of SessionHandler should be passed to a Receiver such as a Queue or Subscription.

func NewSessionHandler added in v0.2.0

func NewSessionHandler(base Handler, start func(*MessageSession) error, end func()) SessionHandler

NewSessionHandler is a type converter that allows three funcs to be tied together into a type that fulfills the SessionHandler interface.

type Subscription

type Subscription struct {
	Topic *Topic
	// contains filtered or unexported fields
}

Subscription represents a Service Bus Subscription entity which are used to receive topic messages. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

func (*Subscription) Close

func (s *Subscription) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Subscription) NewDeadLetter added in v0.2.0

func (s *Subscription) NewDeadLetter() *DeadLetter

NewDeadLetter creates an entity that represents the dead letter sub queue of the queue

Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.

The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.

From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.

Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.

func (*Subscription) NewDeadLetterReceiver added in v0.2.0

func (s *Subscription) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewDeadLetterReceiver builds a receiver for the Subscriptions's dead letter queue

func (*Subscription) NewReceiver added in v0.2.0

func (s *Subscription) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)

NewReceiver will create a new Receiver for receiving messages off of the queue

func (*Subscription) NewSession added in v0.2.0

func (s *Subscription) NewSession(sessionID *string) *SubscriptionSession

NewSession will create a new session based receiver for the subscription

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*Subscription) NewTransferDeadLetter added in v0.2.0

func (s *Subscription) NewTransferDeadLetter() *TransferDeadLetter

NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the subscription

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func (*Subscription) NewTransferDeadLetterReceiver added in v0.2.0

func (s *Subscription) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func (Subscription) Peek added in v0.2.0

func (re Subscription) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error)

Peek fetches a list of Messages from the Service Bus broker without acquiring a lock or committing to a disposition. The messages are delivered as close to sequence order as possible.

The MessageIterator that is returned has the following properties: - Messages are fetches from the server in pages. Page size is configurable with PeekOptions. - The MessageIterator will always return "false" for Done(). - When Next() is called, it will return either: a slice of messages and no error, nil with an error related to being unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that there are currently no messages in the queue with a sequence ID larger than previously viewed ones.

func (Subscription) PeekOne added in v0.2.0

func (re Subscription) PeekOne(ctx context.Context, options ...PeekOption) (*Message, error)

PeekOne fetches a single Message from the Service Bus broker without acquiring a lock or committing to a disposition.

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context, handler Handler) error

Receive subscribes for messages sent to the Subscription

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

func (Subscription) ReceiveDeferred added in v0.2.0

func (re Subscription) ReceiveDeferred(ctx context.Context, handler Handler, sequenceNumbers ...int64) error

ReceiveDeferred will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (Subscription) ReceiveDeferredWithMode added in v0.9.0

func (re Subscription) ReceiveDeferredWithMode(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error

ReceiveDeferredWithMode will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*Subscription) ReceiveOne

func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

func (Subscription) RenewLocks added in v0.2.0

func (re Subscription) RenewLocks(ctx context.Context, messages ...*Message) error

RenewLocks renews the locks on messages provided

func (Subscription) SendBatchDisposition added in v0.3.0

func (re Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error

SendBatchDisposition updates the LockTokenIDs to the disposition status.

type SubscriptionDescription

type SubscriptionDescription struct {
	XMLName xml.Name `xml:"SubscriptionDescription"`
	BaseEntityDescription
	LockDuration                              *string                 `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
	RequiresSession                           *bool                   `xml:"RequiresSession,omitempty"`
	DefaultMessageTimeToLive                  *string                 `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	DefaultRuleDescription                    *DefaultRuleDescription `xml:"DefaultRuleDescription,omitempty"`
	DeadLetteringOnMessageExpiration          *bool                   `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
	DeadLetteringOnFilterEvaluationExceptions *bool                   `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"`
	MessageCount                              *int64                  `xml:"MessageCount,omitempty"`            // MessageCount - The number of messages in the queue.
	MaxDeliveryCount                          *int32                  `xml:"MaxDeliveryCount,omitempty"`        // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
	EnableBatchedOperations                   *bool                   `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	Status                                    *EntityStatus           `xml:"Status,omitempty"`
	CreatedAt                                 *date.Time              `xml:"CreatedAt,omitempty"`
	UpdatedAt                                 *date.Time              `xml:"UpdatedAt,omitempty"`
	AccessedAt                                *date.Time              `xml:"AccessedAt,omitempty"`
	AutoDeleteOnIdle                          *string                 `xml:"AutoDeleteOnIdle,omitempty"`
	ForwardTo                                 *string                 `xml:"ForwardTo,omitempty"`                     // ForwardTo - absolute URI of the entity to forward messages
	ForwardDeadLetteredMessagesTo             *string                 `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
	CountDetails                              *CountDetails           `xml:"CountDetails,omitempty"`
}

SubscriptionDescription is the content type for Subscription management requests

type SubscriptionEntity

type SubscriptionEntity struct {
	*SubscriptionDescription
	*Entity
}

SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities

type SubscriptionManagementOption

type SubscriptionManagementOption func(*SubscriptionDescription) error

SubscriptionManagementOption represents named options for assisting Subscription creation

func SubscriptionWithAutoDeleteOnIdle

func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption

SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func SubscriptionWithAutoForward added in v0.2.0

func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption

SubscriptionWithAutoForward configures the queue to automatically forward messages to the specified entity path

The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func SubscriptionWithBatchedOperations

func SubscriptionWithBatchedOperations() SubscriptionManagementOption

SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.

func SubscriptionWithDeadLetteringOnMessageExpiration

func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption

SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead letter queue

func SubscriptionWithDefaultRuleDescription added in v0.10.3

func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption

SubscriptionWithDefaultRuleDescription configures the subscription to set a default rule

func SubscriptionWithForwardDeadLetteredMessagesTo added in v0.2.0

func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption

SubscriptionWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target entity.

The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func SubscriptionWithLockDuration

func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption

SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func SubscriptionWithMessageTimeToLive

func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption

SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func SubscriptionWithRequiredSessions

func SubscriptionWithRequiredSessions() SubscriptionManagementOption

SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs

type SubscriptionManager

type SubscriptionManager struct {
	Topic *Topic
	// contains filtered or unexported fields
}

SubscriptionManager provides CRUD functionality for Service Bus Subscription

func (*SubscriptionManager) Delete

func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (*SubscriptionManager) DeleteRule added in v0.2.0

func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error

DeleteRule will delete a rule on the subscription

func (SubscriptionManager) Execute

func (em SubscriptionManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*SubscriptionManager) Get

Get fetches a Service Bus Topic entity by name

func (*SubscriptionManager) List

List fetches all of the Topics for a Service Bus Namespace

func (*SubscriptionManager) ListRules added in v0.2.0

func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)

ListRules returns the slice of subscription filter rules

By default when the subscription is created, there exists a single "true" filter which matches all messages.

func (SubscriptionManager) Post

func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*SubscriptionManager) Put

Put creates or updates a Service Bus Topic

func (*SubscriptionManager) PutRule added in v0.2.0

func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)

PutRule creates a new Subscription rule to filter messages from the topic

func (*SubscriptionManager) PutRuleWithAction added in v0.2.0

func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, action ActionDescriber) (*RuleEntity, error)

PutRuleWithAction creates a new Subscription rule to filter messages from the topic and then perform an action

func (SubscriptionManager) TokenProvider added in v0.2.0

func (em SubscriptionManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (SubscriptionManager) Use added in v0.2.0

func (em SubscriptionManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type SubscriptionOption

type SubscriptionOption func(*Subscription) error

SubscriptionOption configures the Subscription Azure Service Bus client

func SubscriptionWithPrefetchCount added in v0.2.0

func SubscriptionWithPrefetchCount(prefetch uint32) SubscriptionOption

SubscriptionWithPrefetchCount configures the subscription to attempt to fetch the number of messages specified by the prefetch count at one time.

The default is 1 message at a time.

Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.

func SubscriptionWithReceiveAndDelete

func SubscriptionWithReceiveAndDelete() SubscriptionOption

SubscriptionWithReceiveAndDelete configures a subscription to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.

type SubscriptionSession added in v0.2.0

type SubscriptionSession struct {
	// contains filtered or unexported fields
}

SubscriptionSession wraps Service Bus session functionality over a Subscription

func NewSubscriptionSession added in v0.2.0

func NewSubscriptionSession(builder ReceiveBuilder, sessionID *string) *SubscriptionSession

NewSubscriptionSession creates a new session receiver to receive from a Service Bus subscription.

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*SubscriptionSession) Close added in v0.2.0

func (ss *SubscriptionSession) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*SubscriptionSession) ManagementPath added in v0.9.0

func (ss *SubscriptionSession) ManagementPath() string

ManagementPath provides an addressable path to the Entity management endpoint

func (*SubscriptionSession) ReceiveDeferred added in v0.9.0

func (ss *SubscriptionSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error

ReceiveDeferred will receive and handle a set of deferred messages

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.

A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.

Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.

func (*SubscriptionSession) ReceiveOne added in v0.2.0

func (ss *SubscriptionSession) ReceiveOne(ctx context.Context, handler SessionHandler) error

ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session. The session can contain multiple messages. ReceiveOneSession will receive all messages within that session.

Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.

If the handler returns an error, the receive loop will be terminated.

func (*SubscriptionSession) SessionID added in v0.2.0

func (ss *SubscriptionSession) SessionID() *string

SessionID is the identifier for the Service Bus session

type SystemProperties

type SystemProperties struct {
	LockedUntil            *time.Time             `mapstructure:"x-opt-locked-until"`
	SequenceNumber         *int64                 `mapstructure:"x-opt-sequence-number"`
	PartitionID            *int16                 `mapstructure:"x-opt-partition-id"`
	PartitionKey           *string                `mapstructure:"x-opt-partition-key"`
	EnqueuedTime           *time.Time             `mapstructure:"x-opt-enqueued-time"`
	DeadLetterSource       *string                `mapstructure:"x-opt-deadletter-source"`
	ScheduledEnqueueTime   *time.Time             `mapstructure:"x-opt-scheduled-enqueue-time"`
	EnqueuedSequenceNumber *int64                 `mapstructure:"x-opt-enqueue-sequence-number"`
	ViaPartitionKey        *string                `mapstructure:"x-opt-via-partition-key"`
	Annotations            map[string]interface{} `mapstructure:"-"`
}

SystemProperties are used to store properties that are set by the system.

type Targetable added in v0.2.0

type Targetable interface {
	TargetURI() string
}

Targetable provides the ability to forward messages to the entity

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic in contrast to queues, in which each message is processed by a single consumer, topics and subscriptions provide a one-to-many form of communication, in a publish/subscribe pattern. Useful for scaling to very large numbers of recipients, each published message is made available to each subscription registered with the topic. Messages are sent to a topic and delivered to one or more associated subscriptions, depending on filter rules that can be set on a per-subscription basis. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages are sent to a topic in the same way they are sent to a queue, but messages are not received from the topic directly. Instead, they are received from subscriptions. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

func (Topic) CancelScheduled added in v0.9.0

func (se Topic) CancelScheduled(ctx context.Context, seq ...int64) error

CancelScheduled allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.

func (*Topic) Close

func (t *Topic) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Topic) NewSender added in v0.2.0

func (t *Topic) NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)

NewSender will create a new Sender for sending messages to the queue

func (*Topic) NewSession added in v0.2.0

func (t *Topic) NewSession(sessionID *string) *TopicSession

NewSession will create a new session based sender for the topic

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*Topic) NewSubscription

func (t *Topic) NewSubscription(name string, opts ...SubscriptionOption) (*Subscription, error)

NewSubscription creates a new Topic Subscription client

func (*Topic) NewSubscriptionManager

func (t *Topic) NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic

func (*Topic) NewTransferDeadLetter added in v0.2.0

func (t *Topic) NewTransferDeadLetter() *TransferDeadLetter

NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the topic

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func (*Topic) NewTransferDeadLetterReceiver added in v0.2.0

func (t *Topic) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)

NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func (Topic) ScheduleAt added in v0.9.0

func (se Topic) ScheduleAt(ctx context.Context, enqueueTime time.Time, messages ...*Message) ([]int64, error)

ScheduleAt will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.

func (*Topic) Send

func (t *Topic) Send(ctx context.Context, event *Message, opts ...SendOption) error

Send sends messages to the Topic

func (*Topic) SendBatch added in v0.3.0

func (t *Topic) SendBatch(ctx context.Context, iterator BatchIterator) error

SendBatch sends a batch of messages to the Topic

type TopicDescription

type TopicDescription struct {
	XMLName xml.Name `xml:"TopicDescription"`
	BaseEntityDescription
	DefaultMessageTimeToLive            *string       `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	MaxSizeInMegabytes                  *int32        `xml:"MaxSizeInMegabytes,omitempty"`                  // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
	RequiresDuplicateDetection          *bool         `xml:"RequiresDuplicateDetection,omitempty"`          // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
	DuplicateDetectionHistoryTimeWindow *string       `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
	EnableBatchedOperations             *bool         `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	SizeInBytes                         *int64        `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
	FilteringMessagesBeforePublishing   *bool         `xml:"FilteringMessagesBeforePublishing,omitempty"`
	IsAnonymousAccessible               *bool         `xml:"IsAnonymousAccessible,omitempty"`
	Status                              *EntityStatus `xml:"Status,omitempty"`
	CreatedAt                           *date.Time    `xml:"CreatedAt,omitempty"`
	UpdatedAt                           *date.Time    `xml:"UpdatedAt,omitempty"`
	SupportOrdering                     *bool         `xml:"SupportOrdering,omitempty"`
	AutoDeleteOnIdle                    *string       `xml:"AutoDeleteOnIdle,omitempty"`
	EnablePartitioning                  *bool         `xml:"EnablePartitioning,omitempty"`
	EnableSubscriptionPartitioning      *bool         `xml:"EnableSubscriptionPartitioning,omitempty"`
	EnableExpress                       *bool         `xml:"EnableExpress,omitempty"`
	CountDetails                        *CountDetails `xml:"CountDetails,omitempty"`
}

TopicDescription is the content type for Topic management requests

type TopicEntity

type TopicEntity struct {
	*TopicDescription
	*Entity
}

TopicEntity is the Azure Service Bus description of a Topic for management activities

type TopicManagementOption

type TopicManagementOption func(*TopicDescription) error

TopicManagementOption represents named options for assisting Topic creation

func TopicWithAutoDeleteOnIdle

func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption

TopicWithAutoDeleteOnIdle configures the topic to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func TopicWithBatchedOperations

func TopicWithBatchedOperations() TopicManagementOption

TopicWithBatchedOperations configures the topic to batch server-side operations.

func TopicWithDuplicateDetection

func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption

TopicWithDuplicateDetection configures the topic to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func TopicWithExpress

func TopicWithExpress() TopicManagementOption

TopicWithExpress configures the topic to hold a message in memory temporarily before writing it to persistent storage.

func TopicWithMaxSizeInMegabytes

func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption

TopicWithMaxSizeInMegabytes configures the maximum size of the topic in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the topic. Default is 1 MB (1 * 1024).

size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku

func TopicWithMessageTimeToLive

func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption

TopicWithMessageTimeToLive configures the topic to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func TopicWithOrdering

func TopicWithOrdering() TopicManagementOption

TopicWithOrdering configures the topic to support ordering of messages.

func TopicWithPartitioning

func TopicWithPartitioning() TopicManagementOption

TopicWithPartitioning configures the topic to be partitioned across multiple message brokers.

type TopicManager

type TopicManager struct {
	// contains filtered or unexported fields
}

TopicManager provides CRUD functionality for Service Bus Topics

func (*TopicManager) Delete

func (tm *TopicManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (TopicManager) Execute

func (em TopicManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*TopicManager) Get

func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)

Get fetches a Service Bus Topic entity by name

func (*TopicManager) List

func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error)

List fetches all of the Topics for a Service Bus Namespace

func (TopicManager) Post

func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*TopicManager) Put

func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)

Put creates or updates a Service Bus Topic

func (TopicManager) TokenProvider added in v0.2.0

func (em TopicManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (TopicManager) Use added in v0.2.0

func (em TopicManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type TopicOption

type TopicOption func(*Topic) error

TopicOption represents named options for assisting Topic message handling

type TopicSession added in v0.2.0

type TopicSession struct {
	// contains filtered or unexported fields
}

TopicSession wraps Service Bus session functionality over a Topic

func NewTopicSession added in v0.2.0

func NewTopicSession(builder SenderBuilder, sessionID *string) *TopicSession

NewTopicSession creates a new session receiver to receive from a Service Bus topic.

Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.

func (*TopicSession) Close added in v0.2.0

func (ts *TopicSession) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*TopicSession) Send added in v0.2.0

func (ts *TopicSession) Send(ctx context.Context, msg *Message) error

Send the message to the queue within a session

func (*TopicSession) SessionID added in v0.2.0

func (ts *TopicSession) SessionID() *string

SessionID is the identifier for the Service Bus session

type TransferDeadLetter added in v0.2.0

type TransferDeadLetter struct {
	// contains filtered or unexported fields
}

TransferDeadLetter represents a transfer dead letter queue in Azure Service Bus.

Messages will be sent to the transfer dead-letter queue under the following conditions:

  • A message passes through more than 3 queues or topics that are chained together.
  • The destination queue or topic is disabled or deleted.
  • The destination queue or topic exceeds the maximum entity size.

func NewTransferDeadLetter added in v0.2.0

func NewTransferDeadLetter(builder TransferDeadLetterBuilder) *TransferDeadLetter

NewTransferDeadLetter constructs an instance of DeadLetter which represents a transfer dead letter queue in Azure Service Bus

func (*TransferDeadLetter) Close added in v0.2.0

func (dl *TransferDeadLetter) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*TransferDeadLetter) ReceiveOne added in v0.2.0

func (dl *TransferDeadLetter) ReceiveOne(ctx context.Context, handler Handler) error

ReceiveOne will receive one message from the dead letter queue

type TransferDeadLetterBuilder added in v0.2.0

type TransferDeadLetterBuilder interface {
	NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
}

TransferDeadLetterBuilder provides the ability to create a new receiver addressed to a given entity's transfer dead letter queue.

type TrueFilter added in v0.2.0

type TrueFilter struct{}

TrueFilter represents a always true sql expression which will accept all messages

func (TrueFilter) ToFilterDescription added in v0.2.0

func (tf TrueFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the TrueFilter into a FilterDescription

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL