internal

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2021 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SpanRecover       = "sb.recover"
	SpanRecoverLink   = "sb.recover.link"
	SpanRecoverClient = "sb.recover.client"
)

link/connection recovery spans

View Source
const (
	SpanProcessorLoop    = "sb.processor.main"
	SpanProcessorMessage = "sb.processor.message"
	SpanProcessorClose   = "sb.processor.close"
)

processor

View Source
const (
	SpanCompleteMessage = "sb.receiver.complete"
)

settlement

View Source
const (
	SpanNegotiateClaim = "sb.auth.negotiateClaim"
)

authentication

View Source
const (
	SpanSendMessageFmt string = "sb.SendMessage.%s"
)

sender spans

View Source
const Version = "v0.1.0"

TODO: this should move into a proper file. Need to resolve some interdependency issues between the public and internal packages first. Version is the semantic version number

Variables

This section is empty.

Functions

func ApplyComponentInfo

func ApplyComponentInfo(span tab.Spanner)

func ConstructAtomPath

func ConstructAtomPath(basePath string, skip int, top int) string

ConstructAtomPath adds the proper parameters for skip and top This is common for the list operations for queues, topics and subscriptions.

func IsCancelError

func IsCancelError(err error) bool

func IsDrainingError

func IsDrainingError(err error) bool

func IsErrNotFound

func IsErrNotFound(err error) bool

IsErrNotFound returns true if the error argument is an ErrNotFound type

func IsNonRetriable

func IsNonRetriable(err error) bool

IsNonRetriable indicates an error is fatal. Typically, this means the connection or link has been closed.

func ShouldRecover

func ShouldRecover(ctx context.Context, err error) bool

Types

type AMQPLinks interface {
	EntityPath() string
	ManagementPath() string

	Audience() string

	// Get will initialize a session and call its link.linkCreator function.
	// If this link has been closed via Close() it will return an non retriable error.
	Get(ctx context.Context) (AMQPSender, AMQPReceiver, MgmtClient, uint64, error)

	// RecoverIfNeeded will check if an error requires recovery, and will recover
	// the link or, possibly, the connection.
	RecoverIfNeeded(ctx context.Context, linksRevision uint64, err error) error

	// Close will close the the link.
	// If permanent is true the link will not be auto-recreated if Get/Recover
	// are called. All functions will return `ErrLinksClosed`
	Close(ctx context.Context, permanent bool) error

	// ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
	ClosedPermanently() bool
}

type AMQPReceiver

type AMQPReceiver interface {
	IssueCredit(credit uint32) error
	DrainCredit(ctx context.Context) error
	Receive(ctx context.Context) (*amqp.Message, error)

	// settlement functions
	AcceptMessage(ctx context.Context, msg *amqp.Message) error
	RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
	ReleaseMessage(ctx context.Context, msg *amqp.Message) error
	ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPReceiverCloser

type AMQPReceiverCloser interface {
	AMQPReceiver
	Close(ctx context.Context) error
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPSender

type AMQPSender interface {
	Send(ctx context.Context, msg *amqp.Message) error
	MaxMessageSize() uint64
}

AMQPSender is implemented by *amqp.Sender

type AMQPSenderCloser

type AMQPSenderCloser interface {
	AMQPSender
	Close(ctx context.Context) error
}

AMQPSenderCloser is implemented by *amqp.Sender

type AMQPSession

type AMQPSession interface {
	NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error)
	NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error)
}

AMQPSession is implemented by *amqp.Session

type AMQPSessionCloser

type AMQPSessionCloser interface {
	AMQPSession
	Close(ctx context.Context) error
}

AMQPSessionCloser is implemented by *amqp.Session

type ActionDescriber

type ActionDescriber interface {
	ToActionDescription() ActionDescription
}

ActionDescriber can transform itself into a ActionDescription

type ActionDescription

type ActionDescription struct {
	Type                  string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
	SQLExpression         string `xml:"SqlExpression"`
	RequiresPreprocessing bool   `xml:"RequiresPreprocessing"`
	CompatibilityLevel    int    `xml:"CompatibilityLevel,omitempty"`
}

ActionDescription describes an action upon a message that matches a filter

With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

type BackoffRetrierParams

