Documentation ¶
Index ¶
- Constants
- func ApplyComponentInfo(span tab.Spanner)
- func ConstructAtomPath(basePath string, skip int, top int) string
- func IsCancelError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsNonRetriable(err error) bool
- func ShouldRecover(ctx context.Context, err error) bool
- type AMQPLinks
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type AMQPSession
- type AMQPSessionCloser
- type ActionDescriber
- type ActionDescription
- type BackoffRetrierParams
- type BaseEntityDescription
- type Closeable
- type CorrelationFilter
- type CountDetails
- type CreateLinkFunc
- type DefaultRuleDescription
- type Disposition
- type DispositionStatus
- type Entity
- type EntityStatus
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type FakeAMQPLinks
- type FakeAMQPReceiver
- type FakeAMQPSender
- type FakeAMQPSession
- type FakeNS
- func (ns *FakeNS) GetEntityAudience(entityPath string) string
- func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
- func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
- func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
- func (ns *FakeNS) NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error)
- func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
- func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) error
- type FalseFilter
- type FilterDescriber
- type FilterDescription
- type ListQueuesOption
- type ListQueuesOptions
- type ListSubscriptionsOption
- type ListSubscriptionsOptions
- type ListTopicsOption
- type ListTopicsOptions
- type MgmtClient
- type MiddlewareFunc
- type Namespace
- func (ns *Namespace) Close(ctx context.Context) error
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetHostname() string
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
- func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
- func (ns *Namespace) NewMgmtClient(ctx context.Context, l AMQPLinks) (MgmtClient, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
- func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)
- func (ns *Namespace) NewTopicManager() *TopicManager
- func (ns *Namespace) Recover(ctx context.Context, clientRevision uint64) error
- type NamespaceForAMQPLinks
- type NamespaceForMgmtClient
- type NamespaceOption
- func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket() NamespaceOption
- func NamespacesWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NonRetriable
- type QueueDescription
- type QueueEntity
- type QueueManagementOption
- func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption
- func QueueEntityWithAutoForward(target Targetable) QueueManagementOption
- func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption
- func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption
- func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption
- func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption
- func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption
- func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption
- func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption
- func QueueEntityWithPartitioning() QueueManagementOption
- func QueueEntityWithRequiredSessions() QueueManagementOption
- type QueueManager
- func (qm *QueueManager) Delete(ctx context.Context, name string) error
- func (em QueueManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, ...) (*http.Response, error)
- func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)
- func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error)
- func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
- func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)
- func (em QueueManager) TokenProvider() auth.TokenProvider
- func (em QueueManager) Use(mw ...MiddlewareFunc)
- type RPCLink
- type ReceiveMode
- type RestHandler
- type Retrier
- type RuleDescription
- type RuleEntity
- type SQLAction
- type SQLFilter
- type SubscriptionDescription
- type SubscriptionEntity
- type SubscriptionManagementOption
- func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption
- func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption
- func SubscriptionWithBatchedOperations() SubscriptionManagementOption
- func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption
- func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption
- func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption
- func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption
- func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption
- func SubscriptionWithRequiredSessions() SubscriptionManagementOption
- type SubscriptionManager
- func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error
- func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error
- func (em SubscriptionManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, ...) (*http.Response, error)
- func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error)
- func (sm *SubscriptionManager) List(ctx context.Context, options ...ListSubscriptionsOption) ([]*SubscriptionEntity, error)
- func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)
- func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
- func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionManagementOption) (*SubscriptionEntity, error)
- func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)
- func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, ...) (*RuleEntity, error)
- func (em SubscriptionManager) TokenProvider() auth.TokenProvider
- func (em SubscriptionManager) Use(mw ...MiddlewareFunc)
- type Targetable
- type TopicDescription
- type TopicEntity
- type TopicManagementOption
- func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption
- func TopicWithBatchedOperations() TopicManagementOption
- func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption
- func TopicWithExpress() TopicManagementOption
- func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption
- func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption
- func TopicWithOrdering() TopicManagementOption
- func TopicWithPartitioning() TopicManagementOption
- type TopicManager
- func (tm *TopicManager) Delete(ctx context.Context, name string) error
- func (em TopicManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, ...) (*http.Response, error)
- func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)
- func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error)
- func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
- func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)
- func (em TopicManager) TokenProvider() auth.TokenProvider
- func (em TopicManager) Use(mw ...MiddlewareFunc)
- type TrueFilter
Constants ¶
const ( SpanRecover = "sb.recover" SpanRecoverLink = "sb.recover.link" SpanRecoverClient = "sb.recover.client" )
link/connection recovery spans
const ( SpanProcessorLoop = "sb.processor.main" SpanProcessorMessage = "sb.processor.message" SpanProcessorClose = "sb.processor.close" )
processor
const (
SpanCompleteMessage = "sb.receiver.complete"
)
settlement
const (
SpanNegotiateClaim = "sb.auth.negotiateClaim"
)
authentication
const (
SpanSendMessageFmt string = "sb.SendMessage.%s"
)
sender spans
const Version = "v0.1.0"
TODO: this should move into a proper file. Need to resolve some interdependency issues between the public and internal packages first. Version is the semantic version number
Variables ¶
This section is empty.
Functions ¶
func ApplyComponentInfo ¶
func ConstructAtomPath ¶
ConstructAtomPath adds the proper parameters for skip and top This is common for the list operations for queues, topics and subscriptions.
func IsCancelError ¶
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsNonRetriable ¶
IsNonRetriable indicates an error is fatal. Typically, this means the connection or link has been closed.
Types ¶
type AMQPLinks ¶
type AMQPLinks interface { EntityPath() string ManagementPath() string Audience() string // Get will initialize a session and call its link.linkCreator function. // If this link has been closed via Close() it will return an non retriable error. Get(ctx context.Context) (AMQPSender, AMQPReceiver, MgmtClient, uint64, error) // RecoverIfNeeded will check if an error requires recovery, and will recover // the link or, possibly, the connection. RecoverIfNeeded(ctx context.Context, linksRevision uint64, err error) error // Close will close the the link. // If permanent is true the link will not be auto-recreated if Get/Recover // are called. All functions will return `ErrLinksClosed` Close(ctx context.Context, permanent bool) error // ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called. ClosedPermanently() bool }
type AMQPReceiver ¶
type AMQPReceiver interface { IssueCredit(credit uint32) error DrainCredit(ctx context.Context) error Receive(ctx context.Context) (*amqp.Message, error) // settlement functions AcceptMessage(ctx context.Context, msg *amqp.Message) error RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error ReleaseMessage(ctx context.Context, msg *amqp.Message) error ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser interface { AMQPReceiver Close(ctx context.Context) error }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPSender ¶
type AMQPSender interface { Send(ctx context.Context, msg *amqp.Message) error MaxMessageSize() uint64 }
AMQPSender is implemented by *amqp.Sender
type AMQPSenderCloser ¶
type AMQPSenderCloser interface { AMQPSender Close(ctx context.Context) error }
AMQPSenderCloser is implemented by *amqp.Sender
type AMQPSession ¶
type AMQPSession interface { NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error) NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error) }
AMQPSession is implemented by *amqp.Session
type AMQPSessionCloser ¶
type AMQPSessionCloser interface { AMQPSession Close(ctx context.Context) error }
AMQPSessionCloser is implemented by *amqp.Session
type ActionDescriber ¶
type ActionDescriber interface {
ToActionDescription() ActionDescription
}
ActionDescriber can transform itself into a ActionDescription
type ActionDescription ¶
type ActionDescription struct { Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` SQLExpression string `xml:"SqlExpression"` RequiresPreprocessing bool `xml:"RequiresPreprocessing"` CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"` }
ActionDescription describes an action upon a message that matches a filter
With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.
type BackoffRetrierParams ¶
type BackoffRetrierParams struct { // MaxRetries is the maximum number of tries (after the first attempt) // that are allowed. MaxRetries int // Factor is the multiplying factor for each increment step Factor float64 // Jitter eases contention by randomizing backoff steps Jitter bool // Min and Max are the minimum and maximum values of the counter Min, Max time.Duration }
BackoffRetrierParams are parameters for NewBackoffRetrier.
type BaseEntityDescription ¶
type BaseEntityDescription struct { InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"` ServiceBusSchema *string `xml:"xmlns,attr,omitempty"` }
BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type CorrelationFilter ¶
type CorrelationFilter struct { CorrelationID *string `xml:"CorrelationId,omitempty"` MessageID *string `xml:"MessageId,omitempty"` To *string `xml:"To,omitempty"` ReplyTo *string `xml:"ReplyTo,omitempty"` Label *string `xml:"Label,omitempty"` SessionID *string `xml:"SessionId,omitempty"` ReplyToSessionID *string `xml:"ReplyToSessionId,omitempty"` ContentType *string `xml:"ContentType,omitempty"` Properties map[string]interface{} `xml:"Properties,omitempty"` }
CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties. A common use is to match against the CorrelationId property, but the application can also choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any user-defined properties. A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, all conditions must match.
func (CorrelationFilter) ToFilterDescription ¶
func (cf CorrelationFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the CorrelationFilter into a FilterDescription
type CountDetails ¶
type CountDetails struct { XMLName xml.Name `xml:"CountDetails"` ActiveMessageCount *int32 `xml:"ActiveMessageCount,omitempty"` DeadLetterMessageCount *int32 `xml:"DeadLetterMessageCount,omitempty"` ScheduledMessageCount *int32 `xml:"ScheduledMessageCount,omitempty"` TransferDeadLetterMessageCount *int32 `xml:"TransferDeadLetterMessageCount,omitempty"` TransferMessageCount *int32 `xml:"TransferMessageCount,omitempty"` }
CountDetails has current active (and other) messages for queue/topic.
type CreateLinkFunc ¶
type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
type DefaultRuleDescription ¶
type DefaultRuleDescription struct { XMLName xml.Name `xml:"DefaultRuleDescription"` Filter FilterDescription `xml:"Filter"` Name *string `xml:"Name,omitempty"` }
DefaultRuleDescription is the content type for Subscription Rule management requests
type Disposition ¶
type Disposition struct { Status DispositionStatus LockTokens []*uuid.UUID DeadLetterReason *string DeadLetterDescription *string }
type DispositionStatus ¶
type DispositionStatus string
const ( CompletedDisposition DispositionStatus = "completed" AbandonedDisposition DispositionStatus = "abandoned" SuspendedDisposition DispositionStatus = "suspended" DeferredDisposition DispositionStatus = "defered" )
type EntityStatus ¶
type EntityStatus string
EntityStatus enumerates the values for entity status.
const ( // Active ... Active EntityStatus = "Active" // Creating ... Creating EntityStatus = "Creating" // Deleting ... Deleting EntityStatus = "Deleting" // Disabled ... Disabled EntityStatus = "Disabled" // ReceiveDisabled ... ReceiveDisabled EntityStatus = "ReceiveDisabled" // Renaming ... Renaming EntityStatus = "Renaming" // Restoring ... Restoring EntityStatus = "Restoring" // SendDisabled ... SendDisabled EntityStatus = "SendDisabled" // Unknown ... Unknown EntityStatus = "Unknown" )
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type FakeAMQPLinks ¶
type FakeAMQPLinks struct { AMQPLinks Closed int // values to be returned for each `Get` call Revision uint64 Receiver AMQPReceiver Sender AMQPSender Mgmt MgmtClient Err error // contains filtered or unexported fields }
func (*FakeAMQPLinks) Close ¶
func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
func (*FakeAMQPLinks) ClosedPermanently ¶
func (l *FakeAMQPLinks) ClosedPermanently() bool
func (*FakeAMQPLinks) Get ¶
func (l *FakeAMQPLinks) Get(ctx context.Context) (AMQPSender, AMQPReceiver, MgmtClient, uint64, error)
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct { AMQPReceiver Closed int Drain int }
func (*FakeAMQPReceiver) DrainCredit ¶
func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error
type FakeAMQPSender ¶
type FakeAMQPSender struct { Closed int AMQPSender }
type FakeAMQPSession ¶
type FakeAMQPSession struct { AMQPSessionCloser // contains filtered or unexported fields }
type FakeNS ¶
type FakeNS struct { MgmtClient MgmtClient RPCLink *rpc.Link Session AMQPSessionCloser AMQPLinks *FakeAMQPLinks // contains filtered or unexported fields }
func (*FakeNS) GetEntityAudience ¶
func (*FakeNS) NegotiateClaim ¶
func (*FakeNS) NewAMQPLinks ¶
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
func (*FakeNS) NewAMQPSession ¶
func (*FakeNS) NewMgmtClient ¶
func (*FakeNS) NewRPCLink ¶
type FalseFilter ¶
type FalseFilter struct{}
FalseFilter represents a always false sql expression which will deny all messages
func (FalseFilter) ToFilterDescription ¶
func (ff FalseFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the FalseFilter into a FilterDescription
type FilterDescriber ¶
type FilterDescriber interface {
ToFilterDescription() FilterDescription
}
FilterDescriber can transform itself into a FilterDescription
type FilterDescription ¶
type FilterDescription struct { XMLName xml.Name `xml:"Filter"` CorrelationFilter Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` SQLExpression *string `xml:"SqlExpression,omitempty"` CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"` }
FilterDescription describes a filter which can be applied to a subscription to filter messages from the topic.
Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules. Each rule consists of a condition that selects particular messages and an action that annotates the selected message. For each matching rule condition, the subscription produces a copy of the message, which may be differently annotated for each matching rule.
Each newly created topic subscription has an initial default subscription rule. If you don't explicitly specify a filter condition for the rule, the applied filter is the true filter that enables all messages to be selected into the subscription. The default rule has no associated annotation action.
type ListQueuesOption ¶
type ListQueuesOption func(*ListQueuesOptions) error
ListQueuesOption represents named options for listing topics
func ListQueuesWithSkip ¶
func ListQueuesWithSkip(skip int) ListQueuesOption
ListQueuesWithSkip will skip the specified number of entities
func ListQueuesWithTop ¶
func ListQueuesWithTop(top int) ListQueuesOption
ListQueuesWithTop will return at most `top` results
type ListQueuesOptions ¶
type ListQueuesOptions struct {
// contains filtered or unexported fields
}
ListQueuesOptions provides options for List() to control things like page size. NOTE: Use the ListQueuesWith* methods to specify this.
type ListSubscriptionsOption ¶
type ListSubscriptionsOption func(*ListSubscriptionsOptions) error
ListSubscriptionsOption represents named options for listing topics
func ListSubscriptionsWithSkip ¶
func ListSubscriptionsWithSkip(skip int) ListSubscriptionsOption
ListSubscriptionsWithSkip will skip the specified number of entities
func ListSubscriptionsWithTop ¶
func ListSubscriptionsWithTop(top int) ListSubscriptionsOption
ListSubscriptionsWithTop will return at most `top` results
type ListSubscriptionsOptions ¶
type ListSubscriptionsOptions struct {
// contains filtered or unexported fields
}
ListSubscriptionsOptions provides options for List() to control things like page size. NOTE: Use the ListSubscriptionsWith* methods to specify this.
type ListTopicsOption ¶
type ListTopicsOption func(*ListTopicsOptions) error
ListTopicsOption represents named options for listing topics
func ListTopicsWithSkip ¶
func ListTopicsWithSkip(skip int) ListTopicsOption
ListTopicsWithSkip will skip the specified number of entities
func ListTopicsWithTop ¶
func ListTopicsWithTop(top int) ListTopicsOption
ListTopicsWithTop will return at most `top` results
type ListTopicsOptions ¶
type ListTopicsOptions struct {
// contains filtered or unexported fields
}
ListTopicsOptions provides options for List() to control things like page size. NOTE: Use the ListTopicsWith* methods to specify this.
type MgmtClient ¶
type MgmtClient interface { Close(ctx context.Context) error SendDisposition(ctx context.Context, lockToken *amqp.UUID, state Disposition) error ReceiveDeferred(ctx context.Context, mode ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) PeekMessages(ctx context.Context, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) }
type MiddlewareFunc ¶
type MiddlewareFunc func(next RestHandler) RestHandler
MiddlewareFunc allows a consumer of the entity manager to inject handlers within the request / response pipeline
The example below adds the atom xml content type to the request, calls the next middleware and returns the result.
addAtomXMLContentType MiddlewareFunc = func(next RestHandler) RestHandler { return func(ctx context.Context, req *http.Request) (res *http.Response, e error) { if req.Method != http.MethodGet && req.Method != http.MethodHead { req.Header.Add("content-Type", "application/atom+xml;type=entry;charset=utf-8") } return next(ctx, req) } }
func TraceReqAndResponseMiddleware ¶
func TraceReqAndResponseMiddleware() MiddlewareFunc
TraceReqAndResponseMiddleware will print the dump of the management request and response.
This should only be used for debugging or educational purposes.
type Namespace ¶
type Namespace struct { Name string Suffix string TokenProvider *tokenProvider Environment azure.Environment // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetHostname ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPLinks ¶
func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client.
func (*Namespace) NewMgmtClient ¶
NewMgmtClient creates a new management client with the internally cached *amqp.Client.
func (*Namespace) NewRPCLink ¶
NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.
func (*Namespace) NewSubscriptionManager ¶
func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)
NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace
func (*Namespace) NewTopicManager ¶
func (ns *Namespace) NewTopicManager() *TopicManager
NewTopicManager creates a new TopicManager for a Service Bus Namespace
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error) GetEntityAudience(entityPath string) string Recover(ctx context.Context, clientRevision uint64) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForMgmtClient ¶
type NamespaceForMgmtClient interface {
NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
}
NamespaceForAMQPLinks is the Namespace surface needed for the *MgmtClient.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Service Bus namespace
func NamespaceWithAzureEnvironment ¶
func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption
NamespaceWithAzureEnvironment sets the namespace's Environment, Suffix and ResourceURI parameters according to the Azure Environment defined in "github.com/Azure/go-autorest/autorest/azure" package. This allows to configure the library to be used in the different Azure clouds. environmentName is the name of the cloud as defined in autorest : https://github.com/Azure/go-autorest/blob/b076c1437d051bf4c328db428b70f4fe22ad38b0/autorest/azure/environments.go#L34-L39
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket() NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
func NamespacesWithTokenCredential ¶
func NamespacesWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespacesWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
}
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NonRetriable ¶
type NonRetriable interface { error NonRetriable() }
type QueueDescription ¶
type QueueDescription struct { XMLName xml.Name `xml:"QueueDescription"` BaseEntityDescription LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. RequiresSession *bool `xml:"RequiresSession,omitempty"` DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires. DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10. EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue. IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` Status *EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` SupportOrdering *bool `xml:"SupportOrdering,omitempty"` AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` EnableExpress *bool `xml:"EnableExpress,omitempty"` CountDetails *CountDetails `xml:"CountDetails,omitempty"` ForwardTo *string `xml:"ForwardTo,omitempty"` ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages }
QueueDescription is the content type for Queue management requests
type QueueEntity ¶
type QueueEntity struct { *QueueDescription *Entity }
QueueEntity is the Azure Service Bus description of a Queue for management activities
type QueueManagementOption ¶
type QueueManagementOption func(*QueueDescription) error
QueueManagementOption represents named configuration options for queue mutation
func QueueEntityWithAutoDeleteOnIdle ¶
func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption
QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The minimum duration is 5 minutes.
func QueueEntityWithAutoForward ¶
func QueueEntityWithAutoForward(target Targetable) QueueManagementOption
QueueEntityWithAutoForward configures the queue to automatically forward messages to the specified target.
The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func QueueEntityWithDeadLetteringOnMessageExpiration ¶
func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption
QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue
func QueueEntityWithDuplicateDetection ¶
func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption
QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.
func QueueEntityWithForwardDeadLetteredMessagesTo ¶
func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption
QueueEntityWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target.
The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func QueueEntityWithLockDuration ¶
func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption
QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
func QueueEntityWithMaxDeliveryCount ¶
func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption
QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before dead-lettering the message
func QueueEntityWithMaxSizeInMegabytes ¶
func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption
QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the queue. Default is 1 MB (1 * 1024).
size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku
func QueueEntityWithMessageTimeToLive ¶
func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption
QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func QueueEntityWithPartitioning ¶
func QueueEntityWithPartitioning() QueueManagementOption
QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure FIFO message retrieval:
SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of session states.
PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional messages. The partition key ensures that all messages that are sent within a transaction are handled by the same messaging broker.
MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and eliminate duplicate messages
func QueueEntityWithRequiredSessions ¶
func QueueEntityWithRequiredSessions() QueueManagementOption
QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager provides CRUD functionality for Service Bus Queues
func NewQueueManager ¶
func NewQueueManager(httpsHostURI string, tokenProvider auth.TokenProvider) *QueueManager
NewQueueManager creates a new QueueManager for a Service Bus Namespace
func (*QueueManager) Delete ¶
func (qm *QueueManager) Delete(ctx context.Context, name string) error
Delete deletes a Service Bus Queue entity by name
func (*QueueManager) Get ¶
func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)
Get fetches a Service Bus Queue entity by name
func (*QueueManager) List ¶
func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error)
List fetches all of the queues for a Service Bus Namespace
func (QueueManager) Post ¶
func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*QueueManager) Put ¶
func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)
Put creates or updates a Service Bus Queue
func (QueueManager) TokenProvider ¶
func (em QueueManager) TokenProvider() auth.TokenProvider
TokenProvider generates authorization tokens for communicating with the Service Bus management API
func (QueueManager) Use ¶
func (em QueueManager) Use(mw ...MiddlewareFunc)
Use adds middleware to the middleware mwStack
type RPCLink ¶
type RPCLink interface { Close(ctx context.Context) error RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*rpc.Response, error) }
RPCLink is implemented by *rpc.Link
type ReceiveMode ¶
type ReceiveMode int
ReceiveMode represents the lock style to use for a reciever - either `PeekLock` or `ReceiveAndDelete`
const ( // PeekLock will lock messages as they are received and can be settled // using the Receiver or Processor's (Complete|Abandon|DeadLetter|Defer)Message // functions. PeekLock ReceiveMode = 0 // ReceiveAndDelete will delete messages as they are received. ReceiveAndDelete ReceiveMode = 1 )
type RestHandler ¶
RestHandler is used to transform a request and response within the http pipeline
type Retrier ¶
type Retrier interface { // Copies the retrier. Retriers are stateful and must be copied // before starting a set of retries. Copy() Retrier // Exhausted is true if the retries were exhausted. Exhausted() bool // CurrentTry is the current try (0 for the first run before retries) CurrentTry() int // Try marks an attempt to call (first call to Try() does not sleep). // Will return false if the `ctx` is cancelled or if we exhaust our retries. // // rp := RetryPolicy{Backoff:defaultBackoffPolicy, MaxRetries:5} // // for rp.Try(ctx) { // <your code> // } // // if rp.Cancelled() || rp.Exhausted() { // // no more retries needed // } // Try(ctx context.Context) bool }
A retrier that allows you to do a basic for loop and get backoff and retry limits. See `Try` for more details on how to use it.
func NewBackoffRetrier ¶
func NewBackoffRetrier(params BackoffRetrierParams) Retrier
NewBackoffRetrier creates a retrier that allows for configurable min/max times, jitter and maximum retries.
type RuleDescription ¶
type RuleDescription struct { XMLName xml.Name `xml:"RuleDescription"` BaseEntityDescription CreatedAt *date.Time `xml:"CreatedAt,omitempty"` Filter FilterDescription `xml:"Filter"` Action *ActionDescription `xml:"Action,omitempty"` }
RuleDescription is the content type for Subscription Rule management requests
type RuleEntity ¶
type RuleEntity struct { *RuleDescription *Entity }
RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities
type SQLAction ¶
type SQLAction struct {
Expression string
}
SQLAction represents a SQL language-based action expression that is evaluated against a BrokeredMessage. A SQLAction supports a subset of the SQL-92 standard.
With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.
see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter
func (SQLAction) ToActionDescription ¶
func (sf SQLAction) ToActionDescription() ActionDescription
ToActionDescription will transform the SqlAction into a ActionDescription
type SQLFilter ¶
type SQLFilter struct {
Expression string
}
SQLFilter represents a SQL language-based filter expression that is evaluated against a BrokeredMessage. A SQLFilter supports a subset of the SQL-92 standard.
see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter
func (SQLFilter) ToFilterDescription ¶
func (sf SQLFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the SqlFilter into a FilterDescription
type SubscriptionDescription ¶
type SubscriptionDescription struct { XMLName xml.Name `xml:"SubscriptionDescription"` BaseEntityDescription LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. RequiresSession *bool `xml:"RequiresSession,omitempty"` DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. DefaultRuleDescription *DefaultRuleDescription `xml:"DefaultRuleDescription,omitempty"` DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires. DeadLetteringOnFilterEvaluationExceptions *bool `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"` MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue. MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10. EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. Status *EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` AccessedAt *date.Time `xml:"AccessedAt,omitempty"` AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` ForwardTo *string `xml:"ForwardTo,omitempty"` // ForwardTo - absolute URI of the entity to forward messages ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages CountDetails *CountDetails `xml:"CountDetails,omitempty"` }
SubscriptionDescription is the content type for Subscription management requests
type SubscriptionEntity ¶
type SubscriptionEntity struct { *SubscriptionDescription *Entity }
SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities
type SubscriptionManagementOption ¶
type SubscriptionManagementOption func(*SubscriptionDescription) error
SubscriptionManagementOption represents named options for assisting Subscription creation
func SubscriptionWithAutoDeleteOnIdle ¶
func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption
SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle interval. The minimum duration is 5 minutes.
func SubscriptionWithAutoForward ¶
func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption
SubscriptionWithAutoForward configures the queue to automatically forward messages to the specified entity path
The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func SubscriptionWithBatchedOperations ¶
func SubscriptionWithBatchedOperations() SubscriptionManagementOption
SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.
func SubscriptionWithDeadLetteringOnMessageExpiration ¶
func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption
SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead letter queue
func SubscriptionWithDefaultRuleDescription ¶
func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption
SubscriptionWithDefaultRuleDescription configures the subscription to set a default rule
func SubscriptionWithForwardDeadLetteredMessagesTo ¶
func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption
SubscriptionWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target entity.
The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func SubscriptionWithLockDuration ¶
func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption
SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
func SubscriptionWithMessageTimeToLive ¶
func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption
SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func SubscriptionWithRequiredSessions ¶
func SubscriptionWithRequiredSessions() SubscriptionManagementOption
SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager provides CRUD functionality for Service Bus Subscription
func (*SubscriptionManager) Delete ¶
func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error
Delete deletes a Service Bus Topic entity by name
func (*SubscriptionManager) DeleteRule ¶
func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error
DeleteRule will delete a rule on the subscription
func (*SubscriptionManager) Get ¶
func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error)
Get fetches a Service Bus Topic entity by name
func (*SubscriptionManager) List ¶
func (sm *SubscriptionManager) List(ctx context.Context, options ...ListSubscriptionsOption) ([]*SubscriptionEntity, error)
List fetches all of the Topics for a Service Bus Namespace
func (*SubscriptionManager) ListRules ¶
func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)
ListRules returns the slice of subscription filter rules
By default when the subscription is created, there exists a single "true" filter which matches all messages.
func (SubscriptionManager) Post ¶
func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*SubscriptionManager) Put ¶
func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionManagementOption) (*SubscriptionEntity, error)
Put creates or updates a Service Bus Topic
func (*SubscriptionManager) PutRule ¶
func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)
PutRule creates a new Subscription rule to filter messages from the topic
func (*SubscriptionManager) PutRuleWithAction ¶
func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, action ActionDescriber) (*RuleEntity, error)
PutRuleWithAction creates a new Subscription rule to filter messages from the topic and then perform an action
func (SubscriptionManager) TokenProvider ¶
func (em SubscriptionManager) TokenProvider() auth.TokenProvider
TokenProvider generates authorization tokens for communicating with the Service Bus management API
func (SubscriptionManager) Use ¶
func (em SubscriptionManager) Use(mw ...MiddlewareFunc)
Use adds middleware to the middleware mwStack
type Targetable ¶
type Targetable interface {
TargetURI() string
}
Targetable provides the ability to forward messages to the entity
type TopicDescription ¶
type TopicDescription struct { XMLName xml.Name `xml:"TopicDescription"` BaseEntityDescription DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. FilteringMessagesBeforePublishing *bool `xml:"FilteringMessagesBeforePublishing,omitempty"` IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` Status *EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` SupportOrdering *bool `xml:"SupportOrdering,omitempty"` AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` EnableSubscriptionPartitioning *bool `xml:"EnableSubscriptionPartitioning,omitempty"` EnableExpress *bool `xml:"EnableExpress,omitempty"` CountDetails *CountDetails `xml:"CountDetails,omitempty"` }
TopicDescription is the content type for Topic management requests
type TopicEntity ¶
type TopicEntity struct { *TopicDescription *Entity }
TopicEntity is the Azure Service Bus description of a Topic for management activities
type TopicManagementOption ¶
type TopicManagementOption func(*TopicDescription) error
TopicManagementOption represents named options for assisting Topic creation
func TopicWithAutoDeleteOnIdle ¶
func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption
TopicWithAutoDeleteOnIdle configures the topic to automatically delete after the specified idle interval. The minimum duration is 5 minutes.
func TopicWithBatchedOperations ¶
func TopicWithBatchedOperations() TopicManagementOption
TopicWithBatchedOperations configures the topic to batch server-side operations.
func TopicWithDuplicateDetection ¶
func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption
TopicWithDuplicateDetection configures the topic to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.
func TopicWithExpress ¶
func TopicWithExpress() TopicManagementOption
TopicWithExpress configures the topic to hold a message in memory temporarily before writing it to persistent storage.
func TopicWithMaxSizeInMegabytes ¶
func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption
TopicWithMaxSizeInMegabytes configures the maximum size of the topic in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the topic. Default is 1 MB (1 * 1024).
size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku
func TopicWithMessageTimeToLive ¶
func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption
TopicWithMessageTimeToLive configures the topic to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func TopicWithOrdering ¶
func TopicWithOrdering() TopicManagementOption
TopicWithOrdering configures the topic to support ordering of messages.
func TopicWithPartitioning ¶
func TopicWithPartitioning() TopicManagementOption
TopicWithPartitioning configures the topic to be partitioned across multiple message brokers.
type TopicManager ¶
type TopicManager struct {
// contains filtered or unexported fields
}
TopicManager provides CRUD functionality for Service Bus Topics
func (*TopicManager) Delete ¶
func (tm *TopicManager) Delete(ctx context.Context, name string) error
Delete deletes a Service Bus Topic entity by name
func (*TopicManager) Get ¶
func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)
Get fetches a Service Bus Topic entity by name
func (*TopicManager) List ¶
func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error)
List fetches all of the Topics for a Service Bus Namespace
func (TopicManager) Post ¶
func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*TopicManager) Put ¶
func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)
Put creates or updates a Service Bus Topic
func (TopicManager) TokenProvider ¶
func (em TopicManager) TokenProvider() auth.TokenProvider
TokenProvider generates authorization tokens for communicating with the Service Bus management API
func (TopicManager) Use ¶
func (em TopicManager) Use(mw ...MiddlewareFunc)
Use adds middleware to the middleware mwStack
type TrueFilter ¶
type TrueFilter struct{}
TrueFilter represents a always true sql expression which will accept all messages
func (TrueFilter) ToFilterDescription ¶
func (tf TrueFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the TrueFilter into a FilterDescription