Documentation ¶
Overview ¶
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) error
- func GetRecoveryKind(err error) recoveryKind
- func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]byte, error)
- func IsCancelError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsFatalSBError(err error) bool
- func NewRPCLink(conn *amqp.Client, address string, opts ...RPCLinkOption) (*rpcLink, error)
- func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64, ...) ([]*amqp.Message, error)
- func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode ReceiveMode, ...) ([]*amqp.Message, error)
- func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
- func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (time.Time, error)
- func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, ...) ([]int64, error)
- func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, ...) error
- func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, state []byte) error
- type AMQPLinks
- type AMQPLinksImpl
- func (l *AMQPLinksImpl) Audience() string
- func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error
- func (l *AMQPLinksImpl) ClosedPermanently() bool
- func (l *AMQPLinksImpl) EntityPath() string
- func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)
- func (links *AMQPLinksImpl) ManagementPath() string
- func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error
- func (l *AMQPLinksImpl) Retry(ctx context.Context, name string, fn RetryWithLinksFn, o utils.RetryOptions) error
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type AMQPSession
- type AMQPSessionCloser
- type Closeable
- type CreateLinkFunc
- type Disposition
- type DispositionStatus
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNonRetriable
- type ErrNotFound
- type FakeAMQPLinks
- func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
- func (l *FakeAMQPLinks) ClosedPermanently() bool
- func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)
- func (l *FakeAMQPLinks) Retry(ctx context.Context, name string, fn RetryWithLinksFn, o utils.RetryOptions) error
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error)
- func (r *FakeAMQPReceiver) Receive(ctx context.Context) (*amqp.Message, error)
- type FakeAMQPSender
- type FakeAMQPSession
- type FakeNS
- func (ns *FakeNS) CloseIfNeeded(ctx context.Context, clientRevision uint64) error
- func (ns *FakeNS) GetEntityAudience(entityPath string) string
- func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
- func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
- func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
- func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
- func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) (bool, error)
- type LinkID
- type LinksWithID
- type Namespace
- func (ns *Namespace) Close(ctx context.Context) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (*amqp.Client, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
- func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) (bool, error)
- type NamespaceForAMQPLinks
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions utils.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NewWebSocketConnArgs
- type RPCLink
- type RPCLinkOption
- type RPCResponse
- type ReceiveMode
- type RetryWithLinksFn
- type SBErrInfo
Constants ¶
const ( // Link/connection creation EventConn = "azsb.Conn" // authentication/claims negotiation EventAuth = "azsb.Auth" // receiver operations EventReceiver = "azsb.Receiver" // mgmt link EventMgmtLink = "azsb.Mgmt" // internal operations EventRetry = utils.EventRetry )
const RecoveryKindConn recoveryKind = "connection"
const RecoveryKindFatal recoveryKind = "fatal"
const RecoveryKindLink recoveryKind = "link"
const RecoveryKindNone recoveryKind = ""
const Version = "v0.3.5"
TODO: this should move into a proper file. Need to resolve some interdependency issues between the public and internal packages first. Version is the semantic version number
Variables ¶
This section is empty.
Functions ¶
func CancelScheduledMessages ¶ added in v0.3.4
CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.
func GetRecoveryKind ¶ added in v0.3.3
func GetRecoveryKind(err error) recoveryKind
func GetSessionState ¶ added in v0.3.4
GetSessionState retrieves state associated with the session.
func IsCancelError ¶
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsFatalSBError ¶ added in v0.3.4
func NewRPCLink ¶ added in v0.3.4
func NewRPCLink(conn *amqp.Client, address string, opts ...RPCLinkOption) (*rpcLink, error)
NewLink will build a new request response link
func PeekMessages ¶ added in v0.3.4
func ReceiveDeferred ¶ added in v0.3.4
func RenewLocks ¶ added in v0.3.4
func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation. NOTE: this function assumes all the messages received on the same link.
func RenewSessionLock ¶ added in v0.3.4
RenewSessionLocks renews a session lock.
func ScheduleMessages ¶ added in v0.3.4
func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error)
ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.
func SendDisposition ¶ added in v0.3.4
func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error
SendDisposition allows you settle a message using the management link, rather than via your *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated with a link (ex: deferred messages).
Types ¶
type AMQPLinks ¶
type AMQPLinks interface { EntityPath() string ManagementPath() string Audience() string // Get will initialize a session and call its link.linkCreator function. // If this link has been closed via Close() it will return an non retriable error. Get(ctx context.Context) (*LinksWithID, error) // Retry will run your callback, recovering links when necessary. Retry(ctx context.Context, name string, fn RetryWithLinksFn, o utils.RetryOptions) error // RecoverIfNeeded will check if an error requires recovery, and will recover // the link or, possibly, the connection. RecoverIfNeeded(ctx context.Context, linkID LinkID, err error) error // Close will close the the link. // If permanent is true the link will not be auto-recreated if Get/Recover // are called. All functions will return `ErrLinksClosed` Close(ctx context.Context, permanent bool) error // ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called. ClosedPermanently() bool }
func NewAMQPLinks ¶ added in v0.3.4
func NewAMQPLinks(ns NamespaceForAMQPLinks, entityPath string, createLink CreateLinkFunc) AMQPLinks
NewAMQPLinks creates a session, starts the claim refresher and creates an associated management link for a specific entity path.
type AMQPLinksImpl ¶ added in v0.3.4
type AMQPLinksImpl struct { // RPCLink lets you interact with the $management link for your entity. RPCLink RPCLink // these are populated by your `createLinkFunc` when you construct // the amqpLinks Sender AMQPSenderCloser Receiver AMQPReceiverCloser // contains filtered or unexported fields }
AMQPLinksImpl manages the set of AMQP links (and detritus) typically needed to work
within Service Bus:
- An *goamqp.Sender or *goamqp.Receiver AMQP link (could also be 'both' if needed) - A `$management` link - an *goamqp.Session
State management can be done through Recover (close and reopen), Close (close permanently, return failures) and Get() (retrieve latest version of all AMQPLinksImpl, or create if needed).
func (*AMQPLinksImpl) Audience ¶ added in v0.3.4
func (l *AMQPLinksImpl) Audience() string
EntityPath is the audience for the queue/topic/subscription.
func (*AMQPLinksImpl) Close ¶ added in v0.3.4
func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error
Close will close the the link permanently. Any further calls to Get()/Recover() to return ErrLinksClosed.
func (*AMQPLinksImpl) ClosedPermanently ¶ added in v0.3.4
func (l *AMQPLinksImpl) ClosedPermanently() bool
ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
func (*AMQPLinksImpl) EntityPath ¶ added in v0.3.4
func (l *AMQPLinksImpl) EntityPath() string
EntityPath is the full entity path for the queue/topic/subscription.
func (*AMQPLinksImpl) Get ¶ added in v0.3.4
func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)
Get will initialize a session and call its link.linkCreator function. If this link has been closed via Close() it will return an non retriable error.
func (*AMQPLinksImpl) ManagementPath ¶ added in v0.3.4
func (links *AMQPLinksImpl) ManagementPath() string
ManagementPath is the management path for the associated entity.
func (*AMQPLinksImpl) RecoverIfNeeded ¶ added in v0.3.4
func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error
Recover will recover the links or the connection, depending on the severity of the error.
func (*AMQPLinksImpl) Retry ¶ added in v0.3.4
func (l *AMQPLinksImpl) Retry(ctx context.Context, name string, fn RetryWithLinksFn, o utils.RetryOptions) error
type AMQPReceiver ¶
type AMQPReceiver interface { IssueCredit(credit uint32) error DrainCredit(ctx context.Context) error Receive(ctx context.Context) (*amqp.Message, error) Prefetched(ctx context.Context) (*amqp.Message, error) // settlement functions AcceptMessage(ctx context.Context, msg *amqp.Message) error RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error ReleaseMessage(ctx context.Context, msg *amqp.Message) error ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error LinkName() string LinkSourceFilterValue(name string) interface{} }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser interface { AMQPReceiver Close(ctx context.Context) error }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPSender ¶
type AMQPSender interface { Send(ctx context.Context, msg *amqp.Message) error MaxMessageSize() uint64 }
AMQPSender is implemented by *amqp.Sender
type AMQPSenderCloser ¶
type AMQPSenderCloser interface { AMQPSender Close(ctx context.Context) error }
AMQPSenderCloser is implemented by *amqp.Sender
type AMQPSession ¶
type AMQPSession interface { NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error) NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error) }
AMQPSession is implemented by *amqp.Session
type AMQPSessionCloser ¶
type AMQPSessionCloser interface { AMQPSession Close(ctx context.Context) error }
AMQPSessionCloser is implemented by *amqp.Session
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type CreateLinkFunc ¶
type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
type Disposition ¶
type Disposition struct { Status DispositionStatus LockTokens []*uuid.UUID DeadLetterReason *string DeadLetterDescription *string }
type DispositionStatus ¶
type DispositionStatus string
const ( CompletedDisposition DispositionStatus = "completed" AbandonedDisposition DispositionStatus = "abandoned" SuspendedDisposition DispositionStatus = "suspended" DeferredDisposition DispositionStatus = "defered" )
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNonRetriable ¶ added in v0.3.3
type ErrNonRetriable struct {
Message string
}
func (ErrNonRetriable) Error ¶ added in v0.3.3
func (e ErrNonRetriable) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type FakeAMQPLinks ¶
type FakeAMQPLinks struct { AMQPLinks Closed int // values to be returned for each `Get` call Revision LinkID Receiver AMQPReceiver Sender AMQPSender RPC RPCLink Err error // contains filtered or unexported fields }
func (*FakeAMQPLinks) Close ¶
func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
func (*FakeAMQPLinks) ClosedPermanently ¶
func (l *FakeAMQPLinks) ClosedPermanently() bool
func (*FakeAMQPLinks) Get ¶
func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)
func (*FakeAMQPLinks) Retry ¶ added in v0.3.4
func (l *FakeAMQPLinks) Retry(ctx context.Context, name string, fn RetryWithLinksFn, o utils.RetryOptions) error
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct { AMQPReceiver Closed int Drain int RequestedCredits uint32 ReceiveResults chan struct { M *amqp.Message E error } }
func (*FakeAMQPReceiver) DrainCredit ¶
func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error
func (*FakeAMQPReceiver) IssueCredit ¶ added in v0.3.4
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) Prefetched ¶ added in v0.3.4
func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error)
type FakeAMQPSender ¶
type FakeAMQPSender struct { Closed int AMQPSender }
type FakeAMQPSession ¶
type FakeAMQPSession struct { AMQPSessionCloser // contains filtered or unexported fields }
type FakeNS ¶
type FakeNS struct { RPCLink RPCLink Session AMQPSessionCloser AMQPLinks *FakeAMQPLinks // contains filtered or unexported fields }
func (*FakeNS) CloseIfNeeded ¶ added in v0.3.4
func (*FakeNS) GetEntityAudience ¶
func (*FakeNS) NegotiateClaim ¶
func (*FakeNS) NewAMQPLinks ¶
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
func (*FakeNS) NewAMQPSession ¶
func (*FakeNS) NewRPCLink ¶
type LinkID ¶ added in v0.3.4
type LinkID struct { // Conn is the ID of the connection we used to create our links. Conn uint64 // Link is the ID of our current link. Link uint64 }
LinkID is ID that represent our current link and the client used to create it. These are used when trying to determine what parts need to be recreated when an error occurs, to prevent recovering a connection/link repeatedly. See amqpLinks.RecoverIfNeeded() for usage.
type LinksWithID ¶ added in v0.3.4
type LinksWithID struct { Sender AMQPSender Receiver AMQPReceiver RPC RPCLink ID LinkID }
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) GetAMQPClientImpl ¶ added in v0.3.4
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPLinks ¶
func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error) GetEntityAudience(entityPath string) string Recover(ctx context.Context, clientRevision uint64) (bool, error) }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Service Bus namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string
func NamespaceWithRetryOptions ¶ added in v0.3.4
func NamespaceWithRetryOptions(retryOptions utils.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶ added in v0.3.4
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
}
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewWebSocketConnArgs ¶ added in v0.3.2
type NewWebSocketConnArgs struct { // Host is the the `wss://<host>` to connect to Host string }
NewWebSocketConnArgs are the arguments to the NewWebSocketConn function you pass if you want to enable websockets.
type RPCLink ¶
type RPCLink interface { Close(ctx context.Context) error RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error) }
RPCLink is implemented by *rpc.Link
type RPCLinkOption ¶ added in v0.3.4
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RPCResponse ¶ added in v0.3.4
RPCResponse is the simplified response structure from an RPC like call
type ReceiveMode ¶
type ReceiveMode int
ReceiveMode represents the lock style to use for a reciever - either `PeekLock` or `ReceiveAndDelete`
const ( // PeekLock will lock messages as they are received and can be settled // using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message // functions. PeekLock ReceiveMode = 0 // ReceiveAndDelete will delete messages as they are received. ReceiveAndDelete ReceiveMode = 1 )
type RetryWithLinksFn ¶ added in v0.3.4
type RetryWithLinksFn func(ctx context.Context, lwid *LinksWithID, args *utils.RetryFnArgs) error
type SBErrInfo ¶ added in v0.3.4
type SBErrInfo struct { RecoveryKind recoveryKind // contains filtered or unexported fields }
func GetSBErrInfo ¶ added in v0.3.4
GetSBErrInfo wraps the passed in 'err' with a proper error with one of either:
- `fatalServiceBusError` if no recovery is possible.
- `serviceBusError` if the error is recoverable. The `recoveryKind` field contains the type of recovery needed.