type BackoffRetrierParams struct {
	// MaxRetries is the maximum number of tries (after the first attempt)
	// that are allowed.
	MaxRetries int
	// Factor is the multiplying factor for each increment step
	Factor float64
	// Jitter eases contention by randomizing backoff steps
	Jitter bool
	// Min and Max are the minimum and maximum values of the counter
	Min, Max time.Duration
}

BackoffRetrierParams are parameters for NewBackoffRetrier.

type BaseEntityDescription

type BaseEntityDescription struct {
	InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"`
	ServiceBusSchema       *string `xml:"xmlns,attr,omitempty"`
}

BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions

type Closeable

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.

type CorrelationFilter

type CorrelationFilter struct {
	CorrelationID    *string                `xml:"CorrelationId,omitempty"`
	MessageID        *string                `xml:"MessageId,omitempty"`
	To               *string                `xml:"To,omitempty"`
	ReplyTo          *string                `xml:"ReplyTo,omitempty"`
	Label            *string                `xml:"Label,omitempty"`
	SessionID        *string                `xml:"SessionId,omitempty"`
	ReplyToSessionID *string                `xml:"ReplyToSessionId,omitempty"`
	ContentType      *string                `xml:"ContentType,omitempty"`
	Properties       map[string]interface{} `xml:"Properties,omitempty"`
}

CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties. A common use is to match against the CorrelationId property, but the application can also choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any user-defined properties. A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, all conditions must match.

func (CorrelationFilter) ToFilterDescription

func (cf CorrelationFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the CorrelationFilter into a FilterDescription

type CountDetails

type CountDetails struct {
	XMLName                        xml.Name `xml:"CountDetails"`
	ActiveMessageCount             *int32   `xml:"ActiveMessageCount,omitempty"`
	DeadLetterMessageCount         *int32   `xml:"DeadLetterMessageCount,omitempty"`
	ScheduledMessageCount          *int32   `xml:"ScheduledMessageCount,omitempty"`
	TransferDeadLetterMessageCount *int32   `xml:"TransferDeadLetterMessageCount,omitempty"`
	TransferMessageCount           *int32   `xml:"TransferMessageCount,omitempty"`
}

CountDetails has current active (and other) messages for queue/topic.

type CreateLinkFunc

type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)

CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.

type DefaultRuleDescription

type DefaultRuleDescription struct {
	XMLName xml.Name          `xml:"DefaultRuleDescription"`
	Filter  FilterDescription `xml:"Filter"`
	Name    *string           `xml:"Name,omitempty"`
}

DefaultRuleDescription is the content type for Subscription Rule management requests

type Disposition

type Disposition struct {
	Status                DispositionStatus
	LockTokens            []*uuid.UUID
	DeadLetterReason      *string
	DeadLetterDescription *string
}

type DispositionStatus

type DispositionStatus string
const (
	CompletedDisposition DispositionStatus = "completed"
	AbandonedDisposition DispositionStatus = "abandoned"
	SuspendedDisposition DispositionStatus = "suspended"
	DeferredDisposition  DispositionStatus = "defered"
)

type Entity

type Entity struct {
	Name string
	ID   string
}

Entity is represents the most basic form of an Azure Service Bus entity.

func (Entity) TargetURI

func (e Entity) TargetURI() string

TargetURI provides an absolute address to a target entity

type EntityStatus

type EntityStatus string

EntityStatus enumerates the values for entity status.

const (
	// Active ...
	Active EntityStatus = "Active"
	// Creating ...
	Creating EntityStatus = "Creating"
	// Deleting ...
	Deleting EntityStatus = "Deleting"
	// Disabled ...
	Disabled EntityStatus = "Disabled"
	// ReceiveDisabled ...
	ReceiveDisabled EntityStatus = "ReceiveDisabled"
	// Renaming ...
	Renaming EntityStatus = "Renaming"
	// Restoring ...
	Restoring EntityStatus = "Restoring"
	// SendDisabled ...
	SendDisabled EntityStatus = "SendDisabled"
	// Unknown ...
	Unknown EntityStatus = "Unknown"
)

type ErrAMQP

type ErrAMQP rpc.Response

ErrAMQP indicates that the server communicated an AMQP error with a particular

func (ErrAMQP) Error

func (e ErrAMQP) Error() string

type ErrConnectionClosed

type ErrConnectionClosed string

ErrConnectionClosed indicates that the connection has been closed.

func (ErrConnectionClosed) Error

