Documentation ¶
Index ¶
- Constants
- Variables
- func CancelScheduledMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, seq []int64) error
- func GetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) ([]byte, error)
- func IsCancelError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsFatalSBError(err error) bool
- func IsLinkError(err error) bool
- func IsNotAllowedError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, ...) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
- func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) ([]*amqp.Message, error)
- func ReceiveDeferred(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) ([]*amqp.Message, error)
- func RenewLocks(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) ([]time.Time, error)
- func RenewSessionLock(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) (time.Time, error)
- func ScheduleMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) ([]int64, error)
- func SetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) error
- func SettleOnMgmtLink(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, ...) error
- func TransformError(err error) error
- type AMQPLinks
- type AMQPLinksImpl
- func (l *AMQPLinksImpl) Audience() string
- func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error
- func (links *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
- func (l *AMQPLinksImpl) ClosedPermanently() bool
- func (l *AMQPLinksImpl) EntityPath() string
- func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)
- func (links *AMQPLinksImpl) ManagementPath() string
- func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error
- func (links *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, ...) error
- type CreateLinkFunc
- type Disposition
- type DispositionStatus
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type FakeAMQPLinks
- func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
- func (l *FakeAMQPLinks) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
- func (l *FakeAMQPLinks) ClosedPermanently() bool
- func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)
- func (l *FakeAMQPLinks) Prefix() string
- func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operation string, ...) error
- func (l *FakeAMQPLinks) Writef(evt azlog.Event, format string, args ...any)
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) Credits() uint32
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) LinkName() string
- func (r *FakeAMQPReceiver) Prefetched() *amqp.Message
- func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
- func (r *FakeAMQPReceiver) ReleaseMessage(ctx context.Context, msg *amqp.Message) error
- type FakeAMQPSender
- type FakeAMQPSession
- type FakeNS
- func (ns *FakeNS) Check() error
- func (ns *FakeNS) Close(permanently bool) error
- func (ns *FakeNS) GetEntityAudience(entityPath string) string
- func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, ...) AMQPLinks
- func (ns *FakeNS) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, error)
- func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) (bool, error)
- type FakeRPCLink
- type LinkID
- type LinksWithID
- type Namespace
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) (bool, error)
- type NamespaceForAMQPLinks
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithNewClientFn(fn func(ctx context.Context) (amqpwrap.AMQPClient, error)) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
- type NewAMQPLinksArgs
- type RPCError
- type RPCLinkArgs
- type RPCLinkOption
- type RecoveryKind
- type RetryWithLinksFn
Constants ¶
const ( // RPCResponseCodeLockLost comes back if you lose a message lock _or_ a session lock. // (NOTE: this is the one HTTP code that doesn't make intuitive sense. For all others I've just // used the HTTP status code instead. RPCResponseCodeLockLost = http.StatusGone )
Response codes that come back over the "RPC" style links like cbs or management.
const Version = "v1.7.0"
Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
Functions ¶
func CancelScheduledMessages ¶ added in v0.3.4
func CancelScheduledMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, seq []int64) error
CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.
func GetSessionState ¶ added in v0.3.4
func GetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, sessionID string) ([]byte, error)
GetSessionState retrieves state associated with the session.
func IsCancelError ¶
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsFatalSBError ¶ added in v0.3.4
func IsLinkError ¶ added in v1.3.0
func IsNotAllowedError ¶ added in v1.2.1
func NegotiateClaim ¶ added in v0.4.0
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider, contextWithTimeoutFn contextWithTimeoutFn) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶ added in v0.3.6
func NewRPCLink ¶ added in v0.3.4
NewRPCLink will build a new request response link
func PeekMessages ¶ added in v0.3.4
func ReceiveDeferred ¶ added in v0.3.4
func RenewLocks ¶ added in v0.3.4
func RenewLocks(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation. NOTE: this function assumes all the messages received on the same link.
func RenewSessionLock ¶ added in v0.3.4
func RenewSessionLock(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, sessionID string) (time.Time, error)
RenewSessionLocks renews a session lock.
func ScheduleMessages ¶ added in v0.3.4
func ScheduleMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error)
ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.
func SetSessionState ¶ added in v0.3.4
func SetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, sessionID string, state []byte) error
SetSessionState sets the state associated with the session.
func SettleOnMgmtLink ¶ added in v1.6.1
func SettleOnMgmtLink(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error
SettleOnMgmtLink allows you settle a message using the management link, rather than via your *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated with a link (ex: deferred messages).
func TransformError ¶ added in v0.4.1
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPLinks ¶
type AMQPLinks interface { EntityPath() string ManagementPath() string Audience() string // Get will initialize a session and call its link.linkCreator function. // If this link has been closed via Close() it will return an non retriable error. Get(ctx context.Context) (*LinksWithID, error) // Retry will run your callback, recovering links when necessary. Retry(ctx context.Context, name log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error // RecoverIfNeeded will check if an error requires recovery, and will recover // the link or, possibly, the connection. RecoverIfNeeded(ctx context.Context, linkID LinkID, err error) error // Close will close the the link. // If permanent is true the link will not be auto-recreated if Get/Recover // are called. All functions will return `ErrLinksClosed` Close(ctx context.Context, permanent bool) error // CloseIfNeeded closes the links or connection if the error is recoverable. // Use this if you don't want to recreate the connection/links at this point. CloseIfNeeded(ctx context.Context, err error) RecoveryKind // ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called. ClosedPermanently() bool // Writef logs a message, with a prefix that represents the AMQPLinks instance // for better traceability. Writef(evt azlog.Event, format string, args ...any) // Prefix is the current logging prefix, usable for logging and continuity. Prefix() string }
func NewAMQPLinks ¶ added in v0.3.4
func NewAMQPLinks(args NewAMQPLinksArgs) AMQPLinks
NewAMQPLinks creates a session, starts the claim refresher and creates an associated management link for a specific entity path.
type AMQPLinksImpl ¶ added in v0.3.4
type AMQPLinksImpl struct { // RPCLink lets you interact with the $management link for your entity. RPCLink amqpwrap.RPCLink // these are populated by your `createLinkFunc` when you construct // the amqpLinks Sender amqpwrap.AMQPSenderCloser Receiver amqpwrap.AMQPReceiverCloser utils.Logger // contains filtered or unexported fields }
AMQPLinksImpl manages the set of AMQP links (and detritus) typically needed to work within Service Bus:
- An *goamqp.Sender or *goamqp.Receiver AMQP link (could also be 'both' if needed) - A `$management` link - an *goamqp.Session
State management can be done through Recover (close and reopen), Close (close permanently, return failures) and Get() (retrieve latest version of all AMQPLinksImpl, or create if needed).
func (*AMQPLinksImpl) Audience ¶ added in v0.3.4
func (l *AMQPLinksImpl) Audience() string
EntityPath is the audience for the queue/topic/subscription.
func (*AMQPLinksImpl) Close ¶ added in v0.3.4
func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error
Close will close the the link permanently. Any further calls to Get()/Recover() to return ErrLinksClosed.
func (*AMQPLinksImpl) CloseIfNeeded ¶ added in v0.3.6
func (links *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
CloseIfNeeded closes the links or connection if the error is recoverable. Use this if you want to make it so the _next_ call on your Sender/Receiver eats the cost of recovery, instead of doing it immediately. This is useful if you're trying to exit out of a function quickly but still need to react to a returned error.
func (*AMQPLinksImpl) ClosedPermanently ¶ added in v0.3.4
func (l *AMQPLinksImpl) ClosedPermanently() bool
ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
func (*AMQPLinksImpl) EntityPath ¶ added in v0.3.4
func (l *AMQPLinksImpl) EntityPath() string
EntityPath is the full entity path for the queue/topic/subscription.
func (*AMQPLinksImpl) Get ¶ added in v0.3.4
func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)
Get will initialize a session and call its link.linkCreator function. If this link has been closed via Close() it will return an non retriable error.
func (*AMQPLinksImpl) ManagementPath ¶ added in v0.3.4
func (links *AMQPLinksImpl) ManagementPath() string
ManagementPath is the management path for the associated entity.
func (*AMQPLinksImpl) RecoverIfNeeded ¶ added in v0.3.4
func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error
Recover will recover the links or the connection, depending on the severity of the error.
func (*AMQPLinksImpl) Retry ¶ added in v0.3.4
func (links *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error
type CreateLinkFunc ¶
type CreateLinkFunc func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error)
CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
type Disposition ¶
type Disposition struct { Status DispositionStatus LockTokens []*uuid.UUID DeadLetterReason *string DeadLetterDescription *string }
type DispositionStatus ¶
type DispositionStatus string
const ( CompletedDisposition DispositionStatus = "completed" AbandonedDisposition DispositionStatus = "abandoned" SuspendedDisposition DispositionStatus = "suspended" DeferredDisposition DispositionStatus = "defered" )
type ErrAMQP ¶
type ErrAMQP amqpwrap.RPCResponse
ErrAMQP indicates that the server communicated an AMQP error with a particular
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual any) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type FakeAMQPLinks ¶
type FakeAMQPLinks struct { AMQPLinks Closed int CloseIfNeededCalled int // values to be returned for each `Get` call Revision LinkID Receiver amqpwrap.AMQPReceiver Sender amqpwrap.AMQPSender RPC amqpwrap.RPCLink // Err is the error returned as part of Get() Err error // contains filtered or unexported fields }
func (*FakeAMQPLinks) Close ¶
func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
func (*FakeAMQPLinks) CloseIfNeeded ¶ added in v0.3.6
func (l *FakeAMQPLinks) CloseIfNeeded(ctx context.Context, err error) RecoveryKind
func (*FakeAMQPLinks) ClosedPermanently ¶
func (l *FakeAMQPLinks) ClosedPermanently() bool
func (*FakeAMQPLinks) Get ¶
func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)
func (*FakeAMQPLinks) Prefix ¶ added in v1.3.0
func (l *FakeAMQPLinks) Prefix() string
func (*FakeAMQPLinks) Retry ¶ added in v0.3.4
func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct { amqpwrap.AMQPReceiver Closed int CloseFn func(ctx context.Context) error CreditsCalled int CreditsImpl func() uint32 IssueCreditErr error RequestedCredits uint32 PrefetchedCalled int ReceiveCalled int ReceiveFn func(ctx context.Context) (*amqp.Message, error) ReleaseMessageCalled int ReleaseMessageFn func(ctx context.Context, msg *amqp.Message) error ReceiveResults []struct { M *amqp.Message E error } PrefetchedResults []*amqp.Message }
func (*FakeAMQPReceiver) Credits ¶ added in v1.1.0
func (r *FakeAMQPReceiver) Credits() uint32
func (*FakeAMQPReceiver) IssueCredit ¶ added in v0.3.4
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) LinkName ¶ added in v1.0.1
func (r *FakeAMQPReceiver) LinkName() string
func (*FakeAMQPReceiver) Prefetched ¶ added in v0.3.4
func (r *FakeAMQPReceiver) Prefetched() *amqp.Message
func (*FakeAMQPReceiver) Receive ¶ added in v0.3.4
func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
Receive returns the next result from ReceiveResults or, if the ReceiveResults is empty, will block on ctx.Done().
func (*FakeAMQPReceiver) ReleaseMessage ¶ added in v1.1.0
func (r *FakeAMQPReceiver) ReleaseMessage(ctx context.Context, msg *amqp.Message) error
type FakeAMQPSender ¶
type FakeAMQPSender struct { Closed int amqpwrap.AMQPSender }
func (*FakeAMQPSender) LinkName ¶ added in v1.3.0
func (s *FakeAMQPSender) LinkName() string
type FakeAMQPSession ¶
type FakeAMQPSession struct { amqpwrap.AMQPSession NewReceiverFn func(ctx context.Context, source string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error) // contains filtered or unexported fields }
func (*FakeAMQPSession) NewReceiver ¶ added in v0.4.1
func (s *FakeAMQPSession) NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
type FakeNS ¶
type FakeNS struct { RPCLink amqpwrap.RPCLink Session amqpwrap.AMQPSession AMQPLinks *FakeAMQPLinks CloseCalled int // contains filtered or unexported fields }
func (*FakeNS) GetEntityAudience ¶
func (*FakeNS) NegotiateClaim ¶
func (*FakeNS) NewAMQPLinks ¶
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc, getRecoveryKindFunc func(err error) RecoveryKind) AMQPLinks
func (*FakeNS) NewAMQPSession ¶
func (*FakeNS) NewRPCLink ¶
type FakeRPCLink ¶ added in v0.4.1
type FakeRPCLink struct { Resp *amqpwrap.RPCResponse Error error }
func (*FakeRPCLink) RPC ¶ added in v0.4.1
func (r *FakeRPCLink) RPC(ctx context.Context, msg *amqp.Message) (*amqpwrap.RPCResponse, error)
type LinkID ¶ added in v0.3.4
type LinkID struct { // Conn is the ID of the connection we used to create our links. Conn uint64 // Link is the ID of our current link. Link uint64 }
LinkID is ID that represent our current link and the client used to create it. These are used when trying to determine what parts need to be recreated when an error occurs, to prevent recovering a connection/link repeatedly. See amqpLinks.RecoverIfNeeded() for usage.
type LinksWithID ¶ added in v0.3.4
type LinksWithID struct { Sender amqpwrap.AMQPSender Receiver amqpwrap.AMQPReceiver RPC amqpwrap.RPCLink ID LinkID }
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // NOTE: exported only so it can be checked in a test RetryOptions exported.RetryOptions // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶ added in v0.4.0
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) GetAMQPClientImpl ¶ added in v0.3.4
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, error)
NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.
func (*Namespace) Recover ¶
Recover destroys the currently held AMQP connection and recreates it, if needed.
If a new client is actually created (rather than just cached) then the returned bool will be true. Any links that were created from the original connection will need to be recreated.
NOTE: cancelling the context only cancels the initialization of a new AMQP connection - the previous connection is always closed.
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { Check() error NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, error) GetEntityAudience(entityPath string) string // Recover destroys the currently held AMQP connection and recreates it, if needed. // // If a new client is actually created (rather than just cached) then the returned bool // will be true. Any links that were created from the original connection will need to // be recreated. // // NOTE: cancelling the context only cancels the initialization of a new AMQP // connection - the previous connection is always closed. Recover(ctx context.Context, clientRevision uint64) (bool, error) Close(permanently bool) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Service Bus namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string
func NamespaceWithNewClientFn ¶ added in v1.2.0
func NamespaceWithNewClientFn(fn func(ctx context.Context) (amqpwrap.AMQPClient, error)) NamespaceOption
NamespaceWithNewClientFn lets you inject a construction function to create new AMQP clients. Useful for tests.
func NamespaceWithRetryOptions ¶ added in v0.3.4
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶ added in v0.3.4
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NewAMQPLinksArgs ¶ added in v0.4.0
type NewAMQPLinksArgs struct { NS NamespaceForAMQPLinks EntityPath string CreateLinkFunc CreateLinkFunc GetRecoveryKindFunc func(err error) RecoveryKind }
type RPCError ¶ added in v0.4.1
type RPCError struct { Resp *amqpwrap.RPCResponse Message string }
RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
type RPCLinkArgs ¶ added in v0.4.0
type RPCLinkArgs struct { Client amqpwrap.AMQPClient Address string LogEvent azlog.Event }
type RPCLinkOption ¶ added in v0.3.4
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RecoveryKind ¶ added in v0.4.0
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶ added in v0.3.3
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
func GetRecoveryKindForSession ¶ added in v0.4.0
func GetRecoveryKindForSession(err error) RecoveryKind
GetRecoveryKindForSession determines the recovery type for session-based links.
type RetryWithLinksFn ¶ added in v0.3.4
type RetryWithLinksFn func(ctx context.Context, lwid *LinksWithID, args *utils.RetryFnArgs) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
|
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |