Documentation ¶
Overview ¶
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) error
- func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]byte, error)
- func IsCancelError(err error) bool
- func IsDetachError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsFatalSBError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, ...) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(args RPCLinkArgs) (*rpcLink, error)
- func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64, ...) ([]*amqp.Message, error)
- func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.ReceiveMode, ...) ([]*amqp.Message, error)
- func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
- func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (time.Time, error)
- func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, ...) ([]int64, error)
- func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, ...) error
- func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, state []byte) error
- func TransformError(err error) error
- type AMQPLinks
- type AMQPLinksImpl
- func (l *AMQPLinksImpl) Audience() string
- func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error
- func (l *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
- func (l *AMQPLinksImpl) ClosedPermanently() bool
- func (l *AMQPLinksImpl) EntityPath() string
- func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)
- func (links *AMQPLinksImpl) ManagementPath() string
- func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error
- func (l *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, ...) error
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type AMQPSession
- type AMQPSessionCloser
- type Closeable
- type CreateLinkFunc
- type Disposition
- type DispositionStatus
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type FakeAMQPLinks
- func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
- func (l *FakeAMQPLinks) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
- func (l *FakeAMQPLinks) ClosedPermanently() bool
- func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)
- func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operation string, ...) error
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error)
- func (r *FakeAMQPReceiver) Receive(ctx context.Context) (*amqp.Message, error)
- type FakeAMQPSender
- type FakeAMQPSession
- type FakeNS
- func (ns *FakeNS) Check() error
- func (ns *FakeNS) Close(ctx context.Context, permanently bool) error
- func (ns *FakeNS) GetEntityAudience(entityPath string) string
- func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, ...) AMQPLinks
- func (ns *FakeNS) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
- func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) (bool, error)
- type FakeRPCLink
- type LinkID
- type LinksWithID
- type Namespace
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(ctx context.Context, permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (*amqp.Client, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, ...) AMQPLinks
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) (bool, error)
- type NamespaceForAMQPLinks
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NewAMQPLinksArgs
- type RPCError
- type RPCLink
- type RPCLinkArgs
- type RPCLinkOption
- type RPCResponse
- type RecoveryKind
- type RetryWithLinksFn
Constants ¶
const ( // RPCResponseCodeLockLost comes back if you lose a message lock _or_ a session lock. // (NOTE: this is the one HTTP code that doesn't make intuitive sense. For all others I've just // used the HTTP status code instead. RPCResponseCodeLockLost = http.StatusGone )
Response codes that come back over the "RPC" style links like cbs or management.
const Version = "v1.0.0"
Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
Functions ¶
func CancelScheduledMessages ¶ added in v0.3.4
CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.
func GetSessionState ¶ added in v0.3.4
GetSessionState retrieves state associated with the session.
func IsCancelError ¶
func IsDetachError ¶ added in v0.3.6
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsFatalSBError ¶ added in v0.3.4
func NegotiateClaim ¶ added in v0.4.0
func NegotiateClaim(ctx context.Context, audience string, conn *amqp.Client, provider auth.TokenProvider) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶ added in v0.3.6
func NewRPCLink ¶ added in v0.3.4
func NewRPCLink(args RPCLinkArgs) (*rpcLink, error)
NewRPCLink will build a new request response link
func PeekMessages ¶ added in v0.3.4
func ReceiveDeferred ¶ added in v0.3.4
func RenewLocks ¶ added in v0.3.4
func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation. NOTE: this function assumes all the messages received on the same link.
func RenewSessionLock ¶ added in v0.3.4
RenewSessionLocks renews a session lock.
func ScheduleMessages ¶ added in v0.3.4
func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error)
ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.
func SendDisposition ¶ added in v0.3.4
func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error
SendDisposition allows you settle a message using the management link, rather than via your *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated with a link (ex: deferred messages).
func SetSessionState ¶ added in v0.3.4
SetSessionState sets the state associated with the session.
func TransformError ¶ added in v0.4.1
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPLinks ¶
type AMQPLinks interface { EntityPath() string ManagementPath() string Audience() string // Get will initialize a session and call its link.linkCreator function. // If this link has been closed via Close() it will return an non retriable error. Get(ctx context.Context) (*LinksWithID, error) // Retry will run your callback, recovering links when necessary. Retry(ctx context.Context, name log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error // RecoverIfNeeded will check if an error requires recovery, and will recover // the link or, possibly, the connection. RecoverIfNeeded(ctx context.Context, linkID LinkID, err error) error // Close will close the the link. // If permanent is true the link will not be auto-recreated if Get/Recover // are called. All functions will return `ErrLinksClosed` Close(ctx context.Context, permanent bool) error // CloseIfNeeded closes the links or connection if the error is recoverable. // Use this if you don't want to recreate the connection/links at this point. CloseIfNeeded(ctx context.Context, err error) RecoveryKind // ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called. ClosedPermanently() bool }
func NewAMQPLinks ¶ added in v0.3.4
func NewAMQPLinks(args NewAMQPLinksArgs) AMQPLinks
NewAMQPLinks creates a session, starts the claim refresher and creates an associated management link for a specific entity path.
type AMQPLinksImpl ¶ added in v0.3.4
type AMQPLinksImpl struct { // RPCLink lets you interact with the $management link for your entity. RPCLink RPCLink // these are populated by your `createLinkFunc` when you construct // the amqpLinks Sender AMQPSenderCloser Receiver AMQPReceiverCloser // contains filtered or unexported fields }
AMQPLinksImpl manages the set of AMQP links (and detritus) typically needed to work
within Service Bus:
- An *goamqp.Sender or *goamqp.Receiver AMQP link (could also be 'both' if needed) - A `$management` link - an *goamqp.Session
State management can be done through Recover (close and reopen), Close (close permanently, return failures) and Get() (retrieve latest version of all AMQPLinksImpl, or create if needed).
func (*AMQPLinksImpl) Audience ¶ added in v0.3.4
func (l *AMQPLinksImpl) Audience() string
EntityPath is the audience for the queue/topic/subscription.
func (*AMQPLinksImpl) Close ¶ added in v0.3.4
func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error
Close will close the the link permanently. Any further calls to Get()/Recover() to return ErrLinksClosed.
func (*AMQPLinksImpl) CloseIfNeeded ¶ added in v0.3.6
func (l *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
CloseIfNeeded closes the links or connection if the error is recoverable. Use this if you want to make it so the _next_ call on your Sender/Receiver eats the cost of recovery, instead of doing it immediately. This is useful if you're trying to exit out of a function quickly but still need to react to a returned error.
func (*AMQPLinksImpl) ClosedPermanently ¶ added in v0.3.4
func (l *AMQPLinksImpl) ClosedPermanently() bool
ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
func (*AMQPLinksImpl) EntityPath ¶ added in v0.3.4
func (l *AMQPLinksImpl) EntityPath() string
EntityPath is the full entity path for the queue/topic/subscription.
func (*AMQPLinksImpl) Get ¶ added in v0.3.4
func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)
Get will initialize a session and call its link.linkCreator function. If this link has been closed via Close() it will return an non retriable error.
func (*AMQPLinksImpl) ManagementPath ¶ added in v0.3.4
func (links *AMQPLinksImpl) ManagementPath() string
ManagementPath is the management path for the associated entity.
func (*AMQPLinksImpl) RecoverIfNeeded ¶ added in v0.3.4
func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error
Recover will recover the links or the connection, depending on the severity of the error.
func (*AMQPLinksImpl) Retry ¶ added in v0.3.4
func (l *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error
type AMQPReceiver ¶
type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender ¶
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser ¶
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
type AMQPSession ¶
type AMQPSession interface { NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error) NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error) }
AMQPSession is implemented by *amqp.Session
type AMQPSessionCloser ¶
type AMQPSessionCloser interface { AMQPSession Close(ctx context.Context) error }
AMQPSessionCloser is implemented by *amqp.Session
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type CreateLinkFunc ¶
type CreateLinkFunc func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
type Disposition ¶
type Disposition struct { Status DispositionStatus LockTokens []*uuid.UUID DeadLetterReason *string DeadLetterDescription *string }
type DispositionStatus ¶
type DispositionStatus string
const ( CompletedDisposition DispositionStatus = "completed" AbandonedDisposition DispositionStatus = "abandoned" SuspendedDisposition DispositionStatus = "suspended" DeferredDisposition DispositionStatus = "defered" )
type ErrAMQP ¶
type ErrAMQP RPCResponse
ErrAMQP indicates that the server communicated an AMQP error with a particular
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type FakeAMQPLinks ¶
type FakeAMQPLinks struct { AMQPLinks Closed int CloseIfNeededCalled int // values to be returned for each `Get` call Revision LinkID Receiver AMQPReceiver Sender AMQPSender RPC RPCLink // Err is the error returned as part of Get() Err error // contains filtered or unexported fields }
func (*FakeAMQPLinks) Close ¶
func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
func (*FakeAMQPLinks) CloseIfNeeded ¶ added in v0.3.6
func (l *FakeAMQPLinks) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
func (*FakeAMQPLinks) ClosedPermanently ¶
func (l *FakeAMQPLinks) ClosedPermanently() bool
func (*FakeAMQPLinks) Get ¶
func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)
func (*FakeAMQPLinks) Retry ¶ added in v0.3.4
func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct { AMQPReceiver Closed int CloseFn func(ctx context.Context) error DrainCalled int DrainCreditImpl func(ctx context.Context) error IssueCreditErr error RequestedCredits uint32 PrefetchedCalled int ReceiveCalled int ReceiveFn func(ctx context.Context) (*amqp.Message, error) ReceiveResults []struct { M *amqp.Message E error } PrefetchResults []struct { M *amqp.Message E error } }
func (*FakeAMQPReceiver) DrainCredit ¶
func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error
func (*FakeAMQPReceiver) IssueCredit ¶ added in v0.3.4
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) Prefetched ¶ added in v0.3.4
func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error)
Prefetched will return the next reuslt from PrefetchedResults or, if the PrefetchedResults is empty will return nil, nil.
type FakeAMQPSender ¶
type FakeAMQPSender struct { Closed int AMQPSender }
type FakeAMQPSession ¶
type FakeAMQPSession struct { amqpwrap.AMQPSession NewReceiverFn func(opts ...amqp.LinkOption) (AMQPReceiverCloser, error) // contains filtered or unexported fields }
func (*FakeAMQPSession) NewReceiver ¶ added in v0.4.1
func (s *FakeAMQPSession) NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error)
type FakeNS ¶
type FakeNS struct { RPCLink RPCLink Session amqpwrap.AMQPSession AMQPLinks *FakeAMQPLinks CloseCalled int // contains filtered or unexported fields }
func (*FakeNS) GetEntityAudience ¶
func (*FakeNS) NegotiateClaim ¶
func (*FakeNS) NewAMQPLinks ¶
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, getRecoveryKindFunc func(err error) RecoveryKind) AMQPLinks
func (*FakeNS) NewAMQPSession ¶
func (*FakeNS) NewRPCLink ¶
type FakeRPCLink ¶ added in v0.4.1
type FakeRPCLink struct { Resp *RPCResponse Error error }
func (*FakeRPCLink) RPC ¶ added in v0.4.1
func (r *FakeRPCLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
type LinkID ¶ added in v0.3.4
type LinkID struct { // Conn is the ID of the connection we used to create our links. Conn uint64 // Link is the ID of our current link. Link uint64 }
LinkID is ID that represent our current link and the client used to create it. These are used when trying to determine what parts need to be recreated when an error occurs, to prevent recovering a connection/link repeatedly. See amqpLinks.RecoverIfNeeded() for usage.
type LinksWithID ¶ added in v0.3.4
type LinksWithID struct { Sender AMQPSender Receiver AMQPReceiver RPC RPCLink ID LinkID }
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // NOTE: exported only so it can be checked in a test RetryOptions exported.RetryOptions // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶ added in v0.4.0
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) GetAMQPClientImpl ¶ added in v0.3.4
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPLinks ¶
func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, getRecoveryKindFunc func(err error) RecoveryKind) AMQPLinks
NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error) GetEntityAudience(entityPath string) string Recover(ctx context.Context, clientRevision uint64) (bool, error) Close(ctx context.Context, permanently bool) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Service Bus namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string
func NamespaceWithRetryOptions ¶ added in v0.3.4
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶ added in v0.3.4
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface { NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, getRecoveryKindFunc func(err error) RecoveryKind) AMQPLinks Check() error }
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewAMQPLinksArgs ¶ added in v0.4.0
type NewAMQPLinksArgs struct { NS NamespaceForAMQPLinks EntityPath string CreateLinkFunc CreateLinkFunc GetRecoveryKindFunc func(err error) RecoveryKind }
type RPCError ¶ added in v0.4.1
type RPCError struct { Resp *RPCResponse Message string }
RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
type RPCLink ¶
type RPCLink interface { Close(ctx context.Context) error RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error) }
RPCLink is implemented by *rpc.Link
type RPCLinkArgs ¶ added in v0.4.0
type RPCLinkArgs struct { Client amqpwrap.AMQPClient Address string LogEvent azlog.Event }
type RPCLinkOption ¶ added in v0.3.4
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RPCResponse ¶ added in v0.3.4
type RPCResponse struct { // Code is the response code - these originate from Service Bus. Some // common values are called out below, with the RPCResponseCode* constants. Code int Description string Message *amqp.Message }
RPCResponse is the simplified response structure from an RPC like call
type RecoveryKind ¶ added in v0.4.0
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶ added in v0.3.3
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
func GetRecoveryKindForSession ¶ added in v0.4.0
func GetRecoveryKindForSession(err error) RecoveryKind
GetRecoveryKindForSession determines the recovery type for session-based links.
type RetryWithLinksFn ¶ added in v0.3.4
type RetryWithLinksFn func(ctx context.Context, lwid *LinksWithID, args *utils.RetryFnArgs) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
|
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
Package sas provides SAS token functionality which implements TokenProvider from package auth for use with Azure Event Hubs and Service Bus.
|
Package sas provides SAS token functionality which implements TokenProvider from package auth for use with Azure Event Hubs and Service Bus. |