func (e ErrConnectionClosed) Error() string

type ErrIncorrectType

type ErrIncorrectType struct {
	Key          string
	ExpectedType reflect.Type
	ActualValue  interface{}
}

ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func NewErrIncorrectType

func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType

NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.

func (ErrIncorrectType) Error

func (e ErrIncorrectType) Error() string

type ErrMalformedMessage

type ErrMalformedMessage string

ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.

func (ErrMalformedMessage) Error

func (e ErrMalformedMessage) Error() string

type ErrMissingField

type ErrMissingField string

ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrMissingField) Error

func (e ErrMissingField) Error() string

type ErrNoMessages

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error

func (e ErrNoMessages) Error() string

type ErrNotFound

type ErrNotFound struct {
	EntityPath string
}

ErrNotFound is returned when an entity is not found (404)

func (ErrNotFound) Error

func (e ErrNotFound) Error() string
type FakeAMQPLinks struct {
	AMQPLinks

	Closed int

	// values to be returned for each `Get` call
	Revision uint64
	Receiver AMQPReceiver
	Sender   AMQPSender
	Mgmt     MgmtClient
	Err      error
	// contains filtered or unexported fields
}

func (*FakeAMQPLinks) Close

func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error

func (*FakeAMQPLinks) ClosedPermanently

func (l *FakeAMQPLinks) ClosedPermanently() bool

func (*FakeAMQPLinks) Get

type FakeAMQPReceiver

type FakeAMQPReceiver struct {
	AMQPReceiver
	Closed int
	Drain  int
}

func (*FakeAMQPReceiver) Close

func (r *FakeAMQPReceiver) Close(ctx context.Context) error

func (*FakeAMQPReceiver) DrainCredit

func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error

type FakeAMQPSender

type FakeAMQPSender struct {
	Closed int
	AMQPSender
}

func (*FakeAMQPSender) Close

func (s *FakeAMQPSender) Close(ctx context.Context) error

type FakeAMQPSession

type FakeAMQPSession struct {
	AMQPSessionCloser
	// contains filtered or unexported fields
}

func (*FakeAMQPSession) Close

func (s *FakeAMQPSession) Close(ctx context.Context) error

type FakeNS

type FakeNS struct {
	MgmtClient MgmtClient
	RPCLink    *rpc.Link
	Session    AMQPSessionCloser
	AMQPLinks  *FakeAMQPLinks
	// contains filtered or unexported fields
}

func (*FakeNS) GetEntityAudience

func (ns *FakeNS) GetEntityAudience(entityPath string) string

func (*FakeNS) NegotiateClaim

func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks

func (*FakeNS) NewAMQPSession

func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)

func (*FakeNS) NewMgmtClient

func (ns *FakeNS) NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error)
func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)

func (*FakeNS) Recover

func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) error

type FalseFilter

type FalseFilter struct{}

FalseFilter represents a always false sql expression which will deny all messages

func (FalseFilter) ToFilterDescription

func (ff FalseFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the FalseFilter into a FilterDescription

type FilterDescriber

type FilterDescriber interface {
	ToFilterDescription() FilterDescription
}

FilterDescriber can transform itself into a FilterDescription

type FilterDescription

type FilterDescription struct {
	XMLName xml.Name `xml:"Filter"`
	CorrelationFilter
	Type               string  `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
	SQLExpression      *string `xml:"SqlExpression,omitempty"`
	CompatibilityLevel int     `xml:"CompatibilityLevel,omitempty"`
}

FilterDescription describes a filter which can be applied to a subscription to filter messages from the topic.

Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules. Each rule consists of a condition that selects particular messages and an action that annotates the selected message. For each matching rule condition, the subscription produces a copy of the message, which may be differently annotated for each matching rule.

Each newly created topic subscription has an initial default subscription rule. If you don't explicitly specify a filter condition for the rule, the applied filter is the true filter that enables all messages to be selected into the subscription. The default rule has no associated annotation action.

type ListQueuesOption

type ListQueuesOption func(*ListQueuesOptions) error

ListQueuesOption represents named options for listing topics

func ListQueuesWithSkip

func ListQueuesWithSkip(skip int) ListQueuesOption

ListQueuesWithSkip will skip the specified number of entities

func ListQueuesWithTop

func ListQueuesWithTop(top int) ListQueuesOption

ListQueuesWithTop will return at most `top` results

type ListQueuesOptions

type ListQueuesOptions struct {
	// contains filtered or unexported fields
}

ListQueuesOptions provides options for List() to control things like page size. NOTE: Use the ListQueuesWith* methods to specify this.

type ListSubscriptionsOption

type ListSubscriptionsOption func(*ListSubscriptionsOptions) error

ListSubscriptionsOption represents named options for listing topics

func ListSubscriptionsWithSkip

func ListSubscriptionsWithSkip(skip int) ListSubscriptionsOption

ListSubscriptionsWithSkip will skip the specified number of entities

func ListSubscriptionsWithTop

func ListSubscriptionsWithTop(top int) ListSubscriptionsOption

ListSubscriptionsWithTop will return at most `top` results

type ListSubscriptionsOptions

type ListSubscriptionsOptions struct {
	// contains filtered or unexported fields
}

ListSubscriptionsOptions provides options for List() to control things like page size. NOTE: Use the ListSubscriptionsWith* methods to specify this.

type ListTopicsOption

type ListTopicsOption func(*ListTopicsOptions) error

ListTopicsOption represents named options for listing topics

func ListTopicsWithSkip

func ListTopicsWithSkip(skip int) ListTopicsOption

ListTopicsWithSkip will skip the specified number of entities

func ListTopicsWithTop

func ListTopicsWithTop(top int) ListTopicsOption

ListTopicsWithTop will return at most `top` results

type ListTopicsOptions

type ListTopicsOptions struct {
	// contains filtered or unexported fields
}

ListTopicsOptions provides options for List() to control things like page size. NOTE: Use the ListTopicsWith* methods to specify this.

type MgmtClient

type MgmtClient interface {
	Close(ctx context.Context) error
	SendDisposition(ctx context.Context, lockToken *amqp.UUID, state Disposition) error
	ReceiveDeferred(ctx context.Context, mode ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error)
	PeekMessages(ctx context.Context, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error)
}

type MiddlewareFunc

type MiddlewareFunc func(next RestHandler) RestHandler

MiddlewareFunc allows a consumer of the entity manager to inject handlers within the request / response pipeline

The example below adds the atom xml content type to the request, calls the next middleware and returns the result.

addAtomXMLContentType MiddlewareFunc = func(next RestHandler) RestHandler {
		return func(ctx context.Context, req *http.Request) (res *http.Response, e error) {
			if req.Method != http.MethodGet && req.Method != http.MethodHead {
				req.Header.Add("content-Type", "application/atom+xml;type=entry;charset=utf-8")
			}
			return next(ctx, req)
		}
	}

func TraceReqAndResponseMiddleware

func TraceReqAndResponseMiddleware() MiddlewareFunc

TraceReqAndResponseMiddleware will print the dump of the management request and response.

This should only be used for debugging or educational purposes.

type Namespace

type Namespace struct {
	Name          string
	Suffix        string
	TokenProvider *tokenProvider
	Environment   azure.Environment
	// contains filtered or unexported fields
}

Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) Close

func (ns *Namespace) Close(ctx context.Context) error

Close closes the current cached client.

func (*Namespace) GetEntityAudience

func (ns *Namespace) GetEntityAudience(entityPath string) string

func (*Namespace) GetHTTPSHostURI

func (ns *Namespace) GetHTTPSHostURI() string

func (*Namespace) GetHostname

func (ns *Namespace) GetHostname() string

func (*Namespace) NegotiateClaim

func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)

negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.

func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks

NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.

func (*Namespace) NewAMQPSession

func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)

NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client.

func (*Namespace) NewMgmtClient

func (ns *Namespace) NewMgmtClient(ctx context.Context, l AMQPLinks) (MgmtClient, error)

NewMgmtClient creates a new management client with the internally cached *amqp.Client.

func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)

NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.

func (*Namespace) NewSubscriptionManager

func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)

NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace

func (*Namespace) NewTopicManager

func (ns *Namespace) NewTopicManager() *TopicManager

NewTopicManager creates a new TopicManager for a Service Bus Namespace

func (*Namespace) Recover

func (ns *Namespace) Recover(ctx context.Context, clientRevision uint64) error

Recover destroys the currently held client and recreates it. clientRevision being nil will recover without a revision check.

type NamespaceForAMQPLinks interface {
	NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
	NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
	NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error)
	GetEntityAudience(entityPath string) string
	Recover(ctx context.Context, clientRevision uint64) error
}

NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.

type NamespaceForMgmtClient

type NamespaceForMgmtClient interface {
	NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
}

NamespaceForAMQPLinks is the Namespace surface needed for the *MgmtClient.

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Service Bus namespace

func NamespaceWithAzureEnvironment

func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption

NamespaceWithAzureEnvironment sets the namespace's Environment, Suffix and ResourceURI parameters according to the Azure Environment defined in "github.com/Azure/go-autorest/autorest/azure" package. This allows to configure the library to be used in the different Azure clouds. environmentName is the name of the cloud as defined in autorest : https://github.com/Azure/go-autorest/blob/b076c1437d051bf4c328db428b70f4fe22ad38b0/autorest/azure/environments.go#L34-L39

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string

func NamespaceWithTLSConfig

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithUserAgent

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket

func NamespaceWithWebSocket() NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

func NamespacesWithTokenCredential

func NamespacesWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption

NamespacesWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)

type NamespaceWithNewAMQPLinks interface {
	NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
}

NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.

type NonRetriable

type NonRetriable interface {
	error
	NonRetriable()
}

type QueueDescription

type QueueDescription struct {
	XMLName xml.Name `xml:"QueueDescription"`
	BaseEntityDescription
	LockDuration                        *string       `xml:"LockDuration,omitempty"`               // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
	MaxSizeInMegabytes                  *int32        `xml:"MaxSizeInMegabytes,omitempty"`         // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
	RequiresDuplicateDetection          *bool         `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
	RequiresSession                     *bool         `xml:"RequiresSession,omitempty"`
	DefaultMessageTimeToLive            *string       `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	DeadLetteringOnMessageExpiration    *bool         `xml:"DeadLetteringOnMessageExpiration,omitempty"`    // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
	DuplicateDetectionHistoryTimeWindow *string       `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
	MaxDeliveryCount                    *int32        `xml:"MaxDeliveryCount,omitempty"`                    // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
	EnableBatchedOperations             *bool         `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	SizeInBytes                         *int64        `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
	MessageCount                        *int64        `xml:"MessageCount,omitempty"`                        // MessageCount - The number of messages in the queue.
	IsAnonymousAccessible               *bool         `xml:"IsAnonymousAccessible,omitempty"`
	Status                              *EntityStatus `xml:"Status,omitempty"`
	CreatedAt                           *date.Time    `xml:"CreatedAt,omitempty"`
	UpdatedAt                           *date.Time    `xml:"UpdatedAt,omitempty"`
	SupportOrdering                     *bool         `xml:"SupportOrdering,omitempty"`
	AutoDeleteOnIdle                    *string       `xml:"AutoDeleteOnIdle,omitempty"`
	EnablePartitioning                  *bool         `xml:"EnablePartitioning,omitempty"`
	EnableExpress                       *bool         `xml:"EnableExpress,omitempty"`
	CountDetails                        *CountDetails `xml:"CountDetails,omitempty"`
	ForwardTo                           *string       `xml:"ForwardTo,omitempty"`
	ForwardDeadLetteredMessagesTo       *string       `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
}

QueueDescription is the content type for Queue management requests

type QueueEntity

type QueueEntity struct {
	*QueueDescription
	*Entity
}

QueueEntity is the Azure Service Bus description of a Queue for management activities

type QueueManagementOption

type QueueManagementOption func(*QueueDescription) error

QueueManagementOption represents named configuration options for queue mutation

func QueueEntityWithAutoDeleteOnIdle

func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption

QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func QueueEntityWithAutoForward

func QueueEntityWithAutoForward(target Targetable) QueueManagementOption

QueueEntityWithAutoForward configures the queue to automatically forward messages to the specified target.

The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func QueueEntityWithDeadLetteringOnMessageExpiration

func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption

QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue

func QueueEntityWithDuplicateDetection

func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption

QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func QueueEntityWithForwardDeadLetteredMessagesTo

func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption

QueueEntityWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target.

The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func QueueEntityWithLockDuration

func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption

QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func QueueEntityWithMaxDeliveryCount

func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption

QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before dead-lettering the message

func QueueEntityWithMaxSizeInMegabytes

func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption

QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the queue. Default is 1 MB (1 * 1024).

size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku

func QueueEntityWithMessageTimeToLive

func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption

QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func QueueEntityWithPartitioning

func QueueEntityWithPartitioning() QueueManagementOption

QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure FIFO message retrieval:

SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of session states.

PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional messages. The partition key ensures that all messages that are sent within a transaction are handled by the same messaging broker.

MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and eliminate duplicate messages

func QueueEntityWithRequiredSessions

func QueueEntityWithRequiredSessions() QueueManagementOption

QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager provides CRUD functionality for Service Bus Queues

func NewQueueManager

func NewQueueManager(httpsHostURI string, tokenProvider auth.TokenProvider) *QueueManager

NewQueueManager creates a new QueueManager for a Service Bus Namespace

func (*QueueManager) Delete

func (qm *QueueManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Queue entity by name

func (QueueManager) Execute

func (em QueueManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*QueueManager) Get

func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)

Get fetches a Service Bus Queue entity by name

func (*QueueManager) List

func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error)

List fetches all of the queues for a Service Bus Namespace

func (QueueManager) Post

func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*QueueManager) Put

func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)

Put creates or updates a Service Bus Queue

func (QueueManager) TokenProvider

func (em QueueManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (QueueManager) Use

func (em QueueManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type RPCLink interface {
	Close(ctx context.Context) error
	RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*rpc.Response, error)
}

RPCLink is implemented by *rpc.Link

type ReceiveMode

type ReceiveMode int

ReceiveMode represents the lock style to use for a reciever - either `PeekLock` or `ReceiveAndDelete`

const (
	// PeekLock will lock messages as they are received and can be settled
	// using the Receiver or Processor's (Complete|Abandon|DeadLetter|Defer)Message
	// functions.
	PeekLock ReceiveMode = 0
	// ReceiveAndDelete will delete messages as they are received.
	ReceiveAndDelete ReceiveMode = 1
)

type RestHandler

type RestHandler func(ctx context.Context, req *http.Request) (*http.Response, error)

RestHandler is used to transform a request and response within the http pipeline

type Retrier

type Retrier interface {
	// Copies the retrier. Retriers are stateful and must be copied
	// before starting a set of retries.
	Copy() Retrier

	// Exhausted is true if the retries were exhausted.
	Exhausted() bool

	// CurrentTry is the current try (0 for the first run before retries)
	CurrentTry() int

	// Try marks an attempt to call (first call to Try() does not sleep).
	// Will return false if the `ctx` is cancelled or if we exhaust our retries.
	//
	//    rp := RetryPolicy{Backoff:defaultBackoffPolicy, MaxRetries:5}
	//
	//    for rp.Try(ctx) {
	//       <your code>
	//    }
	//
	//    if rp.Cancelled() || rp.Exhausted() {
	//       // no more retries needed
	//    }
	//
	Try(ctx context.Context) bool
}

A retrier that allows you to do a basic for loop and get backoff and retry limits. See `Try` for more details on how to use it.

func NewBackoffRetrier

func NewBackoffRetrier(params BackoffRetrierParams) Retrier

NewBackoffRetrier creates a retrier that allows for configurable min/max times, jitter and maximum retries.

type RuleDescription

type RuleDescription struct {
	XMLName xml.Name `xml:"RuleDescription"`
	BaseEntityDescription
	CreatedAt *date.Time         `xml:"CreatedAt,omitempty"`
	Filter    FilterDescription  `xml:"Filter"`
	Action    *ActionDescription `xml:"Action,omitempty"`
}

RuleDescription is the content type for Subscription Rule management requests

type RuleEntity

type RuleEntity struct {
	*RuleDescription
	*Entity
}

RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities

type SQLAction

type SQLAction struct {
	Expression string
}

SQLAction represents a SQL language-based action expression that is evaluated against a BrokeredMessage. A SQLAction supports a subset of the SQL-92 standard.

With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.

see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter

func (SQLAction) ToActionDescription

func (sf SQLAction) ToActionDescription() ActionDescription

ToActionDescription will transform the SqlAction into a ActionDescription

type SQLFilter

type SQLFilter struct {
	Expression string
}

SQLFilter represents a SQL language-based filter expression that is evaluated against a BrokeredMessage. A SQLFilter supports a subset of the SQL-92 standard.

see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter

func (SQLFilter) ToFilterDescription

func (sf SQLFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the SqlFilter into a FilterDescription

type SubscriptionDescription

type SubscriptionDescription struct {
	XMLName xml.Name `xml:"SubscriptionDescription"`
	BaseEntityDescription
	LockDuration                              *string                 `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
	RequiresSession                           *bool                   `xml:"RequiresSession,omitempty"`
	DefaultMessageTimeToLive                  *string                 `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	DefaultRuleDescription                    *DefaultRuleDescription `xml:"DefaultRuleDescription,omitempty"`
	DeadLetteringOnMessageExpiration          *bool                   `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
	DeadLetteringOnFilterEvaluationExceptions *bool                   `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"`
	MessageCount                              *int64                  `xml:"MessageCount,omitempty"`            // MessageCount - The number of messages in the queue.
	MaxDeliveryCount                          *int32                  `xml:"MaxDeliveryCount,omitempty"`        // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
	EnableBatchedOperations                   *bool                   `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	Status                                    *EntityStatus           `xml:"Status,omitempty"`
	CreatedAt                                 *date.Time              `xml:"CreatedAt,omitempty"`
	UpdatedAt                                 *date.Time              `xml:"UpdatedAt,omitempty"`
	AccessedAt                                *date.Time              `xml:"AccessedAt,omitempty"`
	AutoDeleteOnIdle                          *string                 `xml:"AutoDeleteOnIdle,omitempty"`
	ForwardTo                                 *string                 `xml:"ForwardTo,omitempty"`                     // ForwardTo - absolute URI of the entity to forward messages
	ForwardDeadLetteredMessagesTo             *string                 `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
	CountDetails                              *CountDetails           `xml:"CountDetails,omitempty"`
}

SubscriptionDescription is the content type for Subscription management requests

type SubscriptionEntity

type SubscriptionEntity struct {
	*SubscriptionDescription
	*Entity
}

SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities

type SubscriptionManagementOption

type SubscriptionManagementOption func(*SubscriptionDescription) error

SubscriptionManagementOption represents named options for assisting Subscription creation

func SubscriptionWithAutoDeleteOnIdle

func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption

SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func SubscriptionWithAutoForward

func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption

SubscriptionWithAutoForward configures the queue to automatically forward messages to the specified entity path

The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func SubscriptionWithBatchedOperations

func SubscriptionWithBatchedOperations() SubscriptionManagementOption

SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.

func SubscriptionWithDeadLetteringOnMessageExpiration

func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption

SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead letter queue

func SubscriptionWithDefaultRuleDescription

func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption

SubscriptionWithDefaultRuleDescription configures the subscription to set a default rule

func SubscriptionWithForwardDeadLetteredMessagesTo

func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption

SubscriptionWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target entity.

The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.

func SubscriptionWithLockDuration

func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption

SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func SubscriptionWithMessageTimeToLive

func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption

SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func SubscriptionWithRequiredSessions

func SubscriptionWithRequiredSessions() SubscriptionManagementOption

SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager provides CRUD functionality for Service Bus Subscription

func (*SubscriptionManager) Delete

func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (*SubscriptionManager) DeleteRule

func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error

DeleteRule will delete a rule on the subscription

func (SubscriptionManager) Execute

func (em SubscriptionManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*SubscriptionManager) Get

Get fetches a Service Bus Topic entity by name

func (*SubscriptionManager) List

List fetches all of the Topics for a Service Bus Namespace

func (*SubscriptionManager) ListRules

func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)

ListRules returns the slice of subscription filter rules

By default when the subscription is created, there exists a single "true" filter which matches all messages.

func (SubscriptionManager) Post

func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*SubscriptionManager) Put

Put creates or updates a Service Bus Topic

func (*SubscriptionManager) PutRule

func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)

PutRule creates a new Subscription rule to filter messages from the topic

func (*SubscriptionManager) PutRuleWithAction

func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, action ActionDescriber) (*RuleEntity, error)

PutRuleWithAction creates a new Subscription rule to filter messages from the topic and then perform an action

func (SubscriptionManager) TokenProvider

func (em SubscriptionManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (SubscriptionManager) Use

func (em SubscriptionManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type Targetable

type Targetable interface {
	TargetURI() string
}

Targetable provides the ability to forward messages to the entity

type TopicDescription

type TopicDescription struct {
	XMLName xml.Name `xml:"TopicDescription"`
	BaseEntityDescription
	DefaultMessageTimeToLive            *string       `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	MaxSizeInMegabytes                  *int32        `xml:"MaxSizeInMegabytes,omitempty"`                  // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
	RequiresDuplicateDetection          *bool         `xml:"RequiresDuplicateDetection,omitempty"`          // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
	DuplicateDetectionHistoryTimeWindow *string       `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
	EnableBatchedOperations             *bool         `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	SizeInBytes                         *int64        `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
	FilteringMessagesBeforePublishing   *bool         `xml:"FilteringMessagesBeforePublishing,omitempty"`
	IsAnonymousAccessible               *bool         `xml:"IsAnonymousAccessible,omitempty"`
	Status                              *EntityStatus `xml:"Status,omitempty"`
	CreatedAt                           *date.Time    `xml:"CreatedAt,omitempty"`
	UpdatedAt                           *date.Time    `xml:"UpdatedAt,omitempty"`
	SupportOrdering                     *bool         `xml:"SupportOrdering,omitempty"`
	AutoDeleteOnIdle                    *string       `xml:"AutoDeleteOnIdle,omitempty"`
	EnablePartitioning                  *bool         `xml:"EnablePartitioning,omitempty"`
	EnableSubscriptionPartitioning      *bool         `xml:"EnableSubscriptionPartitioning,omitempty"`
	EnableExpress                       *bool         `xml:"EnableExpress,omitempty"`
	CountDetails                        *CountDetails `xml:"CountDetails,omitempty"`
}

TopicDescription is the content type for Topic management requests

type TopicEntity

type TopicEntity struct {
	*TopicDescription
	*Entity
}

TopicEntity is the Azure Service Bus description of a Topic for management activities

type TopicManagementOption

type TopicManagementOption func(*TopicDescription) error

TopicManagementOption represents named options for assisting Topic creation

func TopicWithAutoDeleteOnIdle

func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption

TopicWithAutoDeleteOnIdle configures the topic to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func TopicWithBatchedOperations

func TopicWithBatchedOperations() TopicManagementOption

TopicWithBatchedOperations configures the topic to batch server-side operations.

func TopicWithDuplicateDetection

func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption

TopicWithDuplicateDetection configures the topic to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func TopicWithExpress

func TopicWithExpress() TopicManagementOption

TopicWithExpress configures the topic to hold a message in memory temporarily before writing it to persistent storage.

func TopicWithMaxSizeInMegabytes

func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption

TopicWithMaxSizeInMegabytes configures the maximum size of the topic in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the topic. Default is 1 MB (1 * 1024).

size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku

func TopicWithMessageTimeToLive

func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption

TopicWithMessageTimeToLive configures the topic to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func TopicWithOrdering

func TopicWithOrdering() TopicManagementOption

TopicWithOrdering configures the topic to support ordering of messages.

func TopicWithPartitioning

func TopicWithPartitioning() TopicManagementOption

TopicWithPartitioning configures the topic to be partitioned across multiple message brokers.

type TopicManager

type TopicManager struct {
	// contains filtered or unexported fields
}

TopicManager provides CRUD functionality for Service Bus Topics

func (*TopicManager) Delete

func (tm *TopicManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (TopicManager) Execute

func (em TopicManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, mw ...MiddlewareFunc) (*http.Response, error)

func (*TopicManager) Get

func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)

Get fetches a Service Bus Topic entity by name

func (*TopicManager) List

func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error)

List fetches all of the Topics for a Service Bus Namespace

func (TopicManager) Post

func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*TopicManager) Put

func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)

Put creates or updates a Service Bus Topic

func (TopicManager) TokenProvider

func (em TopicManager) TokenProvider() auth.TokenProvider

TokenProvider generates authorization tokens for communicating with the Service Bus management API

func (TopicManager) Use

func (em TopicManager) Use(mw ...MiddlewareFunc)

Use adds middleware to the middleware mwStack

type TrueFilter

type TrueFilter struct{}

TrueFilter represents a always true sql expression which will accept all messages

func (TrueFilter) ToFilterDescription

func (tf TrueFilter) ToFilterDescription() FilterDescription

ToFilterDescription will transform the TrueFilter into a FilterDescription

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL